In [0]:
# silver_encounters_scd1.ipynb
# SOURCE: Reads encounter records from Bronze (with Delta CDF) into Silver.
# OUTPUT: `kardia_silver.silver_encounters`, updated incrementally.
# TRIGGER:
# In batch mode: Reads available data and exits.
# In stream mode: Runs continuously with 30s micro-batches.

%pip install -q --no-deps --no-index --find-links=/dbfs/Shared/libs kflow
from kflow.config import BRONZE_DB, SILVER_DB, bronze_table, silver_paths, CHANGE_TYPES

from delta.tables import DeltaTable
from pyspark.sql import functions as F

# Load Silver paths
S = silver_paths("encounters")
SRC_TABLE = bronze_table("encounters")
TGT_TABLE = S.table

In [None]:
# Mode widget & flags
try:
    dbutils.widgets.dropdown("mode", "batch", ["batch", "stream"])
except:
    pass
MODE = dbutils.widgets.get("mode") if "dbutils" in globals() else "batch"
IS_BATCH = (MODE == "batch")
CHECKPOINT = f"{S.checkpoint}/{MODE}"

In [0]:
# 1. Ensure Silver DB and Encounters table exists
spark.sql(f"CREATE DATABASE IF NOT EXISTS {S.db}")

spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {TGT_TABLE} (
      encounter_id       STRING  NOT NULL,
      patient_id         STRING  NOT NULL,
      START_TS           TIMESTAMP,
      CODE               STRING,
      DESCRIPTION        STRING,
      REASONCODE         STRING,
      REASONDESCRIPTION  STRING
    ) USING DELTA
    """
)

In [0]:
# 2. Define the upsert logic.
#    For each batch, update or insert records by EncounterID from Delta CDF.
#    `batch_df` is a static DF containing the latest new and updated rows from Bronze CDF.
def upsert_to_silver(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    (DeltaTable.forName(spark, TGT_TABLE)
               .alias("t")
               .merge(
                   batch_df.alias("s"),
                   "t.encounter_id = s.encounter_id AND t.patient_id = s.patient_id"
               )
               .whenMatchedUpdateAll()
               .whenNotMatchedInsertAll()
               .execute())

In [0]:
# 3a. Read new changes from the Bronze Encounters table.
bronze_cdf = (
    spark.readStream
         .format("delta")
         .option("readChangeFeed", "true")
         .table(SRC_TABLE)
         .filter(F.col("_change_type").isin(*CHANGE_TYPES))
)

In [0]:
# 3b. Enrich to seven-column Silver schema.
silver_ready = (
    bronze_cdf
        .withColumnRenamed("ID",      "encounter_id")
        .withColumnRenamed("PATIENT", "patient_id")
        .withColumnRenamed("DATE",    "EVENT_DATE_STR")

        # Parse the raw date string into two formats:
        # - EVENT_DATE (DateType)    - Useful for analytics
        # - EVENT_TS (TimestampType) - If source starts sending real datetimes (future-proof)
        .withColumn("EVENT_DATE",      F.to_date("EVENT_DATE_STR",      "yyyy-MM-dd"))
        .withColumn("EVENT_TS",        F.to_timestamp("EVENT_DATE_STR", "yyyy-MM-dd"))

        # Select final schema (exclude staging columns)
        .selectExpr(
            "encounter_id",
            "patient_id",
            "EVENT_TS as START_TS",
            "CODE",
            "DESCRIPTION",
            "REASONCODE",
            "REASONDESCRIPTION"
        )
)
# NOTE: Timestamp is parsed as midnight in session time zone (UTC by default)

In [0]:
# 3c. Write to Silver table using foreachBatch + MERGE for upserts
#     Process available data every 30 seconds
writer = (
    silver_ready.writeStream
                .foreachBatch(upsert_to_silver)
                .option("checkpointLocation", CHECKPOINT)
)

if IS_BATCH:
    q = writer.trigger(availableNow=True).start()
    print(f"[demo] Draining CDF -> {TGT_TABLE} (checkpoint={CHECKPOINT}) …")
    q.awaitTermination()
else:
    q = writer.trigger(processingTime="30 seconds").start()
    print(f"[live] Continuous 30s CDF upserts to {TGT_TABLE} (checkpoint={CHECKPOINT})")

# NOTE: MERGE will write last-writer-wins