### Objective:

The objective of this notebook is to demonstrate the core data manipulation techniques using PySpark on a sample e-commerce dataset. Specifically, it covers the following tasks:

1. **Selecting specific columns**: Extracting relevant columns for analysis from a larger dataset.
2. **Filtering data**: Using conditional filtering to focus on specific types of events, such as purchases.
3. **Joining DataFrames**: Merging datasets to enrich data with additional information, such as user details.
4. **Creating new columns**: Adding new features based on existing data to enhance the dataset for analysis.
5. **Union of datasets**: Combining multiple datasets into a single DataFrame for comprehensive analysis.
6. **Removing duplicates**:

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, DoubleType
import random
from datetime import datetime, timedelta
import uuid
from pyspark.sql.functions import when, lower, trim, col


# Initialize Spark session
spark = SparkSession.builder \
    .appName("Data Transformation") \
    .getOrCreate()

In [None]:
# Define a helper function to generate random dates
def generate_random_dates(start_date, end_date, n):
    start_timestamp = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
    end_timestamp = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
    delta = end_timestamp - start_timestamp

    dates = []
    for _ in range(n):
        random_days = random.randint(0, delta.days)
        random_seconds = random.randint(0, 86400)  # random seconds within a day
        random_date = start_timestamp + timedelta(days=random_days, seconds=random_seconds)
        dates.append(random_date.strftime("%Y-%m-%d %H:%M:%S"))

    return dates

# Define possible values for product, category, and brand
products = [1003461, 5000088, 17302664, 3601530, 1004775, 1306894, 1306421, 1590604, 12708937, 1004258]
categories = ["electronics.smartphone", "appliances.sewing_machine", "appliances.kitchen.washer", "computers.notebook", "furniture.living_room.sofa"]
brands = ["xiaomi", "janome", "creed", "lg", "hp", "rondell", "michelin", "apple", "samsung", "huawei"]
event_types = ["view", "click", "purchase"]

# Generate sample data
n = 1000  # number of rows
start_date = "2020-01-01 00:00:00"
end_date = "2020-01-31 23:59:59"

# Generate random data
dates = generate_random_dates(start_date, end_date, n)
event_types = [random.choice(event_types) for _ in range(n)]
product_ids = [random.choice(products) for _ in range(n)]
category_codes = [random.choice(categories) for _ in range(n)]
brands = [random.choice(brands) for _ in range(n)]
prices = [round(random.uniform(20.0, 1000.0), 2) for _ in range(n)]  # random prices between 20 and 1000
user_ids = [random.randint(100000000, 999999999) for _ in range(n)]
user_sessions = [str(uuid.uuid4()) for _ in range(n)]  # generate random UUIDs

# Create the DataFrame
data = list(zip(dates, event_types, product_ids, category_codes, brands, prices, user_ids, user_sessions))
columns = ["event_time", "event_type", "product_id", "category_code", "brand", "price", "user_id", "user_session"]

df = spark.createDataFrame(data, columns)

# Show a sample of the data
df.show(5)

+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|2020-01-20 15:30:59|      view|   1004775|furniture.living_...| rondell|704.97|263911838|3781c42e-c9c4-4f4...|
|2020-01-01 16:35:12|      view|   1004258|appliances.sewing...|      hp|574.02|400484249|e7674edf-4cea-460...|
|2020-01-31 13:30:38|  purchase|  12708937|electronics.smart...| samsung|615.45|887158728|6e11e949-8a63-4b1...|
|2020-01-04 13:56:48|      view|   1004775|electronics.smart...|  xiaomi|718.05|512075797|6eb2d586-5879-483...|
|2020-01-01 05:17:50|  purchase|   3601530|electronics.smart...|michelin|752.82|841077968|37980ab3-3c3c-439...|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------

In [None]:
from pyspark.sql import functions as F

# Task 1: Select a few columns using select()
df_selected_columns = df.select("user_id", "event_type", "price", "category_code", "brand")
df_selected_columns.show(5)

+---------+----------+------+--------------------+--------+
|  user_id|event_type| price|       category_code|   brand|
+---------+----------+------+--------------------+--------+
|263911838|      view|704.97|furniture.living_...| rondell|
|400484249|      view|574.02|appliances.sewing...|      hp|
|887158728|  purchase|615.45|electronics.smart...| samsung|
|512075797|      view|718.05|electronics.smart...|  xiaomi|
|841077968|  purchase|752.82|electronics.smart...|michelin|
+---------+----------+------+--------------------+--------+
only showing top 5 rows



In [None]:
# Task 2: Filter values using filter()
df_filtered = df.filter(df.event_type == "purchase")
df_filtered.show(5)

+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|2020-01-31 13:30:38|  purchase|  12708937|electronics.smart...| samsung|615.45|887158728|6e11e949-8a63-4b1...|
|2020-01-01 05:17:50|  purchase|   3601530|electronics.smart...|michelin|752.82|841077968|37980ab3-3c3c-439...|
|2020-01-04 14:25:11|  purchase|   1004775|furniture.living_...|  janome|252.31|354196380|b6a260bf-5b42-48f...|
|2020-01-11 20:58:22|  purchase|  12708937|appliances.kitche...|  janome|916.72|440091732|e48a0688-5fe6-44d...|
|2020-01-19 09:16:35|  purchase|   1003461|  computers.notebook|   creed|289.52|151430021|a9814573-4998-489...|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------

In [None]:
# Task 3: Create a new column using withColumn()
# We will create a new column "total_spent" which is the price of the item for purchases
df_with_new_column = df_filtered.withColumn("total_spent", F.col("price"))
df_with_new_column.show(5)

+-------------------+----------+----------+--------------------+-------+------+---------+--------------------+-----------+
|         event_time|event_type|product_id|       category_code|  brand| price|  user_id|        user_session|total_spent|
+-------------------+----------+----------+--------------------+-------+------+---------+--------------------+-----------+
|2020-01-30 11:17:19|  purchase|   3601530|appliances.kitche...|samsung| 68.73|863774057|1cb054d2-9d31-495...|      68.73|
|2020-01-15 15:17:57|  purchase|   1004258|appliances.sewing...| janome|938.82|994591302|c869d3e9-4181-482...|     938.82|
|2020-01-16 00:43:48|  purchase|   1306421|  computers.notebook|  apple|  71.9|732924385|e1569fef-1180-455...|       71.9|
|2020-01-29 17:12:03|  purchase|   1004258|electronics.smart...| huawei|129.14|516651597|cd435153-81de-419...|     129.14|
|2020-01-29 23:49:36|  purchase|   1004775|appliances.kitche...|  creed|810.12|596983095|a409d0b4-0d90-46c...|     810.12|
+---------------

In [None]:
# Task 4: Read values from the other dataset into another dataframe using union()
# First, create another dataframe for the purpose of union
new_data = [
    ("2020-01-02 14:22:00", "view", 1003461, "electronics.smartphone", "xiaomi", 299.99, 100000001, "session_123"),
    ("2020-01-03 10:12:30", "purchase", 5000088, "appliances.sewing_machine", "janome", 450.00, 100000002, "session_124")
]
columns_new_data = ["event_time", "event_type", "product_id", "category_code", "brand", "price", "user_id", "user_session"]

df_new_data = spark.createDataFrame(new_data, columns_new_data)

# Perform union
df_union = df.union(df_new_data)
df_union.show(5)

+-------------------+----------+----------+--------------------+-------+------+---------+--------------------+
|         event_time|event_type|product_id|       category_code|  brand| price|  user_id|        user_session|
+-------------------+----------+----------+--------------------+-------+------+---------+--------------------+
|2020-01-26 05:33:18|      view|  12708937|furniture.living_...| huawei|474.52|788453398|7df9551d-037e-4c6...|
|2020-01-30 11:17:19|  purchase|   3601530|appliances.kitche...|samsung| 68.73|863774057|1cb054d2-9d31-495...|
|2020-01-21 23:22:58|      view|   1003461|appliances.sewing...|     hp| 219.7|307479202|18c9fc49-cbcc-4d1...|
|2020-01-06 20:32:59|     click|   3601530|electronics.smart...| xiaomi|168.63|381995390|eba3d659-9794-49e...|
|2020-01-25 13:31:55|     click|   1306894|  computers.notebook|     hp|452.13|106348353|8c27be35-3171-42c...|
+-------------------+----------+----------+--------------------+-------+------+---------+--------------------+
o

In [None]:
# Task 5: Remove duplicates, if any, using dropDuplicates()
df_no_duplicates = df_union.dropDuplicates()
df_no_duplicates.show(5)

+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------------+
|2020-01-19 17:19:58|  purchase|  12708937|appliances.kitche...|      lg| 497.0|912991390|f2f59fb2-7d55-401...|
|2020-01-18 16:04:39|      view|  17302664|appliances.kitche...|   creed|777.55|360816102|9eaffb87-3106-4c7...|
|2020-01-25 09:06:41|  purchase|   1306894|  computers.notebook|michelin|951.74|795091067|456afead-7575-4e8...|
|2020-01-14 08:02:31|  purchase|   3601530|appliances.sewing...| samsung|238.15|141455266|144f9bc5-b93a-46c...|
|2020-01-03 18:57:47|     click|   1306894|appliances.sewing...|  janome|143.65|989021927|b0608687-d6cb-4b1...|
+-------------------+----------+----------+--------------------+--------+------+---------+--------------

In [None]:
# Task 6: Group by the total amount spent by each customer, using groupBy()
df_grouped = df_no_duplicates.groupBy("user_id").agg(F.sum("price").alias("total_spent"))
df_grouped.show(5)

+---------+-----------+
|  user_id|total_spent|
+---------+-----------+
|100407490|     896.82|
|504954100|     290.05|
|573467949|     873.62|
|785327888|     166.81|
|236257325|     506.64|
+---------+-----------+
only showing top 5 rows

