## 1. Load full e-commerce dataset

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Load full e-commerce dataset
events = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)

# Convert event_time to timestamp for proper window operations
events = events.withColumn("event_time", F.to_timestamp("event_time"))

# Show basic info
print("Dataset overview:")
events.select("event_type", "category_code", "price", "user_id", "product_id").show(10)
print(f"Total events: {events.count()}")
print(f"Unique users: {events.select('user_id').distinct().count()}")

Dataset overview:
+----------+--------------------+-------+---------+----------+
|event_type|       category_code|  price|  user_id|product_id|
+----------+--------------------+-------+---------+----------+
|      view|                NULL|  35.79|541312140|  44600062|
|      view|appliances.enviro...|   33.2|554748717|   3900821|
|      view|furniture.living_...|  543.1|519107250|  17200506|
|      view|  computers.notebook| 251.74|550050854|   1307067|
|      view|electronics.smart...|1081.98|535871217|   1004237|
|      view|   computers.desktop| 908.62|512742880|   1480613|
|      view|                NULL| 380.96|555447699|  17300353|
|      view|                NULL|  41.16|550978835|  31500053|
|      view|  apparel.shoes.keds| 102.71|520571932|  28719074|
|      view|electronics.smart...| 566.01|537918940|   1004545|
+----------+--------------------+-------+---------+----------+
only showing top 10 rows
Total events: 42448764
Unique users: 3022290


## 2. Perform complex joins

In [0]:
# Create user summary table
user_summary = events.groupBy("user_id").agg(
    F.count("*").alias("total_events"),
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
    F.avg("price").alias("avg_spent"),
    F.max("event_time").alias("last_activity")
)

# Create product summary table
product_summary = events.groupBy("product_id", "category_code").agg(
    F.count("*").alias("total_views"),
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("total_sales"),
    F.avg("price").alias("avg_price")
)

# COMPLEX JOIN: Combine user and product info with event details
joined_data = events.alias("e") \
    .join(user_summary.alias("u"), "user_id", "left") \
    .join(product_summary.alias("p"), "product_id", "left") \
    .select(
        "e.*",
        "u.total_events",
        "u.purchases",
        "u.avg_spent",
        "p.total_views",
        "p.total_sales",
        "p.avg_price"
    )

print("\n=== After Complex Join ===")
joined_data.select("user_id", "product_id", "event_type", "total_events", "total_views").show(10)


=== After Complex Join ===
+---------+----------+----------+------------+-----------+
|  user_id|product_id|event_type|total_events|total_views|
+---------+----------+----------+------------+-----------+
|512515208|   1004133|      view|         146|      25270|
|554748717|   3900821|      view|           3|       3069|
|512895240|   1201459|      view|         230|       2089|
|515912492|  12500217|      view|         112|        420|
|513243015|   4804056|      view|         191|     214234|
|514109745|  22700601|      view|         133|       4111|
|515547719|   1004934|      cart|          54|      21871|
|539402318|   5701192|      view|           8|       2436|
|556942357|   1305996|      view|          23|       6225|
|550117627|  12600003|      view|         407|       9752|
+---------+----------+----------+------------+-----------+
only showing top 10 rows


## 3. Calculate running totals with window functions

In [0]:
# Window for user-based running totals
user_window = Window.partitionBy("user_id").orderBy("event_time")

# Add running calculations
events_with_windows = events.withColumn("user_event_sequence", F.row_number().over(user_window)) \
    .withColumn("user_running_total", F.sum("price").over(user_window)) \
    .withColumn("avg_price_so_far", F.avg("price").over(user_window))

# Time-based window (last 24 hours for same user)
time_window = Window.partitionBy("user_id") \
    .orderBy(F.unix_timestamp("event_time")) \
    .rangeBetween(-86400, 0)  # 24 hours in seconds

events_with_windows = events_with_windows.withColumn(
    "events_last_24h", 
    F.count("*").over(time_window)
)

print("\n=== Window Functions - Running Totals ===")
events_with_windows.select(
    "user_id", "event_time", "event_type", "price", 
    "user_event_sequence", "user_running_total", "events_last_24h"
).filter(F.col("user_id") == 514148024).orderBy("event_time").show(10)


=== Window Functions - Running Totals ===
+-------+----------+----------+-----+-------------------+------------------+---------------+
|user_id|event_time|event_type|price|user_event_sequence|user_running_total|events_last_24h|
+-------+----------+----------+-----+-------------------+------------------+---------------+
+-------+----------+----------+-----+-------------------+------------------+---------------+



## Basic running count with window [functions](url)

In [0]:
window = Window.partitionBy("user_id").orderBy("event_time")
events_with_count = events.withColumn("event_number", F.row_number().over(window)) \
                         .withColumn("cumulative_events", F.count("*").over(window))

print("1. Basic Window:")
events_with_count.select("user_id", "event_time", "event_type", "event_number", "cumulative_events") \
                .filter(F.col("user_id") == 554748717) \
                .orderBy("event_time") \
                .show(5)

# Running sum of prices per user
events_with_total = events.withColumn("running_total", F.sum("price").over(window))

print("\n2. Running Total per User:")
events_with_total.select("user_id", "event_time", "event_type", "price", "running_total") \
                .filter(F.col("user_id") == 554748717) \
                .orderBy("event_time") \
                .show(5)

1. Basic Window:
+---------+-------------------+----------+------------+-----------------+
|  user_id|         event_time|event_type|event_number|cumulative_events|
+---------+-------------------+----------+------------+-----------------+
|554748717|2019-10-01 00:00:00|      view|           1|                1|
|554748717|2019-10-01 00:00:25|      view|           2|                2|
|554748717|2019-10-01 00:01:25|      view|           3|                3|
+---------+-------------------+----------+------------+-----------------+


2. Running Total per User:
+---------+-------------------+----------+------+-------------+
|  user_id|         event_time|event_type| price|running_total|
+---------+-------------------+----------+------+-------------+
|554748717|2019-10-01 00:00:00|      view|  33.2|         33.2|
|554748717|2019-10-01 00:00:25|      view|122.18|       155.38|
|554748717|2019-10-01 00:01:25|      view|122.18|       277.56|
+---------+-------------------+----------+------+---

## 4. Create derived features

In [0]:
# A. Purchase conversion features
conversion_features = events.groupBy("user_id", "category_code").agg(
    F.sum(F.when(F.col("event_type") == "view", 1).otherwise(0)).alias("views"),
    F.sum(F.when(F.col("event_type") == "cart", 1).otherwise(0)).alias("cart_adds"),
    F.sum(F.when(F.col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
).withColumn(
    "view_to_cart_rate", 
    F.when(F.col("views") > 0, F.col("cart_adds") / F.col("views") * 100).otherwise(0)
).withColumn(
    "cart_to_purchase_rate",
    F.when(F.col("cart_adds") > 0, F.col("purchases") / F.col("cart_adds") * 100).otherwise(0)
).withColumn(
    "overall_conversion_rate",
    F.when(F.col("views") > 0, F.col("purchases") / F.col("views") * 100).otherwise(0)
)

print("\n=== Derived Features - Conversion Rates ===")
conversion_features.orderBy(F.desc("overall_conversion_rate")).show(10)

# B. Time-based features
events_with_time_features = events.withColumn("event_hour", F.hour("event_time")) \
    .withColumn("event_day", F.dayofmonth("event_time")) \
    .withColumn("day_of_week", F.dayofweek("event_time")) \
    .withColumn("is_weekend", F.when((F.col("day_of_week") == 1) | (F.col("day_of_week") == 7), 1).otherwise(0))

# C. Price segment features
events_with_segments = events.withColumn(
    "price_segment",
    F.when(F.col("price") < 50, "low")
     .when(F.col("price") < 200, "medium")
     .otherwise("high")
)


=== Derived Features - Conversion Rates ===
+---------+--------------------+-----+---------+---------+-----------------+---------------------+-----------------------+
|  user_id|       category_code|views|cart_adds|purchases|view_to_cart_rate|cart_to_purchase_rate|overall_conversion_rate|
+---------+--------------------+-----+---------+---------+-----------------+---------------------+-----------------------+
|522054469|electronics.smart...|    1|        3|        3|            300.0|                100.0|                  300.0|
|547014715|electronics.smart...|    1|        0|        3|              0.0|                  0.0|                  300.0|
|560080717|electronics.smart...|    1|        1|        2|            100.0|                200.0|                  200.0|
|557384158|electronics.smart...|    1|        3|        2|            300.0|    66.66666666666666|                  200.0|
|560693783|electronics.smart...|    1|        2|        2|            200.0|                10

In [0]:
# Top 5 products by revenue
revenue = events.filter(F.col("event_type") == "purchase") \
    .groupBy("product_id") \
    .agg(F.sum("price").alias("revenue")) \
    .orderBy(F.desc("revenue")).limit(5)

print("\n=== Top 5 Products by Revenue ===")
revenue.show()

# Running total per user (as per their example)
window = Window.partitionBy("user_id").orderBy("event_time")
events_with_cumulative = events.withColumn("cumulative_events", F.count("*").over(window))

# Conversion rate by category (as per their example)
conversion_by_category = events.groupBy("category_code", "event_type").count() \
    .groupBy("category_code") \
    .pivot("event_type").sum("count") \
    .withColumn("conversion_rate", F.col("purchase")/F.col("view")*100) \
    .fillna(0)

print("\n=== Conversion Rate by Category ===")
conversion_by_category.orderBy(F.desc("conversion_rate")).show(10)


=== Top 5 Products by Revenue ===
+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|1.2406807350000003E7|
|   1005105|1.0239248679999996E7|
|   1004249|   6730112.920000011|
|   1005135|   5567806.640000007|
|   1004767|   5430723.430000007|
+----------+--------------------+


=== Conversion Rate by Category ===
+--------------------+------+--------+--------+------------------+
|       category_code|  cart|purchase|    view|   conversion_rate|
+--------------------+------+--------+--------+------------------+
|electronics.smart...|549765|  338018|10619448| 3.183009135691422|
|   kids.fmcg.diapers|    98|     768|   24203|3.1731603520224763|
|electronics.audio...| 51143|   30503| 1018542| 2.994770956916848|
|     appliances.iron|  4095|    3653|  157645|2.3172317548923216|
|appliances.kitche...|  4717|    3709|  164954|2.2485056439977207|
|medicine.tools.to...|   522|     310|   13974|2.2184056104193504|
|appliances.person

## 6. Save the most important results

In [0]:
# Save key results
output_path = "/Volumes/workspace/ecommerce/analysis_results/"

# Save the enriched dataset with window functions
# events_with_windows.write.mode("overwrite").parquet(f"{output_path}/events_with_windows")

# Save user engagement scores
# user_engagement.write.mode("overwrite").parquet(f"{output_path}/user_engagement_scores")

# Save conversion features
# conversion_features.write.mode("overwrite").parquet(f"{output_path}/conversion_features")

# print("\n" + "="*50)
# print("ANALYSIS COMPLETE!")
# print(f"Key results saved to: {output_path}")
# print("="*50)