## BRONZE LAYER: Raw ingestion

In [0]:
from pyspark.sql import functions as F

# Step1: Read raw CSV
df = spark.read.csv(
    "/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv",
    header=True,
    inferSchema=True
)
#Step2: Add ingestion Metadata(still Raw data)
df_bronze = df.withColumn(
    "ingestion_ts", F.current_timestamp()
)
# Preview the data
df_bronze.show(5)
#check printSchema
df_bronze.printSchema()

#Step3: Write Bronze layer as a **managed table**
df_bronze.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze_df_nov")
    
#Step 4: Verify
spark.table("bronze_df_nov").show(5)

+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code| brand| price|  user_id|        user_session|        ingestion_ts|
+-------------------+----------+----------+-------------------+--------------------+------+------+---------+--------------------+--------------------+
|2019-11-01 00:00:00|      view|   1003461|2053013555631882655|electronics.smart...|xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|2026-01-14 06:36:...|
|2019-11-01 00:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|janome|293.65|530496790|8e5f4f83-366c-4f7...|2026-01-14 06:36:...|
|2019-11-01 00:00:01|      view|  17302664|2053013553853497655|                NULL| creed| 28.31|561587266|755422e7-9040-477...|2026-01-14 06:36:...|
|2019-11-01 00:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|    lg|712

## Silver Layer â€“ Cleaned & Trusted Data

In [0]:
from pyspark.sql import functions as F

#Step 1: Read Bronze table
df_bronze = spark.table("bronze_df_nov")

In [0]:
#Step 2: Rename columns to standard snake_case
df_bronze = df_bronze.selectExpr(
    "event_time",
    "event_type",
    "product_id",
    "category_id",
    "category_code",
    "brand",
    "price",
    "user_id",
    "user_session",
    "ingestion_ts"
)

In [0]:
#Step 3: Handle nulls
#Drop rows with critical nulls (like event_time, user_session, product_id)
df_clean = df_bronze.dropna(subset=["event_time", "user_session", "product_id"])

In [0]:
#Fill nulls for non-critical columns
df_clean = df_clean.fillna({
    "brand":"unknown",
    "category_code":"unknown",
    "price":0
    })

In [0]:
#Step 4:Remove outliers
df_clean=df_clean.filter((F.col("price") >0) & (F.col("price")<10000))

In [0]:
#Step 5: Deduplicate events
df_clean = df_clean.dropDuplicates(["user_session", "event_time", "product_id"])

In [0]:
#Step 6: Add derived columns
df_clean = df_clean.withColumn("event_date", F.to_date("event_time")) \
                   .withColumn("price_tier",
                               F.when(F.col("price") < 10, "budget")
                               .when(F.col("price") <50, "mid")
                               .otherwise("premium")
                   )

In [0]:
#Step 7: Persist as Silver Delta table
df_clean.write.format("delta") \
              .mode("overwrite") \
              .saveAsTable("silver_df_nov_realworld")

In [0]:
# Step 8: Verify
spark.table("silver_df_nov_realworld").show(5)

+-------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+--------------------+----------+----------+
|         event_time|event_type|product_id|        category_id|       category_code|    brand| price|  user_id|        user_session|        ingestion_ts|event_date|price_tier|
+-------------------+----------+----------+-------------------+--------------------+---------+------+---------+--------------------+--------------------+----------+----------+
|2019-11-17 08:43:12|      view|  17300671|2053013553853497655|             unknown|  unknown| 86.12|537619570|e9783f60-ca2c-404...|2026-01-14 06:36:...|2019-11-17|   premium|
|2019-11-17 08:43:19|      view| 100007591|2176851562834428674|construction.tool...| magnetta| 21.11|535250502|defdddab-4d28-459...|2026-01-14 06:36:...|2019-11-17|       mid|
|2019-11-17 08:43:29|      view|  28718397|2053013565228450757|       apparel.shoes|   rieker|103.99|512853623|36e5a7e8-

## GOLD LAYER

In [0]:
#Step 0: Load Silver table
df_silver = spark.table("silver_df_nov_realworld")

In [0]:
# Step 1: Product-level metrics
df_gold_product = df_silver.groupBy(
    "product_id", "category_id", "category_code", "brand"
).agg(
    F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_id"))).alias("views"),
    F.countDistinct(F.when(F.col("event_type")=="purchase", F.col("user_id"))).alias("purchases"),
    F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("revenue")
).withColumn(
    "conversion_rate",
    F.when(F.col("views")>0, F.col("purchases") / F.col("views")*100).otherwise(0)
)

In [0]:
#Step 2: Category-level metrics
df_gold_category = df_silver.groupBy("category_id","category_code").agg(
    F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_id"))).alias("views"),
    F.countDistinct(F.when(F.col("event_type")=="purchase", F.col("user_id"))).alias("purchases"),
    F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("revenue")
).withColumn(
    "conversion_rate",
    F.when(F.col("views")>0, F.col("purchases")/F.col("views")*100).otherwise(0)
)

In [0]:
# Step 3: Daily product-level metrics (time series)
df_gold_daily = df_silver.groupBy("product_id", "event_date").agg(
    F.countDistinct(F.when(F.col("event_type")=="view", F.col("user_id"))).alias("daily_views"),
    F.countDistinct(F.when(F.col("event_type")=="purchase", F.col("user_id"))).alias("daily_purchases"),
    F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("daily_revenue")
).withColumn(
    "daily_conversion_rate",
    F.when(F.col("daily_views")>0, F.col("daily_purchases")/F.col("daily_views")*100).otherwise(0)
)

In [0]:
#Step 4: Persist Gold tables as Delta
df_gold_product.write.format("delta").mode("overwrite").saveAsTable("gold_df_product_nov")
df_gold_category.write.format("delta").mode("overwrite").saveAsTable("gold_df_category_nov")
df_gold_daily.write.format("delta").mode("overwrite").saveAsTable("gold_df_daily_nov")


In [0]:
#Step 5: Verify
print("Prodcut-level Gold Table:")
spark.table("gold_df_product_nov").show(5)

print("Category-level Gold Table:")
spark.table("gold_df_category_nov").show(5)

print("Daily Purchase Gold Table:")
spark.table("gold_df_daily_nov").show(5)

Prodcut-level Gold Table:
+----------+-------------------+--------------------+--------+-----+---------+------------------+------------------+
|product_id|        category_id|       category_code|   brand|views|purchases|           revenue|   conversion_rate|
+----------+-------------------+--------------------+--------+-----+---------+------------------+------------------+
|  15900101|2053013558190408249|             unknown|elenberg|  106|        1|              9.24|0.9433962264150944|
|  42300002|2095518921321874323|furniture.living_...| unknown| 1668|       12|           2064.86|0.7194244604316548|
|  12709463|2053013553559896355|             unknown|  maxxis|  647|       38|1799.4900000000002| 5.873261205564142|
|   3601376|2053013563810775923|appliances.kitche...|   candy| 1242|       28| 7464.900000000001| 2.254428341384863|
|  18001588|2053013558525952589|             unknown|   deppa|  455|        8| 57.70000000000001|1.7582417582417582|
+----------+-------------------+------

In [0]:




# Step 5: Verify
print("Product-level Gold Table:")
spark.table("gold_df_product_nov").show(5)

print("Category-level Gold Table:")
spark.table("gold_df_category_nov").show(5)

print("Daily Product-level Gold Table:")
spark.table("gold_df_daily_nov").show(5)


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:141)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:486)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:768)
	at com.data