
#### **Silver Data Transform**

- Read each MANAGED (or registered) table from the Bronze lakehouse/database.
- Clean & standardize data (e.g., email casing).
- Deduplicate rows by a table's "first" data column + Source (country/partition),
with table-specific overrides where needed.
- Add a SurrogateKey to each row for downstream joins/reporting.
- Incrementally upsert into Silver managed Delta tables via MERGE.

Design notes:
* Tables are discovered dynamically from the Bronze DB catalog.
* First data column = first non-meta column (not 'Source' nor 'SurrogateKey');
this is used as a simple business key when no explicit PK is known.
* We preserve existing SurrogateKey values on updates so keys remain stable.

In [1]:
from pyspark.sql import functions as F
from delta.tables import DeltaTable

# --- Databases ---
BRONZE_DB = "Lakehouse_Bz"                     # source
SILVER_DB = spark.catalog.currentDatabase()    # target (Silver)

# Discover all Bronze tables
bronze_tables = [t.name for t in spark.catalog.listTables(BRONZE_DB)
                 if t.tableType in ("MANAGED", "EXTERNAL", "TABLE")]

def normalize_columns(df):
    """
    Normalize names we rely on:
      - Ensure 'Source' is cased exactly as 'Source'
      - Ensure any email column is named 'email'
    """
    src_like = [c for c in df.columns if c.lower() == "source"]
    if src_like and src_like[0] != "Source":
        df = df.withColumnRenamed(src_like[0], "Source")

    email_like = [c for c in df.columns if c.lower() == "email"]
    if email_like and email_like[0] != "email":
        df = df.withColumnRenamed(email_like[0], "email")

    return df

for tbl in bronze_tables:
    src_fqn = f"{BRONZE_DB}.{tbl}"
    dst_fqn = f"{SILVER_DB}.{tbl}"

    # 1) Read Bronze
    try:
        df_src = spark.table(src_fqn)
    except Exception as e:
        print(f"⚠️ {tbl}: cannot read Bronze table — {e}")
        continue

    # 2) Normalize columns
    df = normalize_columns(df_src)

    # 3) Choose dedupe key: first non-meta column
    non_meta = [c for c in df.columns if c.lower() not in ("source", "surrogatekey")]
    if not non_meta:
        print(f"⚠️ {tbl}: no data columns to dedupe on; skipping")
        continue
    id_col = non_meta[0]

    # 4) Base dedupe by (id_col, Source) when Source exists; else by id_col
    if "Source" in df.columns:
        df = df.dropDuplicates([id_col, "Source"])
    else:
        df = df.dropDuplicates([id_col])

    # 5) Table-specific Transformations
    low = tbl.lower()
    if low == "customers" and "email" in df.columns:
        df = df.withColumn("email", F.lower("email")).dropDuplicates(["email"])
    elif low == "employees" and "email" in df.columns:
        df = df.withColumn("email", F.lower("email")).dropDuplicates(["email"])

    # 6) Create the Silver managed table if missing
    if not spark.catalog.tableExists(dst_fqn):
        writer = (df.write.format("delta")
                    .mode("overwrite")
                    .option("overwriteSchema", "true"))
        if "Source" in df.columns:
            writer = writer.partitionBy("Source")
        writer.saveAsTable(dst_fqn)
        print(f"🆕 {dst_fqn}: created (initial load)")
        continue

    # 7) Incremental MERGE (no surrogate key handling here)
    try:
        dt = DeltaTable.forName(spark, dst_fqn)

        # Join condition
        cond = f"t.{id_col} = s.{id_col}" + (" AND t.Source = s.Source" if "Source" in df.columns else "")

        # Update/Insert sets — all columns from the deduped/clean df
        set_all = {c: f"s.{c}" for c in df.columns}

        (dt.alias("t")
           .merge(df.alias("s"), cond)
           .whenMatchedUpdate(set=set_all)
           .whenNotMatchedInsert(values=set_all)
           .execute())

        print(f"✅ {dst_fqn}: merged on ({id_col}{', Source' if 'Source' in df.columns else ''})")
    except Exception as e:
        print(f"❌ {dst_fqn}: MERGE failed — {e}")


StatementMeta(, a7f9892a-a90a-4b03-82f2-ff145da64880, 3, Finished, Available, Finished)

🆕 lakehouse_sv.customers: created (initial load)
🆕 lakehouse_sv.employees: created (initial load)
🆕 lakehouse_sv.machines: created (initial load)
🆕 lakehouse_sv.mills: created (initial load)
🆕 lakehouse_sv.products: created (initial load)
🆕 lakehouse_sv.warehouses: created (initial load)
🆕 lakehouse_sv.inventorysnapshots: created (initial load)
🆕 lakehouse_sv.machinedowntime: created (initial load)
🆕 lakehouse_sv.machinesensors: created (initial load)
🆕 lakehouse_sv.orders: created (initial load)
🆕 lakehouse_sv.plannedproductions: created (initial load)
🆕 lakehouse_sv.shipments: created (initial load)
🆕 lakehouse_sv.stockmovements: created (initial load)


In [2]:
df = spark.sql("SELECT * FROM Lakehouse_Sv.orders order by orderdate desc LIMIT 10")
display(df)

StatementMeta(, a7f9892a-a90a-4b03-82f2-ff145da64880, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3c0987a2-b904-4350-b302-c0b25c643847)

In [3]:
from notebookutils import mssparkutils

delete_tables = False  # <-- set True to delete the folders too

if delete_tables:
    # same list you used when creating
    tables = [
        "Customers","Employees","Machines","Mills","Products","Warehouses",
        "InventorySnapshots","MachineDowntime","MachineSensors",
        "Orders","PlannedProductions","Shipments","StockMovements"
    ]

    # Drop from the current Lakehouse database (what Fabric shows as the schema)
    db = spark.catalog.currentDatabase()
    print(f"Dropping from database: {db}")

    for t in tables:
        spark.sql(f"DROP TABLE IF EXISTS {db}.{t}")
        print(f"🗑️  Dropped: {db}.{t}")

StatementMeta(, a7f9892a-a90a-4b03-82f2-ff145da64880, 5, Finished, Available, Finished)