In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, expr, current_timestamp, explode,
    coalesce, array, when, max as spark_max, count as spark_count
)
from pyspark.sql.types import StringType, TimestampType
from datetime import datetime
from delta.tables import DeltaTable

In [None]:
# -------------------------------------------------------------------
# Input Parameters
# Update these values to match your Fabric environment.
# -------------------------------------------------------------------

# Fabric workspace name
workspace = 'BUNN_Foundation_NONPROD'

# Lakehouse names
bronze_lakehouse = 'bronze_clinical_lakehouse'
silver_lakehouse = 'silver_clinical_lakehouse'
utilities_lakehouse = 'utilities_lakehouse'

# Bronze source table containing the raw JSON VARIANT column
bronze_table = 'clinical_raw'

# The column in the bronze table that holds the JSON VARIANT data
variant_column = 'raw_data'

# The column in the bronze table used for watermarking
watermark_column = 'insert_timestamp'

# Bronze table ID column for lineage tracking
bronze_id_column = 'bronze_id'

# List of target silver tables to process in this run
target_tables = ['OBSERVATION', 'ENCOUNTER']

# Capture the run start time for logging
run_start = datetime.now()
print(f"Processing started at {run_start}")

In [None]:
# -------------------------------------------------------------------
# Utility: Convert dot-notation JSON path to variant colon notation
# -------------------------------------------------------------------
# The mapping table stores paths as 'a.b.c' (dot notation) for readability.
# Databricks VARIANT columns use colon notation: column:a:b:c
# This function performs that conversion at runtime.

def dot_to_variant_path(dot_path):
    """
    Convert a dot-notation JSON path to variant colon-separated notation.

    Example:
        'organizer.component.observation.code._code'
        becomes
        'organizer:component:observation:code:_code'
    """
    return dot_path.replace(".", ":")

In [None]:
# -------------------------------------------------------------------
# Utility: Watermark management functions
# -------------------------------------------------------------------
# These functions read and update the high watermark in the
# clinical_processing_log table to prevent reprocessing of data.

def get_last_watermark(spark, document_type_code, target_table):
    """
    Retrieve the most recent successful high_watermark for a given
    document type and target table combination.
    Returns a timestamp string, or '1900-01-01' if no watermark exists.
    """
    # Query the processing log for the latest completed watermark
    result = spark.sql(f"""
        SELECT MAX(high_watermark) AS last_watermark
        FROM {utilities_lakehouse}.clinical_processing_log
        WHERE document_type_code = '{document_type_code}'
          AND target_table = '{target_table}'
          AND run_status = 'COMPLETED'
    """).collect()

    # Extract the watermark value, defaulting to epoch if none found
    watermark = result[0]["last_watermark"] if result and result[0]["last_watermark"] else None

    if watermark is None:
        print(f"No previous watermark found for {document_type_code}/{target_table}. Processing all rows.")
        return "1900-01-01 00:00:00"
    else:
        print(f"Last watermark for {document_type_code}/{target_table}: {watermark}")
        return str(watermark)


def insert_processing_log(spark, log_id, document_type_code, target_table,
                          bronze_table_name, run_start, run_end,
                          records_read, records_written, records_failed,
                          high_watermark, run_status):
    """
    Insert a record into clinical_processing_log to track the run
    and store the high watermark for future incremental loads.
    """
    spark.sql(f"""
        INSERT INTO {utilities_lakehouse}.clinical_processing_log VALUES (
            {log_id},
            '{document_type_code}',
            '{target_table}',
            '{bronze_table_name}',
            '{run_start}',
            '{run_end}',
            {records_read},
            {records_written},
            {records_failed},
            '{high_watermark}',
            '{run_status}'
        )
    """)
    print(f"Processing log updated: {document_type_code}/{target_table} -> {run_status}")


def get_next_log_id(spark):
    """
    Get the next available log_id from the processing log table.
    """
    return spark.sql(f"""
        SELECT COALESCE(MAX(log_id), 0) + 1 AS next_id
        FROM {utilities_lakehouse}.clinical_processing_log
    """).collect()[0]["next_id"]

In [None]:
# -------------------------------------------------------------------
# Document Type Identification
# -------------------------------------------------------------------
# Checks the JSON VARIANT against the clinical_document_type registry
# to determine which document type parser to use.
# CCDA documents are identified by the presence of a templateId entry
# with _root matching the C-CDA R2.1 standard OID.

def identify_ccda_documents(spark, bronze_df, variant_col):
    """
    Filter a bronze DataFrame to only CCDA documents.

    CCDA documents are identified by the presence of templateId with
    _root = '2.16.840.1.113883.10.20.22.1.1' (C-CDA R2.1 header template).

    The templateId field can be a single object or an array of objects,
    so we check both cases.
    """
    # Step 1: Load the CCDA identifier rule from the config table
    doc_type = spark.sql(f"""
        SELECT identifier_path, identifier_value
        FROM {utilities_lakehouse}.clinical_document_type
        WHERE document_type_code = 'CCDA'
          AND is_active = true
    """).collect()

    if not doc_type:
        print("WARNING: No active CCDA document type configuration found.")
        return bronze_df.limit(0)

    identifier_value = doc_type[0]["identifier_value"]

    # Step 2: Filter for CCDA documents
    # Check if any templateId entry has the matching _root value.
    # templateId can be an array or a single object, so we normalize
    # to an array first, then check if any element matches.
    ccda_df = bronze_df.filter(
        expr(f"""
            EXISTS(
                TRANSFORM(
                    COALESCE(
                        TRY_CAST({variant_col}:templateId AS ARRAY<VARIANT>),
                        ARRAY({variant_col}:templateId)
                    ),
                    t -> t:_root::string
                ),
                v -> v = '{identifier_value}'
            )
        """)
    )

    row_count = ccda_df.count()
    print(f"Identified {row_count} CCDA document(s) in the current batch.")
    return ccda_df

In [None]:
# -------------------------------------------------------------------
# Section Extraction
# -------------------------------------------------------------------
# CCDA documents contain up to 24 clinical sections under
# component.structuredBody.component[]. Each section is identified
# by a LOINC code in section.code._code.
#
# This function explodes the sections array, filters to the target
# section, then normalizes and explodes the entries within it.
# Entries can be a single object or an array, so we handle both.
#
# Some sections (Results, Vital Signs) have an additional nested array
# within each entry (organizer.component[]). When sub_array_path is
# provided, a second explosion is performed so each row represents
# one sub-item (e.g., one observation within an organizer).

def extract_section_entries(df, variant_col, section_loinc_code, sub_array_path=None):
    """
    Extract entries from a specific CCDA section.

    Steps:
    1. Explode the sections array (component.structuredBody.component)
    2. Filter to the section matching the given LOINC code
    3. Normalize entries (handle single object vs array)
    4. Explode entries so each row = one clinical entry
    5. (Optional) If sub_array_path is set, explode the nested array
       within each entry to produce one row per sub-item

    Returns a DataFrame with columns:
        bronze_id, insert_timestamp, _section_code, _section_title, _entry
    When sub_array_path is used, _entry refers to each sub-item (e.g., each
    component within an organizer).
    """
    # Step 1: Explode the sections array from each document
    sections_df = df.selectExpr(
        f"{bronze_id_column}",
        f"{watermark_column}",
        f"explode({variant_col}:component:structuredBody:component) as _section"
    )

    # Step 2: Filter to the target section by LOINC code
    filtered_df = sections_df.filter(
        expr(f"_section:section:code:_code::string = '{section_loinc_code}'")
    )

    # Step 3 & 4: Normalize entries and explode
    # COALESCE + TRY_CAST handles both array and single-object entries:
    #   - If entry is already an array, TRY_CAST succeeds and we use it
    #   - If entry is a single object, TRY_CAST returns NULL, so we wrap in ARRAY()
    entries_df = filtered_df.selectExpr(
        f"{bronze_id_column}",
        f"{watermark_column}",
        f"'{section_loinc_code}' as _section_code",
        f"_section:section:title::string as _section_title",
        """explode(
            coalesce(
                try_cast(_section:section:entry AS ARRAY<VARIANT>),
                array(_section:section:entry)
            )
        ) as _entry"""
    )

    # Step 5: If a sub-array path is defined, perform a second explosion
    # This handles sections like Results and Vital Signs where entries contain
    # organizer.component[] arrays with multiple observations per entry.
    if sub_array_path:
        sub_variant_path = dot_to_variant_path(sub_array_path)
        print(f"Section {section_loinc_code}: exploding sub-array at '{sub_array_path}'")
        entries_df = entries_df.selectExpr(
            f"{bronze_id_column}",
            f"{watermark_column}",
            "_section_code",
            "_section_title",
            f"""explode(
                coalesce(
                    try_cast(_entry:{sub_variant_path} AS ARRAY<VARIANT>),
                    array(_entry:{sub_variant_path})
                )
            ) as _entry"""
        )

    entry_count = entries_df.count()
    print(f"Section {section_loinc_code}: extracted {entry_count} row(s) after all explosions.")
    return entries_df

In [None]:
# -------------------------------------------------------------------
# Dynamic Field Extraction
# -------------------------------------------------------------------
# Reads field mappings from the config table and builds selectExpr
# statements that extract values from the variant entry using
# colon-path notation. Supports optional transformation SQL.
#
# Two functions are provided:
#   build_extraction_exprs     - for section-level entry fields
#   build_root_extraction_exprs - for document-root-level fields

def build_extraction_exprs(spark, document_type_code, target_table, section_loinc_code):
    """
    Build a list of SQL expressions for selectExpr() based on the
    field mappings configured for a given target table and section.

    Each mapping row becomes a selectExpr expression like:
        _entry:path:to:field::STRING AS TARGET_COLUMN

    If transformation_sql is set, it wraps the raw extraction:
        CASE WHEN _entry:path::STRING IN (...) THEN ... END AS TARGET_COLUMN
    """
    # Load active mappings for this table and section from the config
    mappings = spark.sql(f"""
        SELECT target_column, source_json_path, target_data_type,
               transformation_sql, entry_sub_array_path
        FROM {utilities_lakehouse}.clinical_field_mapping
        WHERE document_type_code = '{document_type_code}'
          AND target_table = '{target_table}'
          AND section_loinc_code = '{section_loinc_code}'
          AND path_context = 'section_entry'
          AND is_active = true
        ORDER BY column_ordinal
    """).collect()

    if not mappings:
        print(f"WARNING: No section_entry mappings found for {target_table}/{section_loinc_code}.")
        return []

    # Build one selectExpr expression per mapping row
    exprs = []
    for row in mappings:
        # Convert dot notation to variant colon notation
        variant_path = dot_to_variant_path(row["source_json_path"])
        dtype = row["target_data_type"]
        col_name = row["target_column"]
        transform = row["transformation_sql"]

        # Build the raw extraction expression
        raw_expr = f"_entry:{variant_path}::{dtype}"

        # Apply transformation if defined, otherwise use raw extraction
        if transform:
            final_expr = transform.replace("{value}", raw_expr) + f" AS {col_name}"
        else:
            final_expr = f"{raw_expr} AS {col_name}"

        exprs.append(final_expr)

    print(f"Built {len(exprs)} extraction expression(s) for {target_table}/{section_loinc_code}.")
    return exprs


def build_root_extraction_exprs(spark, document_type_code, target_table, variant_col):
    """
    Build SQL expressions for fields extracted from the document root
    (path_context = 'root'), not from section entries.

    These are typically patient demographics and document-level identifiers
    that apply to every entry extracted from the document.
    """
    # Load active root-level mappings from the config
    mappings = spark.sql(f"""
        SELECT target_column, source_json_path, target_data_type, transformation_sql
        FROM {utilities_lakehouse}.clinical_field_mapping
        WHERE document_type_code = '{document_type_code}'
          AND target_table = '{target_table}'
          AND path_context = 'root'
          AND is_active = true
        ORDER BY column_ordinal
    """).collect()

    if not mappings:
        print(f"No root-level mappings found for {target_table}.")
        return []

    # Build one selectExpr expression per root mapping
    exprs = []
    for row in mappings:
        variant_path = dot_to_variant_path(row["source_json_path"])
        dtype = row["target_data_type"]
        col_name = row["target_column"]
        transform = row["transformation_sql"]

        # Root fields reference the variant column directly
        raw_expr = f"{variant_col}:{variant_path}::{dtype}"

        if transform:
            final_expr = transform.replace("{value}", raw_expr) + f" AS {col_name}"
        else:
            final_expr = f"{raw_expr} AS {col_name}"

        exprs.append(final_expr)

    print(f"Built {len(exprs)} root extraction expression(s) for {target_table}.")
    return exprs

In [None]:
# -------------------------------------------------------------------
# Main Processing Function
# -------------------------------------------------------------------
# Orchestrates the full bronze-to-silver extraction for a given
# document type and target table. Handles watermarking, section
# extraction, field mapping, and silver table writing.

def process_bronze_to_silver(spark, document_type_code, target_table):
    """
    End-to-end processing for one target silver table.

    Steps:
    1. Get the last watermark to determine which rows to process
    2. Read unprocessed rows from the bronze delta table
    3. Filter to CCDA documents using document type identification
    4. Get the distinct section LOINC codes for this target table
    5. For each section, extract entries and apply field mappings
    6. Union results across sections
    7. Join root-level fields (patient ID, org ID, etc.)
    8. Write to the silver delta table
    9. Update the watermark in the processing log
    """
    print(f"\n{'='*60}")
    print(f"Begin processing: {document_type_code} -> {target_table}")
    print(f"{'='*60}")

    # Step 1: Get last watermark
    last_watermark = get_last_watermark(spark, document_type_code, target_table)

    # Step 2: Read unprocessed rows from bronze
    bronze_path = (
        f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/"
        f"{bronze_lakehouse}.Lakehouse/Tables/{bronze_table}"
    )
    print(f"Reading bronze table from: {bronze_path}")
    bronze_df = spark.read.format("delta").load(bronze_path)

    # Apply watermark filter to only process new rows
    bronze_df = bronze_df.filter(col(watermark_column) > lit(last_watermark))

    total_rows = bronze_df.count()
    if total_rows == 0:
        print(f"No new rows to process for {target_table}. Exiting.")
        return

    print(f"Found {total_rows} new row(s) to process.")

    # Capture the max watermark value for this batch
    max_watermark = bronze_df.agg(
        spark_max(col(watermark_column)).alias("max_wm")
    ).collect()[0]["max_wm"]

    # Step 3: Filter to CCDA documents
    ccda_df = identify_ccda_documents(spark, bronze_df, variant_column)

    ccda_count = ccda_df.count()
    if ccda_count == 0:
        print(f"No CCDA documents found in the batch. Exiting.")
        return

    # Step 4: Get distinct section codes and their sub-array paths from config
    section_codes = spark.sql(f"""
        SELECT DISTINCT section_loinc_code, entry_sub_array_path
        FROM {utilities_lakehouse}.clinical_field_mapping
        WHERE document_type_code = '{document_type_code}'
          AND target_table = '{target_table}'
          AND path_context = 'section_entry'
          AND is_active = true
          AND section_loinc_code IS NOT NULL
    """).collect()

    # Build a list of (section_code, sub_array_path) tuples
    section_config = [
        (row["section_loinc_code"], row["entry_sub_array_path"])
        for row in section_codes
    ]
    print(f"Sections to process for {target_table}: {section_config}")

    # Step 5 & 6: Extract entries from each section and union results
    section_dfs = []
    for code, sub_array_path in section_config:
        print(f"\nExtracting from section {code} (sub_array: {sub_array_path})...")

        # Get entries for this section (explode sections, filter, explode entries)
        # Pass sub_array_path for sections needing nested array explosion
        entries_df = extract_section_entries(ccda_df, variant_column, code, sub_array_path)

        # Build extraction expressions for this section from the mapping config
        field_exprs = build_extraction_exprs(spark, document_type_code, target_table, code)

        if not field_exprs:
            print(f"Skipping section {code}: no field expressions found.")
            continue

        # Apply field extraction via selectExpr
        # Include bronze_id and section metadata for lineage tracking
        select_list = [
            f"{bronze_id_column}",
            f"{watermark_column}",
            "_section_code",
            "_section_title"
        ] + field_exprs

        extracted_df = entries_df.selectExpr(*select_list)
        section_dfs.append(extracted_df)

    if not section_dfs:
        print(f"No data extracted for {target_table}. Exiting.")
        return

    # Union all section results using unionByName
    # allowMissingColumns=True lets sections with different field sets combine
    # (missing columns become NULL)
    result_df = section_dfs[0]
    for df in section_dfs[1:]:
        result_df = result_df.unionByName(df, allowMissingColumns=True)

    print(f"\nCombined {result_df.count()} row(s) from all sections.")

    # Step 7: Join root-level fields to section-level data
    root_exprs = build_root_extraction_exprs(
        spark, document_type_code, target_table, variant_column
    )

    if root_exprs:
        # Extract root fields once per document
        root_df = ccda_df.selectExpr(
            f"{bronze_id_column}",
            *root_exprs
        )
        # Join root fields to section-level data on bronze_id
        result_df = result_df.join(root_df, on=bronze_id_column, how="left")

    # Add standard audit columns and rename internal tracking fields
    result_df = result_df \
        .withColumn("SECTION_LOINC_CODE", col("_section_code")) \
        .withColumn("BRONZE_ID", col(bronze_id_column)) \
        .withColumn("ROW_INSERT_TIMESTAMP", current_timestamp()) \
        .drop("_section_code", "_section_title", watermark_column, bronze_id_column)

    # Step 8: Write to silver delta table
    silver_path = (
        f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/"
        f"{silver_lakehouse}.Lakehouse/Tables/{target_table.lower()}"
    )

    records_written = result_df.count()
    print(f"Writing {records_written} row(s) to silver table: {target_table}")
    print(f"Silver path: {silver_path}")

    result_df.write.format("delta").mode("append").save(silver_path)
    print(f"Write complete for {target_table}.")

    # Step 9: Update watermark in the processing log
    next_log_id = get_next_log_id(spark)

    insert_processing_log(
        spark=spark,
        log_id=next_log_id,
        document_type_code=document_type_code,
        target_table=target_table,
        bronze_table_name=bronze_table,
        run_start=run_start,
        run_end=datetime.now(),
        records_read=total_rows,
        records_written=records_written,
        records_failed=0,
        high_watermark=max_watermark,
        run_status='COMPLETED'
    )

    print(f"\n{'='*60}")
    print(f"Completed processing: {document_type_code} -> {target_table}")
    print(f"Rows read: {total_rows} | Rows written: {records_written}")
    print(f"Watermark updated to: {max_watermark}")
    print(f"{'='*60}")

In [None]:
# -------------------------------------------------------------------
# Execute Bronze-to-Silver Processing
# -------------------------------------------------------------------
# Process each target table configured for CCDA documents.
# Each table is processed independently with its own watermark.
# If one table fails, the others still run. Failed tables will
# be retried on the next run since their watermark is not advanced.

for table in target_tables:
    try:
        process_bronze_to_silver(spark, 'CCDA', table)
    except Exception as e:
        # Log the failure so it appears in the processing history
        print(f"FAILED processing {table}: {str(e)}")

        # Record the failure in the processing log
        # The watermark is NOT advanced, so the next run will retry these rows
        try:
            next_log_id = get_next_log_id(spark)
            insert_processing_log(
                spark=spark,
                log_id=next_log_id,
                document_type_code='CCDA',
                target_table=table,
                bronze_table_name=bronze_table,
                run_start=run_start,
                run_end=datetime.now(),
                records_read=0,
                records_written=0,
                records_failed=0,
                high_watermark='1900-01-01 00:00:00',
                run_status='FAILED'
            )
        except Exception as log_err:
            print(f"Additionally failed to write processing log: {str(log_err)}")

print(f"\nAll processing complete. Run started at {run_start}, ended at {datetime.now()}.")

In [None]:
# -------------------------------------------------------------------
# Validation: Test extraction logic against the sample JSON
# -------------------------------------------------------------------
# This cell loads the sample CCDA JSON file and tests the path
# resolution and extraction logic without writing to silver tables.
# Uncomment and run to validate mappings during development.
#
# Prerequisites:
#   - The sample JSON file must be uploaded to the lakehouse Files area
#   - Config tables must be seeded (run ccda_config_tables.ipynb first)

# sample_path = "/lakehouse/default/Files/ccda/brnz_ccda_raw_varient.json"
#
# # Load sample JSON as a single-row DataFrame with a variant-like column
# sample_json = spark.read.text(sample_path, wholetext=True).selectExpr(
#     "'test-001' as bronze_id",
#     "current_timestamp() as insert_timestamp",
#     "parse_json(value) as raw_data"
# )
# print(f"Loaded sample document. Row count: {sample_json.count()}")
#
# # Test 1: Section extraction for Results (30954-2)
# print("\n--- Testing Results Section Extraction ---")
# entries = extract_section_entries(sample_json, "raw_data", "30954-2")
# print(f"Entries found: {entries.count()}")
# entries.select("_section_code", "_section_title").show(5, truncate=False)
#
# # Test 2: Field extraction expressions for Results
# exprs = build_extraction_exprs(spark, "CCDA", "OBSERVATION", "30954-2")
# if exprs:
#     print("\nGenerated expressions:")
#     for e in exprs:
#         print(f"  {e}")
#     select_list = ["bronze_id", "_section_code"] + exprs
#     result = entries.selectExpr(*select_list)
#     print("\n--- Extracted Observation Fields ---")
#     result.show(5, truncate=False)
#
# # Test 3: Root-level extraction
# root_exprs = build_root_extraction_exprs(spark, "CCDA", "OBSERVATION", "raw_data")
# if root_exprs:
#     print("\nGenerated root expressions:")
#     for e in root_exprs:
#         print(f"  {e}")
#     root_result = sample_json.selectExpr("bronze_id", *root_exprs)
#     print("\n--- Root-Level Fields ---")
#     root_result.show(truncate=False)
#
# # Test 4: Encounter section extraction (46240-8)
# print("\n--- Testing Encounter Section Extraction ---")
# enc_entries = extract_section_entries(sample_json, "raw_data", "46240-8")
# print(f"Encounter entries found: {enc_entries.count()}")
#
# enc_exprs = build_extraction_exprs(spark, "CCDA", "ENCOUNTER", "46240-8")
# if enc_exprs:
#     enc_select = ["bronze_id", "_section_code"] + enc_exprs
#     enc_result = enc_entries.selectExpr(*enc_select)
#     print("\n--- Extracted Encounter Fields ---")
#     enc_result.show(5, truncate=False)
#
# # Test 5: Vital Signs section extraction (8716-3)
# print("\n--- Testing Vital Signs Section Extraction ---")
# vs_entries = extract_section_entries(sample_json, "raw_data", "8716-3")
# print(f"Vital signs entries found: {vs_entries.count()}")
#
# vs_exprs = build_extraction_exprs(spark, "CCDA", "OBSERVATION", "8716-3")
# if vs_exprs:
#     vs_select = ["bronze_id", "_section_code"] + vs_exprs
#     vs_result = vs_entries.selectExpr(*vs_select)
#     print("\n--- Extracted Vital Signs Fields ---")
#     vs_result.show(5, truncate=False)