# Import required libraries and functions

In [0]:
from pyspark.sql import functions as f
from delta.tables import DeltaTable

# Ingest Raw Data

In [0]:
raw = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)

raw.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:

# Print Data
display(raw.limit(3))

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-11-01T00:00:00.000Z,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
2019-11-01T00:00:00.000Z,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2019-11-01T00:00:01.000Z,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387


# Bronze Layer - Raw Data Ingestion

In [0]:
bronze = raw.withColumn(
    "product_name",
    f.split(f.col("category_code"), r"\.").getItem(1)  # second part after dot
).filter(f.col("product_name").isNotNull())

bronze = bronze \
    .withColumn("ingestion_ts", f.current_timestamp()) \
    .withColumn("source_file", f.col("_metadata.file_path")) \
    .select(
        "event_time", "event_type", "product_id", "product_name",
        "category_id", "category_code", "brand", "price",
        "user_id", "user_session", "ingestion_ts", "source_file"
    )

bronze.write.format("delta") \
    .mode("append") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")

In [0]:
# Verify Bronze table
# Display first 3 rows
display(bronze.limit(3))           

# Verify schema, including ingestion_ts & source_file
bronze.printSchema()    
     

event_time,event_type,product_id,product_name,category_id,category_code,brand,price,user_id,user_session,ingestion_ts,source_file
2019-11-01T00:00:00.000Z,view,1003461,smartphone,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33,2026-01-14T17:39:30.961Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv
2019-11-01T00:00:00.000Z,view,5000088,sewing_machine,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283,2026-01-14T17:39:30.961Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv
2019-11-01T00:00:01.000Z,view,3601530,kitchen,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f,2026-01-14T17:39:30.961Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_ts: timestamp (nullable = false)
 |-- source_file: string (nullable = false)



# Silver Layer - Cleaning & Validation

In [0]:
# 1. Read Bronze Delta
bronze = spark.read.format("delta") \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/delta/bronze/events")

In [0]:
# 2. Clean and validate
silver = bronze \
.filter(f.col("price") > 0) \
.filter(f.col("price") < 10000) \
.dropDuplicates(["user_session", "event_time"]) \
.withColumn("event_date", f.to_date(f.col("event_time"))) \
.withColumn("price_tier", 
            f.when(f.col("price") < 10, "budget")
            .when(f.col("price") < 50, "mid")
            .otherwise("premium"))
     

In [0]:
# 3. Write to Delta Silver
silver.write.format("delta") \
.mode("overwrite") \
.save("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")

In [0]:

# 4. Verify Silver data
display(silver.limit(3))
silver.printSchema()

event_time,event_type,product_id,product_name,category_id,category_code,brand,price,user_id,user_session,ingestion_ts,source_file,event_date,price_tier
2019-11-17T08:43:01.000Z,view,1005105,smartphone,2053013555631882655,electronics.smartphone,apple,1363.95,543296136,b6c1c551-d7cb-406c-a943-5c58dc0db10d,2026-01-14T17:35:26.974Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv,2019-11-17,premium
2019-11-17T08:43:08.000Z,view,1480279,desktop,2053013561092866779,computers.desktop,hp,967.82,546350875,cd00b163-df39-4d2b-b9a5-d97185772e05,2026-01-14T17:35:26.974Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv,2019-11-17,premium
2019-11-17T08:43:30.000Z,view,28722200,shoes,2053013565228450757,apparel.shoes,respect,57.4,549429277,cf0690b2-ecc1-44de-83cf-018743ebaeea,2026-01-14T17:35:26.974Z,dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv,2019-11-17,premium


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_ts: timestamp (nullable = true)
 |-- source_file: string (nullable = true)
 |-- event_date: date (nullable = true)
 |-- price_tier: string (nullable = false)



# Gold Layer - Business Aggregates

In [0]:
# 1. Read Silver Delta
silver = spark.read.format("delta") \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/delta/silver/events")


In [0]:
# 2. Aggregate metrics per product
product_perf = silver.groupBy("product_id", "product_name") \
    .agg(
        f.countDistinct(f.when(f.col("event_type") == "view", f.col("user_id"))).alias("views"),
        f.countDistinct(f.when(f.col("event_type") == "purchase", f.col("user_id"))).alias("purchases"),
        f.sum(f.when(f.col("event_type") == "purchase", f.col("price"))).alias("revenue")
    ) \
 .withColumn("conversion_rate",
              f.round(f.expr("try_divide(purchases, views) * 100"), 3) 
             )
     

In [0]:
# 3. Write to Delta Gold
product_perf.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/gold/products")

In [0]:
# 4. Verify Gold layer
display(product_perf.limit(5))
product_perf.printSchema()
     

product_id,product_name,views,purchases,revenue,conversion_rate
6301864,kitchen,489,15,700.1400000000001,3.067
4700518,accessories,401,7,1283.95,1.746
40900042,tools,1557,15,634.2399999999999,0.963
4000192,tools,1155,40,8280.23,3.463
5100539,clocks,2141,43,2256.85,2.008


root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- views: long (nullable = false)
 |-- purchases: long (nullable = false)
 |-- revenue: double (nullable = true)
 |-- conversion_rate: double (nullable = true)

