In [0]:
# This section demonstrates how to process and analyze simulated e-commerce order data.

#Customers (for E-commerce)
customers_ecommerce_data = [
    (101, "John Doe", "john.doe@example.com", "Gold"),
    (102, "Jane Smith", "jane.smith@example.com", "Silver"),
    (103, "Peter Jones", "peter.j@example.com", "Bronze"),
    (104, "Alice Brown", "alice.b@example.com", "Gold"),
    (105, "Bob White", "bob.w@example.com", "Silver")
]
customers_ecommerce_schema = ["customer_id", "customer_name", "email", "membership_tier"]
customers_ecommerce_df = spark.createDataFrame(data=customers_ecommerce_data, schema=customers_ecommerce_schema)
print("\n--- E-commerce Customers DataFrame ---")
customers_ecommerce_df.printSchema()
customers_ecommerce_df.show()

#Orders (for E-commerce)
orders_data = [
    (1, 101, "2023-01-05", 2, 50.00, "Laptop"),
    (2, 102, "2023-01-07", 1, 120.00, "Smartphone"),
    (3, 101, "2023-01-10", 3, 15.00, "Mouse"),
    (4, 103, "2023-02-01", 1, 80.00, "Keyboard"),
    (5, 102, "2023-02-15", 2, 25.00, "Headphones"),
    (6, 104, "2023-03-01", 1, 500.00, "Monitor"),
    (7, 101, "2023-03-10", 1, 100.00, "Webcam"),
    (8, 105, "2023-03-12", 1, 30.00, "USB Drive"),
    (9, 103, "2023-04-05", 2, 10.00, "Mouse Pad"),
    (10, 104, "2023-04-20", 1, 75.00, "Charger"),
    (11, 106, "2023-05-01", 1, 200.00, "Tablet"), # Order from non-existent customer
    (12, 101, "2023-05-05", 1, None, "Speaker") # Order with missing amount
]
orders_schema = ["order_id", "customer_id", "order_date", "quantity", "price_per_item", "product_name"]
orders_df = spark.createDataFrame(data=orders_data, schema=orders_schema)
print("\n--- E-commerce Orders DataFrame ---")
orders_df.printSchema()
orders_df.show()


--- E-commerce Customers DataFrame ---
root
 |-- customer_id: long (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- membership_tier: string (nullable = true)

+-----------+-------------+--------------------+---------------+
|customer_id|customer_name|               email|membership_tier|
+-----------+-------------+--------------------+---------------+
|        101|     John Doe|john.doe@example.com|           Gold|
|        102|   Jane Smith|jane.smith@exampl...|         Silver|
|        103|  Peter Jones| peter.j@example.com|         Bronze|
|        104|  Alice Brown| alice.b@example.com|           Gold|
|        105|    Bob White|   bob.w@example.com|         Silver|
+-----------+-------------+--------------------+---------------+


--- E-commerce Orders DataFrame ---
root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- quantity: long (nullable = tru

In [0]:
from pyspark.sql.functions import *

# Convert order_date to date type
orders_df_cleaned = orders_df.withColumn("order_date", to_date(col("order_date")))
                                         
# Handle missing price_per_item: fill with 0 or average/median if appropriate for real data
# For this example, we'll fill with 0 to calculate total_order_value
orders_df_cleaned = orders_df_cleaned.withColumn("price_per_item", coalesce(col("price_per_item"), lit(0.0)))

print("Check the latest Schema")
orders_df_cleaned.printSchema()

print("Check the missing data")
display(orders_df_cleaned)

Check the latest Schema
root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: date (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price_per_item: double (nullable = false)
 |-- product_name: string (nullable = true)

Check the missing data


order_id,customer_id,order_date,quantity,price_per_item,product_name
1,101,2023-01-05,2,50.0,Laptop
2,102,2023-01-07,1,120.0,Smartphone
3,101,2023-01-10,3,15.0,Mouse
4,103,2023-02-01,1,80.0,Keyboard
5,102,2023-02-15,2,25.0,Headphones
6,104,2023-03-01,1,500.0,Monitor
7,101,2023-03-10,1,100.0,Webcam
8,105,2023-03-12,1,30.0,USB Drive
9,103,2023-04-05,2,10.0,Mouse Pad
10,104,2023-04-20,1,75.0,Charger


In [0]:
# Calculate total_order_value and order_month/year
orders_df_enriched = orders_df_cleaned \
    .withColumn("total_order_value", col("quantity") * col("price_per_item")) \
    .withColumn("order_month", month(col("order_date"))) \
    .withColumn("order_year", year(col("order_date"))) \
    .withColumn("order_day", dayofmonth(col("order_date")))

print("\n--- Orders DataFrame after Cleaning and Enrichment ---")
orders_df_enriched.printSchema()
orders_df_enriched.show()


--- Orders DataFrame after Cleaning and Enrichment ---
root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: date (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price_per_item: double (nullable = false)
 |-- product_name: string (nullable = true)
 |-- total_order_value: double (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_day: integer (nullable = true)

+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+
|order_id|customer_id|order_date|quantity|price_per_item|product_name|total_order_value|order_month|order_year|order_day|
+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+
|       1|        101|2023-01-05|       2|          50.0|      Laptop|            100.0|          1|      2023|        5|
|       2|        102|20

In [0]:
print("\n--- 12.2. Joining Orders with Customer Data ---")
customer_orders_df = orders_df_enriched.join(
    customers_ecommerce_df,
    orders_df_enriched.customer_id == customers_ecommerce_df.customer_id,
    "left_outer"
).select(
    orders_df_enriched["*"], # Select all columns from orders_df_enriched
    customers_ecommerce_df["customer_name"],
    customers_ecommerce_df["membership_tier"]
)
print("\n--- Customer Orders Joined DataFrame ---")
customer_orders_df.show()


--- 12.2. Joining Orders with Customer Data ---

--- Customer Orders Joined DataFrame ---
+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+-------------+---------------+
|order_id|customer_id|order_date|quantity|price_per_item|product_name|total_order_value|order_month|order_year|order_day|customer_name|membership_tier|
+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+-------------+---------------+
|       1|        101|2023-01-05|       2|          50.0|      Laptop|            100.0|          1|      2023|        5|     John Doe|           Gold|
|       2|        102|2023-01-07|       1|         120.0|  Smartphone|            120.0|          1|      2023|        7|   Jane Smith|         Silver|
|       3|        101|2023-01-10|       3|          15.0|       Mouse|             45.0|          1|      2023|       10|     John Doe|           Gol

In [0]:
print("\n--- 12.3. Aggregations: Monthly Sales Trends ---")
monthly_sales_df = customer_orders_df.groupBy("order_year", "order_month") \
    .agg(
        round(sum("total_order_value"), 2).alias("total_monthly_sales"),
        count("order_id").alias("total_orders_count"),
        round(avg("total_order_value"), 2).alias("average_order_value")
    ) \
    .orderBy("order_year", "order_month")
print("\n--- Monthly Sales Trends ---")
monthly_sales_df.show()



--- 12.3. Aggregations: Monthly Sales Trends ---

--- Monthly Sales Trends ---
+----------+-----------+-------------------+------------------+-------------------+
|order_year|order_month|total_monthly_sales|total_orders_count|average_order_value|
+----------+-----------+-------------------+------------------+-------------------+
|      2023|          1|              265.0|                 3|              88.33|
|      2023|          2|              130.0|                 2|               65.0|
|      2023|          3|              630.0|                 3|              210.0|
|      2023|          4|               95.0|                 2|               47.5|
|      2023|          5|              200.0|                 2|              100.0|
+----------+-----------+-------------------+------------------+-------------------+



In [0]:
print("--- Top 5 Customers by Spending ---")
top_customers_df = customer_orders_df.groupBy("customer_id", "customer_name", "membership_tier") \
    .agg(
        round(sum("total_order_value"), 2).alias("total_spending"),
        count("order_id").alias("total_orders")
    ) \
    .orderBy(col("total_spending").desc()) \
    .limit(5)
print("\n--- Top 5 Customers by Spending ---")
top_customers_df.show()


--- 12.4. Aggregations: Top 5 Customers by Spending ---

--- Top 5 Customers by Spending ---
+-----------+-------------+---------------+--------------+------------+
|customer_id|customer_name|membership_tier|total_spending|total_orders|
+-----------+-------------+---------------+--------------+------------+
|        104|  Alice Brown|           Gold|         575.0|           2|
|        101|     John Doe|           Gold|         245.0|           4|
|        106|         NULL|           NULL|         200.0|           1|
|        102|   Jane Smith|         Silver|         170.0|           2|
|        103|  Peter Jones|         Bronze|         100.0|           2|
+-----------+-------------+---------------+--------------+------------+



In [0]:
from pyspark.sql.window import Window
window_spec_customer_sales = Window.partitionBy("customer_id").orderBy("order_date")
customer_running_sales_df = customer_orders_df.withColumn(
    "running_total_sales",
    round(sum("total_order_value").over(window_spec_customer_sales), 2)
)
print("\n--- Customer Running Total Sales ---")
customer_running_sales_df.show()



--- Customer Running Total Sales ---
+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+-------------+---------------+-------------------+
|order_id|customer_id|order_date|quantity|price_per_item|product_name|total_order_value|order_month|order_year|order_day|customer_name|membership_tier|running_total_sales|
+--------+-----------+----------+--------+--------------+------------+-----------------+-----------+----------+---------+-------------+---------------+-------------------+
|       1|        101|2023-01-05|       2|          50.0|      Laptop|            100.0|          1|      2023|        5|     John Doe|           Gold|              100.0|
|       3|        101|2023-01-10|       3|          15.0|       Mouse|             45.0|          1|      2023|       10|     John Doe|           Gold|              145.0|
|       7|        101|2023-03-10|       1|         100.0|      Webcam|            100.0|          3|  

In [0]:
def categorize_order_size(total_value):
    if total_value is None:
        return "Unknown"
    if total_value < 50:
        return "Small"
    elif 50 <= total_value < 200:
        return "Medium"
    else:
        return "Large"

categorize_order_udf = udf(categorize_order_size, StringType())

print("\n--- 12.6. UDF: Categorize Order Size ---")
orders_with_category_df = customer_orders_df.withColumn(
    "order_size_category",
    categorize_order_udf(col("total_order_value"))
)
print("\n--- Orders with Size Category ---")
orders_with_category_df.select("order_id", "customer_id", "total_order_value", "order_size_category").show()


--- 12.6. UDF: Categorize Order Size ---

--- Orders with Size Category ---
+--------+-----------+-----------------+-------------------+
|order_id|customer_id|total_order_value|order_size_category|
+--------+-----------+-----------------+-------------------+
|       1|        101|            100.0|             Medium|
|       2|        102|            120.0|             Medium|
|       3|        101|             45.0|              Small|
|       4|        103|             80.0|             Medium|
|       5|        102|             50.0|             Medium|
|       6|        104|            500.0|              Large|
|       7|        101|            100.0|             Medium|
|       8|        105|             30.0|              Small|
|       9|        103|             20.0|              Small|
|      10|        104|             75.0|             Medium|
|      11|        106|            200.0|              Large|
|      12|        101|              0.0|              Small|
+-------