In [0]:
from pyspark.sql.functions import col, lower, trim, current_timestamp, row_number, desc, lit, when
from pyspark.sql.window import Window
from delta.tables import DeltaTable
from functools import reduce

def transform_to_silver(entity_name, primary_key):
    # 1. Paths and Table Names
    bronze_path = f"/Volumes/sales_project/bronze/landing_zone_metadata/data/{entity_name}"
    
    if entity_name in ["orders", "order_items"]:
        silver_table_name = f"sales_project.silver.dim_{entity_name}"
        history_table_name = f"sales_project.silver.history_dim_{entity_name}"
    else:
        silver_table_name = f"sales_project.silver.fact_{entity_name}"
        history_table_name = f"sales_project.silver.history_fact_{entity_name}"

    print(f"Processing: {entity_name} -> {silver_table_name}")

    # 2. Read and DQM (Data Quality Management)
    df_bronze = spark.read.format("delta").load(bronze_path)
    
    # DQM: Trim strings
    for column in df_bronze.columns:
        if dict(df_bronze.dtypes)[column] == "string":
            df_bronze = df_bronze.withColumn(column, trim(col(column)))
    
    # DQM: Lowercase email
    if "email" in df_bronze.columns:
        df_bronze = df_bronze.withColumn("email", lower(col("email")))
        
    # DQM: Remove rows with null or empty Primary Keys
    df_bronze = df_bronze.filter((col(primary_key).isNotNull()) & (col(primary_key) != ""))

    # Deduplication: Ensure only the latest record per PK from Bronze is processed
    window_spec = Window.partitionBy(primary_key).orderBy(desc("ingested_at"))
    df_bronze_latest = df_bronze.withColumn("rn", row_number().over(window_spec)) \
                                .filter(col("rn") == 1).drop("rn")

    # 3. Day 1: Initialize Silver Table
    if not spark.catalog.tableExists(silver_table_name):
        print(f"Day 1 load for {entity_name}")
        (df_bronze_latest
         .withColumn("active_flag", lit("Y"))
         .withColumn("change_type", lit("NC"))
         .withColumn("silver_updated_at", current_timestamp())
         .write.format("delta").saveAsTable(silver_table_name))
        return

    # 4. Day 2+: Compare Silver and Bronze
    df_silver = spark.table(silver_table_name)

    df_join = df_bronze_latest.alias("b").join(
        df_silver.alias("s"),
        col(f"b.{primary_key}") == col(f"s.{primary_key}"),
        "outer"
    )

    # Compare columns for Updates (Excluding technical/metadata columns)
    cols_to_compare = [c for c in df_bronze_latest.columns if c != primary_key]
    compare_exprs = [col(f"b.{c}") != col(f"s.{c}") for c in cols_to_compare]

    # Resolve ambiguity and define Change Type
    df_changes = df_join.select(
        *[when(col(f"b.{c}").isNotNull(), col(f"b.{c}")).otherwise(col(f"s.{c}")).alias(c) for c in df_bronze_latest.columns],
        when(col(f"s.{primary_key}").isNull(), lit("I"))
        .when(col(f"b.{primary_key}").isNull(), lit("D"))
        .when(reduce(lambda a, b: a | b, compare_exprs), lit("U"))
        .otherwise(lit("NC")).alias("change_type")
    )

    # 5. Archive Logic (Move to History)
    # Get current Silver records for items that are being Updated (U) or Deleted (D)
    df_to_history = df_silver.alias("old_s").join(
        df_changes.filter(col("change_type").isin("U", "D")).select(primary_key),
        primary_key
    )

    # Update flags for History: N for Updates, D for Deletes
    df_history_final = df_to_history.withColumn(
        "active_flag", when(col("change_type") == "U", lit("N")).otherwise(lit("D"))
    ).withColumn("archived_at", current_timestamp())

    if not df_history_final.isEmpty():
        df_history_final.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(history_table_name)

    # 6. Update Silver Table (Current Snapshot)
    silver_table = DeltaTable.forName(spark, silver_table_name)
    
    # Physically remove Deletes (D) and Old versions of Updates (U) from Silver
    ids_to_remove = [row[primary_key] for row in df_changes.filter(col("change_type").isin("U", "D")).select(primary_key).collect()]
    if ids_to_remove:
        silver_table.delete(col(primary_key).isin(ids_to_remove))

    # Insert New (I) and New versions of Updates (U) back into Silver as Active
    df_to_upsert = df_changes.filter(col("change_type").isin("I", "U")) \
                             .withColumn("active_flag", lit("Y")) \
                             .withColumn("silver_updated_at", current_timestamp())

    if not df_to_upsert.isEmpty():
        df_to_upsert.write.format("delta").mode("append").saveAsTable(silver_table_name)

    print(f"Success: {entity_name} processed according to Day 2 logic.")

In [0]:
# List of tables with their name and primary keys 
entity_list = [
    {"name" : "customers", "pk" : "customer_id"},
    {"name" : "products", "pk" : "product_id"},
    {"name" : "orders", "pk" : "order_id"},
    {"name" : "order_items", "pk" : "order_item_id"}
]

# Initialize the Silver Schema
spark.sql("CREATE SCHEMA IF NOT EXISTS sales_project.silver")

# Run the pipeline
for entity in entity_list:
    transform_to_silver(entity["name"], entity["pk"])

In [0]:
display(spark.table("sales_project.silver.fact_customers"))
# You should see all records with active_flag = 'Y' and change_type = 'NC'