In [0]:
from pyspark import pipelines as pl
from pyspark.sql.functions import *
from pyspark.sql.types import *

# update this to point to your local test files or mounted volume
volume_path = "/Volumes/workspace/damg7370/datastore/json file/customer_*.json"


In [0]:
# Create bronze streaming table (rescue mode)
pl.create_streaming_table("demo_cust_bronze_sd_rescue")

@pl.append_flow(
  target = "demo_cust_bronze_sd_rescue",
  name = "demo_cust_bronze_sd_rescue_ingest_flow"
)
def demo_cust_bronze_sd_rescue_ingest_flow():
  df = (
      spark.readStream
           .format("cloudFiles")
           .option("cloudFiles.format", "json")
           .option("cloudFiles.inferColumnTypes", "true")
           .option("cloudFiles.schemaEvolutionMode", "rescue")
           .load(f"{volume_path}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp())\
           .withColumn("source_filename", col("_metadata.file_path"))


In [0]:
# Function to handle DATATYPE changes by consuming _rescued_data JSON map and casting to target types
def process__rescue_data_datatype_change(df, target_schema: StructType):
    # Parse the _rescued_data JSON to a MAP (Key,Value) type and store in _rescued_data_modified column
    df = df.withColumn("_rescued_data_modified", from_json(col("_rescued_data"), MapType(StringType(), StringType())))
    
    for field in target_schema.fields:
        data_type = field.dataType
        column_name = field.name

        # key exists in rescued map?
        key_condition = expr(f"_rescued_data_modified IS NOT NULL AND map_contains_key(_rescued_data_modified, '{column_name}')")
        
        # If rescued key exists, take it and cast to target type; otherwise use existing column (cast).
        rescued_value = when(key_condition, col("_rescued_data_modified").getItem(column_name).cast(data_type))\
                         .otherwise(col(column_name).cast(data_type))
        
        df = df.withColumn(column_name, rescued_value)
        df = df.withColumn(column_name, col(column_name).cast(data_type))
        
    df = df.drop('_rescued_data_modified')

    # set _rescued_data to null after processing (keeps expect_all_or_drop checks simple)
    df = df.withColumn('_rescued_data', lit(None).cast(StringType()))
    return df


# Function to extract new fields (present in _rescued_data) and add them as columns (string typed)
def process__rescue_data_new_fields(df):
    # parse JSON to map
    df = df.withColumn(
        "_rescued_data_json_to_map", 
        from_json(col("_rescued_data"), MapType(StringType(), StringType()))
    )

    # get keys per row
    df = df.withColumn("_rescued_data_map_keys", map_keys(col("_rescued_data_json_to_map")))

    # get distinct keys across dataframe (non-streaming safe). if streaming, maintain a static list externally.
    df_keys = df.select(explode(map_keys(col("_rescued_data_json_to_map"))).alias("rescued_key")).distinct()

    new_keys = [row["rescued_key"] for row in df_keys.collect()] if not df.isStreaming else []

    for key in new_keys:
        if key != "_file_path":
            # add column using rescued map
            df = df.withColumn(key, col("_rescued_data_json_to_map").getItem(key).cast(StringType()))
    # drop helper column
    df = df.drop("_rescued_data_json_to_map", "_rescued_data_map_keys")
    return df


In [0]:
# Example: updated type we expect for signupDate (simulate datatype change)
updated_datatypes = StructType([
  StructField("signupDate", DateType(), True)
])

# Create silver streaming table (rescue version)
pl.create_streaming_table(
  name = "demo_cust_silver_sd_rescue",
  expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
  target = "demo_cust_silver_sd_rescue",
  name = "demo_cust_silver_sd_rescue_clean_flow"
)
def demo_cust_silver_sd_rescue_clean_flow():
  df = spark.readStream.table("demo_cust_bronze_sd_rescue")
  # add any new fields that were rescued
  df = process__rescue_data_new_fields(df)
  # handle datatype change for columns defined in updated_datatypes
  df = process__rescue_data_datatype_change(df, updated_datatypes)
  # drop _rescued_data or keep it null (we set it null inside function)
  return df


When using Auto Loader, schemaEvolutionMode = "rescue" is defensive: unexpected fields are stored in _rescued_data so the canonical schema stays stable and data engineers can choose when and how to promote new fields and handle datatype changes. schemaEvolutionMode = "addNewColumns" is aggressive: it mutates the schema by creating new top-level columns automatically, which is convenient when producers are stable but risky when producers are noisy or untrusted. In our experiment, rescue kept the bronze schema unchanged and placed new fields into _rescued_data for programmatic resolution; addNewColumns added the new fields directly to the table schema. Datatype changes were safer to handle with rescue since we could parse and cast values deterministically; with addNewColumns datatype changes caused inference conflicts or required additional error handling.

In [0]:
# Create separate bronze table for 'addNewColumns' experiment
pl.create_streaming_table("demo_cust_bronze_sd_addnew")

@pl.append_flow(
  target = "demo_cust_bronze_sd_addnew",
  name = "demo_cust_bronze_sd_addnew_ingest_flow"
)
def demo_cust_bronze_sd_addnew_ingest_flow():
  df = (
      spark.readStream
           .format("cloudFiles")
           .option("cloudFiles.format", "json")
           .option("cloudFiles.inferColumnTypes", "true")
           .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
           .load(f"{volume_path}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp()).withColumn("source_filename", col("_metadata.file_path"))

# Silver for addNewColumns
pl.create_streaming_table(
  name = "demo_cust_silver_sd_addnew",
  expect_all_or_drop = {"valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
  target = "demo_cust_silver_sd_addnew",
  name = "demo_cust_silver_sd_addnew_clean_flow"
)
def demo_cust_silver_sd_addnew_clean_flow():
  # Here we do NOT need to parse _rescued_data (it won't exist for new fields), but we still may need to handle datatype changes (which may fail)
  df = spark.readStream.table("demo_cust_bronze_sd_addnew")
  # optionally: cast fields to expected types if you know schema
  df = df.withColumn("signupDate", to_date(col("signupDate")))  # example
  return df
