
# DAY 3 – Advanced Spark Transformations & Analytics

## Objectives
1. Load full e-commerce dataset
2. Perform complex joins
3. Use window functions for running totals
4. Create derived analytical features



## Load Full E‑commerce Dataset
Data location:
`/Volumes/workspace/ecommerce/ecommerce_data/`


In [0]:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

events = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/",
    header=True,
    inferSchema=True
)

events.printSchema()
events.show(5)


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|53


## Complex Join Example
Join events with aggregated revenue per product


In [0]:

# Revenue per product
product_revenue = (
    events.filter(F.col("event_type") == "purchase")
          .groupBy("product_id")
          .agg(F.sum("price").alias("revenue"))
)

# Join back to events
events_with_revenue = events.join(
    product_revenue,
    on=["product_id"],
    how="left"
)

events_with_revenue.show(5)


+----------+-------------------+----------+-------------------+--------------------+------+------+---------+--------------------+------------------+
|product_id|         event_time|event_type|        category_id|       category_code| brand| price|  user_id|        user_session|           revenue|
+----------+-------------------+----------+-------------------+--------------------+------+------+---------+--------------------+------------------+
|   3601530|2019-11-01 00:00:01|      view|2053013563810775923|appliances.kitche...|    lg|712.87|518085591|3bfb58cd-7892-48c...|           5037.62|
|   1003461|2019-11-01 00:00:00|      view|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|           3847.96|
|   5000088|2019-11-01 00:00:00|      view|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|            586.71|
|   1306894|2019-11-01 00:00:01|      view|2053013558920217191|  computers.notebook|    hp|360.09|52077268


## Top 5 Products by Revenue


In [0]:

top_products = product_revenue.orderBy(F.desc("revenue")).limit(5)
top_products.show()


+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|   1005115|3.3032381669999998E7|
|   1005105| 2.168460336999998E7|
|   1004249|1.3545407540000029E7|
|   1005135|1.2654328769999996E7|
|   1004767|1.1004748489999987E7|
+----------+--------------------+




## Running Totals Using Window Functions


In [0]:

window_spec = Window.partitionBy("user_id").orderBy("event_time")

events_with_running_total = events.withColumn(
    "cumulative_events",
    F.count("*").over(window_spec)
)

events_with_running_total.select(
    "user_id", "event_time", "event_type", "cumulative_events"
).show(10)


+--------+-------------------+----------+-----------------+
| user_id|         event_time|event_type|cumulative_events|
+--------+-------------------+----------+-----------------+
|65800726|2019-11-27 04:33:16|      view|                1|
|65800726|2019-11-27 04:35:24|      view|                2|
|81255481|2019-11-08 07:44:45|      view|                1|
|81255481|2019-11-21 14:11:26|      view|                2|
|82079354|2019-11-28 04:58:01|      view|                1|
|82079354|2019-11-28 04:58:22|      view|                2|
|82079354|2019-11-28 04:59:29|      view|                3|
|82079354|2019-11-28 04:59:54|      view|                4|
|82079354|2019-11-28 05:00:22|      view|                5|
|82079354|2019-11-28 05:00:47|      view|                6|
+--------+-------------------+----------+-----------------+
only showing top 10 rows



## Derived Feature: Conversion Rate by Category


In [0]:

conversion_rate = (
    events.groupBy("category_code", "event_type")
          .count()
          .groupBy("category_code")
          .pivot("event_type")
          .sum("count")
          .withColumn(
              "conversion_rate",
              F.col("purchase") / F.col("view") * 100
          )
)

conversion_rate.show()


+--------------------+-----+--------+-------+------------------+
|       category_code| cart|purchase|   view|   conversion_rate|
+--------------------+-----+--------+-------+------------------+
|furniture.living_...| 6521|    2646| 632899|0.4180761859317206|
|      apparel.jumper|  324|      84|  33931|0.2475612271963691|
| stationery.cartrige|  750|     325|  19323|1.6819334471872898|
|       sport.bicycle| 2920|    1374| 234796|0.5851888447844086|
|        apparel.sock|  108|      40|   6076|0.6583278472679395|
|appliances.enviro...|   97|      59|   5488|1.0750728862973762|
|          kids.swing| 1703|     812|  89026|0.9120930963988049|
|auto.accessories....|   96|      18|   3705|0.4858299595141701|
|auto.accessories....| 2929|    1038|  89495|1.1598413319179843|
|electronics.audio...| 1811|     919|  73039|1.2582319035036076|
|  electronics.clocks|89633|   41143|3267223| 1.259265131275092|
|electronics.audio...| 2690|    1119|  95772|1.1683999498809672|
|appliances.kitche...| 69


## Export Results


In [0]:

top_products.write.mode("overwrite").parquet(
    "/Volumes/workspace/ecommerce/ecommerce_data/output_all/top_products_by_revenue"
)

conversion_rate.write.mode("overwrite").parquet(
    "/Volumes/workspace/ecommerce/ecommerce_data/output_all/conversion_rate_by_category"
)



## Key Takeaways
- Window functions enable time-based analytics
- Joins enrich datasets for feature engineering
- Derived features are essential for ML & BI
- Parquet is optimal for analytical storage
