In [0]:
# Imports
from pyspark.sql.types import *
from pyspark.sql.functions import (
    col,
    length,
    trim,
    regexp_replace,
    regexp_extract,
    to_date,
    to_timestamp,
    when,
    lpad
)
from pyspark.sql import DataFrame


In [0]:
# Set widgets for use in a Databricks job
dbutils.widgets.text("source", "")  # Leave blank for manual multi-table mode
dbutils.widgets.text("bronze_base_path", "/FileStore/bronze")
dbutils.widgets.text("silver_base_path", "/FileStore/silver")

# Retrieve widget values
source = dbutils.widgets.get("source")
bronze_base_path = dbutils.widgets.get("bronze_base_path")
silver_base_path = dbutils.widgets.get("silver_base_path")

# Default to all tables if source is blank
if source:
    sources_to_process = [source]
else:
    sources_to_process = ["patients", "encounters", "providers", "organizations", "conditions"]

In [0]:
def silver_transform(df: DataFrame, source: str) -> DataFrame:
    """
    Applies source-specific transformation logic for Silver layer.
    
    Parameters:
        df (DataFrame): Input DataFrame from Bronze layer
        source (str): Source name (e.g., 'patients', 'encounters', etc.)
        
    Returns:
        DataFrame: Transformed DataFrame ready for Silver layer
    """
    
    if source == "patients":
        # Synthea adds digits to end of names — regex to trim
        df = df.withColumn("first", regexp_replace(trim(col("first")), r"\d+$", "")) \
               .withColumn("middle", regexp_replace(trim(col("middle")), r"\d+$", "")) \
               .withColumn("last", regexp_replace(trim(col("last")), r"\d+$", "")) \
               .withColumn("maiden", regexp_replace(trim(col("maiden")), r"\d+$", "")) \
               .withColumn("suffix", regexp_replace(trim(col("suffix")), r"\d+$", "")) \
               .withColumn("birthdate", to_date("birthdate")) \
               .withColumn("deathdate", to_date("deathdate"))

    elif source == "encounters":
        # Timestamp cast and text field cleanup
        df = df.withColumn("start", to_timestamp("start")) \
               .withColumn("stop", to_timestamp("stop")) \
               .withColumn("encounterclass", trim(col("encounterclass"))) \
               .withColumn("description", trim(col("description"))) \
               .withColumn("reasondescription", trim(col("reasondescription"))) \
               .withColumn("base_encounter_cost", col("base_encounter_cost").cast("double")) \
               .withColumn("total_claim_cost", col("total_claim_cost").cast("double")) \
               .withColumn("payer_coverage", col("payer_coverage").cast("double"))

    elif source == "providers":
        # Trim digits from names and clean location fields
        df = df.withColumn("name", regexp_replace(trim(col("name")), r"(\D+)\d+\s+(\D+)\d+", r"\1 \2")) \
               .withColumn("speciality", trim(col("speciality"))) \
               .withColumn("address", trim(col("address"))) \
               .withColumn("city", trim(col("city"))) \
               .withColumn("state", trim(col("state"))) \
               .withColumn("zip_clean",
                    when(length(col("zip")) < 5, lpad(col("zip").cast("string"), 5, "0"))
                    .otherwise(col("zip").cast("string").substr(1, 5)))

    elif source == "organizations":
        # Clean contact and location info, fix ZIPs and phone format
        df = df.withColumn("name", trim(col("name"))) \
               .withColumn("address", trim(col("address"))) \
               .withColumn("city", trim(col("city"))) \
               .withColumn("state", trim(col("state"))) \
               .withColumn("phone", trim(col("phone"))) \
               .withColumn("zip_cleaned",
                    when(length(col("zip")) < 5, lpad(col("zip").cast("string"), 5, "0"))
                    .otherwise(col("zip").cast("string").substr(1, 5))) \
               .withColumn("phone",
                    regexp_replace(col("phone"), r"[^0-9]", "").substr(1, 10)) \
               .withColumn("revenue", col("revenue").cast("double")) \
               .withColumn("utilization", col("utilization").cast("int"))

    elif source == "conditions":
        # Cast dates and extract condition type from description
        df = df.withColumn("start", to_date("start")) \
               .withColumn("stop", to_date("stop")) \
               .withColumn("description", trim(col("description"))) \
               .withColumn("condition_type", regexp_extract(col("description"), r"\((.*?)\)", 1))

    else:
        raise ValueError(f"Unknown source: {source}")

    return df


In [0]:
for src in sources_to_process:
    input_path = f"{bronze_base_path}/{src}"
    output_path = f"{silver_base_path}/{src}"
    
    print(f"🔄 Processing {src} from {input_path} -> {output_path}")
    
    try:
        df_bronze = spark.read.format("delta").load(input_path)
        df_silver = silver_transform(df_bronze, src)
        df_silver.write.format("delta").mode("overwrite").save(output_path)
        print(f"✅ Successfully wrote Silver table: {src}")
    except Exception as e:
        print(f"❌ Error processing {src}: {e}")


🔄 Processing patients from /FileStore/bronze/patients → /FileStore/silver/patients
✅ Successfully wrote Silver table: patients
🔄 Processing encounters from /FileStore/bronze/encounters → /FileStore/silver/encounters
✅ Successfully wrote Silver table: encounters
🔄 Processing providers from /FileStore/bronze/providers → /FileStore/silver/providers
✅ Successfully wrote Silver table: providers
🔄 Processing organizations from /FileStore/bronze/organizations → /FileStore/silver/organizations
✅ Successfully wrote Silver table: organizations
🔄 Processing conditions from /FileStore/bronze/conditions → /FileStore/silver/conditions
✅ Successfully wrote Silver table: conditions
