In [1]:
# ---------------------------------------------------------
# Notebook: 02_gold_layer_modeling
# Purpose : Silver (clinical_raw_data_lakehouse) -> Gold (gold)
# ---------------------------------------------------------

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 3, Finished, Available, Finished)

In [2]:
from pyspark.sql.functions import col, current_timestamp, lit, max as spark_max

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 4, Finished, Available, Finished)

In [3]:
# =========================================================
# 1) Reference Lookups (demo mappings)
# =========================================================
icd_ref = spark.createDataFrame([
    ("I10", "Hypertension"),
    ("E11", "Type 2 Diabetes"),
    ("J45", "Asthma"),
    ("C50", "Breast Cancer"),
    ("M54", "Back Pain"),
    ("F32", "Depression")
], ["ICD10Code", "diagnosis_desc"])

cpt_ref = spark.createDataFrame([
    ("99213", "Office/outpatient visit"),
    ("93000", "Electrocardiogram"),
    ("70450", "CT Head scan"),
    ("45378", "Colonoscopy"),
    ("27447", "Knee replacement")
], ["CPTCode", "procedure_desc"])

loinc_ref = spark.createDataFrame([
    ("718-7", "Hemoglobin"),
    ("4548-4", "HbA1c"),
    ("6299-2", "Glucose"),
    ("2093-3", "Cholesterol")
], ["LOINCCode", "lab_test_desc"])

rxnorm_ref = spark.createDataFrame([
    ("1049630", "Metformin"),
    ("617314", "Atorvastatin"),
    ("198211", "Lisinopril")
], ["RxNormCode", "medication_desc"])

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 5, Finished, Available, Finished)

In [4]:
from pyspark.sql.functions import col, current_timestamp, lit, max as spark_max

def incremental_load(df, target_table, key_cols):
    """
    df          : Source DataFrame (from Silver)
    target_table: Target Gold Lakehouse table
    key_cols    : Natural/business keys for deduplication
    """
    # Step 1: Get last load date from target
    try:
        last_load = spark.read.table(target_table).agg(spark_max("load_date")).collect()[0][0]
        if last_load is None:
            last_load = "1990-01-01"
    except:
        last_load = "1990-01-01"
    
    # Step 2: Filter using silver.load_date
    df = df.filter(col("load_date") > lit(last_load))
    
    # Step 3: Deduplicate
    df = df.dropDuplicates(key_cols)
    
    # Step 4: Add audit column (new load_date for Gold)
    df = df.withColumn("load_date", current_timestamp())
    
    # Step 5: Write to Gold
    if df.count() > 0:
        df.write.mode("append").saveAsTable(target_table)
        print(f"✅ {df.count()} new rows written to {target_table}")
    else:
        print(f"ℹ️ No new rows for {target_table}")

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 6, Finished, Available, Finished)

In [5]:
# =========================================================
# 3) Dimensions (Silver -> DWH)
# =========================================================
# Patients
dim_patient = spark.read.table("clinical_raw_data_lakehouse.silver_patients") \
    .select("PatientId","FirstName","LastName","DOB","Gender","City","State","PostalCode","load_date")
incremental_load(dim_patient, "clinical_raw_data_lakehouse.gold_dim_patient", ["PatientId"])

# Providers
dim_provider = spark.read.table("clinical_raw_data_lakehouse.silver_providers") \
    .select("ProviderId","FirstName","LastName","Specialty","Organization","load_date")
incremental_load(dim_provider, "clinical_raw_data_lakehouse.gold_dim_provider", ["ProviderId"])

# Locations
dim_location = spark.read.table("clinical_raw_data_lakehouse.silver_locations") \
    .select("LocationId","LocationName","Region","load_date")
incremental_load(dim_location, "clinical_raw_data_lakehouse.gold_dim_location", ["LocationId"])

# Payers
dim_payer = spark.read.table("clinical_raw_data_lakehouse.silver_payers") \
    .select("PayerId","PayerName","PlanType","Region","load_date")
incremental_load(dim_payer, "clinical_raw_data_lakehouse.gold_dim_payer", ["PayerId"])

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 7, Finished, Available, Finished)

ℹ️ No new rows for clinical_raw_data_lakehouse.gold_dim_patient
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_dim_provider
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_dim_location
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_dim_payer


In [6]:

# =========================================================
# 4) Facts (Silver -> DWH with lookups)
# =========================================================
fact_encounter = spark.read.table("clinical_raw_data_lakehouse.silver_encounters")
incremental_load(fact_encounter, "clinical_raw_data_lakehouse.gold_fact_encounter", ["EncounterId"])

fact_diagnosis = (
    spark.read.table("clinical_raw_data_lakehouse.silver_diagnoses")
    .join(icd_ref, "ICD10Code", "left")
)
incremental_load(fact_diagnosis, "clinical_raw_data_lakehouse.gold_fact_diagnosis", ["DiagnosisId"])

fact_procedure = (
    spark.read.table("clinical_raw_data_lakehouse.silver_procedures")
    .join(cpt_ref, "CPTCode", "left")
)
incremental_load(fact_procedure, "clinical_raw_data_lakehouse.gold_fact_procedure", ["ProcedureId"])

fact_lab = (
    spark.read.table("clinical_raw_data_lakehouse.silver_labs")
    .join(loinc_ref, "LOINCCode", "left")
)
incremental_load(fact_lab, "clinical_raw_data_lakehouse.gold_fact_lab", ["LabResultId"])

fact_medication = (
    spark.read.table("clinical_raw_data_lakehouse.silver_medications")
    .join(rxnorm_ref, "RxNormCode", "left")
)
incremental_load(fact_medication, "clinical_raw_data_lakehouse.gold_fact_medication", ["MedicationId"])

fact_claim = spark.read.table("clinical_raw_data_lakehouse.silver_claims")
incremental_load(fact_claim, "clinical_raw_data_lakehouse.gold_fact_claim", ["ClaimId"])

fact_vital = spark.read.table("clinical_raw_data_lakehouse.silver_vitals")
incremental_load(fact_vital, "clinical_raw_data_lakehouse.gold_fact_vital", ["VitalId"])


print("🎉 Gold layer incremental load finished. Data stored in clinical_raw_data_lakehouse.gold_")

StatementMeta(, a45aa227-ba7b-4d06-a5fa-506842640d2e, 8, Finished, Available, Finished)

ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_encounter
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_diagnosis
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_procedure
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_lab
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_medication
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_claim
ℹ️ No new rows for clinical_raw_data_lakehouse.gold_fact_vital
🎉 Gold layer incremental load finished. Data stored in clinical_raw_data_lakehouse.gold_
