# Zenith Online - Data Transformation Pipeline (Unity Catalog)

This notebook implements the full Bronze-Silver-Gold ETL pipeline for Zenith Online, fully integrated with Databricks Unity Catalog. It is designed to be run after the data generator has populated the landing zone volume.

## Pipeline Initialization

Import required libraries and set up Spark session. Define all paths and table names for Unity Catalog integration.

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.window import Window
import pandas as pd

# In Databricks, the SparkSession `spark` is already created for you.

In [0]:
# The data generator wrote data to these UC Volume paths
RAW_STREAMING_PATH = "/Volumes/zenith_online/00_landing/streaming/user_events"
RAW_BATCH_CUSTOMERS_PATH = "/Volumes/zenith_online/00_landing/batch/customers"
RAW_BATCH_PRODUCTS_PATH = "/Volumes/zenith_online/00_landing/batch/products"

# Define the three-level namespace for Unity Catalog
CATALOG_NAME = "zenith_online"
BRONZE_SCHEMA = "01_bronze"
SILVER_SCHEMA = "02_silver"
GOLD_SCHEMA = "03_gold"

# Define UC Volume paths for streaming checkpoints and schema metadata
CHECKPOINT_BASE_PATH = f"/Volumes/{CATALOG_NAME}/_system/checkpoints"
SCHEMA_BASE_PATH = f"/Volumes/{CATALOG_NAME}/_system/schemas"

# Full table names
BRONZE_EVENTS_TABLE = f"{CATALOG_NAME}.{BRONZE_SCHEMA}.bronze_user_events"
BRONZE_CUSTOMERS_TABLE = f"{CATALOG_NAME}.{BRONZE_SCHEMA}.bronze_customer_profiles"
BRONZE_PRODUCTS_TABLE = f"{CATALOG_NAME}.{BRONZE_SCHEMA}.bronze_product_details"
SILVER_TABLE = f"{CATALOG_NAME}.{SILVER_SCHEMA}.silver_sessionized_activity"
GOLD_DAILY_PRODUCT_TABLE = f"{CATALOG_NAME}.{GOLD_SCHEMA}.gold_daily_product_performance"
GOLD_CUSTOMER_SUMMARY_TABLE = f"{CATALOG_NAME}.{GOLD_SCHEMA}.customer_purchase_summary"

## Unity Catalog Environment Setup

Create the catalog and schemas if they do not exist. This ensures all downstream tables are created in the correct namespace.

In [0]:
# Create the Catalog and Schemas if they do not exist
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {BRONZE_SCHEMA}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SILVER_SCHEMA}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {GOLD_SCHEMA}")

print(f"Unity Catalog environment '{CATALOG_NAME}' is ready.")


## 01 - Bronze Layer: Raw Data Ingestion

### Streaming User Events
Ingest raw user event data from JSON files using Auto Loader and write to a Delta table in the Bronze schema.

In [0]:
print("Starting Bronze Layer: Streaming Events Ingestion")

event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("event_timestamp", StringType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("session_id", StringType(), True)
])

bronze_events_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{SCHEMA_BASE_PATH}/bronze_events")
    .schema(event_schema)
    .load(RAW_STREAMING_PATH)
    .withColumn("ingestion_timestamp", F.current_timestamp())
    .withColumn("event_dt", F.to_timestamp("event_timestamp"))
)

(
    bronze_events_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_BASE_PATH}/bronze_events")
    .trigger(availableNow=True) 
    .toTable(BRONZE_EVENTS_TABLE)
)

print(f"Bronze Layer: Streaming events processing complete. Table `{BRONZE_EVENTS_TABLE}` is updated.")

### Batch Customer & Product Data
Ingest customer profiles (CSV) and product details (Parquet) into Bronze tables for further enrichment.

In [0]:
print("Starting Bronze Layer: Batch Ingestion")

customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("signup_date", StringType(), True),
    StructField("location", StringType(), True)
])
customers_df = spark.read.format("csv").schema(customer_schema).option("header", "true").load(RAW_BATCH_CUSTOMERS_PATH)
customers_df.write.format("delta").mode("overwrite").saveAsTable(BRONZE_CUSTOMERS_TABLE)
print(f"Bronze Layer: Wrote customer profiles to `{BRONZE_CUSTOMERS_TABLE}`.")

products_df = spark.read.format("parquet").load(RAW_BATCH_PRODUCTS_PATH)
products_df.write.format("delta").mode("overwrite").saveAsTable(BRONZE_PRODUCTS_TABLE)
print(f"Bronze Layer: Wrote product details to `{BRONZE_PRODUCTS_TABLE}`.")

## 02 - Silver Layer - Clean, Enrich, and De-duplicate

**Goal:** Read from the Bronze UC tables, enrich the data, and write the cleaned result to a Silver UC table (`zenith_online.02_silver.silver_sessionized_activity`).

### Define Region Categorization Logic

Create a Pandas UDF to categorize customer locations into regions for downstream analytics.

In [0]:
@F.pandas_udf(StringType())
def categorize_region(locations: pd.Series) -> pd.Series:
    east_coast = ['New York', 'Philadelphia']
    west_coast = ['Los Angeles', 'San Diego']
    def get_region(location):
        if location in east_coast: return 'East Coast'
        elif location in west_coast: return 'West Coast'
        else: return 'Central'
    return locations.apply(get_region)

print("Silver Layer: Pandas UDF `categorize_region` created.")


### Silver Layer Transformation

Deduplicate streaming events, join with customer and product data, enrich with region, and write to the Silver Delta table.

In [0]:
print("Starting Silver Layer: Enriching and Cleaning Events")

bronze_events_stream_df = spark.readStream.table(BRONZE_EVENTS_TABLE)
customers_df = spark.read.table(BRONZE_CUSTOMERS_TABLE)
products_df = spark.read.table(BRONZE_PRODUCTS_TABLE)

deduplicated_stream_df = (
    bronze_events_stream_df
        .withWatermark("event_dt", "3 minutes")
        .dropDuplicates(["event_id"])
)

enriched_df = (
    deduplicated_stream_df
    .join(
        F.broadcast(products_df), 
        deduplicated_stream_df.product_id == products_df.product_id,
        "inner"
    )
    .join(
        customers_df,
        deduplicated_stream_df.user_id == customers_df.customer_id,
        "left"
    )
    .withColumn("region", categorize_region(F.col("location")))
    .withColumnRenamed("event_type", "action")
    .withColumn("event_date", F.to_date(F.col("event_dt")))
    .select(
        "event_id", "event_dt", "event_date", "action",
        deduplicated_stream_df.user_id, "region",
        deduplicated_stream_df.product_id, "product_name", "category", "price"
    )
)

(
    enriched_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", f"{CHECKPOINT_BASE_PATH}/silver_activity")
    .trigger(availableNow=True)
    .toTable(SILVER_TABLE)
)

print(f"Silver Layer: Enriched activity stream processing complete. Table `{SILVER_TABLE}` is updated.")

## 03 - Gold Layer: Business-Focused Aggregations

Produce analytics-ready tables for business intelligence and reporting.

### Daily Product Performance (Handling Data Skew)

Aggregate Silver data to compute daily product metrics, using salting to mitigate data skew for popular products.

In [0]:
print("Starting Gold Layer: Daily Product Performance Aggregation")
silver_df = spark.read.table(SILVER_TABLE)

SALTING_FACTOR = 5
salted_df = silver_df.withColumn("salt", (F.rand() * SALTING_FACTOR).cast("int"))

salted_agg_df = (
    salted_df.groupBy("event_date", "product_id", "product_name", "category", "salt")
    .agg(
        F.count(F.when(F.col("action") == "view_product", 1)).alias("views"),
        F.count(F.when(F.col("action") == "add_to_cart", 1)).alias("adds_to_cart"),
        F.count(F.when(F.col("action") == "purchase", 1)).alias("purchases"),
        F.sum(F.when(F.col("action") == "purchase", F.col("price")).otherwise(0)).alias("daily_revenue")
    )
)

daily_product_performance_df = (
    salted_agg_df.groupBy("event_date", "product_id", "product_name", "category")
    .agg(
        F.sum("views").alias("total_views"),
        F.sum("adds_to_cart").alias("total_adds_to_cart"),
        F.sum("purchases").alias("total_purchases"),
        F.sum("daily_revenue").alias("total_revenue")
    )
)

window_spec = Window.partitionBy("event_date", "category").orderBy(F.col("total_revenue").desc())
final_product_gold_df = daily_product_performance_df.withColumn("revenue_rank", F.rank().over(window_spec))

(
    final_product_gold_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("event_date")
    .saveAsTable(GOLD_DAILY_PRODUCT_TABLE)
)

print(f"Gold Layer: Wrote to `{GOLD_DAILY_PRODUCT_TABLE}` after handling skew.")


In [0]:
display(final_product_gold_df)

### Customer Purchase Summary

Aggregate customer-level purchase metrics and perform a data quality check for missing regions.

In [0]:
print("Starting Gold Layer: Customer Purchase Summary")
silver_df = spark.read.table(SILVER_TABLE)

# --- Data Quality Check (Serverless-Compatible Method) ---
# # Accumulators are not available on serverless compute as they require direct sparkContext access.
# unknown_location_count = spark.sparkContext.accumulator(0)
# def count_unknown_locations(region):
#     if region is None:
#         global unknown_location_count
#         unknown_location_count += 1
#     return region
# count_unknown_udf = F.udf(count_unknown_locations, StringType())
# The best practice is to perform a direct count() action on a filtered DataFrame.
# This achieves the same goal of gathering a metric from the data.
unknown_location_count = silver_df.filter(F.col("region").isNull()).count()

print(f"Data Quality Check: Found {unknown_location_count} events with unknown customer locations.")

customer_summary_df = (
    silver_df.filter(F.col("action") == 'purchase')
    .groupBy("user_id", "region")
    .agg(
        F.sum("price").alias("total_purchase_value"),
        F.count("event_id").alias("total_purchases"),
        F.approx_count_distinct("product_id").alias("distinct_products_purchased"),
        F.max("event_dt").alias("last_purchase_timestamp")
    )
    .orderBy(F.col("total_purchase_value").desc())
)

(
    customer_summary_df.write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(GOLD_CUSTOMER_SUMMARY_TABLE)
)
print(f"Gold Layer: Wrote to `{GOLD_CUSTOMER_SUMMARY_TABLE}`.")

## Answering Business Questions

Now that our Gold tables exist in Unity Catalog, we can query them directly with SQL.


### Business Question 1: What are the top 5 most purchased products for the most recent day?

Query the Gold product performance table to find the most popular products.

In [0]:
print("--- Top 5 Purchased Products (Most Recent Day) ---")
spark.sql(f"""
    SELECT 
        event_date,
        product_name,
        category,
        total_purchases,
        total_revenue
    FROM {GOLD_DAILY_PRODUCT_TABLE}
    --WHERE event_date = (SELECT MAX(event_date) FROM {GOLD_DAILY_PRODUCT_TABLE})
    ORDER BY total_purchases DESC
    LIMIT 5
""").show()


### Business Question 2: What is the daily sales revenue per category?

Summarize daily revenue by product category for trend analysis.

In [0]:
print("--- Daily Sales Revenue per Category ---")
spark.sql(f"""
    SELECT 
        event_date,
        category,
        SUM(total_revenue) as category_revenue
    FROM {GOLD_DAILY_PRODUCT_TABLE}
    GROUP BY event_date, category
    ORDER BY event_date DESC, category_revenue DESC
""").show(truncate=False)


### Top 10 Customers by Purchase Value

Identify the highest-value customers based on total purchase value.

In [0]:
print("--- Top 10 Customers by Purchase Value ---")
spark.sql(f"""
    SELECT
        user_id,
        region,
        total_purchase_value,
        total_purchases,
        last_purchase_timestamp
    FROM {GOLD_CUSTOMER_SUMMARY_TABLE}
    ORDER BY total_purchase_value DESC
    LIMIT 10
""").show()

## End of Pipeline

The script has successfully processed data through the Bronze, Silver, and Gold layers within Unity Catalog, creating valuable business assets and demonstrating key capabilities tested in the Spark Developer certification exam.