In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, expr, current_timestamp, to_timestamp, sha2, concat_ws, coalesce, monotonically_increasing_id
from delta.tables import DeltaTable
from pyspark.sql import Window

#ADLS configuration 
spark.conf.set(
  "fs.azure.account.key.hospitalstorageac.dfs.core.windows.net",
  dbutils.secrets.get(scope = "hospitalanalyticsvaultscope", key = "storage-connection")
)


# Paths
silver_path = "abfss://silver@hospitalstorageac.dfs.core.windows.net/patient_flow"
gold_dim_patient = "abfss://gold@hospitalstorageac.dfs.core.windows.net/dim_patient"
gold_dim_department = "abfss://gold@hospitalstorageac.dfs.core.windows.net/dim_department"
gold_fact = "abfss://gold@hospitalstorageac.dfs.core.windows.net/fact_patient_flow"

# Read silver data (assume append-only)
silver_df = spark.read.format("delta").load(silver_path)

# Define window for latest admission per patient
w = Window.partitionBy("patient_id").orderBy(F.col("admission_time").desc())

silver_df = (
    silver_df
    .withColumn("row_num", F.row_number().over(w))  # Rank by latest admission_time
    .filter(F.col("row_num") == 1)                  # Keep only latest row
    .drop("row_num")
)

#Patient Dimension Table Creation
# Prepare incoming dimension records (deduplicated per patient, latest record)
incoming_patient = (silver_df
                    .select("patient_id", "gender", "age")
                    .withColumn("effective_from", current_timestamp())
                   )

# Create target if not exists
if not DeltaTable.isDeltaTable(spark, gold_dim_patient):
    # initialize table with schema and empty data
    incoming_patient.withColumn("surrogate_key", F.monotonically_increasing_id()) \
                    .withColumn("effective_to", lit(None).cast("timestamp")) \
                    .withColumn("is_current", lit(True)) \
                    .write.format("delta").mode("overwrite").save(gold_dim_patient)

# Load target as DeltaTable
target_patient = DeltaTable.forPath(spark, gold_dim_patient)

# Create an expression to detect attribute changes (hash or explicit comparisons)
# We'll use a simple concat hash to detect changes
incoming_patient = incoming_patient.withColumn(
    "_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
)

# Bring target current hash
target_patient_df = spark.read.format("delta").load(gold_dim_patient).withColumn(
    "_target_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
).select("surrogate_key", "patient_id", "gender", "age", "is_current", "_target_hash", "effective_from", "effective_to")

# Create temp views for merge
incoming_patient.createOrReplaceTempView("incoming_patient_tmp")
target_patient_df.createOrReplaceTempView("target_patient_tmp")

# We'll implement in two steps using Delta MERGE (safe & explicit)

# 1) Mark old current rows as not current where changed
changes_df = spark.sql("""
SELECT t.surrogate_key, t.patient_id
FROM target_patient_tmp t
JOIN incoming_patient_tmp i
  ON t.patient_id = i.patient_id
WHERE t.is_current = true AND t._target_hash <> i._hash
""")

changed_keys = [row['surrogate_key'] for row in changes_df.collect()]

if changed_keys:
    # Update existing current records: set is_current=false and effective_to=current_timestamp()
    target_patient.update(
        condition = expr("is_current = true AND surrogate_key IN ({})".format(",".join([str(k) for k in changed_keys]))),
        set = {
            "is_current": expr("false"),
            "effective_to": expr("current_timestamp()")
        }
    )

# 2) Insert new rows for changed & new records
# Build insert DF: join incoming with target to figure new inserts where either not exists or changed
inserts_df = spark.sql("""
SELECT i.patient_id, i.gender, i.age, i.effective_from, i._hash
FROM incoming_patient_tmp i
LEFT JOIN target_patient_tmp t
  ON i.patient_id = t.patient_id AND t.is_current = true
WHERE t.patient_id IS NULL OR t._target_hash <> i._hash
""").withColumn("surrogate_key", F.monotonically_increasing_id()) \
  .withColumn("effective_to", lit(None).cast("timestamp")) \
  .withColumn("is_current", lit(True)) \
  .select("surrogate_key", "patient_id", "gender", "age", "effective_from", "effective_to", "is_current")

# Append new rows
if inserts_df.count() > 0:
    inserts_df.write.format("delta").mode("append").save(gold_dim_patient)



# Department Dimension Table Creation

# prepare incoming (latest per patient feed snapshot)
incoming_dept = (silver_df
                 .select("department", "hospital_id")
                )

# add hash and dedupe incoming (one row per natural key)
incoming_dept = incoming_dept.dropDuplicates(["department", "hospital_id"]) \
    .withColumn("surrogate_key", monotonically_increasing_id())

# initialize table if missing

incoming_dept.select("surrogate_key", "department", "hospital_id") \
    .write.format("delta").mode("overwrite").save(gold_dim_department)



# Create Fact table

# Read current dims (filter is_current=true)
dim_patient_df = (spark.read.format("delta").load(gold_dim_patient)
                  .filter(col("is_current") == True)
                  .select(col("surrogate_key").alias("surrogate_key_patient"), "patient_id", "gender", "age"))

dim_dept_df = (spark.read.format("delta").load(gold_dim_department)
               .select(col("surrogate_key").alias("surrogate_key_dept"), "department", "hospital_id"))

# Build base fact from silver events
fact_base = (silver_df
             .select("patient_id", "department", "hospital_id", "admission_time", "discharge_time", "bed_id")
             .withColumn("admission_date", F.to_date("admission_time"))
            )

# Join to get surrogate keys
fact_enriched = (fact_base
                 .join(dim_patient_df, on="patient_id", how="left")
                 .join(dim_dept_df, on=["department", "hospital_id"], how="left")
                )

# Compute metrics
fact_enriched = fact_enriched.withColumn("length_of_stay_hours",
                                         (F.unix_timestamp(col("discharge_time")) - F.unix_timestamp(col("admission_time"))) / 3600.0) \
                             .withColumn("is_currently_admitted", F.when(col("discharge_time") > current_timestamp(), lit(True)).otherwise(lit(False))) \
                             .withColumn("event_ingestion_time", current_timestamp())

# Let's make column names explicit instead:
fact_final = fact_enriched.select(
    F.monotonically_increasing_id().alias("fact_id"),
    col("surrogate_key_patient").alias("patient_sk"),
    col("surrogate_key_dept").alias("department_sk"),
    "admission_time",
    "discharge_time",
    "admission_date",
    "length_of_stay_hours",
    "is_currently_admitted",
    "bed_id",
    "event_ingestion_time"
)

# Persist fact table partitioned by admission_date (helps Synapse / queries)
fact_final.write.format("delta").mode("overwrite").save(gold_fact)


# Quick sanity checks
print("Patient dim count:", spark.read.format("delta").load(gold_dim_patient).count())
print("Department dim count:", spark.read.format("delta").load(gold_dim_department).count())
print("Fact rows:", spark.read.format("delta").load(gold_fact).count())

Patient dim count: 1165
Department dim count: 49
Fact rows: 1165


In [0]:
display(spark.read.format("delta").load(gold_dim_patient))



patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
00217efd-e853-4a00-9e9b-68bfa9a681c9,Female,53,2025-09-09T17:25:57.380494Z,0,,True
0032aeb3-0b4f-4113-ac7d-a62f32e78aab,Male,71,2025-09-09T17:25:57.380494Z,1,,True
00370612-0e4b-4d1f-8b10-bdbfdef04548,Male,91,2025-09-09T17:25:57.380494Z,2,,True
005cbd11-a827-4cb0-b719-07fb8ad692d1,Male,73,2025-09-09T17:25:57.380494Z,3,,True
006f802c-a2f0-4f37-9497-135e5bcca42e,Male,73,2025-09-09T17:25:57.380494Z,4,,True
00d0981f-3273-446d-9295-c2095c1804e3,Female,92,2025-09-09T17:25:57.380494Z,5,,True
00fa6095-55a2-4cea-849b-2ddcb658e616,Female,14,2025-09-09T17:25:57.380494Z,6,,True
0114bdab-404f-47a8-99ca-476fe0894c50,Female,27,2025-09-09T17:25:57.380494Z,7,,True
014837b9-81e9-4926-b052-944c8d76ff93,Female,60,2025-09-09T17:25:57.380494Z,8,,True
0156a255-3376-458c-92a6-8f81ed7610c0,Male,88,2025-09-09T17:25:57.380494Z,9,,True


In [0]:
display(spark.read.format("delta").load(gold_dim_department))

surrogate_key,department,hospital_id
0,Pediatrics,5
1,Surgery,3
2,Surgery,2
3,ICU,3
4,Emergency,5
5,Emergency,7
6,Cardiology,1
7,Surgery,1
8,Oncology,3
9,Emergency,6


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, expr, current_timestamp, to_timestamp, sha2, concat_ws, coalesce, monotonically_increasing_id
from delta.tables import DeltaTable
from pyspark.sql import Window

#ADLS configuration 
spark.conf.set(
  "fs.azure.account.key.hospitalstorageac.dfs.core.windows.net",
  dbutils.secrets.get(scope = "hospitalanalyticsvaultscope", key = "storage-connection")
)
# Paths
silver_path = "abfss://silver@hospitalstorageac.dfs.core.windows.net/patient_flow"
gold_dim_patient = "abfss://gold@hospitalstorageac.dfs.core.windows.net/dim_patient"
gold_dim_department = "abfss://gold@hospitalstorageac.dfs.core.windows.net/dim_department"
gold_fact = "abfss://gold@hospitalstorageac.dfs.core.windows.net/fact_patient_flow"

display(spark.read.format("delta").load(gold_fact))

fact_id,patient_sk,department_sk,admission_time,discharge_time,admission_date,length_of_stay_hours,is_currently_admitted,bed_id,event_ingestion_time
0,0,22,2025-09-07T08:25:35.289976Z,2025-09-08T08:25:35.289976Z,2025-09-07,24.0,False,418,2025-09-10T23:26:11.771312Z
1,0,43,2025-09-08T10:20:51.006086Z,2025-09-08T17:20:51.006086Z,2025-09-08,7.0,False,317,2025-09-10T23:26:11.771312Z
2,0,5,2025-09-08T22:35:23.233568Z,2025-09-11T21:35:23.233568Z,2025-09-08,71.0,True,356,2025-09-10T23:26:11.771312Z
3,1,3,2025-09-06T22:23:13.413186Z,2025-09-08T07:23:13.413186Z,2025-09-06,33.0,False,434,2025-09-10T23:26:11.771312Z
4,2,20,2025-09-08T10:17:19.449598Z,2025-09-11T02:17:19.449598Z,2025-09-08,64.0,True,493,2025-09-10T23:26:11.771312Z
5,1,33,2025-09-09T19:31:14.835206Z,2025-09-12T19:31:14.835206Z,2025-09-09,72.0,True,71,2025-09-10T23:26:11.771312Z
6,1,37,2025-09-09T22:26:05.337934Z,2025-09-12T11:26:05.337934Z,2025-09-09,61.0,True,144,2025-09-10T23:26:11.771312Z
7,0,3,2025-09-08T22:36:40.355957Z,2025-09-10T09:36:40.355957Z,2025-09-08,35.0,False,150,2025-09-10T23:26:11.771312Z
8,3,9,2025-09-07T10:23:50.48959Z,2025-09-09T05:23:50.48959Z,2025-09-07,43.0,False,49,2025-09-10T23:26:11.771312Z
9,0,34,2025-09-09T01:46:09.022628Z,2025-09-10T11:46:09.022628Z,2025-09-09,34.0,False,292,2025-09-10T23:26:11.771312Z
