In [0]:
%run ./01_Project_Config

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from delta.tables import *

In [0]:
# 1 Read Bronze
df_bronze = spark.readStream.table(f"{catalog}.bronze.pos_sales")

In [0]:
# 2. Standardisation 
df_standardized = (
    df_bronze
    .withColumn("channel", upper(trim(col("channel"))))
    .withColumn("store_id", upper(trim(col("store_id"))))
    .withColumn("customer_id", upper(trim(col("customer_id"))))
    .withColumn("product_id", upper(trim(col("product_id"))))
    .withColumn("payment_type", upper(trim(col("payment_type"))))
    .withColumn("currency", upper(trim(col("currency"))))
    .withColumn(
        "promo_code",
        upper(trim(coalesce(col("promo_code"), lit("NA"))))
    )
    .withColumn(
        "city",
        initcap(trim(coalesce(col("city"), lit("UNKNOWN"))))
    )
    .withColumn(
        "txn_ts",
        to_timestamp(trim(col("txn_ts")))
    )
)

In [0]:
# 3. Data Quality Checks (Tagging Rows)
df_tagged = (df_standardized
    .withColumn(
        "is_valid",
        when(col("store_id").isNull(), False)
        .when(col("unit_price") <= 0, False)
        .when(col("quantity") <= 0, False)
        .when(col("customer_id").isNull(), False)
        .when(col("product_id").isNull(), False)
        .when((col("discount_pct") < 0) | (col("discount_pct") > 100), False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("store_id").isNull(), "Missing Store ID")
        .when(col("unit_price") <= 0, "Invalid Unit Price")
        .when(col("quantity") <= 0, "Invalid Quantity")
        .when(col("customer_id").isNull(), "Missing Customer ID")
        .when(col("product_id").isNull(), "Missing Product ID")
        .when((col("discount_pct") < 0) | (col("discount_pct") > 100), "Invalid Discount")
        .otherwise(lit(None))
    )
)


In [0]:
# 4. Split stream
# Good Data 
df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")

# Bad Data 
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. ENRICHMENT (Business Logic)
# Calculate Gross Sales and Net Sales
df_enriched = (df_clean
    .withColumn("gross_sales", col("quantity") * col("unit_price"))
    .withColumn("discount_amount", col("gross_sales") * (col("discount_pct") / 100))
    .withColumn("net_sales", col("gross_sales") - col("discount_amount"))
    .withColumn("processed_ts", current_timestamp())
)

In [0]:
# 6. DEDUPLICATION FUNCTION
# We use 'foreachBatch' because Window functions work best in micro-batches
def upsert_to_silver(microBatchDF, batchId):
    windowSpec = Window.partitionBy("txn_id").orderBy(col("ingest_ts").desc())
    
    df_deduped = (microBatchDF
                  .withColumn("row_num", row_number().over(windowSpec))
                  .filter(col("row_num") == 1)
                  .drop("row_num")
                 )
    
    (df_deduped.write
     .format("delta")
     .mode("append") 
     .option("mergeSchema", "true")
     .saveAsTable(f"{catalog}.silver.pos_sales")
    )

In [0]:
# 7. Write stream
print("... Processing Valid Records to Silver")
query_silver = (df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths['checkpoints'] + "pos_silver")
    .trigger(availableNow=True) 
    .start()
)

In [0]:
print("... Processing Invalid Records to Quarantine")
query_quarantine = (df_quarantine.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")
    .option("checkpointLocation", paths['checkpoints'] + "pos_quarantine")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.pos_sales_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()
print("Silver POS Processing Complete.")

In [0]:
%sql 
select * from retail_lakehouse.silver.pos_sales;

In [0]:
%sql 
select * from retail_lakehouse.quarantine.pos_sales_quarantine

Silver Returns

In [0]:
df_bronze_ret = spark.readStream.table(f"{catalog}.bronze.returns")

In [0]:
# 1. Standardize
df_standard_ret = (df_bronze_ret
    .withColumn("return_reason", upper(col("return_reason")))
    .withColumn("return_reason", coalesce(col("return_reason"), lit("UNKNOWN")))
    .withColumn("return_id", upper(col("return_id")))
    .withColumn("txn_id", upper(col("txn_id")))
    .withColumn("product_id", upper(col("product_id")))
    .withColumn("channel", trim(col("product_id")))
        .withColumn(
        "return_ts",
        to_timestamp(trim(col("return_ts")))
    )
)

In [0]:
# 2. Tagging (Orphan Checks)
df_tagged_ret = df_standard_ret.withColumn("is_valid", 
    when(col("txn_id").isNull() | upper(col("txn_id")).contains("UNKNOWN"), False) 
    .when(col("return_qty") <= 0, False)
    .when(col("product_id").isNull(), False)
    .when(col("return_id").isNull(), False)
    .otherwise(True)
).withColumn("reject_reason",
    when(col("txn_id").isNull() | upper(col("txn_id")).contains("UNKNOWN"), "Missing Transaction ID")
    .when(col("return_qty") <= 0, "Invalid Quantity")
    .when(col("product_id").isNull(), "Missing Product ID")
    .when(col("return_id").isNull(), "Missing Return ID")
    .otherwise(None)
)

In [0]:
# 3. Split
df_clean_ret = df_tagged_ret.filter(col("is_valid") == True).drop("is_valid", "reject_reason")
df_quarantine_ret = df_tagged_ret.filter(col("is_valid") == False)

In [0]:
# 4. DEDUPLICATION FUNCTION
def upsert_to_silver(microBatchDF, batchId):
    windowSpec = Window.partitionBy("return_id").orderBy(col("ingest_ts").desc())
    
    df_deduped = (microBatchDF
                  .withColumn("row_num", row_number().over(windowSpec))
                  .filter(col("row_num") == 1)
                  .drop("row_num")
                 )
    
    # Write to Silver Table
    (df_deduped.write
     .format("delta")
     .mode("append") 
     .option("mergeSchema", "true")
     .saveAsTable(f"{catalog}.silver.returns")
    )

In [0]:
# 5. Write Streams (Trigger both)
print("... Processing Valid Records to Silver")
query_silver = (df_clean_ret.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths['checkpoints'] + "returns_silver")
    .trigger(availableNow=True) 
    .start()
)

In [0]:
print("... Processing Invalid Records to Quarantine")
query_quarantine = (df_quarantine_ret.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", paths['checkpoints'] + "returns_quarantine")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.returns_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()
print("Silver returns Processing Complete.")

In [0]:
%sql 
select * from retail_lakehouse.silver.returns;

In [0]:
%sql
select * from retail_lakehouse.quarantine.returns_quarantine;

Silver Inventory


In [0]:
# 1. Read Bronze Stream
df_bronze = spark.readStream.table(f"{catalog}.bronze.inventory")

In [0]:
#2. Standardization
df_standardized = (
    df_bronze
    .withColumn("store_id", upper(trim(col("store_id"))))
    .withColumn("product_id", upper(trim(col("product_id"))))
    .withColumn("snapshot_date", to_date(col("snapshot_date")))
)

In [0]:
#3. Data Quality Tagging
df_tagged = (
    df_standardized
    .withColumn(
        "is_valid",
        when(col("store_id").isNull(), False)
        .when(col("product_id").isNull(), False)
        .when(col("stock_on_hand") < 0, False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("store_id").isNull(), "Missing Store ID")
        .when(col("product_id").isNull(), "Missing Product ID")
        .when(col("stock_on_hand") < 0, "Negative Stock")
        .otherwise(lit(None))
    )
)

In [0]:
# 4. Split Streams
df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. Enrichment
df_enriched = df_clean.withColumn("processed_ts", current_timestamp())

In [0]:
# 6. Merge Logic (Step 6 Adjusted for Stock Update)
def upsert_to_silver(microBatchDF, batchId):
    # 1. Deduplicate the incoming batch
    windowSpec = Window.partitionBy("store_id", "product_id").orderBy(col("snapshot_date").desc())
    df_deduped = (microBatchDF
                  .withColumn("rn", row_number().over(windowSpec))
                  .filter("rn=1").drop("rn"))
    
    # 2. Check: Does the table exist?
    if not SparkSession.getActiveSession().catalog.tableExists(f"{catalog}.silver.inventory"):
        # If not exists, create it by writing the first batch
        print(f"Batch {batchId}: Table doesn't exist. Creating new Delta table...")
        (df_deduped.write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{catalog}.silver.inventory")
        )
    else:
        # If exists, do the MERGE
        print(f"Batch {batchId}: Merging into existing table...")
        deltaTable = DeltaTable.forName(spark, f"{catalog}.silver.inventory")
        (deltaTable.alias("t")
            .merge(
                df_deduped.alias("s"), 
                "t.store_id = s.store_id AND t.product_id = s.product_id"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

In [0]:
print("Processing Inventory...")

q1 = (df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths["checkpoints"] + "inv_silver")
    .trigger(availableNow=True)
    .start())

q2 = (df_quarantine.writeStream
    .format("delta").outputMode("append")
    .option("checkpointLocation", paths["checkpoints"] + "inv_quarantine")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.inventory_quarantine"))

q1.awaitTermination()
q2.awaitTermination()
print("Inventory Done.")

In [0]:
%sql
select * from retail_lakehouse.silver.inventory

In [0]:
%sql 
select * from retail_lakehouse.quarantine.inventory_quarantine;

Silver Dimension Products

In [0]:
# 1. Read Bronze Product stream
df_bronze = spark.readStream.table(f"{catalog}.bronze.dim_product")

In [0]:
# 2. Standardization (Soft fixes)

df_standardized = (
    df_bronze
    .withColumn("product_id", upper(trim(col("product_id"))))
    .withColumn("product_name", trim(col("product_name")))
    .withColumn("category", upper(trim(coalesce(col("category"), lit("UNKNOWN")))))
    .withColumn("brand", upper(trim(coalesce(col("brand"), lit("UNKNOWN")))))
    .withColumn("cost_price", col("cost_price").cast("double"))
    .withColumn("list_price", col("list_price").cast("double"))
)

In [0]:
# 3. Data Quality Tagging
df_tagged = (
    df_standardized
    .withColumn(
        "is_valid",
        when(col("product_id").isNull(), False)
        .when(col("cost_price") < 0, False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("product_id").isNull(), "Missing Product ID")
        .when(col("cost_price") < 0, "Negative Price")
        .otherwise(lit(None))
    )
)

In [0]:
# 4. Split Good & Bad Records
df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. Business Enrichment
df_enriched = (
    df_clean
    .withColumn("processed_ts", current_timestamp())
)

In [0]:
# 6. Deduplication & Merge 

def upsert_to_silver(microBatchDF, batchId):

    windowSpec = Window.partitionBy("product_id").orderBy(col("ingest_ts").desc())
    df_deduped = (
        microBatchDF
        .withColumn("row_num", row_number().over(windowSpec))
        .filter(col("row_num") == 1)
        .drop("row_num")
    )

    # Merge logic
    target_table_name = f"{catalog}.silver.dim_product"
    
    if not spark.catalog.tableExists(target_table_name):
        # Create table if not exists
        df_deduped.write.format("delta").saveAsTable(target_table_name)
    else:
        # Merge if exists
        deltaTable = DeltaTable.forName(spark, target_table_name)
        (deltaTable.alias("t")
            .merge(
                df_deduped.alias("s"), 
                "t.product_id = s.product_id"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

In [0]:
# 7. Start Streams

print("Processing valid Products to Silver...")

query_silver = (
    df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths["checkpoints"] + "silver_dim_product")
    .trigger(availableNow=True)
    .start()
)

print("Processing invalid Products to Quarantine...")

query_quarantine = (
    df_quarantine.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", paths["checkpoints"] + "quarantine_dim_product")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.dim_product_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()

print("Silver Product Processing Complete.")

In [0]:
%sql 
select * from retail_lakehouse.silver.dim_product;

In [0]:
%sql 
select * from retail_lakehouse.quarantine.dim_product_quarantine;

Silver Dimension Store

In [0]:
# 1. Read Bronze Store stream
df_bronze = spark.readStream.table(f"{catalog}.bronze.dim_store")

In [0]:
# 2. Standardization

df_standardized = (
    df_bronze
    .withColumn("store_id", upper(trim(col("store_id"))))
    .withColumn("store_name", trim(col("store_name")))
    .withColumn("city", initcap(trim(coalesce(col("city"), lit("UNKNOWN")))))
    .withColumn("region", upper(trim(coalesce(col("region"), lit("UNKNOWN")))))
)

In [0]:
# 3. Data Quality Tagging
df_tagged = (
    df_standardized
    .withColumn(
        "is_valid",
        when(col("store_id").isNull(), False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("store_id").isNull(), "Missing Store ID")
        .otherwise(lit(None))
    )
)

In [0]:
# 4. Split Good & Bad Records

df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. Enrichment
df_enriched = (
    df_clean.withColumn("processed_ts", current_timestamp())
)

In [0]:
# 6. Deduplication & Merge

def upsert_to_silver(microBatchDF, batchId):

    windowSpec = Window.partitionBy("store_id").orderBy(col("ingest_ts").desc())
    df_deduped = (
        microBatchDF
        .withColumn("row_num", row_number().over(windowSpec))
        .filter(col("row_num") == 1)
        .drop("row_num")
    )

    target_table_name = f"{catalog}.silver.dim_store"
    
    if not spark.catalog.tableExists(target_table_name):
        df_deduped.write.format("delta").saveAsTable(target_table_name)
    else:
        deltaTable = DeltaTable.forName(spark, target_table_name)
        (deltaTable.alias("t")
            .merge(df_deduped.alias("s"), "t.store_id = s.store_id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

In [0]:
# 7. Start Streams

print("Processing valid Stores to Silver...")

query_silver = (
    df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths["checkpoints"] + "silver_dim_store")
    .trigger(availableNow=True)
    .start()
)

print("Processing invalid Stores to Quarantine...")

query_quarantine = (
    df_quarantine.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", paths["checkpoints"] + "quarantine_dim_store")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.dim_store_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()

print("Silver Store Processing Complete.")

In [0]:
%sql
select * from retail_lakehouse.silver.dim_store;

In [0]:
%sql
select * from retail_lakehouse.quarantine.dim_store_quarantine;

Silver Dimension Customer


In [0]:
# 1. Read Bronze Customer stream
df_bronze = spark.readStream.table(f"{catalog}.bronze.dim_customer")

In [0]:
# 2. Standardization

df_standardized = (
    df_bronze
    .withColumn("customer_id", upper(trim(col("customer_id"))))
    .withColumn("name", initcap(trim(col("name"))))
    .withColumn("email", lower(trim(coalesce(col("email"), lit("unknown@nomail.com")))))
    .withColumn("city", initcap(trim(coalesce(col("city"), lit("UNKNOWN")))))
)

In [0]:
# 3. Data Quality Tagging

df_tagged = (
    df_standardized
    .withColumn(
        "is_valid",
        when(col("customer_id").isNull(), False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("customer_id").isNull(), "Missing Customer ID")
        .otherwise(lit(None))
    )
)

In [0]:
# 4. Split Good & Bad Records

df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. Enrichment

df_enriched = (
    df_clean.withColumn("processed_ts", current_timestamp())
)

In [0]:
# 6. Deduplication & Merge

def upsert_to_silver(microBatchDF, batchId):

    windowSpec = Window.partitionBy("customer_id").orderBy(col("ingest_ts").desc())
    df_deduped = (
        microBatchDF
        .withColumn("row_num", row_number().over(windowSpec))
        .filter(col("row_num") == 1)
        .drop("row_num")
    )

    target_table_name = f"{catalog}.silver.dim_customer"
    
    if not spark.catalog.tableExists(target_table_name):
        df_deduped.write.format("delta").saveAsTable(target_table_name)
    else:
        deltaTable = DeltaTable.forName(spark, target_table_name)
        (deltaTable.alias("t")
            .merge(df_deduped.alias("s"), "t.customer_id = s.customer_id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

In [0]:
# 7. Start Streams

print("Processing valid Customers to Silver...")

query_silver = (
    df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths["checkpoints"] + "silver_dim_customer")
    .trigger(availableNow=True)
    .start()
)

print("Processing invalid Customers to Quarantine...")

query_quarantine = (
    df_quarantine.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", paths["checkpoints"] + "quarantine_dim_customer")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.dim_customer_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()

print("Silver Customer Processing Complete.")

In [0]:
%sql select * from retail_lakehouse.silver.dim_customer;

In [0]:
%sql select * from retail_lakehouse.quarantine.dim_customer_quarantine;

Silver Dimesnion Promotion

In [0]:
# 1. Read Bronze Promotion stream
df_bronze = spark.readStream.table(f"{catalog}.bronze.dim_promotion")

In [0]:
# 2. Standardization
df_standardized = (
    df_bronze
    .withColumn("promo_code", upper(trim(col("promo_code"))))
    .withColumn("promo_type", upper(trim(coalesce(col("promo_type"), lit("STANDARD")))))
    .withColumn("discount_rule", trim("discount_rule"))
    .withColumn("start_date",to_date(trim(col("start_date"))))
    .withColumn("end_date",to_date(trim(col("end_date"))))
)

In [0]:
# 3. Data Quality Tagging
df_tagged = (
    df_standardized
    .withColumn(
        "is_valid",
        when(col("promo_code").isNull(), False)
        .otherwise(True)
    )
    .withColumn(
        "dq_issue",
        when(col("promo_code").isNull(), "Missing Promo Code")
        .otherwise(lit(None))
    )
)

In [0]:
# 4. Split Good & Bad Records

df_clean = df_tagged.filter(col("is_valid") == True).drop("is_valid", "dq_issue")
df_quarantine = df_tagged.filter(col("is_valid") == False)

In [0]:
# 5. Enrichment
df_enriched = (
    df_clean.withColumn("processed_ts", current_timestamp())
)

In [0]:
# 6. Deduplication & Merge

def upsert_to_silver(microBatchDF, batchId):

    windowSpec = Window.partitionBy("promo_code").orderBy(col("ingest_ts").desc())
    df_deduped = (
        microBatchDF
        .withColumn("row_num", row_number().over(windowSpec))
        .filter(col("row_num") == 1)
        .drop("row_num")
    )

    target_table_name = f"{catalog}.silver.dim_promotion"
    
    if not spark.catalog.tableExists(target_table_name):
        df_deduped.write.format("delta").saveAsTable(target_table_name)
    else:
        deltaTable = DeltaTable.forName(spark, target_table_name)
        (deltaTable.alias("t")
            .merge(df_deduped.alias("s"), "t.promo_code = s.promo_code")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )

In [0]:
# 7. Start Streams
print("Processing valid Promotions to Silver...")

query_silver = (
    df_enriched.writeStream
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", paths["checkpoints"] + "silver_dim_promotion")
    .trigger(availableNow=True)
    .start()
)

print("Processing invalid Promotions to Quarantine...")

query_quarantine = (
    df_quarantine.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", paths["checkpoints"] + "quarantine_dim_promotion")
    .trigger(availableNow=True)
    .toTable(f"{catalog}.quarantine.dim_promotion_quarantine")
)

query_silver.awaitTermination()
query_quarantine.awaitTermination()

print("Silver Promotion Processing Complete.")

In [0]:
%sql 
select * from retail_lakehouse.silver.dim_promotion;

In [0]:
%sql
select * from retail_lakehouse.quarantine.dim_promotion_quarantine;