In [0]:
from pyspark.sql import functions as F

In [0]:
# BRONZE: Raw ingestion
raw = spark.read.csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv", header=True, inferSchema=True)
raw.withColumn("ingestion_ts", F.current_timestamp()) \
   .write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")

In [0]:
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")
bronze.printSchema()
display(bronze.limit(10))

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_ts: timestamp (nullable = true)



event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session,ingestion_ts
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c,2026-01-14T05:19:56.979Z
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc,2026-01-14T05:19:56.979Z
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8,2026-01-14T05:19:56.979Z
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713,2026-01-14T05:19:56.979Z
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d,2026-01-14T05:19:56.979Z
2019-10-01T00:00:05.000Z,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9,2026-01-14T05:19:56.979Z
2019-10-01T00:00:08.000Z,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d,2026-01-14T05:19:56.979Z
2019-10-01T00:00:08.000Z,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6,2026-01-14T05:19:56.979Z
2019-10-01T00:00:10.000Z,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880,2026-01-14T05:19:56.979Z
2019-10-01T00:00:11.000Z,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb,2026-01-14T05:19:56.979Z


In [0]:
# SILVER: Cleaned data
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")
silver = bronze.filter(F.col("price") > 0) \
    .filter(F.col("price") < 10000) \
    .dropDuplicates(["user_session", "event_time"]) \
    .withColumn("event_date", F.to_date("event_time")) \
    .withColumn("price_tier",
        F.when(F.col("price") < 10, "budget")
         .when(F.col("price") < 50, "mid")
         .otherwise("premium"))
silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")

In [0]:
# GOLD: Aggregates
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")
product_perf = silver.groupBy("product_id") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price").cast("double"))).alias("revenue")
    ).withColumn(
        "conversion_rate",
        F.when(F.col("views") > 0, F.col("purchases")/F.col("views")*100)
    )
product_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/gold/products")
