In [0]:
# 02_silver_claims_current_stream.py
# SOURCE:  kardia_bronze.bronze_claims  (Delta Change Data Feed ON)
# OUTPUT:  kardia_silver.silver_claims_current  (SCD‑1: latest row per ClaimID)
# PATTERN: Structured Streaming CDF + foreachBatch upsert (mirrors 02_silver_patients_transform.py style).
# TRIGGER: Incremental batch via trigger(availableNow=True); checkpoint drives progress.
# STATE: Streaming checkpoint replaces the prior __pipeline_metadata version‑tracking table.
#        If checkpoint is lost, full Bronze history will replay; MERGE keeps target idempotent.
# AUDIT: Adds _ingest_ts at Silver load time (used downstream in Gold hourly metrics).

from pyspark.sql import functions as F, Window
from delta.tables import DeltaTable

# Table paths
SILVER_DB       = "kardia_silver"
BRONZE_TABLE    = "kardia_bronze.bronze_claims"
SILVER_TABLE    = f"{SILVER_DB}.silver_claims_current"
CHECKPOINT_PATH = "dbfs:/kardia/_checkpoints/silver_claims"
CHANGE_TYPES    = ["insert", "update_postimage"]

In [0]:
# 1. Ensure Silver DB & target table exist.
#    We declare the Silver schema explicitly (adds _ingest_ts) so downstream
#    code is stable even if Bronze evolves.

spark.sql(f"CREATE DATABASE IF NOT EXISTS {SILVER_DB}")

spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {SILVER_TABLE} (
      ClaimID               STRING,
      PatientID             STRING,
      ProviderID            STRING,
      ClaimAmount           DOUBLE,
      ClaimDate             STRING,
      DiagnosisCode         STRING,
      ProcedureCode         STRING,
      ClaimStatus           STRING,
      ClaimType             STRING,
      ClaimSubmissionMethod STRING,
      _ingest_ts            TIMESTAMP
    ) USING DELTA
    """
)


In [0]:
# 2.  Helper: prepare batch for SCD‑1 upsert.
#     Deduplicate within the micro‑batch by ClaimID ordering on _commit_version desc.
#     Drop CDF system columns; stamp _ingest_ts.

def _prepare_claims_df(batch_df):
    w = Window.partitionBy("ClaimID").orderBy(F.col("_commit_version").desc())
    latest = (batch_df
              .withColumn("row_num", F.row_number().over(w))
              .filter("row_num = 1")
              .drop("row_num", "_change_type", "_commit_version", "_commit_timestamp"))
    return latest.withColumn("_ingest_ts", F.current_timestamp())

In [0]:
# 3 ▸ foreachBatch upsert: SCD‑1 current image per ClaimID.

def upsert_to_silver(batch_df, _):
    silver_ready_df = _prepare_claims_df(batch_df)
    (DeltaTable.forName(spark, SILVER_TABLE)
               .alias("t")
               .merge(silver_ready_df.alias("s"), "t.ClaimID = s.ClaimID")
               .whenMatchedUpdateAll()
               .whenNotMatchedInsertAll()
               .execute())

In [0]:
# 4 ▸ Build CDF source stream from Bronze Claims.
#     We read Change Data Feed and keep only post‑image rows.
bronze_cdf = (
    spark.readStream
         .format("delta")
         .option("readChangeData", "true")
         .table(BRONZE_TABLE)
         .filter(F.col("_change_type").isin(*CHANGE_TYPES))
)

In [0]:
# 5 ▸ Start incremental availableNow run.
#     Checkpoint tracks source progress; reruns pick up only new commits.
query = (
    bronze_cdf.writeStream
              .foreachBatch(upsert_to_silver)
              .option("checkpointLocation", CHECKPOINT_PATH)
              .trigger(availableNow=True)
              .start()
)
query.awaitTermination()

In [0]:
# 6 ▸ Post‑run sanity checks.
print(f"Silver claims row count: {spark.table(SILVER_TABLE).count()}")
print(f"Checkpoint: {CHECKPOINT_PATH}")

# Preview
display(spark.table(SILVER_TABLE).limit(20))