In [0]:
#Libraries management
from pyspark import pipelines as pl
from pyspark.sql.functions import *
from pyspark.sql.types import *

volume_path="/Volumes/workspace/damg7370/datastore/schema_drift/customer_*.json" 

In [0]:
#bronze layer table: cust_bronze_sd
pl.create_streaming_table("demo_cust_bronze_sd")

# Ingest the raw data into the bronze table using append flow
@pl.append_flow(
  target = "demo_cust_bronze_sd", #object name
  name = "demo_cust_bronze_sd_ingest_flow" #flow name
)
def demo_cust_bronze_sd_ingest_flow():
  df = (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.inferColumnTypes", "true") #auto scan schema 
          #.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns") # schema customer_data_1.json is different than customer_data_2.json so it fails with  [UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] excetion and stops processing
          .option("cloudFiles.schemaEvolutionMode", "rescue")
          .load(f"{volume_path}")
  )
  return df.withColumn("ingestion_datetime", current_timestamp())\
           .withColumn("source_filename", col("_metadata.file_path")) 

In [0]:

'''
pl.create_streaming_table(
   name = "demo_cust_silver_sd",
   expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
   )
@pl.append_flow(
   target = "demo_cust_silver_sd",
   name = "demo_cust_silver_sd_clean_flow"
 )
def demo_cust_silver_sd_clean_flow():
   return (
       spark.readStream.table("demo_cust_bronze_sd")
   )
'''

In [0]:

def process__rescue_data_datatype_change(df, target_schema: StructType):
    #Parse the _rescued_data json to a MAP (Key,Value) type and store in _rescued_data_modified column
    df = df.withColumn("_rescued_data_modified", from_json(col("_rescued_data"), MapType(StringType(), StringType())))
    
    for field in target_schema.fields:
        data_type = field.dataType
        column_name = field.name

        # Check if "_rescue_data" is not null and if the key exists
        # pyspark.sql.functions.map_contains_key function in PySpark is used to check if a specified key exists within a MapType column in a DataFrame. returns T/F
        key_condition = expr(f"_rescued_data_modified IS NOT NULL AND map_contains_key(_rescued_data_modified, '{column_name}')")
        
        # Extract the rescued value for this column, if it exists, and cast it to the target data type
        rescued_value = when(key_condition, col("_rescued_data_modified").getItem(column_name).cast(data_type)).otherwise(col(column_name).cast(data_type))
        
        # Update the DataFrame with the merged column
        df = df.withColumn(column_name, rescued_value)
        df = df.withColumn(column_name, col(column_name).cast(data_type))
        
    df = df.drop('_rescued_data_modified')

    # Setting the _rescued_data to null after processing since we use the column to check qualit expectation for schema update
    df = df.withColumn('_rescued_data', lit(None).cast(StringType()))
    return df
    

In [0]:

# Function to handle adding NEW FIELDS 
def process__rescue_data_new_fields(df):

    #Add all fields from _rescued_data to key map
    df = df.withColumn(
        "_rescued_data_json_to_map", 
        from_json(
            col("_rescued_data"), 
            MapType(StringType(), StringType())
        )
    )

    # Extract all keys from _rescued_data_map_keys
    df = df.withColumn("_rescued_data_map_keys", map_keys(col("_rescued_data_json_to_map")))

    # Get all keys in all rows as a new DataFrame
    df_keys = df.select(
        explode(
            map_keys(col("_rescued_data_json_to_map"))
        ).alias("rescued_key")
    ).distinct()

    # Collect keys as a list (only if df is not streaming)
    # If streaming, you must provide the list of possible keys another way
    new_keys = [row["rescued_key"] for row in df_keys.collect()] if not df.isStreaming else []

    # Add new columns for each key
    for key in new_keys:
        if key != "_file_path":
            df = df.withColumn(
                key,
                col("_rescued_data_json_to_map").getItem(key).cast(StringType())
            )

    #***Ehnancement can be done by adding additional logic 
    #***  to exclude columns that are already in dataframe(Substract those columns)
    #***  to infer datatype for new columns and use infered datatype instead of static stringtype
    #***  additionally check if each column exists and dataframe has rows on each transformation and raise exception before using it

    return df


In [0]:

updated_datatypes = StructType([
  # define the column signuoDate as DATE type and also make it nullable (Make 3rd argument False if you want to make it non nullable)
  StructField("signupDate", DateType(), True) 
])

pl.create_streaming_table(
  name = "demo_cust_silver_sd",
  expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "CustomerID IS NOT NULL"}
)

@pl.append_flow(
  target = "demo_cust_silver_sd",
  name = "demo_cust_silver_sd_clean_flow"
)
def demo_cust_silver_sd_clean_flow():
  df = (
    spark.readStream.table("demo_cust_bronze_sd")
  )
  df = process__rescue_data_new_fields(df)
  df = process__rescue_data_datatype_change(df, updated_datatypes)
  return df
