In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable

In [0]:
# Process Features table to Silver with SCD Type 2 (FIXED - Works in ONE run)

# Step 1: Read from bronze and fix data types
df_features = spark.table("practise.bronze.features")
for c in ["MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5", "CPI", "Unemployment"]:
    df_features = df_features.withColumn(c, expr(f"try_cast({c} as double)"))

# Step 2: Add SCD Type 2 tracking columns
df_features = df_features.drop("Bronze_Ing_Time")
df_features = df_features.withColumn("Silver_Ing_Time", current_timestamp())
df_features = df_features.withColumn("is_current", lit(True))
df_features = df_features.withColumn("effective_start_date", current_timestamp())
df_features = df_features.withColumn("effective_end_date", lit(None).cast("timestamp"))

# Step 3: Merge to silver table
if spark.catalog.tableExists("practise.silver.features"):
    # FIXED: First expire old records, then insert new ones
    
    # Part 1: Expire old records that changed
    DeltaTable.forName(spark, "practise.silver.features").alias("old").merge(
        df_features.alias("new"),
        "old.Store = new.Store AND old.Date = new.Date AND old.is_current = true"
    ).whenMatchedUpdate(
        condition="old.Temperature != new.Temperature OR old.Fuel_Price != new.Fuel_Price OR old.MarkDown1 != new.MarkDown1 OR old.MarkDown2 != new.MarkDown2 OR old.MarkDown3 != new.MarkDown3 OR old.MarkDown4 != new.MarkDown4 OR old.MarkDown5 != new.MarkDown5 OR old.CPI != new.CPI OR old.Unemployment != new.Unemployment OR old.IsHoliday != new.IsHoliday",
        set={"is_current": "false", "effective_end_date": "current_timestamp()"}
    ).execute()
    
    # Part 2: Insert new/changed records
    DeltaTable.forName(spark, "practise.silver.features").alias("old").merge(
        df_features.alias("new"),
        "old.Store = new.Store AND old.Date = new.Date AND old.is_current = true"
    ).whenNotMatchedInsertAll().execute()
    
    print("✅ Merged to practise.silver.features (in ONE run)")
else:
    # First time: Create the table
    df_features.write.format("delta").saveAsTable("practise.silver.features")
    print("✅ Created practise.silver.features")

In [0]:
# Process Sales table to Silver with SCD Type 2 (FIXED - Works in ONE run)

# Step 1: Read from bronze
df_sales = spark.table("practise.bronze.sales")

# Step 2: Add SCD Type 2 tracking columns
df_sales = df_sales.drop("Bronze_Ing_Time")
df_sales = df_sales.withColumn("Silver_Ing_Time", current_timestamp())
df_sales = df_sales.withColumn("is_current", lit(True))
df_sales = df_sales.withColumn("effective_start_date", current_timestamp())
df_sales = df_sales.withColumn("effective_end_date", lit(None).cast("timestamp"))

# Step 3: Merge to silver table
if spark.catalog.tableExists("practise.silver.sales"):
    # FIXED: First expire old records, then insert new ones
    
    # Part 1: Expire old records that changed
    DeltaTable.forName(spark, "practise.silver.sales").alias("old").merge(
        df_sales.alias("new"),
        "old.Store = new.Store AND old.Dept = new.Dept AND old.Date = new.Date AND old.is_current = true"
    ).whenMatchedUpdate(
        condition="old.Weekly_Sales != new.Weekly_Sales OR old.IsHoliday != new.IsHoliday",
        set={"is_current": "false", "effective_end_date": "current_timestamp()"}
    ).execute()
    
    # Part 2: Insert new/changed records
    DeltaTable.forName(spark, "practise.silver.sales").alias("old").merge(
        df_sales.alias("new"),
        "old.Store = new.Store AND old.Dept = new.Dept AND old.Date = new.Date AND old.is_current = true"
    ).whenNotMatchedInsertAll().execute()
    
    print("✅ Merged to practise.silver.sales (in ONE run)")
else:
    # First time: Create the table
    df_sales.write.format("delta").saveAsTable("practise.silver.sales")
    print("✅ Created practise.silver.sales")

In [0]:
# Process Stores table to Silver with SCD Type 2 (FIXED - Works in ONE run)

# Step 1: Read from bronze
df_stores = spark.table("practise.bronze.stores")

# Step 2: Add SCD Type 2 tracking columns
df_stores = df_stores.drop("Bronze_Ing_Time")
df_stores = df_stores.withColumn("Silver_Ing_Time", current_timestamp())
df_stores = df_stores.withColumn("is_current", lit(True))
df_stores = df_stores.withColumn("effective_start_date", current_timestamp())
df_stores = df_stores.withColumn("effective_end_date", lit(None).cast("timestamp"))

# Step 3: Merge to silver table
if spark.catalog.tableExists("practise.silver.stores"):
    # FIXED: First expire old records, then insert new ones
    
    # Part 1: Expire old records that changed
    DeltaTable.forName(spark, "practise.silver.stores").alias("old").merge(
        df_stores.alias("new"),
        "old.Store = new.Store AND old.is_current = true"
    ).whenMatchedUpdate(
        condition="old.Type != new.Type OR old.Size != new.Size",
        set={"is_current": "false", "effective_end_date": "current_timestamp()"}
    ).execute()
    
    # Part 2: Insert new/changed records (records that don't have a current version)
    DeltaTable.forName(spark, "practise.silver.stores").alias("old").merge(
        df_stores.alias("new"),
        "old.Store = new.Store AND old.is_current = true"
    ).whenNotMatchedInsertAll().execute()
    
    print("✅ Merged to practise.silver.stores (in ONE run)")
else:
    # First time: Create the table
    df_stores.write.format("delta").saveAsTable("practise.silver.stores")
    print("✅ Created practise.silver.stores")

In [0]:
# Verify silver tables with row counts
for tbl in ["features", "sales", "stores"]:
    t = spark.table(f"practise.silver.{tbl}")
    print(f"\n=== SILVER.{tbl.upper()} ===")
    print(f"Total: {t.count():,} | Current: {t.filter(col('is_current') == True).count():,}")
    display(t.filter(col("is_current") == True).limit(5))

In [0]:
display(spark.table("practise.silver.stores"))