In [0]:
bronze_path = "/Volumes/workspace/ecommerce/bronze/ecommerce_events"
bronze_table = "bronze_ecommerce_events"


In [0]:
from pyspark.sql.functions import (
    current_timestamp,
    to_date,
    col
)

def ingest_to_bronze(df):
    return (
        df
        .withColumn("ingestion_time", current_timestamp())
        .withColumn("event_date", to_date(col("event_time")))
        .withColumn("source_file", col("_metadata.file_path"))

    )

In [0]:
df_oct = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv")
)

df_nov = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv")
)

print('df_oct schema:')
df_oct.printSchema()
print('df_oct row count:', df_oct.count())


df_oct schema:
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

df_oct row count: 42448764


In [0]:
df_oct_bronze = ingest_to_bronze(df_oct)
df_nov_bronze = ingest_to_bronze(df_nov)
print('df_oct_bronze schema:')
df_oct_bronze.printSchema()
print('df_oct_bronze row count:', df_oct_bronze.count())

df_oct_bronze schema:
root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- ingestion_time: timestamp (nullable = false)
 |-- event_date: date (nullable = true)
 |-- source_file: string (nullable = false)

df_oct_bronze row count: 42448764


In [0]:
(
    df_oct_bronze
    .write
    .format("delta")
    .mode("append")
    .partitionBy("event_date")
    .save(bronze_path)
)


In [0]:
(
    df_nov_bronze
    .write
    .format("delta")
    .mode("append")
    .partitionBy("event_date")
    .save(bronze_path)
)


In [0]:
spark.read.format("delta").load(bronze_path) \
    .groupBy("source_file") \
    .count() \
    .show(truncate=False)


+-------------------------------------------------------------+--------+
|source_file                                                  |count   |
+-------------------------------------------------------------+--------+
|dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Nov.csv|67501979|
|dbfs:/Volumes/workspace/ecommerce/ecommerce_data/2019-Oct.csv|42448764|
+-------------------------------------------------------------+--------+



Silver Starts Here

In [0]:
# one time only
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.silver_ecommerce_events
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/silver/ecommerce_events`;


num_affected_rows,num_inserted_rows


In [0]:
bronze_df = spark.read.format("delta").load(
    "/Volumes/workspace/ecommerce/bronze/ecommerce_events"
)


In [0]:
from pyspark.sql.functions import max as spark_max

silver_exists = spark.catalog.tableExists("silver_ecommerce_events")

if silver_exists:
    silver_max_event_time = (
        spark.table("silver_ecommerce_events")
        .select(spark_max("event_time").alias("max_event_time"))
        .collect()[0]["max_event_time"]
    )
else:
    silver_max_event_time = None


In [0]:
lateness_days = 2
# Filter Bronze
from pyspark.sql.functions import col, expr

if silver_max_event_time:
    bronze_incremental_df = bronze_df.filter(
        col("event_time") >= expr(
            f"timestamp('{silver_max_event_time}') - interval {lateness_days} days"
        )
    )
else:
    bronze_incremental_df = bronze_df


In [0]:
from pyspark.sql.functions import lower, trim

standardized_df = (
    bronze_incremental_df
    .withColumn("event_type", lower(trim(col("event_type"))))
    .withColumn("category_code", lower(trim(col("category_code"))))
    .withColumn("brand", lower(trim(col("brand"))))
)


In [0]:
from pyspark.sql.functions import when, lit, current_timestamp
rejected_df = (
    standardized_df
    .withColumn(
        "reject_rule_id",
        when(col("event_time").isNull(), lit("R001"))
        .when(col("event_date").isNull(), lit("R002"))
        .when(col("price") < 0, lit("R003"))
        .when(~col("event_type").isin("view", "cart", "purchase"), lit("R004"))
        .when(col("user_id").isNull(), lit("R005"))
    )
    .filter(col("reject_rule_id").isNotNull())
    .withColumn("reject_layer", lit("silver"))
    .withColumn("reject_time", current_timestamp())
)
# Rejects Append only
rejected_df.write.format("delta") \
    .mode("append") \
    .partitionBy("event_date") \
    .save("/Volumes/workspace/ecommerce/silver/ecommerce_events_rejects")


In [0]:
valid_df = standardized_df.join(
    rejected_df.select("user_session", "product_id", "event_time", "event_type"),
    on=["user_session", "product_id", "event_time", "event_type"],
    how="left_anti"
)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

dedup_window = Window.partitionBy(
    "user_session", "product_id", "event_time", "event_type"
).orderBy(col("ingestion_time").desc())

deduped_df = (
    valid_df
    .withColumn("row_num", row_number().over(dedup_window))
    .filter(col("row_num") == 1)
    .drop("row_num")
)


In [0]:
silver_df = deduped_df.select(
    "event_time",
    "event_type",
    "product_id",
    "category_id",
    "category_code",
    "brand",
    "price",
    "user_id",
    "user_session",
    "event_date"
)


In [0]:
silver_df.createOrReplaceTempView("silver_updates")

In [0]:
#One time Run
(
    silver_df
    .limit(0)  # creates schema only
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/workspace/ecommerce/silver/ecommerce_events")
)


In [0]:
%sql
MERGE INTO workspace.ecommerce.silver_ecommerce_events tgt
USING silver_updates src
ON  tgt.user_session = src.user_session
AND tgt.product_id   = src.product_id
AND tgt.event_time   = src.event_time
AND tgt.event_type   = src.event_type

WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
109819992,109819980,0,12


In [0]:
silver_df = spark.table("workspace.ecommerce.silver_ecommerce_events")

print("Total silver rows:", silver_df.count())
silver_df.select("event_date", "event_type").show(5)


Total silver rows: 109820004
+----------+----------+
|event_date|event_type|
+----------+----------+
|2019-10-23|      view|
|2019-10-17|      view|
|2019-11-01|      view|
|2019-10-26|      view|
|2019-10-03|      view|
+----------+----------+
only showing top 5 rows


**Gold Layer Begins**

Schema Design

In [0]:
(
    spark.createDataFrame([], schema="""
        event_date DATE,
        category_code STRING,
        orders BIGINT,
        revenue DOUBLE
    """)
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("event_date")
    .save("/Volumes/workspace/ecommerce/gold/daily_revenue")
)


In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.gold_daily_revenue
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/gold/daily_revenue`;


num_affected_rows,num_inserted_rows


In [0]:
gold_base_path="/Volumes/workspace/ecommerce/gold/"


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, StringType, DateType
# Schema
product_metrics_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("views", LongType(), True),
    StructField("carts", LongType(), True),
    StructField("purchases", LongType(), True),
    StructField("conversion_rate", DoubleType(), True)
])
# Boostrap Delta
(
    spark.createDataFrame([], product_metrics_schema)
    .write
    .format("delta")
    .mode("overwrite")
    .save(f"{gold_base_path}/product_metrics")
)


In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.gold_product_metrics
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/gold/product_metrics`;

num_affected_rows,num_inserted_rows


In [0]:
# Schema
brand_revenue_schema = StructType([
    StructField("brand", StringType(), False),
    StructField("revenue", DoubleType(), True),
    StructField("rank", IntegerType(), True)
])
# BootStrap Delta
(
    spark.createDataFrame([], brand_revenue_schema)
    .write
    .format("delta")
    .mode("overwrite")
    .save(f"{gold_base_path}/brand_revenue")
)


In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.gold_brand_revenue
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/gold/brand_revenue`;


num_affected_rows,num_inserted_rows


In [0]:
# Schema
user_funnel_schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("viewed", IntegerType(), True),
    StructField("added_to_cart", IntegerType(), True),
    StructField("purchased", IntegerType(), True)
])
# Boostrap Delta
(
    spark.createDataFrame([], user_funnel_schema)
    .write
    .format("delta")
    .mode("overwrite")
    .save(f"{gold_base_path}/user_funnel")
)


In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.gold_user_funnel
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/gold/user_funnel`;


num_affected_rows,num_inserted_rows


In [0]:
# Schema
dau_schema = StructType([
    StructField("event_date", DateType(), False),
    StructField("active_users", LongType(), True)
])
# Boostrap Delta
(
    spark.createDataFrame([], dau_schema)
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("event_date")
    .save(f"{gold_base_path}/daily_active_users")
)


In [0]:
%sql
CREATE TABLE IF NOT EXISTS workspace.ecommerce.gold_daily_active_users
USING DELTA
AS
SELECT * FROM delta.`/Volumes/workspace/ecommerce/gold/daily_active_users`;


num_affected_rows,num_inserted_rows


In [0]:
%sql
SHOW TABLES IN workspace.ecommerce;


database,tableName,isTemporary
ecommerce,gold_brand_revenue,False
ecommerce,gold_daily_active_users,False
ecommerce,gold_daily_revenue,False
ecommerce,gold_product_metrics,False
ecommerce,gold_user_funnel,False
ecommerce,silver_ecommerce_events,False


In [0]:
from pyspark.sql.functions import max as spark_max, col, lit, date_sub
gold_table = "workspace.ecommerce.gold_daily_revenue"

gold_exists = spark.catalog.tableExists(gold_table)

if gold_exists:
    max_gold_date = (
        spark.table(gold_table)
        .select(spark_max("event_date").alias("max_date"))
        .collect()[0]["max_date"]
    )
else:
    max_gold_date = None


In [0]:
reprocess_days = 2

silver_df = spark.table("workspace.ecommerce.silver_ecommerce_events")

if max_gold_date:
    silver_incremental_df = silver_df.filter(
        col("event_date") >= date_sub(lit(max_gold_date), reprocess_days)
    )
else:
    silver_incremental_df = silver_df


In [0]:
from pyspark.sql.functions import sum, count
daily_revenue_df = (
    silver_incremental_df
    .filter(col("event_type") == "purchase")
    .groupBy("event_date", "category_code")
    .agg(
        count("*").alias("orders"),
        sum("price").alias("revenue")
    )
)


In [0]:
(
    daily_revenue_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", "event_date IS NOT NULL")
    .partitionBy("event_date")
    .save("/Volumes/workspace/ecommerce/gold/daily_revenue")
)


In [0]:
dau_df = (
    silver_incremental_df
    .groupBy("event_date")
    .agg(
        count("user_id").alias("active_users")
    )
)
(
    dau_df
    .write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", "event_date IS NOT NULL")
    .partitionBy("event_date")
    .save("/Volumes/workspace/ecommerce/gold/daily_active_users")
)


In [0]:
from pyspark.sql.functions import when
product_metrics_df = (
    silver_df
    .groupBy("product_id")
    .agg(
        count(when(col("event_type") == "view", True)).alias("views"),
        count(when(col("event_type") == "cart", True)).alias("carts"),
        count(when(col("event_type") == "purchase", True)).alias("purchases")
    )
    .withColumn(
        "conversion_rate",
        col("purchases") / col("views")
    )
)
(
    product_metrics_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/workspace/ecommerce/gold/product_metrics")
)


In [0]:
from pyspark.sql.functions import dense_rank, sum, col, lit
from pyspark.sql.window import Window

brand_revenue_df = (
    silver_df
    .filter(col("event_type") == "purchase")
    .groupBy("brand")
    .agg(sum("price").alias("revenue"))
)

rank_window = Window.partitionBy(lit(1)).orderBy(col("revenue").desc())

brand_revenue_df = brand_revenue_df.withColumn(
    "rank", dense_rank().over(rank_window)
)


(
    brand_revenue_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/workspace/ecommerce/gold/brand_revenue")
)


In [0]:
from pyspark.sql.functions import count, when

user_funnel_df = (
    silver_df
    .groupBy("user_id")
    .agg(
        count(when(col("event_type") == "view", True)).cast("int").alias("viewed"),
        count(when(col("event_type") == "cart", True)).cast("int").alias("added_to_cart"),
        count(when(col("event_type") == "purchase", True)).cast("int").alias("purchased")
    )
)

(
    user_funnel_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("/Volumes/workspace/ecommerce/gold/user_funnel")
)


