**schema evolution** allows users to change schema to support changing data structure. This is common use case for data ingestion.

**option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")**
- **failOnNewColumns** is STRICT schema implies will not accept any changes and stops the pipeline. change to either `addNewColumns` or `rescue` if you want to accept the evolution
- **addNewColumns** will automatically addes new columns and continues
- **rescue** doesnâ€™t evolve the schema. All unexpected fields fall into a special column called **_rescued_data** and stores data in JSON format

**Note:**
- `addNewColumns` mode is the **default** when a _**schema is not provided**_
- **none** is the default when _**you provide a schema**_ (`addNewColumns` is not allowed when the schema of the streamis provided)

Lets understand Expectations for **_rescued_data** IS NULL
- No errors occur during ingestion related to schema mismatch or data parsing 
- all incoming data should align with the defined schema
- Raise an alert / failure if we see any non conforming rows

This ensures strict data quality enforcement to accept only valid schema rows and rows that are procerly parsed. Additionally, it also helps us to identify error detection in early stages of the pipeline

**Note**: If _rescued_data IS NOT NULL implies that row is either does not adher to schema or some parsing error. 

**Medallion architecture**
- Bronze(dlt format) table -> Read from storage(aka raw data csv,tsv,json,xml,parquet,db source, etc) and create a bronze table which represents as-is data from source
- Silver(dlt format) table -> Apply required exceptions/validations/schema datatype changes/business policies/etc

**Files to be uploaded one after another for each run to show the demo**
- customer_data_1.json: Base file(first file that we upload)
- customer_data_2.json: Additional columns (age, gender,loyaltystatus) for existing customers values change plus new customer(s)
- customer_data_3.json: No structure changes however existing customers value changes plus additional row(s)
- customer_data_4.json: new column added (CreditScore) with existing customers value changes. No new row(s)




In [0]:
#Libraries management
from pyspark import pipelines as pl
from pyspark.sql.functions import *
from pyspark.sql.types import *

volume_path="/Volumes/workspace/sd_schema/datastore/customer_*.json" 


In [0]:
#bronze layer table: cust_bronze_sd
pl.create_streaming_table("cust_bronze_sd_rescue")

# Ingest the raw data into the bronze table using append flow
@pl.append_flow(
  target = "cust_bronze_sd_rescue", #object name
  name = "cust_bronze_sd_rescue_ingest_flow" #flow name
)
def cust_bronze_sd_rescue_ingest_flow():
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.inferColumnTypes", "true") #auto scan schema 
          #.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns") # schema customer_data_1.json is different than customer_data_2.json so it fails with  [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] excetion and stops processing
          .option("cloudFiles.schemaEvolutionMode", "rescue")
          .load(f"{volume_path}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp())\
           .withColumn("source_filename", col("_metadata.file_path")) 


In [0]:
# Function to handle DATATYPE changes
# Logic to process the fields if data type changes. There are many ways it can be handled
#    (*) Without Overwrite of data in silver layer
#        Create additional table every time a colmn datatype changes and then create view on top of it as UNION to all new tables
#            PROS: Technically we are not overwriting the data hence no reloading
#            CONS: New table will created every time datatype changes 
#
#    (*) Overwrite data in both Bronze and Silver layers
#        I am not sure how this works for streams. I have not done much exploration in this method. Hope it works
#        Here aswell we use _rescued_data column to check quality expectation for schema update
#            PROS: No need to reload bronze layer table as _rescued_data has all desired changed and can be used to process
#            CONS: Less code changes because _rescued_data doesnt need any additional logic to hendle. 
#                  However all raw data to be stored at begining. Reload of both tables
#
#    (*) Merge and overwrite data in silver layer (below function example does the same implementation)
#            PROS: No need to reload bronze layer table as _rescued_data has all desired changed and can be used to process
#            CONS: Table in silver need to be completely reloaded
# NOTE: The above options technically doesnt handle column renames. We need to write additional logic to handle column renames
#       I would say we could follow the views logic to load renamed column as new field and then in view drop old column and use new renamed column
#       However, we need to merge the data in silver layer and hence we need to reload the silver layer table

def process__rescue_data_datatype_change(df, target_schema: StructType):
    df = df.withColumn(
        "_rescued_data_modified", 
        from_json(col("_rescued_data"), MapType(StringType(), StringType()))
    )
    
    for field in target_schema.fields:
        data_type = field.dataType
        column_name = field.name
        
        if column_name in df.columns:
            # Check if value is in rescued_data
            key_condition = expr(f"_rescued_data_modified IS NOT NULL AND map_contains_key(_rescued_data_modified, '{column_name}')")
            
            # Use rescued value if present, otherwise use existing column
            rescued_value = when(
                key_condition, 
                col("_rescued_data_modified").getItem(column_name).cast(data_type)
            ).otherwise(col(column_name).cast(data_type))
            
            df = df.withColumn(column_name, rescued_value)
    
    df = df.drop('_rescued_data_modified')
    df = df.withColumn('_rescued_data', lit(None).cast(StringType()))
    
    return df

In [0]:
# Function to handle adding NEW FIELDS 
def process__rescue_data_new_fields(df, expected_fields: list):
    df = df.withColumn(
        "_rescued_data_json_to_map", 
        from_json(col("_rescued_data"), MapType(StringType(), StringType()))
    )
    
    for key in expected_fields:
        if key != "_file_path":
            if key not in df.columns:
                df = df.withColumn(
                    key,
                    col("_rescued_data_json_to_map").getItem(key)
                )
            else:
                df = df.withColumn(
                    key,
                    when(
                        col("_rescued_data_json_to_map").getItem(key).isNotNull(),
                        col("_rescued_data_json_to_map").getItem(key)
                    ).otherwise(col(key))
                )
    
    df = df.drop('_rescued_data_json_to_map')
    return df

In [0]:
# # -----------------------------------------------------------------------------------------------------
# #plain implementation without processing _rescue_data field. Use this when you upload customer_data_1.json
# # -----------------------------------------------------------------------------------------------------
# pl.create_streaming_table(
#   name = "cust_silver_sd_rescue",
#   expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
#   )
# @pl.append_flow(
#   target = "cust_silver_sd_rescue",
#   name = "cust_silver_sd_rescue_clean_flow"
# )
# def cust_silver_sd_rescue_clean_flow():
#   return (
#       spark.readStream.table("cust_bronze_sd_rescue")
#   )

In [0]:
# -----------------------------------------------------------------------------------------------------
# uncomment this code before uploading customer_data_2.json. Then upload the file and run the pipeline
# -----------------------------------------------------------------------------------------------------
# we know that when we process customer_data_2.json file there are new fields in schema to be added and at same time we are planing for datatype chage
# for an already existing field that came with customer_data_1.json file. Since there is a datatype change for existing field so we need to perform
# full refresh (Run pipeline with full table refresh)
updated_datatypes = StructType([
    StructField("SignupDate", DateType(), True),
    StructField("age", IntegerType(), True),
    StructField("CreditScore", IntegerType(), True)
])


expected_new_fields = [
    # "age",             
    # "gender",          
    # "loyaltyStatus",   
    "CreditScore"      
]

pl.create_streaming_table(
  name = "cust_silver_sd_rescue",
  expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
  target = "cust_silver_sd_rescue",
  name = "cust_silver_sd_rescue_clean_flow"
)
def demo_cust_silver_sd_clean_flow():
  df = (
    spark.readStream.table("cust_bronze_sd_rescue")
  )
  df = process__rescue_data_new_fields(df, expected_new_fields)
  df = process__rescue_data_datatype_change(df, updated_datatypes)
  return df

