In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# Set Spark configs before any Spark actions
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.streaming.schemaInference", "true")
spark.conf.set("spark.sql.broadcastTimeout", "1200")

In [0]:
%sql
-- Create schemas for medallion layers

CREATE SCHEMA IF NOT EXISTS retailplex_platform.silver;

-- Create volume for schema and checkpoints
CREATE VOLUME IF NOT EXISTS retailplex_platform.silver.checkpoints
COMMENT 'Volume for silver checkpoints';

In [0]:
# Silver Schema - Customers Table
customers_schema = """
customer_id STRING,
first_name STRING,
last_name STRING,
email STRING,
registration_date TIMESTAMP,
customer_segment STRING,
age INTEGER,
gender STRING,
city STRING,
state STRING,
country STRING,
phone STRING
"""

products_schema = """
product_id STRING,
product_name STRING,
category STRING,
subcategory STRING,
brand STRING,
price DECIMAL(10,2),
cost DECIMAL(10,2),
stock_quantity INTEGER,
supplier_id STRING,
launch_date TIMESTAMP,
weight_kg DECIMAL(5,2)
"""

# Silver Schema - Orders Table
orders_schema = """
order_id STRING,
customer_id STRING,
order_date TIMESTAMP,
order_status STRING,
total_amount DECIMAL(10,2),
quantity INTEGER,
discount_amount DECIMAL(10,2),
tax_amount DECIMAL(10,2),
shipping_cost DECIMAL(10,2),
payment_method STRING,
shipping_address STRING
"""

# Silver Schema - Events Table
events_cdc_schema = """
event_id STRING,
event_type STRING,
customer_id STRING,
product_id STRING,
session_id STRING,
page_url STRING,
device_type STRING,
browser STRING,
ip_address STRING,
referrer STRING,
event_duration INTEGER
"""


In [0]:
%sql

-- Drop table if EXISTS retailplex_platform.silver.product;
CREATE TABLE IF NOT EXISTS retailplex_platform.silver.product (
    product_id STRING,
    product_name STRING,
    category STRING,
    subcategory STRING,
    brand STRING,
    price DECIMAL(10,2),
    cost DECIMAL(10,2),
    stock_quantity INT,
    supplier_id STRING,
    launch_date TIMESTAMP,
    weight_kg DECIMAL(5,2),
    processed_timestamp TIMESTAMP
    )
USING delta;

In [0]:
%sql
-- Drop table if EXISTS retailplex_platform.silver.order;
CREATE TABLE IF NOT EXISTS retailplex_platform.silver.order (
    order_id STRING,
    customer_id STRING,
    order_date TIMESTAMP,
    order_status STRING,
    total_amount DECIMAL(10,2),
    quantity INT,
    discount_amount DECIMAL(10,2),
    tax_amount DECIMAL(10,2),
    shipping_cost DECIMAL(10,2),
    payment_method STRING,
    shipping_address STRING,
    processed_timestamp TIMESTAMP
)
USING delta;

In [0]:
%sql
-- Drop table if EXISTS retailplex_platform.silver.customer;
CREATE TABLE IF NOT EXISTS retailplex_platform.silver.customer (
  customer_id STRING,
    first_name STRING,
    last_name STRING,
    email STRING,
    registration_date TIMESTAMP,
    customer_segment STRING,
    age INT,
    gender STRING,
    city STRING,
    state STRING,
    country STRING,
    phone STRING,
    active_ind INTEGER,
    valid_from TIMESTAMP,
    valid_to TIMESTAMP
)
USING delta;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS retailplex_platform.silver.event_cdc
(
  event_id STRING,
  event_type STRING,
  customer_id STRING,
  product_id STRING,
  session_id STRING,
  page_url STRING,
  device_type STRING,
  browser STRING,
  ip_address STRING,
  referrer STRING,
  event_duration INT
)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
%sql
CREATE TABLE IF NOT EXISTS retailplex_platform.silver.event
(
  event_id STRING,
  event_type STRING,
  customer_id STRING,
  product_id STRING,
  session_id STRING,
  page_url STRING,
  device_type STRING,
  browser STRING,
  ip_address STRING,
  referrer STRING,
  event_duration INT,
  processed_timestamp TIMESTAMP
)
USING delta;

Step 1: Read from Bronze

In [0]:

bronzeDF = (spark.readStream
    .table("retailplex_platform.bronze.multiplex_stream")
)

Step 2: Order data Processing

In [0]:
# Define the merge function for foreachBatch
def merge_batch_order(batchDF, batchId):
    # Register the deduplicated batch as a temp view for SQL
    batchDF.createOrReplaceTempView("batch_orders")
    
    # Perform MERGE INTO using SQL
    spark_sql = ("""
        MERGE INTO retailplex_platform.silver.order AS target
        USING batch_orders AS source
        ON target.order_id = source.order_id AND target.order_date = source.order_date
        WHEN MATCHED THEN
            UPDATE SET
                target.customer_id = source.customer_id,
                target.order_status = source.order_status,
                target.total_amount = source.total_amount,
                target.quantity = source.quantity,
                target.discount_amount  = source.discount_amount ,
                target.tax_amount  = source.tax_amount ,
                target.shipping_cost  = source.shipping_cost ,
                target.payment_method  = source.payment_method ,
                target.shipping_address  = source.shipping_address ,
                target.processed_timestamp = source.processed_timestamp
        WHEN NOT MATCHED THEN
            INSERT (
                order_id,
                customer_id,
                order_date,
                order_status,
                total_amount,
                quantity,
                discount_amount,
                tax_amount,
                shipping_cost,
                payment_method,
                shipping_address,
                processed_timestamp
            )
            VALUES (
                source.order_id,
                source.customer_id,
                source.order_date,
                source.order_status,
                source.total_amount,
                source.quantity,
                source.discount_amount,
                source.tax_amount,
                source.shipping_cost,
                source.payment_method,
                source.shipping_address,
                source.processed_timestamp
            );
    """)
    batchDF.sparkSession.sql(spark_sql)


In [0]:
def process_order_data():
    bronzeDF.createOrReplaceTempView("bronze_product_stream")

    # Parse the JSON data and filter for orders topic using SQL
    ordersDF = bronzeDF.filter("topic = 'orders'")\
        .select(F.from_json(F.col("data").cast("string"), orders_schema).alias("order")) \
        .select("order.*", F.current_timestamp().alias("processed_timestamp") )\
        .withWatermark("order_date", "10 seconds")\
        .dropDuplicates(["order_id", "order_date"])

    ordersQuery = (ordersDF.writeStream
    .format("delta")
    .option("checkpointLocation", "/Volumes/retailplex_platform/silver/checkpoints/order")
    .trigger(availableNow=True)
    .foreachBatch(merge_batch_order)
    .start())

    ordersQuery.awaitTermination()

In [0]:
process_order_data()

Step 2: Product data Processing

In [0]:
def merge_batch_product(batchDF, batchId):
    # Register batch dataframe as a temp view for SQL
    batchDF.createOrReplaceTempView("raw_batch_products")
    
    spark = batchDF.sparkSession
    
    # Deduplicate source by product_id keeping latest record if duplicates exist
    spark.sql("""
    CREATE OR REPLACE TEMP VIEW batch_products AS
    SELECT product_id,
           product_name,
           category,
           brand,
           price,
           cost,
           stock_quantity,
           supplier_id,
           launch_date,
           weight_kg,
           processed_timestamp
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY processed_timestamp DESC) AS rn
      FROM raw_batch_products
    ) dedup
    WHERE rn = 1
    """)

    # Now perform the merge using the deduplicated view
    spark.sql("""
        MERGE INTO retailplex_platform.silver.product AS target
        USING batch_products AS source
        ON target.product_id = source.product_id
        WHEN MATCHED THEN
            UPDATE SET
                target.product_name = source.product_name,
                target.category = source.category,
                target.brand = source.brand,
                target.price = source.price,
                target.cost = source.cost,
                target.stock_quantity = source.stock_quantity,
                target.supplier_id = source.supplier_id,
                target.launch_date = source.launch_date,
                target.weight_kg = source.weight_kg,
                target.processed_timestamp = source.processed_timestamp
        WHEN NOT MATCHED THEN
            INSERT (
                product_id,
                product_name,
                category,
                brand,
                price,
                cost,
                stock_quantity,
                supplier_id,
                launch_date,
                weight_kg,
                processed_timestamp
            )
            VALUES (
                source.product_id,
                source.product_name,
                source.category,
                source.brand,
                source.price,
                source.cost,
                source.stock_quantity,
                source.supplier_id,
                source.launch_date,
                source.weight_kg,
                source.processed_timestamp
            )
    """)


In [0]:
def process_product_data():
    bronzeDF.createOrReplaceTempView("bronze_product_stream")

    # Parse the JSON data and filter for orders topic using SQL
    productsDF = spark.sql(f"""
        SELECT products.*, current_timestamp() AS processed_timestamp
        FROM (
            SELECT from_json(cast(data AS string), '{products_schema}') AS products
            FROM bronze_product_stream
            WHERE topic = 'products'
        )
    """)

    productsQuery = (productsDF.writeStream
    .format("delta")
    .option("checkpointLocation", "/Volumes/retailplex_platform/silver/checkpoints/product")
    .trigger(availableNow=True)
    .foreachBatch(merge_batch_product)
    .start())

    productsQuery.awaitTermination()

In [0]:
process_product_data()

Step 2: Customer data Processing

In [0]:
# Define the merge function for foreachBatch
def merge_batch_customer(batchDF, batchId):
    # Register batch dataframe as a temp view for SQL usage
    batchDF.createOrReplaceTempView("batch_customers")
    
    # Use spark session from batchDF
    spark = batchDF.sparkSession
    
    # Create staged_updates view (one time)
    spark.sql("""
    CREATE OR REPLACE TEMP VIEW staged_updates AS
    SELECT customer_id,
           first_name,
           last_name,
           email,
           registration_date,
           customer_segment,
           age,
           gender,
           city,
           state,
           country,
           phone,
           current_timestamp() AS valid_from
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY current_timestamp() DESC) AS rn
      FROM batch_customers
    ) dedup
    WHERE rn = 1
    """)
    
     # Step 1: Expire old active records that have changed
    spark.sql("""
    MERGE INTO retailplex_platform.silver.customer AS target
    USING staged_updates AS source
    ON target.customer_id = source.customer_id
       AND target.active_ind = 1
        AND (
         target.first_name <> source.first_name OR
         target.last_name <> source.last_name OR
         target.email <> source.email OR
         target.registration_date <> source.registration_date OR
         target.customer_segment <> source.customer_segment OR
         target.age <> source.age OR
         target.gender <> source.gender OR
         target.city <> source.city OR
         target.state <> source.state OR
         target.country <> source.country OR
         target.phone <> source.phone
       )
    WHEN MATCHED THEN
      UPDATE SET active_ind = 0,
                 valid_to = current_timestamp()
    """)

        # Step 2: Insert new changed and new rows (active records)
    # Insert new changed and new records which are not present as active rows anymore
    spark.sql("""
    INSERT INTO retailplex_platform.silver.customer
    SELECT
      su.customer_id,
      su.first_name,
      su.last_name,
      su.email,
      su.registration_date,
      su.customer_segment,
      su.age,
      su.gender,
      su.city,
      su.state,
      su.country,
      su.phone,
      1 AS active_ind,
      current_timestamp() AS valid_from,
      NULL AS valid_to
    FROM staged_updates su
    LEFT JOIN retailplex_platform.silver.customer t
      ON su.customer_id = t.customer_id
      AND t.active_ind = 1
    WHERE t.customer_id IS NULL
       OR (
         t.first_name <> su.first_name OR
         t.last_name <> su.last_name OR
         t.email <> su.email OR
         t.registration_date <> su.registration_date OR
         t.customer_segment <> su.customer_segment OR
         t.age <> su.age OR
         t.gender <> su.gender OR
         t.city <> su.city OR
         t.state <> su.state OR
         t.country <> su.country OR 
         t.phone <> su.phone
       )
    """)

In [0]:
def process_customer_data():
    bronzeDF.createOrReplaceTempView("bronze_customer_stream")
    customersDF = spark.sql(f"""
    SELECT customers.*, 
            1 as active_ind,
           current_timestamp() AS valid_from, 
           NULL AS valid_to
    FROM (
        SELECT from_json(cast(data AS string), '{customers_schema}') AS customers
        FROM bronze_customer_stream
        WHERE topic = 'customers'
    )
        """)

    customersQuery = (customersDF.writeStream
        .format("delta")
        .option("checkpointLocation", "/Volumes/retailplex_platform/silver/checkpoints/customer")
        .trigger(availableNow=True)
        .foreachBatch(merge_batch_customer)
        .start())

    customersQuery.awaitTermination()

In [0]:
process_customer_data()

Step 2: Event_CDC data Processing

In [0]:
def merge_events_cdc(batch_df, batch_id):
    if batch_df.count() > 0:
        # Create a temporary view from the batch DataFrame
        batch_df.createOrReplaceTempView("batch_events")
        
        # Execute SQL MERGE INTO statement
        spark_sql = ("""
                MERGE INTO retailplex_platform.silver.event_cdc AS target
                USING batch_events AS source
                ON target.event_id = source.event_id
                WHEN MATCHED THEN
                    UPDATE SET
                        event_type = source.event_type,
                        customer_id = source.customer_id,
                        product_id = source.product_id,
                        session_id = source.session_id,
                        page_url = source.page_url,
                        device_type = source.device_type,
                        browser = source.browser,
                        ip_address = source.ip_address,
                        referrer = source.referrer,
                        event_duration = source.event_duration
                WHEN NOT MATCHED THEN
                    INSERT (event_id, event_type, customer_id, product_id, session_id, 
                        page_url, device_type, browser, ip_address, referrer, event_duration)
                    VALUES (source.event_id, source.event_type, source.customer_id, 
                        source.product_id, source.session_id, source.page_url,
                        source.device_type, source.browser, source.ip_address, 
                        source.referrer, source.event_duration)
            """)

    batch_df.sparkSession.sql(spark_sql)


In [0]:
def process_event_cdc_data():
    eventsDF = bronzeDF.filter("topic = 'events'")\
    .select(F.from_json(F.col("data").cast("string"), events_cdc_schema)
            .alias("events"))\
    .select("events.*")

    eventsQuery = (eventsDF.writeStream
                .format("delta")
                .foreachBatch(merge_events_cdc)
                .option("checkpointLocation", "/Volumes/retailplex_platform/silver/checkpoints/event_cdc")
                .trigger(availableNow=True)
                .start())

    eventsQuery.awaitTermination()

In [0]:
process_event_cdc_data()

Step 2: Event data Processing

In [0]:
# Define the merge function for foreachBatch
def merge_latest_events_batch(batchDF, batchId):
    # Register the deduplicated batch as a temp view for SQL


    window = Window.partitionBy("event_id").orderBy(F.col("_commit_version").desc())

    (batchDF.filter(F.col("_change_type").isin(["insert", "update_postimage","delete"])) 
        .withColumn("rank", F.rank().over(window))
        .filter("rank = 1") 
        .drop("rank")
        .withColumnRenamed("_commit_timestamp", "processed_timestamp")
        .createOrReplaceTempView("batch_events"))
    
    # Perform MERGE INTO using SQL
    spark_sql = ("""
        MERGE INTO retailplex_platform.silver.event AS target
        USING batch_events AS source
        ON target.event_id = source.event_id 
        WHEN MATCHED AND source._change_type != 'delete'
        AND target.processed_timestamp < source.processed_timestamp
         THEN
            UPDATE SET
                target.event_type = source.event_type,
                target.customer_id = source.customer_id,
                target.product_id = source.product_id,
                target.session_id = source.session_id,
                target.page_url  = source.page_url ,
                target.device_type  = source.device_type ,
                target.browser  = source.browser ,
                target.ip_address  = source.ip_address ,
                target.referrer  = source.referrer ,
                target.event_duration  = source.event_duration ,
                target.processed_timestamp = current_timestamp()
        WHEN MATCHED AND source._change_type = 'delete' THEN
            DELETE
        WHEN NOT MATCHED AND source._change_type != 'delete' THEN
            INSERT (
                event_id,
                event_type,
                customer_id,
                product_id,
                session_id,
                page_url,
                device_type,
                browser,
                ip_address,
                referrer,
                event_duration,
                processed_timestamp
            )
            VALUES (
                source.event_id,
                source.event_type,
                source.customer_id,
                source.product_id,
                source.session_id,
                source.page_url,
                source.device_type,
                source.browser,
                source.ip_address,
                source.referrer,
                source.event_duration,
                current_timestamp()
            );
    """)
    batchDF.sparkSession.sql(spark_sql)

In [0]:
def process_event_data():
    cdc_stream = (spark.readStream
    .option("readChangeFeed", "true")  # Stream changes via CDC
    .option("startingVersion", 0)  # Start from version 0; 
    .table("retailplex_platform.silver.event_cdc"))

    query = (cdc_stream.writeStream
    .format("delta")
    .outputMode("update")  # Append changes to batch
    .option("checkpointLocation", "/Volumes/retailplex_platform/silver/checkpoints/event")
    .trigger(availableNow=True)
    .foreachBatch(merge_latest_events_batch)
    .start())

    query.awaitTermination()


In [0]:
process_event_data()

In [0]:
%sql
USE CATALOG retailplex_platform;


CREATE OR REPLACE VIEW silver.broadcast_customer_segments AS
SELECT
  CAST(segment_id AS INT)                               AS segment_id,
  TRIM(segment_name)                                    AS segment_name,
  TRIM(segment_description)                             AS segment_description,
  CAST(min_spend_threshold AS DECIMAL(18,2))            AS min_spend_threshold,
  CAST(discount_percentage AS DECIMAL(5,2))             AS discount_percentage,
  CAST(priority_support AS BOOLEAN)                     AS priority_support,
  CAST(free_shipping_threshold AS DECIMAL(18,2))        AS free_shipping_threshold,
  CURRENT_TIMESTAMP()                                   AS _ingested_at
FROM bronze.customer_segments
QUALIFY ROW_NUMBER() OVER (PARTITION BY segment_id ORDER BY segment_id) = 1;


In [0]:
%sql
CREATE OR REPLACE VIEW silver.broadcast_product_categories AS
SELECT
  CAST(category_id AS INT)                   AS category_id,
  TRIM(category_name)                        AS category_name,
  TRIM(category_description)                 AS category_description,
  CAST(commission_rate AS DECIMAL(5,4))      AS commission_rate,
  CAST(tax_rate AS DECIMAL(5,4))             AS tax_rate,
  CAST(return_policy_days AS INT)            AS return_policy_days,
  CURRENT_TIMESTAMP()                        AS _ingested_at
FROM bronze.product_categories
QUALIFY ROW_NUMBER() OVER (PARTITION BY category_id ORDER BY category_id) = 1;


In [0]:
%sql
CREATE OR REPLACE VIEW silver.broadcast_product_subcategories AS
SELECT
  CAST(subcategory_id AS INT)            AS subcategory_id,
  TRIM(subcategory_name)                 AS subcategory_name,
  CAST(category_id AS INT)               AS category_id,
  TRIM(subcategory_description)          AS subcategory_description,
  TRIM(storage_requirements)             AS storage_requirements,
  CAST(fragile_flag AS BOOLEAN)          AS fragile_flag,
  CURRENT_TIMESTAMP()                    AS _ingested_at
FROM bronze.product_subcategories
QUALIFY ROW_NUMBER() OVER (PARTITION BY subcategory_id ORDER BY subcategory_id) = 1;


In [0]:
%sql
CREATE OR REPLACE VIEW silver.broadcast_suppliers AS
SELECT
  TRIM(supplier_id)                        AS supplier_id,
  TRIM(supplier_name)                      AS supplier_name,
  TRIM(supplier_country)                   AS supplier_country,
  TRIM(supplier_region)                    AS supplier_region,
  TRIM(contact_email)                      AS contact_email,
  CAST(payment_terms_days AS INT)          AS payment_terms_days,
  CAST(quality_rating AS DECIMAL(3,1))     AS quality_rating,
  CAST(delivery_time_days AS INT)          AS delivery_time_days,
  CAST(minimum_order_value AS DECIMAL(18,2)) AS minimum_order_value,
  CURRENT_TIMESTAMP()                      AS _ingested_at
FROM bronze.suppliers
QUALIFY ROW_NUMBER() OVER (PARTITION BY supplier_id ORDER BY supplier_id) = 1;




In [0]:
%sql
CREATE OR REPLACE VIEW silver.geography AS
SELECT
  UPPER(TRIM(state_code))        AS state_code,
  TRIM(state_name)               AS state_name,
  TRIM(region)                   AS region,
  UPPER(TRIM(country_code))      AS country_code,
  TRIM(country_name)             AS country_name,
  TRIM(timezone)                 AS timezone,
  CAST(sales_tax_rate AS DECIMAL(5,4)) AS sales_tax_rate,
  CAST(shipping_zone AS INT)     AS shipping_zone,
  CAST(population_millions AS DECIMAL(6,1)) AS population_millions,
  CURRENT_TIMESTAMP()            AS _ingested_at
FROM bronze.geography
QUALIFY ROW_NUMBER() OVER (PARTITION BY state_code ORDER BY state_code) = 1;


In [0]:
%sql
select * from retailplex_platform.silver.customer 

In [0]:
%sql
select * FROM table_changes('retailplex_platform.silver.event_cdc', 0)



In [0]:
%sql
 drop table if exists retailplex_platform.silver.event_cdc

In [0]:
%sql
select * from retailplex_platform.bronze.multiplex_stream 

In [0]:
latest_version = spark.sql(f"SELECT max(version) FROM (DESCRIBE HISTORY delta.`{first_silver_path}`)").collect()[0][0]