## 📘 Section 4: Data Modeling and Transformation

### 🧪 Lab 4.8: Modeling Patient Encounters for Analytics

**Objective:** Design and conceptually implement the transformation of raw patient encounter data (from Bronze) into a structured Silver layer table and then into a simple Gold layer dimensional model.

**Assumed Bronze Layer Data (from previous ingestion labs):**
* `Bronze_HL7_Encounters_Raw` (parsed from HL7 ADT messages in Lakehouse: `HealthDataLH_YourName`)
* `Bronze_FHIR_Encounters_Parsed` (parsed JSON from FHIR API in Lakehouse: `HealthDataLH_YourName`)
* A `Bronze_Patients_Raw` table. For this lab, let's define its schema and provide sample data creation.

**Creating Sample `Bronze_Patients_Raw` Data (PySpark Notebook Cell):**
This would typically be ingested from an EHR dump or FHIR Patient resources.

In [None]:
from pyspark.sql import Row
import datetime

lakehouse_name = "HealthDataLH_YourName" # Replace YourName
bronze_patients_table = f"{lakehouse_name}.Bronze_Patients_Raw"

# Sample patient data (could come from various sources)
patient_data = [
    Row(PatientSourceID="PATID12345", MRN="MRN001", SSN_Token="TOKEN_SSN1", FirstName="John", LastName="Doe", DateOfBirth=datetime.date(1980, 1, 1), Gender="M", Race="WH", Ethnicity="N", ZipCode="90210", SourceSystem="EHR_SystemA", IngestionTimestamp=datetime.datetime.now()),
    Row(PatientSourceID="PATID67890", MRN="MRN002", SSN_Token="TOKEN_SSN2", FirstName="Jane", LastName="Roe", DateOfBirth=datetime.date(1975, 3, 15), Gender="F", Race="AS", Ethnicity="N", ZipCode="90211", SourceSystem="EHR_SystemA", IngestionTimestamp=datetime.datetime.now()),
    Row(PatientSourceID="PATFHIR001", MRN="MRN003", SSN_Token="TOKEN_SSN3", FirstName="Walter", LastName="White", DateOfBirth=datetime.date(1965, 9, 7), Gender="M", Race="WH", Ethnicity="N", ZipCode="87101", SourceSystem="FHIR_API", IngestionTimestamp=datetime.datetime.now()),
    Row(PatientSourceID="EXTRALONGPATIENTID004", MRN="MRN004", SSN_Token="TOKEN_SSN4", FirstName="Sarah", LastName="Connor", DateOfBirth=datetime.date(1985, 5, 10), Gender="F", Race="WH", Ethnicity="H", ZipCode="90001", SourceSystem="EHR_SystemB", IngestionTimestamp=datetime.datetime.now()),
    Row(PatientSourceID="PATID12345", MRN="MRN001", SSN_Token="TOKEN_SSN1", FirstName="John", LastName="Doe", DateOfBirth=datetime.date(1980, 1, 1), Gender="M", Race="WH", Ethnicity="N", ZipCode="90210", SourceSystem="EHR_SystemA_OldRecord", IngestionTimestamp=datetime.datetime.now()-datetime.timedelta(days=10)) # Duplicate for testing dedupe
]

patients_df = spark.createDataFrame(patient_data)

patients_df.write.format("delta").mode("overwrite").saveAsTable(bronze_patients_table)
print(f"Sample data written to {bronze_patients_table}")
spark.read.table(bronze_patients_table).show()

**Assumed Silver Layer Data (from Lab 3.8):**
* `Silver_Unified_Encounters` (in Lakehouse: `HealthDataLH_YourName`)

**Lab Tasks (Conceptual Steps & Code):**

**1. (Re-run/Verify) Create `Silver_Unified_Encounters` Table:**
    * **Tool:** Spark Notebook (from Lab 3.8).
    * **Logic:** Ensure the PySpark code from Lab 3.8, Task 3 (Conform and Integrate into a Silver Layer Table) has been run and the `Silver_Unified_Encounters` table exists in your `HealthDataLH_YourName` Lakehouse. This table should have columns like `SourceSystemEncounterID`, `PatientIdentifier`, `AdmissionDateTime`, `DischargeDateTime`, `EncounterType`, `AttendingProviderIdentifier`, `PrimaryDiagnosisCode`, `SourceSystem`, `SilverLoadTimestamp`, `EncounterSK`.

---
**2. Create `Dim_Patient` (Dimension Table - Gold Layer):**
    * **Tool:** Spark Notebook.
    * **Notebook Name:** `Create_Dim_Patient_Gold_YourName`
    * **Logic (PySpark Code):**

In [None]:
from pyspark.sql.functions import col, lit, current_timestamp, expr, monotonically_increasing_id, md5, concat_ws, year, month, dayofmonth, datediff, current_date, coalesce
import datetime

lakehouse_name = "HealthDataLH_YourName" # Replace YourName
bronze_patients_table = f"{lakehouse_name}.Bronze_Patients_Raw"
dim_patient_table = f"{lakehouse_name}.Dim_Patient" # Gold Layer Table

# Read from Bronze_Patients_Raw table
try:
    raw_patients_df = spark.read.table(bronze_patients_table)
except Exception as e:
    print(f"Error reading table {bronze_patients_table}: {e}")
    dbutils.notebook.exit(f"Failed to read {bronze_patients_table}")

# --- De-duplicate patient records ---
# Assuming MRN is a good candidate for a natural key for de-duplication
# Keep the record with the latest ingestion timestamp in case of duplicates
window_spec_patient = spark.catalog.Window.partitionBy("MRN").orderBy(col("IngestionTimestamp").desc())

deduplicated_patients_df = raw_patients_df.withColumn("row_num", expr("row_number() OVER (PARTITION BY MRN ORDER BY IngestionTimestamp DESC)")) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# --- Select and Transform for Dimension Table ---
dim_patient_df = deduplicated_patients_df.select(
    col("MRN").alias("PatientNaturalKey"), # Natural Key from source system (e.g., MRN)
    col("PatientSourceID").alias("SourcePatientID"),
    col("FirstName"),
    col("LastName"),
    col("DateOfBirth").cast("date"),
    col("Gender"), # Consider mapping to standard codes if necessary
    col("Race"),   # Consider mapping
    col("Ethnicity"), # Consider mapping
    col("ZipCode"),
    col("SourceSystem").alias("PatientSourceSystem")
).withColumn(
    "PatientKey", md5(col("PatientNaturalKey")) # Surrogate Key (MD5 hash of MRN for simplicity)
    # For a robust SK, an incrementing ID or a more sophisticated hash might be used.
    # monotonically_increasing_id() can be used but has caveats in distributed environments for strict sequential IDs.
).withColumn(
    "FullName", expr("concat(FirstName, ' ', LastName)")
).withColumn(
    "Age", expr("floor(datediff(current_date(), DateOfBirth) / 365.25)").cast("int") # Calculated Age
).withColumn(
    "GoldLoadTimestamp", current_timestamp()
).withColumn(
    "EffectiveStartDate", lit(datetime.date(1900, 1, 1)).cast("date") # For Type 2 SCD, default
).withColumn(
    "EffectiveEndDate", lit(None).cast("date") # For Type 2 SCD, default
).withColumn(
    "IsCurrent", lit(True).cast("boolean") # For Type 2 SCD
)

# Reorder columns for clarity in the dimension table
final_dim_patient_df = dim_patient_df.select(
    "PatientKey", "PatientNaturalKey", "SourcePatientID", "FirstName", "LastName", "FullName", "DateOfBirth", "Age", "Gender",
    "Race", "Ethnicity", "ZipCode", "PatientSourceSystem",
    "EffectiveStartDate", "EffectiveEndDate", "IsCurrent", "GoldLoadTimestamp"
)

# Write to Gold Layer Dim_Patient table
final_dim_patient_df.write.format("delta").mode("overwrite").saveAsTable(dim_patient_table) # Use "overwrite" for this lab; "merge" for SCD Type 2 updates
print(f"Successfully created/updated {dim_patient_table}")
final_dim_patient_df.show(truncate=False) for clarity in the dimension table
final_dim_patient_df = dim_patient_df.select(
    "PatientKey", "PatientNaturalKey", "SourcePatientID", "FirstName", "LastName", "FullName", "DateOfBirth", "Age", "Gender",
    "Race", "Ethnicity", "ZipCode", "PatientSourceSystem",
    "EffectiveStartDate", "EffectiveEndDate", "IsCurrent", "GoldLoadTimestamp"
)

# Write to Gold Layer Dim_Patient table
final_dim_patient_df.write.format("delta").mode("overwrite").saveAsTable(dim_patient_table) # Use "overwrite" for this lab; "merge" for SCD Type 2 updates
print(f"Successfully created/updated {dim_patient_table}")
final_dim_patient_df.show(truncate=False) for clarity in the dimension table
final_dim_patient_df = dim_patient_df.select(
    "PatientKey", "PatientNaturalKey", "SourcePatientID", "FirstName", "LastName", "FullName", "DateOfBirth", "Age", "Gender",
    "Race", "Ethnicity", "ZipCode", "PatientSourceSystem",
    "EffectiveStartDate", "EffectiveEndDate", "IsCurrent", "GoldLoadTimestamp"
)

# Write to Gold Layer Dim_Patient table
final_dim_patient_df.write.format("delta").mode("overwrite").saveAsTable(dim_patient_table) # Use "overwrite" for this lab; "merge" for SCD Type 2 updates
print(f"Successfully created/updated {dim_patient_table}")
final_dim_patient_df.show(truncate=False)

---
**3. Create `Fact_Encounter` (Fact Table - Gold Layer):**
    * **Tool:** Spark Notebook.
    * **Notebook Name:** `Create_Fact_Encounter_Gold_YourName`
    * **Logic (PySpark Code):**
        *(This assumes `Dim_Patient` has been created. For `Dim_Date` and `Dim_Provider`, we will create simplified placeholder versions or skip them for this lab's core focus on `Fact_Encounter` creation. In a real scenario, these would be properly built dimensions.)*

In [None]:
from pyspark.sql.functions import col, lit, current_timestamp, expr, to_date, datediff, year, month, dayofmonth, coalesce, when, explode
import datetime

lakehouse_name = "HealthDataLH_YourName" # Replace YourName
silver_unified_encounters_table = f"{lakehouse_name}.Silver_Unified_Encounters"
dim_patient_table = f"{lakehouse_name}.Dim_Patient"
fact_encounter_table = f"{lakehouse_name}.Fact_Encounter" # Gold Layer Table

# --- (Optional) Create a simple Dim_Date if not existing for lookup ---
# This is a very basic Dim_Date for joining. A full Dim_Date is more comprehensive.
dim_date_table = f"{lakehouse_name}.Dim_Date"
try:
    spark.read.table(dim_date_table).limit(1).collect()
    print(f"{dim_date_table} already exists.")
except:
    print(f"Creating simplified {dim_date_table}...")
    date_range_df = spark.sql("SELECT sequence(to_date('2020-01-01'), to_date('2030-12-31'), interval 1 day) as date_array")
    dates_df = date_range_df.select(explode(col("date_array")).alias("FullDate"))
    simple_dim_date_df = dates_df.select(
        expr("replace(cast(FullDate as string), '-', '')").cast("int").alias("DateKey"), # Format YYYYMMDD
        col("FullDate").cast("date"),
        year(col("FullDate")).alias("Year"),
        month(col("FullDate")).alias("Month"),
        dayofmonth(col("FullDate")).alias("Day")
    )
    simple_dim_date_df.write.format("delta").mode("overwrite").saveAsTable(dim_date_table)
    print(f"Simplified {dim_date_table} created.")

# Read source tables
try:
    encounters_df = spark.read.table(silver_unified_encounters_table)
    patients_dim_df = spark.read.table(dim_patient_table).select("PatientKey", "PatientNaturalKey") # Only need keys for join
    date_dim_df = spark.read.table(dim_date_table).select("DateKey", "FullDate")
except Exception as e:
    print(f"Error reading source tables for Fact_Encounter: {e}")
    dbutils.notebook.exit("Failed to read source tables.")

# Join Encounters with Dim_Patient to get PatientKey
# Assuming encounters_df.PatientIdentifier corresponds to patients_dim_df.PatientNaturalKey (e.g., MRN)
fact_df_intermediate = encounters_df.join(
    patients_dim_df,
    encounters_df.PatientIdentifier == patients_dim_df.PatientNaturalKey,
    "left_outer"
).select(
    encounters_df["*"], # Select all columns from encounters_df
    patients_dim_df["PatientKey"]
)

# Join with Dim_Date for AdmissionDateKey
fact_df_intermediate = fact_df_intermediate.join(
    date_dim_df.alias("dim_admission_date"),
    to_date(fact_df_intermediate.AdmissionDateTime) == col("dim_admission_date.FullDate"),
    "left_outer"
).select(
    fact_df_intermediate["*"],
    col("dim_admission_date.DateKey").alias("AdmissionDateKey")
)

# Join with Dim_Date for DischargeDateKey
fact_df_intermediate = fact_df_intermediate.join(
    date_dim_df.alias("dim_discharge_date"),
    to_date(fact_df_intermediate.DischargeDateTime) == col("dim_discharge_date.FullDate"),
    "left_outer"
).select(
    fact_df_intermediate["*"],
    col("dim_discharge_date.DateKey").alias("DischargeDateKey")
)

# --- Select Measures and Foreign Keys for Fact Table ---
fact_encounter_df = fact_df_intermediate.withColumn(
    "LengthOfStayInDays",
    when(col("DischargeDateTime").isNotNull() & col("AdmissionDateTime").isNotNull(),
         datediff(to_date(col("DischargeDateTime")), to_date(col("AdmissionDateTime")))
    ).otherwise(None).cast("int")
).select(
    col("EncounterSK").alias("EncounterKey"), # Surrogate key from Silver layer or newly generated
    col("PatientKey"), # Foreign Key from Dim_Patient
    col("AdmissionDateKey"), # Foreign Key from Dim_Date
    col("DischargeDateKey"), # Foreign Key from Dim_Date
    # Add ProviderKey if Dim_Provider exists and is joined
    # col("ProviderKey"),
    col("SourceSystemEncounterID").alias("EncounterNaturalKey"), # Natural key from source
    col("EncounterType"),
    col("PrimaryDiagnosisCode"),
    col("AttendingProviderIdentifier"), # Could be a degenerate dimension or FK to Dim_Provider
    col("LengthOfStayInDays"),
    # Add other measures like TotalCharges if available
    # lit(1).alias("EncounterCount"), # Useful measure
    col("SourceSystem"),
    current_timestamp().alias("GoldLoadTimestamp")
).filter(col("PatientKey").isNotNull()) # Only load encounters that have a matching patient in Dim_Patient

# Write to Gold Layer Fact_Encounter table
fact_encounter_df.write.format("delta").mode("overwrite").saveAsTable(fact_encounter_table)
print(f"Successfully created/updated {fact_encounter_table}")
fact_encounter_df.show(truncate=False)

---
**4. (Optional) Create `Patient_Visit_Analytics_View` (Gold Layer View):**
    * **Tool:** SQL Endpoint of the `HealthDataLH_YourName` Lakehouse or a Fabric Warehouse.
    * **Logic (SQL Code):**
        Open a new SQL query window connected to your Lakehouse SQL endpoint.

        ```sql
        -- Ensure you are in the context of your Lakehouse, e.g., USE HealthDataLH_YourName;
        -- Or fully qualify table names if needed: HealthDataLH_YourName.dbo.Fact_Encounter

        CREATE OR ALTER VIEW Patient_Visit_Analytics_View AS
        SELECT
            p.PatientNaturalKey AS PatientMRN,
            p.FullName AS PatientFullName,
            p.DateOfBirth AS PatientDateOfBirth,
            p.Age AS PatientAge,
            p.Gender AS PatientGender,
            fe.EncounterNaturalKey,
            fe.EncounterType,
            fe.PrimaryDiagnosisCode,
            adm_dt.FullDate AS AdmissionDate,
            dis_dt.FullDate AS DischargeDate,
            fe.LengthOfStayInDays,
            fe.AttendingProviderIdentifier,
            fe.SourceSystem AS EncounterSourceSystem
            -- Add more fields from dimensions (Dim_Provider, Dim_Diagnosis) as needed
        FROM
            HealthDataLH_YourName.dbo.Fact_Encounter fe
        JOIN
            HealthDataLH_YourName.dbo.Dim_Patient p ON fe.PatientKey = p.PatientKey
        LEFT JOIN
            HealthDataLH_YourName.dbo.Dim_Date adm_dt ON fe.AdmissionDateKey = adm_dt.DateKey
        LEFT JOIN
            HealthDataLH_YourName.dbo.Dim_Date dis_dt ON fe.DischargeDateKey = dis_dt.DateKey
        -- LEFT JOIN
        --     HealthDataLH_YourName.dbo.Dim_Provider dp ON fe.ProviderKey = dp.ProviderKey -- If Dim_Provider exists
        ;

        -- Test the view
        SELECT * FROM Patient_Visit_Analytics_View LIMIT 10;
        ```

---
**Discussion Questions:**
1.  **In `Dim_Patient`, why is it generally better to use a system-generated `PatientKey` (surrogate key) as the primary key instead of using the `PatientIdentifier` (natural key from the source system) directly?**
    * **Stability:** Natural keys (like MRN) can sometimes change, be reassigned, or have errors in the source system. Surrogate keys are controlled within the data warehouse and remain stable, protecting the fact tables from these source system changes.
    * **Performance:** Surrogate keys are typically integers (or a fixed-length hash like in our example), which are more efficient for joins in database systems compared to potentially long or composite string-based natural keys.
    * **Decoupling:** Using surrogate keys decouples the data warehouse model from the operational source system's keying structure. This allows the source system to evolve its keys without breaking the warehouse.
    * **Handling Slowly Changing Dimensions (SCDs):** Surrogate keys are essential for implementing SCD Type 2, where you need to track historical changes to dimension attributes. A new surrogate key is assigned for each version of a patient's record.
    * **Integration of Multiple Sources:** If patients come from multiple source systems with different natural key formats or potential overlaps, a surrogate key provides a single, unambiguous way to identify a unique patient entity in the warehouse.

2.  **How can Microsoft Purview (discussed in later sections) help in understanding the lineage of data as it moves from `Bronze_HL7_Encounters_Raw` to `Silver_Unified_Encounters` and finally into `Fact_Encounter`?**
    * **Automated Lineage Tracking:** Microsoft Purview can scan Fabric items (Lakehouses, Notebooks, Data Factory pipelines) and automatically capture data lineage. It visualizes how data flows from source tables (e.g., `Bronze_HL7_Encounters_Raw`) through transformation processes (e.g., the Spark Notebooks that create `Silver_Unified_Encounters` and `Fact_Encounter`) to the final analytical tables and even into Power BI reports.
    * **Impact Analysis:** If there's a change proposed to `Bronze_HL7_Encounters_Raw` or a transformation rule, lineage helps identify all downstream tables, reports, and processes (like `Silver_Unified_Encounters`, `Fact_Encounter`) that will be affected.
    * **Troubleshooting & Root Cause Analysis:** If data quality issues are found in `Fact_Encounter`, lineage allows data stewards or engineers to trace back to the origin of the data and the transformations applied at each step, helping to pinpoint where the issue was introduced.
    * **Compliance and Governance:** For regulations like HIPAA, demonstrating data provenance and how sensitive data elements are processed and transformed is critical. Purview's lineage provides this auditable trail.
    * **Data Discovery:** Business users or analysts can use lineage to understand where the data in their reports or analytical models originates, increasing trust and understanding of the data.

3.  **If a new field, like `DischargeDisposition`, needs to be added to the `Silver_Unified_Encounters` table later, how does using Delta Lake format for your Lakehouse tables make this process easier?**
    * **Schema Evolution:** Delta Lake supports schema evolution, which means you can add new columns (like `DischargeDisposition`) to an existing Delta table without rewriting the entire table or breaking existing queries that don't use the new column.
        * The Spark Notebook or Dataflow Gen2 performing the transformation to `Silver_Unified_Encounters` can be updated to include the new `DischargeDisposition` field.
        * When this updated job runs and writes to the Delta table, the new column will be added to the table's schema automatically (if `mergeSchema` option is used or if the write mode supports it).
    * **Schema Enforcement:** While allowing evolution, Delta Lake also offers schema enforcement. This prevents accidental schema changes due to bad data, but for intentional additions like `DischargeDisposition`, schema evolution handles it gracefully.
    * **No Downtime for Readers:** Existing queries and downstream processes that read from `Silver_Unified_Encounters` but do not yet reference the new `DischargeDisposition` column will continue to work without modification. They will simply not see the new column until they are updated to use it.
    * **Data Backfill:** After adding the column, historical records will have null values for `DischargeDisposition`. You can then run a separate job to backfill this new column for existing records if the data is available.