In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [23]:
spark = SparkSession.builder.appName("EcommerceAnalysis").getOrCreate()

# Create Customers DataFrame
customers_data = [
    (1, "Alice", "alice@example.com", "2022-01-15"),
    (2, "Bob", "bob@example.com", "2022-03-10"),
    (3, "Charlie", "charlie@example.com", "2022-05-20"),
    (4, "David", "david@example.com", "2022-07-05")
]
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "email", "join_date"])

# Create Orders DataFrame
orders_data = [
    (101, 1, "2023-01-10", 150.50),
    (102, 2, "2023-01-12", 200.00),
    (103, 1, "2023-02-05", 75.25),
    (104, 3, "2023-02-18", 300.75),
    (105, 4, "2023-03-01", 50.00),
    (106, 2, "2023-03-15", 120.00)
]
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "order_date", "total_amount"])

# Create Products DataFrame
products_data = [
    (1001, "Laptop", 999.99, "Electronics"),
    (1002, "Smartphone", 599.99, "Electronics"),
    (1003, "Headphones", 99.99, "Accessories"),
    (1004, "Keyboard", 49.99, "Accessories")
]
products_df = spark.createDataFrame(products_data, ["product_id", "name", "price", "category"])

order_items_data = [
    (101, 1001, 1, 999.99),  # order_id, product_id, quantity, price
    (101, 1003, 1, 99.99),
    (102, 1002, 1, 599.99),
    (103, 1004, 2, 49.99),   # 2 keyboards
    (104, 1001, 1, 999.99),
    (104, 1002, 1, 599.99),
    (105, 1003, 1, 99.99),
    (106, 1004, 3, 49.99)    # 3 keyboards
]

order_items_df = spark.createDataFrame(order_items_data, schema=['order_id', 'product_id', 'quantity', 'price'])

# Show schema for verification
print("Customers Schema:")
customers_df.printSchema()
print("\nOrders Schema:")
orders_df.printSchema()
print("\nProducts Schema:")
products_df.printSchema()
print("\nOrder Items Schema:")
order_items_df.printSchema()

Customers Schema:
root
 |-- customer_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- join_date: string (nullable = true)


Orders Schema:
root
 |-- order_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- total_amount: double (nullable = true)


Products Schema:
root
 |-- product_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- category: string (nullable = true)


Order Items Schema:
root
 |-- order_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- price: double (nullable = true)



In [4]:
# 1. Filtering: Find all customers who joined after '2022-03-01'
filtered_customers = customers_df.filter(col("join_date") > "2022-03-01")
filtered_customers.show()

# 2. Sorting: Sort orders by total_amount in descending order
sorted_orders = orders_df.orderBy(col("total_amount").desc())
sorted_orders.show()

# 3. Aggregation: Calculate the total revenue generated per customer (include customer name)
customer_revenue = orders_df.join(customers_df, "customer_id") \
    .groupBy("customer_id", "name") \
    .agg(sum("total_amount").alias("total_revenue"))
customer_revenue.show()

# 4. Joining: List all orders with customer details (name, email) and order details (order_date, total_amount)
order_details = orders_df.join(customers_df, "customer_id") \
    .select("order_id", "name", "email", "order_date", "total_amount")
order_details.show()

# 5. Advanced Aggregation: Find the average order amount for each customer, but only for those with at least 2 orders
customer_avg = orders_df.groupBy("customer_id") \
    .agg(
        avg("total_amount").alias("avg_order_amount"),
        count("order_id").alias("order_count")
    ) \
    .filter(col("order_count") >= 2) \
    .join(customers_df, "customer_id") \
    .select("customer_id", "name", "avg_order_amount", "order_count")
customer_avg.show()

                                                                                

+-----------+-------+-------------------+----------+
|customer_id|   name|              email| join_date|
+-----------+-------+-------------------+----------+
|          2|    Bob|    bob@example.com|2022-03-10|
|          3|Charlie|charlie@example.com|2022-05-20|
|          4|  David|  david@example.com|2022-07-05|
+-----------+-------+-------------------+----------+



                                                                                

+--------+-----------+----------+------------+
|order_id|customer_id|order_date|total_amount|
+--------+-----------+----------+------------+
|     104|          3|2023-02-18|      300.75|
|     102|          2|2023-01-12|       200.0|
|     101|          1|2023-01-10|       150.5|
|     106|          2|2023-03-15|       120.0|
|     103|          1|2023-02-05|       75.25|
|     105|          4|2023-03-01|        50.0|
+--------+-----------+----------+------------+



                                                                                

+-----------+-------+-------------+
|customer_id|   name|total_revenue|
+-----------+-------+-------------+
|          1|  Alice|       225.75|
|          2|    Bob|        320.0|
|          3|Charlie|       300.75|
|          4|  David|         50.0|
+-----------+-------+-------------+

+--------+-------+-------------------+----------+------------+
|order_id|   name|              email|order_date|total_amount|
+--------+-------+-------------------+----------+------------+
|     101|  Alice|  alice@example.com|2023-01-10|       150.5|
|     103|  Alice|  alice@example.com|2023-02-05|       75.25|
|     102|    Bob|    bob@example.com|2023-01-12|       200.0|
|     106|    Bob|    bob@example.com|2023-03-15|       120.0|
|     104|Charlie|charlie@example.com|2023-02-18|      300.75|
|     105|  David|  david@example.com|2023-03-01|        50.0|
+--------+-------+-------------------+----------+------------+



                                                                                

+-----------+-----+----------------+-----------+
|customer_id| name|avg_order_amount|order_count|
+-----------+-----+----------------+-----------+
|          1|Alice|         112.875|          2|
|          2|  Bob|           160.0|          2|
+-----------+-----+----------------+-----------+



In [5]:
# Problem 6: Distinct Counts - Find how many unique months customers placed orders in
# Expected Output: Single number representing count of distinct months (YYYY-MM format)
# Hint: First extract month from order_date, then count distinct values

# Your PySpark solution here
# Start with orders_df, transform order_date to 'yyyy-MM' format, then count distinct months
orders_df.select(
        date_format(col('order_date'), 'yyyy-MM').alias('year-month')
    ).agg(
        countDistinct(col('year-month')).alias('month_cnt')
    ).show()

+---------+
|month_cnt|
+---------+
|        3|
+---------+



In [6]:
# Problem 7: Time Analysis - Find the month with the highest total revenue
# Expected Output: Single row with [month_year, total_revenue] for the top month
# Hint: Combine date_format, groupBy, sum, and orderBy

orders_df.select(
    date_format(col('order_date'), 'yyyy-MM').alias('year-month'),
    col('total_amount')
).groupBy(col('year-month')) \
.agg(
    sum(col('total_amount')).alias('total_revenue')
).orderBy(col('total_revenue').desc()) \
.limit(1) \
.show()

+----------+-------------+
|year-month|total_revenue|
+----------+-------------+
|   2023-02|        376.0|
+----------+-------------+



In [7]:
# Problem 8: Customer Order Frequency - Find customers who placed orders in at least 2 distinct months
# Expected Output: DataFrame with [customer_id, name, distinct_month_count]
# Should include Alice (Jan+Feb) and Bob (Jan+Mar) from sample data

orders_df.select(
    date_format('order_date', 'yyyy-MM').alias('year-month'),
    col('customer_id')
).join(customers_df, on='customer_id', how='inner') \
.groupBy(col('customer_id'), col('name')) \
.agg(
    countDistinct(col('year-month')).alias('distinct_month_count')
) \
.filter(col('distinct_month_count') >= 2) \
.show()

+-----------+-----+--------------------+
|customer_id| name|distinct_month_count|
+-----------+-----+--------------------+
|          1|Alice|                   2|
|          2|  Bob|                   2|
+-----------+-----+--------------------+



In [8]:
# Problem 9: Customer Spending Comparison - Find customers who spent more than the average customer spend
# Expected Output: DataFrame with [customer_id, name, total_spend] where total_spend > overall_avg
# Note: overall_avg = average of all customers' total spending

customer_spend = orders_df.groupBy('customer_id') \
    .agg(sum(col('total_amount')).alias('total_spend'))

avg_customer_spend = customer_spend.agg(avg(col('total_spend')).alias('avg_customer_spend'))

above_avg_customers = customer_spend.join(customers_df, on='customer_id', how='inner') \
    .join(avg_customer_spend, how='cross') \
    .filter(col('total_spend') > col('avg_customer_spend')) \
    .select(col('customer_id'), col('name'), col('total_spend'))

above_avg_customers.show()


+-----------+-------+-----------+
|customer_id|   name|total_spend|
+-----------+-------+-----------+
|          1|  Alice|     225.75|
|          2|    Bob|      320.0|
|          3|Charlie|     300.75|
+-----------+-------+-----------+



In [9]:
# Problem 10: Window Functions - Find each customer's most expensive order
# Expected Output: DataFrame with [customer_id, name, order_id, order_date, max_order_amount]
# Should show one row per customer with their highest-value order

ranked_orders = orders_df.withColumn('rank', rank().over(Window.partitionBy('customer_id').orderBy(col('total_amount').desc())))

top_orders = ranked_orders.join(customers_df, 'customer_id') \
    .filter(col('rank') == 1) \
    .select(
        col('customer_id'), 
        col('name'), 
        col('order_id'), 
        col('order_date'), 
        col('total_amount').alias('max_order_amount')
    )

top_orders.show()

+-----------+-------+--------+----------+----------------+
|customer_id|   name|order_id|order_date|max_order_amount|
+-----------+-------+--------+----------+----------------+
|          1|  Alice|     101|2023-01-10|           150.5|
|          2|    Bob|     102|2023-01-12|           200.0|
|          3|Charlie|     104|2023-02-18|          300.75|
|          4|  David|     105|2023-03-01|            50.0|
+-----------+-------+--------+----------+----------------+



In [25]:
# Problem 11: Time Between Orders - Calculate the days between each customer's first and most recent order
# Expected Output: DataFrame with [customer_id, name, days_between_orders]
# Should show the time span in days for each customer with >1 order

time_between_orders = (
    orders_df
    .groupBy('customer_id')
    .agg(
        min('order_date').alias('first_order_date'),
        max('order_date').alias('last_order_date'),
        count('order_id').alias('order_count')
    )
    .filter(col('order_count') > 1)
    .withColumn('days_between_orders', datediff(col('last_order_date'), col('first_order_date')))
    .join(customers_df, 'customer_id')
    .select('customer_id', 'name', 'days_between_orders')
)

time_between_orders.show()

+-----------+-----+-------------------+
|customer_id| name|days_between_orders|
+-----------+-----+-------------------+
|          1|Alice|                 26|
|          2|  Bob|                 62|
+-----------+-----+-------------------+



In [43]:
# Problem 12: Order Frequency - Calculate the average time between orders (in days) for each customer
# Expected Output: DataFrame with [customer_id, name, avg_days_between_orders]
# Should show the mean time between consecutive orders for customers with ≥2 orders

avg_time_between_orders = (
    orders_df
    .withColumn('prev_order_date', lag('order_date').over(Window.partitionBy('customer_id').orderBy('order_date')))
    .withColumn('days_between', datediff(col('order_date'), col('prev_order_date')))
    .groupBy('customer_id')
    .agg(
        avg('days_between').alias('avg_days_between_orders'),
        count('order_id').alias('order_count')
    )
    .filter(
        (col('order_count') >= 2) &
        (col('avg_days_between_orders').isNotNull())
    )
    .join(customers_df, 'customer_id')
    .select(
        'customer_id',
        'name',
        'avg_days_between_orders'
    )
)

avg_time_between_orders.show()

+-----------+-----+-----------------------+
|customer_id| name|avg_days_between_orders|
+-----------+-----+-----------------------+
|          1|Alice|                   26.0|
|          2|  Bob|                   62.0|
+-----------+-----+-----------------------+



In [44]:
# Problem 14 (Revised): Customer Order Patterns - Find customers who frequently order the same total amount
# Expected Output: DataFrame with [amount, customer_count, example_customer_id]
# Shows which order amounts appear most frequently across customers

customer_order_patterns = (
    orders_df
    .groupBy('total_amount')
    .agg(
        count('customer_id').alias('customer_count'),
        min('customer_id').alias('example_customer_id')
    )
)

customer_order_patterns.show()

+------------+--------------+-------------------+
|total_amount|customer_count|example_customer_id|
+------------+--------------+-------------------+
|       150.5|             1|                  1|
|       200.0|             1|                  2|
|       75.25|             1|                  1|
|      300.75|             1|                  3|
|        50.0|             1|                  4|
|       120.0|             1|                  2|
+------------+--------------+-------------------+



In [46]:
# Problem 15: Customer Segmentation - Classify customers into spending tiers
# Expected Output: DataFrame with [customer_id, name, total_spend, tier]
# Tiers: 
#   Platinum: > $500 total spend
#   Gold: $300-$500
#   Silver: $100-$300
#   Bronze: < $100

customer_segmentation = (
    orders_df
    .groupBy('customer_id')
    .agg(
        sum('total_amount').alias('total_spend')
    )
    .withColumn('tier', 
                when(col('total_spend') > 500, 'Platinum')
                .when((col('total_spend') > 300) & (col('total_spend') <= 500), 'Gold')
                .when((col('total_spend') > 100) & (col('total_spend') <= 300), 'Silver')
                .otherwise('Bronze')
    )
    .join(customers_df, 'customer_id')
    .select(
        'customer_id',
        'name',
        'total_spend',
        'tier'
    )
)

customer_segmentation.show()

+-----------+-------+-----------+------+
|customer_id|   name|total_spend|  tier|
+-----------+-------+-----------+------+
|          1|  Alice|     225.75|Silver|
|          2|    Bob|      320.0|  Gold|
|          3|Charlie|     300.75|  Gold|
|          4|  David|       50.0|Bronze|
+-----------+-------+-----------+------+



In [16]:
# Problem 16: Time-Based Analysis - Calculate the 30-day rolling average order amount per customer
# Expected Output: DataFrame with [customer_id, order_date, total_amount, rolling_avg_30d]
# Should show the average order amount for each customer over their preceding 30-day window
rolling_avg = (
    orders_df
    .withColumn('rolling_avg_30d', 
                avg('total_amount')
                .over(
                    Window
                    .partitionBy('customer_id')
                    .orderBy(col('order_date').cast('timestamp').cast('long'))
                    .rangeBetween(-30 * 86400, 0)
                )
    )
    .select(
        'customer_id',
        'order_date',
        'total_amount',
        'rolling_avg_30d'
    )
)

rolling_avg.show()

+-----------+----------+------------+---------------+
|customer_id|order_date|total_amount|rolling_avg_30d|
+-----------+----------+------------+---------------+
|          1|2023-01-10|       150.5|          150.5|
|          1|2023-02-05|       75.25|        112.875|
|          2|2023-01-12|       200.0|          200.0|
|          2|2023-03-15|       120.0|          120.0|
|          3|2023-02-18|      300.75|         300.75|
|          4|2023-03-01|        50.0|           50.0|
+-----------+----------+------------+---------------+



In [21]:
# Problem 17: Customer Churn Prediction - Identify customers at risk of churning 
# (No orders for ≥60 days from their last order date)
# Expected Output: DataFrame with [customer_id, name, last_order_date, days_since_last_order]
# Should only include customers with ≥1 order who meet the 60-day threshold

potential_churn = (
    orders_df
    .groupBy('customer_id')
    .agg(
        max('order_date').alias('last_order_date'),
        count('order_id').alias('order_count')
    )
    .withColumn('days_since_last_order', datediff(current_date(), col('last_order_date')))
    .filter(
        (col('days_since_last_order') >= 60) &
        (col('order_count') >= 1)
    )
    .join(customers_df, 'customer_id')
    .select(
        'customer_id',
        'name',
        'last_order_date',
        'days_since_last_order'
    )
)

potential_churn.show()



+-----------+-------+---------------+---------------------+
|customer_id|   name|last_order_date|days_since_last_order|
+-----------+-------+---------------+---------------------+
|          1|  Alice|     2023-02-05|                  784|
|          2|    Bob|     2023-03-15|                  746|
|          3|Charlie|     2023-02-18|                  771|
|          4|  David|     2023-03-01|                  760|
+-----------+-------+---------------+---------------------+



                                                                                

In [30]:
# Problem 18A: Frequently Bought Together Pairs
# Find product pairs that appear together in orders more than twice
# 
# Expected Output: DataFrame with [product1_id, product1_name, product2_id, product2_name, pair_count]

frequent_pairs = (
    order_items_df.alias('oi1')
    .join(order_items_df.alias('oi2'),
        (col('oi1.order_id') == col('oi2.order_id')) &
        (col('oi1.product_id') < col('oi2.product_id'))
    )
    .groupBy(col('oi1.product_id'), col('oi2.product_id'))
    .agg(count('oi1.order_id').alias('pair_count'))
    # .filter(col('pair_count') > 2)
    .join(products_df.alias('p1'), col('oi1.product_id') == col('p1.product_id'))
    .join(products_df.alias('p2'), col('oi2.product_id') == col('p2.product_id'))
    .select(
        col('oi1.product_id').alias('product1_id'),
        col('p1.name').alias('product1_name'),
        col('oi2.product_id').alias('product2_id'),
        col('p2.name').alias('product2_name'),
        'pair_count'
    )
    .orderBy(col('pair_count').desc())
)

frequent_pairs.show()

+-----------+-------------+-----------+-------------+----------+
|product1_id|product1_name|product2_id|product2_name|pair_count|
+-----------+-------------+-----------+-------------+----------+
|       1001|       Laptop|       1002|   Smartphone|         1|
|       1001|       Laptop|       1003|   Headphones|         1|
+-----------+-------------+-----------+-------------+----------+



In [41]:
# Problem 19: Customer Product Preferences - For each customer, find their most purchased product category
# 
# Expected Output: DataFrame with [customer_id, name, favorite_category, purchase_count]
# 
# Rules:
# 1. Calculate category counts from order_items and products
# 2. For ties, select the category with the highest total spending
# 3. Include all customers (even those with no orders)
# 
# Example:
# Alice: 3 Electronics purchases ($1200 total) vs 2 Accessories ($200) → Electronics

preferred_category = (
    order_items_df.alias('oi')
    .join(products_df.alias('p'), 'product_id')
    .join(orders_df.alias('o'), 'order_id')
    .join(customers_df.alias('c'), 'customer_id', 'right')
    .groupBy('customer_id', col('c.name'), 'category')
    .agg(
        count('order_id').alias('purchase_count'),
        sum('total_amount').alias('total_spend')
    )
    .withColumn('rank', 
                rank()
                .over(
                    Window
                    .partitionBy('customer_id')
                    .orderBy(
                        col('purchase_count').desc(),
                        col('total_spend').desc()
                    )
                )
    )
    .filter(col('rank') == 1)
    .select(
        'customer_id',
        'name',
        col('category').alias('favorite_category'),
        'purchase_count'
    )
)

preferred_category.show()

+-----------+-------+-----------------+--------------+
|customer_id|   name|favorite_category|purchase_count|
+-----------+-------+-----------------+--------------+
|          1|  Alice|      Accessories|             2|
|          2|    Bob|      Electronics|             1|
|          3|Charlie|      Electronics|             2|
|          4|  David|      Accessories|             1|
+-----------+-------+-----------------+--------------+



In [49]:
# Problem 20: Customer Value Segmentation - Classify customers into value tiers based on both spending and order frequency
# 
# Expected Output: DataFrame with [customer_id, name, total_spend, order_count, value_tier]
# 
# Segmentation Rules:
# ⭐ High Value: Top 20% by spend AND top 20% by order count  
# 💎 Consistent: Not high-value but top 50% in both metrics  
# 🛒 Occasional: Bottom 50% in spend but top 50% in frequency  
# 💸 Big Spenders: Top 20% in spend but bottom 50% in frequency  
# 🐌 Low Activity: Bottom 50% in both metrics
# 
# Requirements:
# 1. Use percent_rank() for percentile calculations
# 2. Handle ties at threshold boundaries
# 3. Include all customers (even those with no orders)
# 
# Example:
# Alice: total_spend=$1200 (85th %ile), order_count=5 (90th %ile) → ⭐ High Value

customer_value_segmentation = (
    customers_df
    .join(orders_df, 'customer_id', 'left')
    .groupBy('customer_id', 'name')
    .agg(
        coalesce(sum('total_amount'), lit(0)).alias('total_spend'),
        coalesce(countDistinct('order_id'), lit(0)).alias('order_count')
    )
    .withColumn(
        'total_spend_pct_rank',
        percent_rank()
        .over(
            Window
            .orderBy(col('total_spend').desc())
        )
    )
    .withColumn(
        'order_count_pct_rank',
        percent_rank()
        .over(
            Window
            .orderBy(col('order_count').desc())
        )
    )
    .withColumn(
        'value_tier',
        when((col('total_spend_pct_rank') <= 0.2) & (col('order_count_pct_rank') <= 0.2), 'High Value')
        .when(
            ((col('total_spend_pct_rank') <= 0.5) & (col('order_count_pct_rank') <= 0.5)) & 
            ~((col('total_spend_pct_rank') <= 0.2) & (col('order_count_pct_rank') <= 0.2)), 'Consistent')
        .when((col('total_spend_pct_rank') > 0.5) & (col('order_count_pct_rank') <= 0.5), 'Occasional')
        .when((col('total_spend_pct_rank') <= 0.2) & (col('order_count_pct_rank') > 0.5), 'Big Spenders')
        .otherwise('Low Activity')
    )
    .select(
        'customer_id',
        'name',
        'total_spend',
        'order_count',
        'value_tier'
    )

)

customer_value_segmentation.show()

25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 11:34:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 1

+-----------+-------+-----------+-----------+------------+
|customer_id|   name|total_spend|order_count|  value_tier|
+-----------+-------+-----------+-----------+------------+
|          2|    Bob|      320.0|          2|  High Value|
|          1|  Alice|     225.75|          2|  Occasional|
|          3|Charlie|     300.75|          1|Low Activity|
|          4|  David|       50.0|          1|Low Activity|
+-----------+-------+-----------+-----------+------------+

