### Bronze: raw ingestion

In [0]:
raw = spark \
      .read \
      .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv", header=True, inferSchema=True)
raw.show(5)

In [0]:
from pyspark.sql import functions as F
raw.withColumn("ingestion_time", F.current_timestamp()) \
    .write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/bronze/events")

### Silver: cleaning & validation

In [0]:
bronze = spark.read.format("delta").load("/Volumes/workspace/ecommerce/delta/bronze/events")


In [0]:
from pyspark.sql import functions as F
def get_final_category(category_code):
    if category_code is None:
        return None
    return category_code.split(".")[-1]


silver = bronze.filter((F.col("price") > 0) & (F.col("price") < 10000)) \
              .dropDuplicates(['user_session', 'event_time']) \
              .withColumn("event_date", F.to_date(F.col("event_time"))) \
              .withColumn("price_tier",
                         F.when(F.col("price") < 100, "budget")
                          .when(F.col("price") < 200, "affordable")
                          .when(F.col("price") < 500, "midrange")
                          .when(F.col("price") < 1000, "luxury")
                          .otherwise("ultra_luxury")) \
              .filter(F.col("category_code").isNotNull()) \
              .withColumn("product_category", F.udf(get_final_category)(F.col("category_code"))) 
              

silver.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/silver/events")
              

In [0]:
silver.show(5)

### Gold: business aggregates

In [0]:
silver = spark.read.format("delta").load("/Volumes/workspace/ecommerce/delta/silver/events")

brand_smartphone_perf = silver.filter((F.col("product_category")=="smartphone") & 
                        (F.col("event_type")=="purchase")) \
                        .groupBy("brand", "price_tier") \
                        .agg(F.countDistinct("user_id").alias("quantities_sale"),
                            F.round(F.sum("price")).alias("revenue")) \
                        .orderBy(F.col("revenue").desc()) \
                      
brand_smartphone_perf.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/delta/gold/smartphones")
        

In [0]:
brand_smartphone_perf.show()