In [0]:
#Import
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, expr, current_timestamp, to_timestamp, sha2, concat_ws, coalesce, monotonically_increasing_id
from delta.tables import DeltaTable
from pyspark.sql import Window

In [0]:
#ADLS configuration 
spark.conf.set(
  "fs.azure.account.key.<storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>"
)

In [0]:
# Paths
silver_path = "abfss://<silver-container>@<storage-account-name>.dfs.core.windows.net/patient_flow"
gold_dim_patient = "abfss://<gold-container>@<storage-account-name>.dfs.core.windows.net/dim_patient"
gold_dim_department = "abfss://<gold-container>@<storage-account-name>.dfs.core.windows.net/dim_department"
gold_fact = "abfss://<gold-container>@<storage-account-name>.dfs.core.windows.net/fact_patient_flow"

In [0]:
# Read silver data (assume append-only)
silver_df = spark.read.format("delta").load(silver_path)

In [0]:
# Define window for latest admission per patient
w = Window.partitionBy("patient_id").orderBy(F.col("admission_time").desc())

In [0]:
silver_df = (
    silver_df
    .withColumn("row_num", F.row_number().over(w))  # Rank by latest admission_time
    .filter(F.col("row_num") == 1)                  # Keep only latest row
    .drop("row_num")
)

In [0]:
#Patient Dimension Table Creation
# Prepare incoming dimension records (deduplicated per patient, latest record)
incoming_patient = (silver_df
                    .select("patient_id", "gender", "age")
                    .withColumn("effective_from", current_timestamp())
                   )

In [0]:
# Create target if not exists
if not DeltaTable.isDeltaTable(spark, gold_dim_patient):
    # initialize table with schema and empty data
    incoming_patient.withColumn("surrogate_key", F.monotonically_increasing_id()) \
                    .withColumn("effective_to", lit(None).cast("timestamp")) \
                    .withColumn("is_current", lit(True)) \
                    .write.format("delta").mode("overwrite").save(gold_dim_patient)

In [0]:
# Load target as DeltaTable
target_patient = DeltaTable.forPath(spark, gold_dim_patient)

In [0]:
# Create an expression to detect attribute changes (hash or explicit comparisons)
# We'll use a simple concat hash to detect changes
# Creare SHA2 secure hashging uniquet to each patient
incoming_patient = incoming_patient.withColumn(
    "_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
)

In [0]:
# Bring target current hash
target_patient_df = spark.read.format("delta").load(gold_dim_patient).withColumn(
    "_target_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
).select("surrogate_key", "patient_id", "gender", "age", "is_current", "_target_hash", "effective_from", "effective_to")


In [0]:
# Create temp views for merge
incoming_patient.createOrReplaceTempView("incoming_patient_tmp")
target_patient_df.createOrReplaceTempView("target_patient_tmp")

In [0]:
%sql
SELECT * FROM incoming_patient_tmp;

patient_id,gender,age,effective_from,_hash
00e40388-0843-491d-bc72-266fdbc1ad15,Female,1,2025-09-23T07:52:50.481269Z,b08a19b1b8e69e6e60e9bdaa5b3a4f9a2b60bfd0b004ed37a88f5f3f4e16d4ef
010d7c5e-fcc0-4998-96d0-d4a1f45d3dc8,Female,71,2025-09-23T07:52:50.481269Z,621b79738e02f2d318b2107a2953bea1caba6328ae8a8953436ba29ff9d9230f
014b817b-ec75-4ae5-8f2b-94c4fa27692a,Female,72,2025-09-23T07:52:50.481269Z,b1edc496cdb013da764f334ef08ddd6dfbbd1ab945145b85e720f664b92598c2
01561f53-e69c-435b-b2aa-129ba72343a3,Female,7,2025-09-23T07:52:50.481269Z,c298370ca500a76e71094a96f5d48862408fe9a2d56034cd3294ffd7427b45b2
0193dedc-51fa-458e-b003-252711fa8397,Female,5,2025-09-23T07:52:50.481269Z,a9e40a4bc7152a9c633e26da659cad657fe21b617a2bd7946cfd703cc4d33b78
01ba7e1e-382a-48be-bd8d-26bbb4a40550,Female,29,2025-09-23T07:52:50.481269Z,2bda94ac9d1a5a873cfaf1651f547dfd83029a573eb8d9c5fb761cccb061dca0
01cb2862-68c7-454c-8b29-db469a9acdc0,Male,95,2025-09-23T07:52:50.481269Z,4552ce552d7ab6025fec8f87ada802894bf8d1c524192abba66b818d4772c8d8
0265cc8f-ebe1-4f69-ab52-9a0b620781d8,Female,44,2025-09-23T07:52:50.481269Z,28b2ce5cdc4673c15f3b2697a66ec2cfb5aff7c3902938f07ed23575009af7b7
0267d11a-22d3-42e5-9355-01023c63d169,Female,22,2025-09-23T07:52:50.481269Z,4ecbe9e979ade08f9e0e078a14f47283d86a7004caf34c76f36d555075cdb511
02f9fd4b-7a75-4597-b843-c03edf5e1bc6,Female,20,2025-09-23T07:52:50.481269Z,81396fc99347dae73bb5165d4d9919258f7381f180fa12edbf6d02e002a23369


In [0]:
%sql
SELECT * FROM target_patient_tmp;

surrogate_key,patient_id,gender,age,is_current,_target_hash,effective_from,effective_to
0,00e40388-0843-491d-bc72-266fdbc1ad15,Female,1,True,b08a19b1b8e69e6e60e9bdaa5b3a4f9a2b60bfd0b004ed37a88f5f3f4e16d4ef,2025-09-23T07:25:10.759868Z,
1,010d7c5e-fcc0-4998-96d0-d4a1f45d3dc8,Female,71,True,621b79738e02f2d318b2107a2953bea1caba6328ae8a8953436ba29ff9d9230f,2025-09-23T07:25:10.759868Z,
2,014b817b-ec75-4ae5-8f2b-94c4fa27692a,Female,72,True,b1edc496cdb013da764f334ef08ddd6dfbbd1ab945145b85e720f664b92598c2,2025-09-23T07:25:10.759868Z,
3,01561f53-e69c-435b-b2aa-129ba72343a3,Female,7,True,c298370ca500a76e71094a96f5d48862408fe9a2d56034cd3294ffd7427b45b2,2025-09-23T07:25:10.759868Z,
4,0193dedc-51fa-458e-b003-252711fa8397,Female,5,True,a9e40a4bc7152a9c633e26da659cad657fe21b617a2bd7946cfd703cc4d33b78,2025-09-23T07:25:10.759868Z,
5,01ba7e1e-382a-48be-bd8d-26bbb4a40550,Female,29,True,2bda94ac9d1a5a873cfaf1651f547dfd83029a573eb8d9c5fb761cccb061dca0,2025-09-23T07:25:10.759868Z,
6,01cb2862-68c7-454c-8b29-db469a9acdc0,Male,95,True,4552ce552d7ab6025fec8f87ada802894bf8d1c524192abba66b818d4772c8d8,2025-09-23T07:25:10.759868Z,
7,0265cc8f-ebe1-4f69-ab52-9a0b620781d8,Female,44,True,28b2ce5cdc4673c15f3b2697a66ec2cfb5aff7c3902938f07ed23575009af7b7,2025-09-23T07:25:10.759868Z,
8,0267d11a-22d3-42e5-9355-01023c63d169,Female,22,True,4ecbe9e979ade08f9e0e078a14f47283d86a7004caf34c76f36d555075cdb511,2025-09-23T07:25:10.759868Z,
9,02f9fd4b-7a75-4597-b843-c03edf5e1bc6,Female,20,True,81396fc99347dae73bb5165d4d9919258f7381f180fa12edbf6d02e002a23369,2025-09-23T07:25:10.759868Z,


### We'll implement in two steps using Delta MERGE (safe & explicit)

In [0]:
# 1) Mark old current rows as not current where changed
changes_df = spark.sql("""
SELECT t.surrogate_key, t.patient_id
FROM target_patient_tmp t
JOIN incoming_patient_tmp i
  ON t.patient_id = i.patient_id
WHERE t.is_current = true AND t._target_hash <> i._hash
""")

changed_keys = [row['surrogate_key'] for row in changes_df.collect()]

if changed_keys:
    # Update existing current records: set is_current=false and effective_to=current_timestamp()
    target_patient.update(
        condition = expr("is_current = true AND surrogate_key IN ({})".format(",".join([str(k) for k in changed_keys]))),
        set = {
            "is_current": expr("false"),
            "effective_to": expr("current_timestamp()")
        }
    )

In [0]:
# 2) Insert new rows for changed & new records
# Build insert DF: join incoming with target to figure new inserts where either not exists or changed
inserts_df = spark.sql("""
SELECT i.patient_id, i.gender, i.age, i.effective_from, i._hash
FROM incoming_patient_tmp i
LEFT JOIN target_patient_tmp t
  ON i.patient_id = t.patient_id AND t.is_current = true
WHERE t.patient_id IS NULL OR t._target_hash <> i._hash
""").withColumn("surrogate_key", F.monotonically_increasing_id()) \
  .withColumn("effective_to", lit(None).cast("timestamp")) \
  .withColumn("is_current", lit(True)) \
  .select("surrogate_key", "patient_id", "gender", "age", "effective_from", "effective_to", "is_current")

# Append new rows
if inserts_df.count() > 0:
    inserts_df.write.format("delta").mode("append").save(gold_dim_patient)

#### Department Dimension Table Creation 

In [0]:
# prepare incoming (latest per patient feed snapshot)
incoming_dept = (silver_df
                 .select("department", "hospital_id")
                )

In [0]:
# add hash and dedupe incoming (one row per natural key)
incoming_dept = incoming_dept.dropDuplicates(["department", "hospital_id"]) \
    .withColumn("surrogate_key", monotonically_increasing_id())

In [0]:
# initialize table if missing
incoming_dept.select("surrogate_key", "department", "hospital_id") \
    .write.format("delta").mode("overwrite").save(gold_dim_department)

#### Create Fact table

In [0]:
# Read current dims (filter is_current=true)
dim_patient_df = (spark.read.format("delta").load(gold_dim_patient)
                  .filter(col("is_current") == True)
                  .select(col("surrogate_key").alias("surrogate_key_patient"), "patient_id", "gender", "age"))

dim_dept_df = (spark.read.format("delta").load(gold_dim_department)
               .select(col("surrogate_key").alias("surrogate_key_dept"), "department", "hospital_id"))

In [0]:
# Build base fact from silver events
fact_base = (silver_df
             .select("patient_id", "department", "hospital_id", "admission_time", "discharge_time", "bed_id")
             .withColumn("admission_date", F.to_date("admission_time"))
            )

In [0]:
# Join to get surrogate keys
fact_enriched = (fact_base
                 .join(dim_patient_df, on="patient_id", how="left")
                 .join(dim_dept_df, on=["department", "hospital_id"], how="left")
                )

In [0]:
# Compute metrics
fact_enriched = fact_enriched.withColumn("length_of_stay_hours",
                                         (F.unix_timestamp(col("discharge_time")) - F.unix_timestamp(col("admission_time"))) / 3600.0) \
                             .withColumn("is_currently_admitted", F.when(col("discharge_time") > current_timestamp(), lit(True)).otherwise(lit(False))) \
                             .withColumn("event_ingestion_time", current_timestamp())

In [0]:
# Let's make column names explicit instead:
fact_final = fact_enriched.select(
    F.monotonically_increasing_id().alias("fact_id"),
    col("surrogate_key_patient").alias("patient_sk"),
    col("surrogate_key_dept").alias("department_sk"),
    "admission_time",
    "discharge_time",
    "admission_date",
    "length_of_stay_hours",
    "is_currently_admitted",
    "bed_id",
    "event_ingestion_time"
)

In [0]:
# Persist fact table partitioned by admission_date (helps Synapse / queries)
fact_final.write.format("delta").mode("overwrite").save(gold_fact)

In [0]:
# Quick sanity checks
print("Patient dim count:", spark.read.format("delta").load(gold_dim_patient).count())
print("Department dim count:", spark.read.format("delta").load(gold_dim_department).count())
print("Fact rows:", spark.read.format("delta").load(gold_fact).count())

Patient dim count: 1084
Department dim count: 49
Fact rows: 1084


In [0]:
#gold dim patients
display(spark.read.format("delta").load(gold_dim_patient))

patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
00e40388-0843-491d-bc72-266fdbc1ad15,Female,1,2025-09-23T07:25:10.759868Z,0,,True
010d7c5e-fcc0-4998-96d0-d4a1f45d3dc8,Female,71,2025-09-23T07:25:10.759868Z,1,,True
014b817b-ec75-4ae5-8f2b-94c4fa27692a,Female,72,2025-09-23T07:25:10.759868Z,2,,True
01561f53-e69c-435b-b2aa-129ba72343a3,Female,7,2025-09-23T07:25:10.759868Z,3,,True
0193dedc-51fa-458e-b003-252711fa8397,Female,5,2025-09-23T07:25:10.759868Z,4,,True
01ba7e1e-382a-48be-bd8d-26bbb4a40550,Female,29,2025-09-23T07:25:10.759868Z,5,,True
01cb2862-68c7-454c-8b29-db469a9acdc0,Male,95,2025-09-23T07:25:10.759868Z,6,,True
0265cc8f-ebe1-4f69-ab52-9a0b620781d8,Female,44,2025-09-23T07:25:10.759868Z,7,,True
0267d11a-22d3-42e5-9355-01023c63d169,Female,22,2025-09-23T07:25:10.759868Z,8,,True
02f9fd4b-7a75-4597-b843-c03edf5e1bc6,Female,20,2025-09-23T07:25:10.759868Z,9,,True


In [0]:
display(spark.read.format("delta").load(gold_dim_patient).filter("patient_id = 'edf0f166-6d42-447d-89dd-2f8f1277c55c'")) 

patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
edf0f166-6d42-447d-89dd-2f8f1277c55c,Female,22,2025-09-23T07:45:58.54305Z,0,,True


In [0]:
#gold dim department
display(spark.read.format("delta").load(gold_dim_department))

surrogate_key,department,hospital_id
0,Pediatrics,5
1,Surgery,3
2,Surgery,2
3,ICU,3
4,Emergency,5
5,Emergency,7
6,Cardiology,1
7,Surgery,1
8,Oncology,3
9,Emergency,6


In [0]:
#gold fact
display(spark.read.format("delta").load(gold_fact))

fact_id,patient_sk,department_sk,admission_time,discharge_time,admission_date,length_of_stay_hours,is_currently_admitted,bed_id,event_ingestion_time
0,0,0,2025-09-23T04:57:46.059044Z,2025-09-25T08:57:46.059044Z,2025-09-23,52.0,True,462,2025-09-23T07:52:56.841726Z
1,1,32,2025-09-22T00:04:41.333079Z,2025-09-24T10:04:41.333079Z,2025-09-22,58.0,True,163,2025-09-23T07:52:56.841726Z
2,2,18,2025-09-21T17:56:18.002118Z,2025-09-23T21:56:18.002118Z,2025-09-21,52.0,True,209,2025-09-23T07:52:56.841726Z
3,3,26,2025-09-20T19:57:32.050569Z,2025-09-21T02:57:32.050569Z,2025-09-20,7.0,False,150,2025-09-23T07:52:56.841726Z
4,4,25,2025-09-23T03:56:09.997618Z,2025-09-25T10:56:09.997618Z,2025-09-23,55.0,True,116,2025-09-23T07:52:56.841726Z
5,5,25,2025-09-22T12:54:33.931322Z,2025-09-23T09:54:33.931322Z,2025-09-22,21.0,True,118,2025-09-23T07:52:56.841726Z
6,6,19,2025-09-23T05:53:05.871839Z,2025-09-25T18:53:05.871839Z,2025-09-23,61.0,True,491,2025-09-23T07:52:56.841726Z
7,7,46,2025-09-22T19:57:45.058576Z,2025-09-23T03:57:45.058576Z,2025-09-22,8.0,False,453,2025-09-23T07:52:56.841726Z
8,8,26,2025-09-23T04:01:11.187045Z,2025-09-25T07:01:11.187045Z,2025-09-23,51.0,True,221,2025-09-23T07:52:56.841726Z
9,9,3,2025-09-21T21:59:04.10988Z,2025-09-23T21:59:04.10988Z,2025-09-21,48.0,True,123,2025-09-23T07:52:56.841726Z
