In [0]:
from pyspark.sql import functions as F


In [0]:
raw = spark.read.format("csv") \
    .option("header", True) \
    .load("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")


In [0]:
bronze = raw.withColumn(
    "ingestion_ts",
    F.current_timestamp()
)


In [0]:

%sql
SHOW VOLUMES IN workspace.ecommerce;

In [0]:
bronze.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/medallion/bronze/events")


In [0]:
from pyspark.sql.functions import (
    col, when, to_date, current_timestamp
)

bronze = spark.read.format("delta") \
    .load("/Volumes/workspace/ecommerce/medallion/bronze/events")

silver = (
    bronze
    # Remove invalid records
    .filter(col("user_id").isNotNull())
    .filter(col("event_type").isin("view", "click", "purchase"))
    
    # Remove duplicates
    .dropDuplicates(["user_id", "event_time", "event_type"])
    
    # Add derived columns
    .withColumn("event_date", to_date(col("event_time")))
    
    # Add processing timestamp
    .withColumn("processed_ts", current_timestamp())
)

# Drop columns not present in the target table schema
silver_to_write = silver.drop("event_date", "processed_ts")

silver_to_write.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/medallion/silver/events")


In [0]:

silver.show()


In [0]:
from pyspark.sql.functions import count

gold = silver.groupBy("event_type") \
    .agg(count("*").alias("total_events"))

gold.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/medallion/gold/event_summary")


In [0]:
from pyspark.sql.functions import countDistinct

platform_metrics = silver.groupBy("event_type") \
    .agg(
        count("*").alias("total_events"),
        countDistinct("user_id").alias("active_users")
    )

platform_metrics.write.format("delta") \
    .mode("overwrite") \
    .save("/Volumes/workspace/ecommerce/medallion/gold/platform_metrics")
