In [0]:
### Parameters ###

from datetime import datetime               # This imports the datetime class from Python, not Spark.
from pyspark.sql import functions as F      # This imports Spark SQL functions (like F.lit, F.sha2, F.current_timestamp) so we can build transformations.

storage_account = "millerinsurancedatalake"
container = "raw"
entity = "broker"
pk = "broker_code"

base = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
snapshot_root = f"{base}/landing/onprem/snapshot/{entity}"
silver_schema = "silver"
silver_table = f"{silver_schema}.{entity}"

print(f"base: {base}")
print(f"snapshot_root: {snapshot_root}")
print(f"silver_schema: {silver_schema}")
print(f"silver_table: {silver_table}")
print(f"pk: {pk}")

In [0]:
### Find latest + previous dt=... files ###

import re   # Imports Pythonâ€™s regex library. We use it to extract the date from folder names like dt=2026-01-22.

items = dbutils.fs.ls(snapshot_root)        

# dbutils.fs.ls lists the contents of a directory as FileInfo objects. Each FileInfo has fields like, path (full path string), name (file name), size (file size in bytes), modifcationTime (modification time in milliseconds since the epoch), etc.

dts = []
for it in items:
    m = re.search(r"dt=(\d{4}-\d{2}-\d{2})", it.path)   # regex to extract date from path --> re.search(patth, string)
    if m:
        dt = m.group(1)
        dts.append(dt)

dts = sorted(dts)  # just keep sorted list
latest_dt = dts[-1]
prev_dt = dts[-2] if len(dts) >= 2 else None

print(f"prev_dt: {prev_dt}")
print(f"latest_dt: {latest_dt}")

latest_path = f"{snapshot_root}/dt={latest_dt}"

print(f"latest_path: {latest_path}")

In [0]:
### Read latest snapshot CSV ###

new_df = (spark.read
          .format("csv")
          .option("header", "true")
          .option("inferSchema", "true")
          .load(latest_path))

# Add metadata we'll keep in silver
new_df = (new_df
          .withColumn("snapshot_date", F.lit(latest_dt))
          .withColumn("ingestion_date", F.current_timestamp()))

display(new_df)
display(new_df.select(pk).distinct().count())



In [0]:
### Define "business columns" + compute row hash ###

all_cols = new_df.columns
meta_cols = ["ingestion_date", "snapshot_date"]
business_cols = [c for c in all_cols if c not in meta_cols and c != pk]


# stable hash: cast to string + coalesce nulls
new_df = (new_df
          .withColumn("row_hash",
            F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in business_cols]), 256)))

# soft delete fields (for inserts/updates)
new_df = (new_df
          .withColumn("deleted_flag", F.lit(False))
          .withColumn("deleted_date", F.lit(None).cast("timestamp"))
          .withColumn("last_seen_date", F.lit(latest_dt)))

# display(new_df.select(pk, "row_hash", "deleted", "deleted_date", "last_seen_date"))

In [0]:
### Create schema + first-run table if needed ###

silver_db_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/_schemas/{silver_schema}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_schema} MANAGED LOCATION '{silver_db_path}'")


table_exists = spark.catalog.tableExists(silver_table)

if not table_exists:
    (new_df.select([pk] + business_cols + ["row_hash", "snapshot_date", "ingestion_date", "deleted_flag", "deleted_date", "last_seen_date"])
           .write
           .format("delta")
           .mode("overwrite")
           .option("path", silver_db_path)
           .saveAsTable(silver_table))
    print(f"Created table: {silver_table}")

In [0]:
### Build stage ops (I/U/D) vs existing silver

silver_df = spark.table(silver_table)

# keep only current-state columns we need for comparison
silver_active = silver_df.select(pk, "row_hash", "deleted_flag", "deleted_date", "last_seen_date")

# INSERTS: in new, not in silver
ins = (new_df.alias("n")
       .join(silver_active.alias("s"), on=pk, how="leftanti")
       .withColumn("op", F.lit("I")))

# UPDATES: in both and hash changed
upd = (new_df.alias("n")
       .join(silver_active.alias("s"), on=pk, how="inner")
       .where(F.col("n.row_hash") != F.col("s.row_hash"))
       .select("n.*")
       .withColumn("op", F.lit("U")))

# DELETES (missing once): in silver but not in new, and not already deleted
missing = (silver_df.alias("s")
           .join(new_df.alias("n"), on=pk, how="leftanti")
           .where(F.col("s.deleted_flag") == False)
           .select("s.*")
           .withColumn("op", F.lit("D"))
           # we don't have "new" values for deletes, but we still need pk + op
           .withColumn("deleted_date", F.current_timestamp())
           .withColumn("deleted_flag", F.lit(True)))

# stage = union of op sets with consistent columns
stage_cols = [pk] + business_cols + ["row_hash", "snapshot_date", "ingestion_date", "deleted_flag", "deleted_date", "last_seen_date", "op"]
stage = (ins.select(stage_cols)
         .unionByName(upd.select(stage_cols))
         .unionByName(missing.select(stage_cols), allowMissingColumns=True))

display(stage.groupBy("op").count())

print(missing.columns)


In [0]:
### MERGE stage into silver

stage.createOrReplaceTempView("stage_broker")

business_cols = [c.split(".")[-1] for c in business_cols]

# columns that exist in the silver table
update_cols = business_cols + [
    "row_hash",
    "snapshot_date",
    "ingestion_date",
    "deleted_flag",
    "deleted_date",
    "last_seen_date"
]

set_clause = ",\n".join([f"t.{c} = s.{c}" for c in update_cols])

insert_cols = [pk] + business_cols + [
    "row_hash",
    "snapshot_date",
    "ingestion_date",
    "deleted_flag",
    "deleted_date",
    "last_seen_date"
]
insert_cols_sql = ", ".join(insert_cols)
insert_vals_sql = ", ".join([f"s.{c}" for c in insert_cols])

spark.sql(f"""
MERGE INTO {silver_table} t
USING stage_broker s
ON t.{pk} = s.{pk}

WHEN MATCHED AND s.op = 'U' THEN
  UPDATE SET {set_clause}

WHEN MATCHED AND s.op = 'D' THEN
  UPDATE SET
    t.deleted_flag = true,
    t.deleted_date = s.deleted_date,
    t.last_seen_date = s.last_seen_date

WHEN NOT MATCHED AND s.op = 'I' THEN
  INSERT ({insert_cols_sql})
  VALUES ({insert_vals_sql})
""")


In [0]:
silver_path = "abfss://raw@millerinsurancedatalake.dfs.core.windows.net/silver/_schemas/silver"
dbutils.fs.ls(silver_path)  # should show _delta_log + part- files


In [0]:
df = spark.read.format("delta").load(silver_path)
df.show(20, truncate=False)
df.count()

In [0]:
## 1 Post-run data validation / sanity check

df.createOrReplaceTempView("broker_current_path")

spark.sql("SELECT COUNT(*) AS total_rows FROM broker_current_path").show()
spark.sql("SELECT COUNT(*) AS deleted_rows FROM broker_current_path WHERE deleted_flag = true").show()

In [0]:
## latest snapshot count (adjust path)
original_path = "abfss://raw@millerinsurancedatalake.dfs.core.windows.net/landing/onprem/snapshot/broker/dt=2026-01-21"
snap_path = "abfss://raw@millerinsurancedatalake.dfs.core.windows.net/landing/onprem/snapshot/broker/dt=2026-01-22"

original = spark.read.option("header", True).csv(original_path)
snap = spark.read.option("header", True).csv(snap_path)

print("original:", original.count())
print("snapshot:", snap.count())
print("silver:", df.count())

In [0]:
## DQ summary

from pyspark.sql import functions as F

pk = "broker_code"

# --- metrics ---
total_rows   = df.count()
deleted_rows = df.filter(F.col("deleted_flag") == True).count()
pk_nulls     = df.filter(F.col(pk).isNull()).count()

dupes_df = (
    df.filter(F.col("deleted_flag") == False)
      .groupBy(pk).count()
      .filter(F.col("count") > 1)
      .orderBy(F.col("count").desc())
)

dupe_rows = dupes_df.count()

# --- compact summary line ---
print(
    f"Rows={total_rows} | Deleted={deleted_rows} | PK nulls={pk_nulls} | PK dupes={dupe_rows}"
)

# --- only show tables when there's something to investigate ---
if pk_nulls > 0:
    display(df.filter(F.col(pk).isNull()).select(pk).limit(50))

if dupe_rows > 0:
    display(dupes_df.limit(50))
