In [0]:
from pyspark.sql.functions import current_timestamp

df_bronze = spark.read.option("header", True).option("inferschema",True).csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
df_bronze.withColumn("ingesttime", current_timestamp()) \
    .write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/bronze/ecommerce_data_2019_oct")

In [0]:
df_bronze = spark.read.option("header", True).option("inferschema",True).csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
df_bronze.withColumn("ingesttime", current_timestamp()) \
    .write.format("delta").mode("overwrite").saveAsTable("workspace.bronze.ecommerce_data_2019_oct")

In [0]:
from pyspark.sql.functions import col, trim

# Read from bronze table
df_bronze = spark.read.table("workspace.bronze.ecommerce_data_2019_oct")

# Data cleaning and validation
df_silver = (
    df_bronze
    .dropDuplicates()
    .filter(col("price").isNotNull() & (col("price") > 0))
    .withColumn("product_id", trim(col("product_id")))
    .withColumn("user_id", trim(col("user_id")))
)

# Save to silver layer
df_silver.write.format("delta").mode("overwrite").saveAsTable("workspace.silver.ecommerce_data_2019_oct")

In [0]:
import pyspark.sql.functions as F

# Read from silver table
df_silver = spark.read.table("workspace.silver.ecommerce_data_2019_oct")

# Data aggregations
df_gold = df_silver.groupBy("product_id", "category_id") \
    .agg(
        F.countDistinct(F.when(F.col("event_type")=="view", "user_id")).alias("views"),
        F.countDistinct(F.when(F.col("event_type")=="purchase", "user_id")).alias("purchases"),
        F.sum(F.when(F.col("event_type")=="purchase", F.col("price"))).alias("revenue")
    )

# Fix: Avoid division by zero
conversion_rate_expr = F.when(F.col("views") > 0, F.col("purchases")/F.col("views")*100)
df_gold = df_gold.withColumn("conversion_rate", conversion_rate_expr)

# save to gold layer
df_gold.write.format("delta").mode("overwrite").saveAsTable("workspace.gold.products")
