In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Schema Evolution Example") \
    .getOrCreate()

In [8]:
# can get in api data as well
_data = [
['EMP001', '{"dept" : "account", "fname": "Ramesh", "lname":"Singh", "skills": ["excel", "tally", "word"]}'],
['EMP002', '{"dept" : "sales", "fname": "Siv", "lname": "Kumar", "skills": ["biking", "sales"]}'],
['EMP003', '{"dept" : "hr", "fname": "MS Raghvan", "skills": ["communication", "soft-skills"]}']
]

In [10]:
# Columns for the data
_cols = ['emp_no', 'raw_data']

df_raw = spark.createDataFrame(data = _data, schema = _cols)
df_raw.printSchema()
df_raw.show(3, False)

root
 |-- emp_no: string (nullable = true)
 |-- raw_data: string (nullable = true)

+------+----------------------------------------------------------------------------------------------+
|emp_no|raw_data                                                                                      |
+------+----------------------------------------------------------------------------------------------+
|EMP001|{"dept" : "account", "fname": "Ramesh", "lname":"Singh", "skills": ["excel", "tally", "word"]}|
|EMP002|{"dept" : "sales", "fname": "Siv", "lname": "Kumar", "skills": ["biking", "sales"]}           |
|EMP003|{"dept" : "hr", "fname": "MS Raghvan", "skills": ["communication", "soft-skills"]}            |
+------+----------------------------------------------------------------------------------------------+



In [13]:
json_schema_df = spark.read.json(df_raw.rdd.map(lambda row:row.raw_data))
json_schema = json_schema_df.schema

In [14]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json
df_details = df_raw.withColumn("parsed_data", from_json(df_raw["raw_data"], json_schema)).drop("raw_data")
df_details.printSchema()

root
 |-- emp_no: string (nullable = true)
 |-- parsed_data: struct (nullable = true)
 |    |-- dept: string (nullable = true)
 |    |-- fname: string (nullable = true)
 |    |-- lname: string (nullable = true)
 |    |-- skills: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



In [16]:
# Lets verify the data
df_details.select("emp_no", "parsed_data.*").show(10, False)

+------+-------+----------+-----+----------------------------+
|emp_no|dept   |fname     |lname|skills                      |
+------+-------+----------+-----+----------------------------+
|EMP001|account|Ramesh    |Singh|[excel, tally, word]        |
|EMP002|sales  |Siv       |Kumar|[biking, sales]             |
|EMP003|hr     |MS Raghvan|NULL |[communication, soft-skills]|
+------+-------+----------+-----+----------------------------+



In [17]:
# We can explode the data further from list
from pyspark.sql.functions import explode
df_details.select("emp_no", "parsed_data.dept", "parsed_data.fname", 
"parsed_data.lname", "parsed_data") \
.withColumn("skills", explode("parsed_data.skills")) \
.drop("parsed_data") \
.show(100, False)

+------+-------+----------+-----+-------------+
|emp_no|dept   |fname     |lname|skills       |
+------+-------+----------+-----+-------------+
|EMP001|account|Ramesh    |Singh|excel        |
|EMP001|account|Ramesh    |Singh|tally        |
|EMP001|account|Ramesh    |Singh|word         |
|EMP002|sales  |Siv       |Kumar|biking       |
|EMP002|sales  |Siv       |Kumar|sales        |
|EMP003|hr     |MS Raghvan|NULL |communication|
|EMP003|hr     |MS Raghvan|NULL |soft-skills  |
+------+-------+----------+-----+-------------+



In [31]:
from pyspark.sql.functions import lit

def process_and_merge_data(existing_df, new_data):
    """
    Process and merge new data into an existing DataFrame with schema evolution.

    Args:
    existing_df (DataFrame): Existing processed DataFrame.
    new_data (list): New data rows to append.

    Returns:
    DataFrame: Updated DataFrame with new data processed.
    """
    # Create a DataFrame for the new data
    new_df_raw = spark.createDataFrame(new_data, ["emp_no", "raw_data"])

    # Infer schema dynamically for the new data
    inferred_schema_df = spark.read.json(new_df_raw.rdd.map(lambda row: row.raw_data))
    inferred_schema = inferred_schema_df.schema

    # Parse new data with inferred schema
    new_df_processed = new_df_raw.withColumn(
        "parsed_data",
        from_json(new_df_raw["raw_data"], inferred_schema)
    ).drop("raw_data")

    # If existing_df is not None, align schemas
    if existing_df is not None:
        # Get the list of columns in the existing and new DataFrames
        existing_columns = set(existing_df.columns)
        new_columns = set(new_df_processed.columns)

        # Add missing columns to existing DataFrame
        for col in new_columns - existing_columns:
            existing_df = existing_df.withColumn(col, lit(None))

        # Add missing columns to new DataFrame
        for col in existing_columns - new_columns:
            new_df_processed = new_df_processed.withColumn(col, lit(None))

        # Perform union with aligned schemas
        combined_df = existing_df.unionByName(new_df_processed, allowMissingColumns=True)
    else:
        # If no existing DataFrame, return new DataFrame
        combined_df = new_df_processed

    return combined_df


In [32]:
# Initialize an empty DataFrame (initial load)
df_existing = None

# Old data
_data = [
    ['EMP001', '{"dept" : "account", "fname": "Ramesh", "lname":"Singh", "skills": ["excel", "tally", "word"]}'],
    ['EMP002', '{"dept" : "sales", "fname": "Siv", "lname": "Kumar", "skills": ["biking", "sales"]}'],
    ['EMP003', '{"dept" : "hr", "fname": "MS Raghvan", "skills": ["communication", "soft-skills"]}']
]
df_existing = process_and_merge_data(df_existing, _data)

df_existing.show(truncate=False)

+------+----------------------------------------------------+
|emp_no|parsed_data                                         |
+------+----------------------------------------------------+
|EMP001|{account, Ramesh, Singh, [excel, tally, word]}      |
|EMP002|{sales, Siv, Kumar, [biking, sales]}                |
|EMP003|{hr, MS Raghvan, NULL, [communication, soft-skills]}|
+------+----------------------------------------------------+



In [34]:

# # New data
# new_data = [
#     ['EMP001', '{"dept" : "account", "fname": "Ramesh", "age": 30, "skills": {"primary": "excel", "secondary": "tally"}}'],
#     ['EMP002', '{"dept" : "sales", "fname": "Siv", "age": 25, "details": {"city": "Mumbai", "zip": "400001"}}'],
#     ['EMP003', '{"dept" : "hr", "fname": "MS Raghvan", "skills": {"primary": "soft-skills"}}']
# ]
# df_existing = process_and_merge_data(df_existing, new_data)

# # Display the final DataFrame
# df_existing.printSchema()
# df_existing.show(truncate=False)