In [0]:
%run ../../config/project_config

In [0]:
%run ../../src/utils

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, current_timestamp

In [0]:
excluded_cols = ["load_dt", "source_file", "_rescued_data"]

In [0]:
source_tables = [
    f"{CATALOG_NAME}.{SCHEMA_BRONZE}.county_metrics_csv_dlt",
    f"{CATALOG_NAME}.{SCHEMA_BRONZE}.county_metrics_json",
    f"{CATALOG_NAME}.{SCHEMA_BRONZE}.county_metrics_xml"
]

In [0]:
unified_df = None
for t in source_tables:
    raw_df = spark.table(t).drop(*excluded_cols)

    for col_name in raw_df.columns:
        # 2. Convert to string and replace empty strings with None immediately
        raw_df = raw_df.withColumn(col_name, F.col(col_name).cast("string"))
        raw_df = raw_df.withColumn(
            col_name, 
            F.when(F.col(col_name) == "", None).otherwise(F.col(col_name))
        )
    
    if unified_df is None:
        unified_df = raw_df
    else:
        unified_df = unified_df.unionByName(raw_df, allowMissingColumns=True)

In [0]:
silver_df = standardize_dataframe_headers(unified_df)

In [0]:
is_malformed = F.col("region_name").isNull() | F.col("date").isNull()
df_quarantine = silver_df.filter(is_malformed)
df_clean = silver_df.filter(~is_malformed).dropDuplicates(["region_name", "date"])

In [0]:
# Save to Bronze Quarantine table
(df_quarantine.write
    .format("delta")
    .mode("append") # Use append to keep a history of bad records
    .option("mergeSchema", "true")
    .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_BRONZE}.quarantine_records"))

In [0]:
# Write to Silver with Liquid Clustering enabled
(df_clean.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .clusterBy("region_name", "date") 
    .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_SILVER}.county_metrics"))