#### Dynamic Schema Evolution Handling & Data Quality Enforcement using Patient Encounter Data for the HealthcareData Lakehouse.

Goal: Demonstrate how Fabric Spark Notebooks can handle incoming data with evolving schemas (new columns) while dynamically applying data quality (DQ) rules defined externally, showcasing Delta Lake's capabilities.

Techniques to Showcase:

    1. Defining DQ rules externally (using JSON in this demo).
    2. Writing a dynamic PySpark function to apply rules based on definitions and current DataFrame schema.
    3. Using Delta Lake CHECK constraints for simple, static rules enforced at write time.
    4. Using Delta Lake schema evolution (mergeSchema=true) to handle new columns in source data gracefully during appends.
    5. Applying the dynamic DQ function to the evolved table.
    6. Analyzing and reporting DQ failures.


###### Reset Demo

In [1]:
%%sql

DROP TABLE IF EXISTS patient_encounters_dq_results;
DROP TABLE IF EXISTS patient_encounters_dynamic;

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 3, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

#### Cell 1: Setup & Imports

In [2]:
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DateType, DecimalType, StructType, StructField
# Optional: from delta.tables import DeltaTable

print("Setup complete. Imports loaded.")

# --- Configuration ---
landing_zone_path = "Files/landing/healthcare"
rules_json_path = f"{landing_zone_path}/dq_rules.json"
batch1_csv_path = f"{landing_zone_path}/encounters_batch_1.csv"
batch2_csv_path = f"{landing_zone_path}/encounters_batch_2.csv"

# Define Delta table names within the Lakehouse ('Tables' folder)
# Assumes Lakehouse 'HealthcareData' is default or specified
encounter_table = "patient_encounters_dynamic"
dq_results_table = "patient_encounters_dq_results"

print(f"Rules Path: {rules_json_path}")
print(f"Batch 1 Path: {batch1_csv_path}")
print(f"Batch 2 Path: {batch2_csv_path}")
print(f"Encounter Table: {encounter_table}")
print(f"DQ Results Table: {dq_results_table}")

# Define today's date for checks (using current_date directly in function is better)
# today_date = F.current_date() # Get this inside the function

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 5, Finished, Available, Finished)

Setup complete. Imports loaded.
Rules Path: Files/landing/healthcare/dq_rules.json
Batch 1 Path: Files/landing/healthcare/encounters_batch_1.csv
Batch 2 Path: Files/landing/healthcare/encounters_batch_2.csv
Encounter Table: patient_encounters_dynamic
DQ Results Table: patient_encounters_dq_results


#### Cell 2: Define Dynamic DQ Check Function

In [3]:
def apply_dynamic_dq(df, rules_list):
    """
    Applies data quality rules defined in rules_list to a DataFrame.

    Args:
        df (DataFrame): The input Spark DataFrame.
        rules_list (list): A list of dictionaries, where each dict defines a DQ rule.

    Returns:
        DataFrame: The input DataFrame with additional columns (DQ_RuleID)
                   indicating the pass/fail status (True/False) for each active rule
                   applicable to the DataFrame's schema.
    """
    df_schema = df.schema
    df_cols = df_schema.fieldNames()
    dq_exprs = [] # List to hold generated DQ check expressions

    print(f"Applying {len(rules_list)} DQ rules dynamically...")

    for rule in rules_list:
        rule_id = rule.get("rule_id", "UNKNOWN_RULE")
        col_name = rule.get("column_name")
        check_type = rule.get("check_type")
        is_active = rule.get("is_active", False)
        dq_col_name = f"DQ_{rule_id}" # Name of the resulting boolean column

        if not is_active:
            print(f"  Skipping inactive rule: {rule_id}")
            continue

        if not col_name or not check_type:
            print(f"  Skipping invalid rule definition: {rule_id} (missing column_name or check_type)")
            continue

        # --- Check if the column exists in the DataFrame ---
        if col_name not in df_cols:
            print(f"  Skipping rule '{rule_id}': Column '{col_name}' not found in DataFrame schema.")
            continue

        # --- Build the Spark SQL expression for the check ---
        # Default to True (passes) if rule logic has issue or column is null when check shouldn't apply to null
        check_expr = F.lit(True)
        try:
            col_ref = F.col(col_name)

            if check_type == "notNull":
                # Fails only if NULL
                check_expr = col_ref.isNotNull()
            elif check_type == "notFutureDate":
                # Assumes column is DateType or TimestampType
                # Fails if date > today
                check_expr = F.when(col_ref.isNull(), True).otherwise(col_ref <= F.current_date())
            elif check_type == "regexMatch":
                pattern = rule.get("pattern")
                if pattern:
                    # Fails if value is NULL or doesn't match regex
                    check_expr = F.when(col_ref.isNull(), False).otherwise(col_ref.rlike(pattern))
                else: print(f"  Skipping rule '{rule_id}': Missing 'pattern' for regexMatch.")
            elif check_type == "valueInSet":
                allowed_values = rule.get("allowed_values")
                if allowed_values is not None and isinstance(allowed_values, list):
                     # Fails if value is NULL or not in the set
                     check_expr = F.when(col_ref.isNull(), False).otherwise(col_ref.isin(allowed_values))
                else: print(f"  Skipping rule '{rule_id}': Missing/invalid 'allowed_values' list for valueInSet.")
            elif check_type == "valueInRange":
                min_val = rule.get("min_value")
                max_val = rule.get("max_value")
                if min_val is not None and max_val is not None:
                     # Assumes column is numeric/compatible; fails if NULL or outside range
                     # Need to cast literal values to column type potentially, or just let Spark handle comparison
                     check_expr = F.when(col_ref.isNull(), True).otherwise(col_ref.between(min_val, max_val))
                else: print(f"  Skipping rule '{rule_id}': Missing 'min_value' or 'max_value' for valueInRange.")
            else:
                print(f"  Skipping rule '{rule_id}': Unknown check_type '{check_type}'.")
                continue # Skip adding this expression

            # Add the generated expression to our list
            dq_exprs.append(check_expr.alias(dq_col_name))
            print(f"  Added check for rule '{rule_id}' on column '{col_name}'.")

        except Exception as e:
            print(f"  ERROR processing rule '{rule_id}': {e}. Skipping this rule.")

    # Select original columns plus the new DQ check columns
    if dq_exprs:
        return df.select(F.col("*"), *dq_exprs)
    else:
        print("No applicable and valid DQ checks were generated.")
        return df # Return original df if no rules applied


print("Dynamic DQ function defined.")

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 6, Finished, Available, Finished)

Dynamic DQ function defined.


Demo Point: Walk through the function logic: it takes a DataFrame and rules, iterates rules, checks if the rule's column exists in the DataFrame, builds the appropriate Spark expression based on check_type, and adds the resulting boolean (True=Pass, False=Fail) column to the output DataFrame. Highlight the schema awareness (if col_name not in df_cols).

#### Cell 3: Load Initial Data, Define Schema & Optional Constraints

In [4]:
# Define expected schema for Batch 1
schema_batch1 = StructType([
    StructField("encounter_id", StringType(), True),
    StructField("patient_id", IntegerType(), True), # Keep as Int for now, handle potential nulls in DQ
    StructField("provider_id", IntegerType(), True),
    StructField("encounter_date", DateType(), True),
    StructField("primary_diagnosis_code", StringType(), True),
    StructField("visit_type", StringType(), True),
    StructField("billable_amount", DecimalType(10, 2), True) # Use Decimal for currency
])

# Load Batch 1 with defined schema
batch1_df = spark.read.csv(batch1_csv_path, header=True, schema=schema_batch1, dateFormat="yyyy-MM-dd")

print("Initial Batch 1 Schema & Data:")
batch1_df.printSchema()
batch1_df.show(truncate=False)

# --- Write initial batch to Delta ---
# Drop table first for clean demo reset using SQL
spark.sql(f"DROP TABLE IF EXISTS {encounter_table}")
batch1_df.write.format("delta").mode("overwrite").saveAsTable(encounter_table)
print(f"Initial encounter data saved to Delta table: {encounter_table}")

# --- Optional: Add Delta CHECK Constraints for stable rules ---
# Example: Enforce patient_id IS NOT NULL at write time
# Note: This would cause the write above to fail for row ENC006 if active!
# For demo, we might apply dynamic check later instead, or show this failing.
# try:
#     print("\nAttempting to add CHECK constraint (Patient ID NOT NULL)...")
#     spark.sql(f"ALTER TABLE {encounter_table} ADD CONSTRAINT patient_id_not_null CHECK (patient_id IS NOT NULL)")
#     print("CHECK constraint added successfully.")
# except Exception as e:
#     print(f"Failed to add CHECK constraint (maybe expected if data violates it): {e}")

# Show initial table content
print("\nInitial Table Content:")
spark.table(encounter_table).show(truncate=False)

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 7, Finished, Available, Finished)

Initial Batch 1 Schema & Data:
root
 |-- encounter_id: string (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- provider_id: integer (nullable = true)
 |-- encounter_date: date (nullable = true)
 |-- primary_diagnosis_code: string (nullable = true)
 |-- visit_type: string (nullable = true)
 |-- billable_amount: decimal(10,2) (nullable = true)

+------------+----------+-----------+--------------+----------------------+----------+---------------+
|encounter_id|patient_id|provider_id|encounter_date|primary_diagnosis_code|visit_type|billable_amount|
+------------+----------+-----------+--------------+----------------------+----------+---------------+
|ENC001      |1001      |501        |2025-01-15    |S62.001A              |Office    |150.75         |
|ENC002      |1002      |502        |2025-01-16    |J45.909               |Telehealth|75.00          |
|ENC003      |1003      |501        |2025-01-17    |M54.5                 |Office    |165.50         |
|ENC004      |1001  

Demo Point: Explain defining the schema explicitly for robustness. Show writing the first batch. Discuss Delta CHECK constraints as a way to enforce some rules directly on the table (but note it prevents violating data from being written). Show the initial table state.

#### Cell 4: Load Update Batch and Append with Schema Evolution

In [5]:
# Define schema for Batch 2 (includes the new column)
schema_batch2 = StructType([
    StructField("encounter_id", StringType(), True),
    StructField("patient_id", IntegerType(), True),
    StructField("provider_id", IntegerType(), True),
    StructField("encounter_date", DateType(), True),
    StructField("primary_diagnosis_code", StringType(), True),
    StructField("visit_type", StringType(), True),
    StructField("billable_amount", DecimalType(10, 2), True),
    StructField("quality_metric_score", IntegerType(), True) # New column
])

# Load Batch 2
batch2_df = spark.read.csv(batch2_csv_path, header=True, schema=schema_batch2, dateFormat="yyyy-MM-dd")

print("\nUpdate Batch 2 Schema & Data:")
batch2_df.printSchema()
batch2_df.show(truncate=False)

# --- Append Batch 2 using Schema Evolution ---
print(f"\nAppending Batch 2 to Delta table: {encounter_table} with mergeSchema=true...")
try:
    batch2_df.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable(encounter_table)
    print("Append successful with schema evolution.")
except Exception as e:
    print(f"Error appending Batch 2: {e}")


# --- Verify Schema Evolution ---
print("\nTable Schema AFTER appending Batch 2:")
evolved_df = spark.table(encounter_table)
evolved_df.printSchema()

print("\nFull Table Content AFTER appending Batch 2:")
evolved_df.orderBy("encounter_date", "encounter_id").show(truncate=False)

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 8, Finished, Available, Finished)


Update Batch 2 Schema & Data:
root
 |-- encounter_id: string (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- provider_id: integer (nullable = true)
 |-- encounter_date: date (nullable = true)
 |-- primary_diagnosis_code: string (nullable = true)
 |-- visit_type: string (nullable = true)
 |-- billable_amount: decimal(10,2) (nullable = true)
 |-- quality_metric_score: integer (nullable = true)

+------------+----------+-----------+--------------+----------------------+----------+---------------+--------------------+
|encounter_id|patient_id|provider_id|encounter_date|primary_diagnosis_code|visit_type|billable_amount|quality_metric_score|
+------------+----------+-----------+--------------+----------------------+----------+---------------+--------------------+
|ENC008      |1005      |503        |2025-02-01    |F32.9                 |Office    |140.00         |85                  |
|ENC009      |1001      |501        |2025-02-05    |S62.001A              |Office    |155

Demo Point: Show loading the second batch with the new column. Explain the critical mode("append").option("mergeSchema", "true") flags. Show the table schema after the append – highlight that quality_metric_score has been automatically added. Display the full table containing data from both batches.

#### Cell 5: Load Dynamic DQ Rules

In [6]:
import json 

# Define the relative path within the Lakehouse Files section
# This path should work with spark.read relative to the root of the default Lakehouse
rules_json_path = "Files/landing/healthcare/dq_rules.json"

# Load DQ rules from JSON file using Spark's text reader
print(f"Attempting to load DQ rules from: {rules_json_path}")
rules_list = [] # Initialize empty list

try:
    # Read the entire file content into a single string using Spark
    # wholetext=True is useful if the JSON is multi-line formatted, otherwise default works too
    rules_file_content = spark.read.text(rules_json_path, wholetext=True).first()[0]

    # Parse the JSON string loaded by Spark into a Python list/object
    rules_list = json.loads(rules_file_content)

    print("Successfully loaded DQ rules using spark.read.text:")
    print(json.dumps(rules_list, indent=2))

except Exception as e:
    print(f"Error loading DQ rules from {rules_json_path} using Spark:")
    print(e)
    # Add specific checks if needed
    if "Path does not exist" in str(e):
         print(f"\n--- Troubleshooting ---")
         print(f"1. Verify the file exists at '{rules_json_path}' in the Lakehouse Explorer UI.")
         print(f"2. Ensure the correct Lakehouse is attached as default to this notebook.")
         print(f"3. As a fallback, try using the full ABFSS path (find IDs in Lakehouse/Workspace settings):")
         print(f"   Example: full_path = 'abfss://YOUR_WORKSPACE_ID@onelake.dfs.fabric.microsoft.com/YOUR_LAKEHOUSE_ID/Files/landing/healthcare/dq_rules.json'")
         # full_path = 'abfss://...' # Construct actual path if needed
         # rules_file_content = spark.read.text(full_path, wholetext=True).first()[0]
         # rules_list = json.loads(rules_file_content)

    rules_list = [] # Ensure rules_list is empty on error

# Verify rules_list content (will be empty if loading failed)
if not rules_list:
    print("\nWARNING: DQ rules list is empty. Subsequent DQ checks will be skipped.")

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 9, Finished, Available, Finished)

Attempting to load DQ rules from: Files/landing/healthcare/dq_rules.json
Successfully loaded DQ rules using spark.read.text:
[
  {
    "rule_id": "DQ001",
    "column_name": "patient_id",
    "check_type": "notNull",
    "is_active": true,
    "description": "Patient ID must not be null"
  },
  {
    "rule_id": "DQ002",
    "column_name": "encounter_date",
    "check_type": "notFutureDate",
    "is_active": true,
    "description": "Encounter date cannot be in the future"
  },
  {
    "rule_id": "DQ003",
    "column_name": "primary_diagnosis_code",
    "check_type": "regexMatch",
    "pattern": "^[A-Z][0-9]{2}(\\.[A-Z0-9]{1,4})?$",
    "is_active": true,
    "description": "Primary Diagnosis Code must be valid ICD-10 format (approx.)"
  },
  {
    "rule_id": "DQ004",
    "column_name": "visit_type",
    "check_type": "valueInSet",
    "allowed_values": [
      "Office",
      "Telehealth",
      "Inpatient",
      "Emergency"
    ],
    "is_active": true,
    "description": "Visit Type

Demo Point: Show the external definition of DQ rules. Explain how this allows modifying rules without changing the core Spark code. (Note: Reading directly from Lakehouse Files might require using the full ABFSS path or Spark's read capabilities; the code above simulates loading for ease of demo setup).

#### Cell 6: Apply Dynamic DQ Checks to Evolved Table

In [7]:
# Read the full evolved table
full_encounters_df = spark.table(encounter_table)

# Apply the dynamic DQ function
if rules_list: # Check if rules were loaded
    dq_results_df = apply_dynamic_dq(full_encounters_df, rules_list)

    print("\nSchema with DQ Check Columns:")
    dq_results_df.printSchema()

    print("\nSample Data with DQ Results (True = Pass):")
    # Select some key columns and the DQ columns to show results
    cols_to_show = ["encounter_id", "patient_id", "encounter_date", "primary_diagnosis_code", "visit_type", "billable_amount", "quality_metric_score"] \
                   + [f"DQ_{rule['rule_id']}" for rule in rules_list if f"DQ_{rule['rule_id']}" in dq_results_df.columns]
    dq_results_df.select(cols_to_show).orderBy("encounter_date", "encounter_id").show(truncate=False)
else:
    print("Skipping DQ checks as rules were not loaded.")

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 10, Finished, Available, Finished)

Applying 6 DQ rules dynamically...
  Added check for rule 'DQ001' on column 'patient_id'.
  Added check for rule 'DQ002' on column 'encounter_date'.
  Added check for rule 'DQ003' on column 'primary_diagnosis_code'.
  Added check for rule 'DQ004' on column 'visit_type'.
  Added check for rule 'DQ005' on column 'billable_amount'.
  Added check for rule 'DQ006' on column 'quality_metric_score'.

Schema with DQ Check Columns:
root
 |-- encounter_id: string (nullable = true)
 |-- patient_id: integer (nullable = true)
 |-- provider_id: integer (nullable = true)
 |-- encounter_date: date (nullable = true)
 |-- primary_diagnosis_code: string (nullable = true)
 |-- visit_type: string (nullable = true)
 |-- billable_amount: decimal(10,2) (nullable = true)
 |-- quality_metric_score: integer (nullable = true)
 |-- DQ_DQ001: boolean (nullable = false)
 |-- DQ_DQ002: boolean (nullable = true)
 |-- DQ_DQ003: boolean (nullable = true)
 |-- DQ_DQ004: boolean (nullable = true)
 |-- DQ_DQ005: boolean (n

Demo Point: Show calling the apply_dynamic_dq function. Point out the output DataFrame now includes new boolean columns (DQ_DQ001, DQ_DQ002, etc.). Highlight specific rows from the sample data and show how they passed/failed certain checks (e.g., ENC006 fails DQ001, ENC007 fails DQ002, ENC010 fails DQ003, etc.). Show that DQ006 was applied only after the schema evolved.

#### Cell 7: Analyze DQ Results and Save Failures

In [8]:
# Check if the DataFrame from the previous step exists and is not None
if 'dq_results_df' in locals() and dq_results_df is not None:
    print("\nAnalyzing DQ results...")
    # --- Identify failed records and the rules they failed (using PySpark functions) ---

    # Get the names of the dynamically added DQ check columns (e.g., 'DQ_DQ001', 'DQ_DQ002')
    dq_check_cols = [c for c in dq_results_df.columns if c.startswith("DQ_")]

    # Create an array of rule IDs for checks that failed (where the DQ column is False)
    failed_rules_cols_exprs = []
    if dq_check_cols: # Proceed only if DQ columns were actually generated
        print(f"  Generating failed rules list based on columns: {dq_check_cols}")
        for c in dq_check_cols:
            # Extract the Rule ID part (e.g., 'DQ001' from 'DQ_DQ001')
            rule_id = c[3:]
            # Use when().otherwise() to create a column expression:
            # It yields the rule_id string if the check column 'c' is False, otherwise it yields NULL
            failed_rules_cols_exprs.append(F.when(F.col(c) == False, F.lit(rule_id)).otherwise(F.lit(None)))

        # Create the final array column using F.array() to combine the conditional expressions,
        # and then use the higher-order function F.filter() to remove the NULLs (passed checks)
        failed_rules_array_col = F.filter(F.array(*failed_rules_cols_exprs), lambda x: x.isNotNull())
    else:
        # If no DQ columns were found/generated, create an empty array literal for consistency
        print("  No DQ check columns found to analyze.")
        failed_rules_array_col = F.array().cast("array<string>")


    # Add the 'failed_rules' array and the overall 'dq_status' column
    # Apply these to the dq_results_df which contains the original data + boolean DQ check columns
    dq_summary_df = dq_results_df.withColumn("failed_rules", failed_rules_array_col) \
                               .withColumn("dq_status", F.when(F.size(F.col("failed_rules")) == 0, "PASS").otherwise("FAIL"))

    print("\nDQ Summary (Status and Failed Rules):")
    # Show status and the array of failed rule IDs for all records
    dq_summary_df.select("encounter_id", "patient_id", "dq_status", "failed_rules").orderBy("encounter_date", "encounter_id").show(truncate=False)

    # --- Filter for only failed records ---
    failed_records_df = dq_summary_df.filter(F.col("dq_status") == "FAIL")
    failed_count = failed_records_df.count() # Cache count might be useful if reused

    print(f"\nIdentified {failed_count} records failing DQ checks.")

    if failed_count > 0:
        print("Failed Records Details (showing status and failed rules):")
        # Show failed records with their failed rules
        failed_records_df.select("encounter_id", "patient_id", "dq_status", "failed_rules").orderBy("encounter_date", "encounter_id").show(truncate=False)

        # --- Save failed records details to another Delta table ---
        # Select original data columns + status + failed rules array for the output table.
        # This avoids saving the intermediate boolean DQ check columns (DQ_DQ001, etc.).
        # We need the original columns from the DataFrame before DQ checks were added.
        # Assuming 'full_encounters_df' from Cell 6 holds that state.
        if 'full_encounters_df' in locals():
             original_data_cols = full_encounters_df.columns
             # Define columns for the final output table
             output_cols = original_data_cols + ["dq_status", "failed_rules"]
             # Select these columns from the already filtered failed_records_df
             df_to_save = failed_records_df.select(output_cols)

             try:
                # Write to the results table, overwriting previous results for the demo
                df_to_save.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(dq_results_table)
                print(f"\nFailed records saved to Delta table: {dq_results_table}")
             except Exception as e:
                print(f"\nError saving DQ results: {e}")
        else:
             print("\nCould not find 'full_encounters_df' to determine original columns. Skipping save.")
    else:
        print("\nNo failed records to save.")
        # Optional: Drop the results table if it exists and there are no failures, for a clean state
        # spark.sql(f"DROP TABLE IF EXISTS {dq_results_table}")
        # print(f"Ensured results table {dq_results_table} is clear as no failures were found.")

else:
    print("DQ results DataFrame ('dq_results_df') not found or is None. Skipping analysis and save.")

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 11, Finished, Available, Finished)


Analyzing DQ results...
  Generating failed rules list based on columns: ['DQ_DQ001', 'DQ_DQ002', 'DQ_DQ003', 'DQ_DQ004', 'DQ_DQ005', 'DQ_DQ006']

DQ Summary (Status and Failed Rules):
+------------+----------+---------+------------+
|encounter_id|patient_id|dq_status|failed_rules|
+------------+----------+---------+------------+
|ENC001      |1001      |PASS     |[]          |
|ENC002      |1002      |PASS     |[]          |
|ENC003      |1003      |PASS     |[]          |
|ENC004      |1001      |PASS     |[]          |
|ENC005      |1004      |PASS     |[]          |
|ENC006      |NULL      |FAIL     |[DQ001]     |
|ENC008      |1005      |PASS     |[]          |
|ENC009      |1001      |PASS     |[]          |
|ENC010      |1003      |FAIL     |[DQ003]     |
|ENC011      |1002      |FAIL     |[DQ004]     |
|ENC012      |1004      |FAIL     |[DQ005]     |
|ENC013      |1005      |FAIL     |[DQ006]     |
|ENC014      |1001      |PASS     |[]          |
|ENC007      |1002      |FAIL 

Demo Point: Show how to analyze the boolean DQ columns to create a summary status (PASS/FAIL) and a list of exactly which rules failed for each record. Filter to isolate the failing records and save them for review/correction.

#### Cell 8: Query DQ Results & Discussion

In [9]:
%%sql
-- Query the DQ results table (showing only failed records)
SELECT
    encounter_id,
    patient_id,
    encounter_date,
    primary_diagnosis_code,
    visit_type,
    billable_amount,
    quality_metric_score,
    dq_status,
    failed_rules -- Show array of failed rule IDs
FROM
    patient_encounters_dq_results -- Use table name directly
ORDER BY
    encounter_date, encounter_id;

StatementMeta(, 0f594dd2-53ad-41f8-b164-a7990180b401, 12, Finished, Available, Finished)

<Spark SQL result set with 6 rows and 9 fields>

Demo Point: Query the table containing only the failed records. Discuss the benefits: Schema evolution handled new data columns automatically. DQ rules were applied dynamically based on external definitions and the current schema. Specific failures are identified for remediation. This approach is flexible and maintainable.