# Lakehouse Schema Evolution Rules
---

In [15]:
# Set environment variables
RESOURCE_PATH="."
ADD_COLUMNS_PATH = f"{RESOURCE_PATH}/add_columns"
DROP_COLUMNS_PATH = f"{RESOURCE_PATH}/drop_columns"
CHANGE_COLUMNS_PATH = f"{RESOURCE_PATH}/change_column_types"
OUTPUT_PATH="./output"
ADD_COLUMNS_OUTPUT_PATH = f"{OUTPUT_PATH}/add_columns"
DROP_COLUMNS_OUTPUT_PATH = f"{OUTPUT_PATH}/drop_columns"
CHANGE_COLUMNS_OUTPUT_PATH = f"{OUTPUT_PATH}/change_column_types"

import warnings
warnings.filterwarnings('ignore')

# Adding Data Columns (Part I)
##  Add Columns in flat schema
---

In [16]:
# Add Columns in flat schema
flat_schema_df = spark.read.format("json").option("multiLine","true") \
                      .load(f"{ADD_COLUMNS_PATH}/flat_schema/flat_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
flat_schema_df.printSchema()
flat_schema_df.write.format("delta").mode("overwrite").save(f"{ADD_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
flat_schema_df_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
flat_schema_df_reread.select("record_number","name", "description").show(truncate=False)

# Read in json data files with columns added to the flat schema
flat_schema_df_latest = spark.read.format("json").option("multiLine","true") \
                             .load(f"{ADD_COLUMNS_PATH}/flat_schema/flat_schema_add_columns.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
flat_schema_df_latest.printSchema()
flat_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{ADD_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
flat_schema_df_latest_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
flat_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
flat_schema_df_latest_reread.show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- record_number: long (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+-------------+-----+------------------------------+
|record_number|name |description                   |
+-------------+-----+------------------------------+
|1            |oftra|One framework to rule them all|
+-------------+-----+------------------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- description: string (nullable = true)
 |-- language: string (nullable = true)
 |-- name: string (nullable = true)
 |-- record_number: long (nullable = true)
 |-- version: string (nullable = true)

---------------------------------------
Merged sche

# Adding Data Columns (Part I)
## Add Columns inside struct schema
---

In [17]:
# Add Columns inside struct schema
struct_schema_df = spark.read.format("json").option("multiLine","true") \
                        .load(f"{ADD_COLUMNS_PATH}/struct_schema/struct_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
struct_schema_df.printSchema()
struct_schema_df.write.format("delta").mode("overwrite").save(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.show(truncate=False)

# Read in json data with added columns to the struct schema
struct_schema_df_latest = spark.read.format("json").option("multiLine","true") \
                        .load(f"{ADD_COLUMNS_PATH}/struct_schema/struct_schema_add_columns.json")
print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                       .option("mergeSchema", "true") \
                       .save(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df_latest_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- project: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- record_number: long (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+------------------------------------------+
|project                                   |
+------------------------------------------+
|{One framework to rule them all, oftra, 1}|
+------------------------------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- project: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- language: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- record_number: long (nullable = true)
 |-- version: stri

# Adding Data Columns (Part I)
## Add Columns in struct nested in another struct or nested inside an array
--- 

In [18]:
# Add Columns in a struct schema inside an array
from pyspark.sql.functions import explode
ordered_df = spark.read.format("json").option("multiLine","true") \
                  .load(f"{ADD_COLUMNS_PATH}/struct_schema/struct_schema_inside_array.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")

ordered_df.printSchema()
ordered_df.write.format("delta").mode("overwrite") \
          .save(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")
struct_schema_df_reread = spark.read.format("delta") \
                               .load(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.select(explode("projects").alias("project")).show(truncate=False)

# Read in some json files with a couple of added columns to the flat schema
struct_schema_df_latest = spark.read.format("json").option("multiLine","true") \
                               .load(f"{ADD_COLUMNS_PATH}/struct_schema/struct_schema_inside_array_add_columns.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                       .option("mergeSchema", "true") \
                       .save(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")
struct_schema_df_latest_reread = spark.read.format("delta") \
                                      .load(f"{ADD_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("--------------------------------------------------------")
print("Exploded Array Output when querying the original dataset")
print("--------------------------------------------------------")
struct_schema_df_latest_reread.select(explode("projects").alias("project"), "version").show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- projects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- language: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- record_number: long (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+------------------+
|project           |
+------------------+
|{Python, oftra, 5}|
|{Python, oftra, 6}|
+------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- projects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- language: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- record_number: long (nullab

# Adding Data Columns (Part I)
## Add Columns in array of arrays
--- 

In [19]:
# Adding columns inside an array of array of arrays
from pyspark.sql.functions import explode
df = spark.read.format("json").option("multiLine","true").load(f"{ADD_COLUMNS_PATH}/array_schema/matrix.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")

df.printSchema()
df.write.format("delta").mode("overwrite").save(f"{ADD_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")
df_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
df_reread.select(explode("matrix").alias("matrix")).show(truncate=False)

# Read in json data with added array inside array of arrays
df_latest = spark.read.format("json").option("multiLine","true") \
                        .load(f"{ADD_COLUMNS_PATH}/array_schema/tensor.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
df_latest.printSchema()
df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{ADD_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")
df_latest_reread = spark.read.format("delta").load(f"{ADD_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
df_latest_reread.select(explode("matrix").alias("matrix")).show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- matrix: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: long (containsNull = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+--------------------+
|matrix              |
+--------------------+
|[[10, 20], [30, 40]]|
|[[50, 60], [70, 80]]|
+--------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- matrix: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: long (containsNull = true)



AnalysisException: Failed to merge fields 'matrix' and 'matrix'. Failed to merge incompatible data types LongType and ArrayType(LongType,true)

# Drop Columns (Part II)
## Drop Columns in a flat schema
---

In [20]:
# Drop Columns in a flat schema
flat_schema_df = spark.read.format("json").option("multiLine","true") \
                      .load(f"{DROP_COLUMNS_PATH}/flat_schema/flat_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
ordered_df = flat_schema_df.select("record_number","name","description","language","version")
ordered_df.printSchema()
ordered_df.write.format("delta").mode("overwrite").save(f"{DROP_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

flat_schema_df_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
flat_schema_df_reread.show(truncate=False)

# Read in json data with a couple of deleted columns to the flat schema
flat_schema_df_latest = spark.read.format("json") \
                             .schema("record_number long, name string, description string") \
                             .option("multiLine","true") \
                             .load(f"{DROP_COLUMNS_PATH}/flat_schema/flat_schema_drop_columns.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
flat_schema_df_latest.printSchema()
flat_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{DROP_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
flat_schema_df_latest_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
flat_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
flat_schema_df_latest_reread.show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- record_number: long (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- language: string (nullable = true)
 |-- version: string (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+-------------+-----+------------------------------+--------+-------+
|record_number|name |description                   |language|version|
+-------------+-----+------------------------------+--------+-------+
|1            |oftra|One framework to rule them all|Python  |3.11   |
+-------------+-----+------------------------------+--------+-------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- record_number: long (nullable = true)
 |-- name: string (nullable = true)
 |-- descr

# Drop Columns (Part II)
## Drop Columns in a struct schema
---

In [21]:
# Drop Columns in a struct schema
struct_schema_df = spark.read.format("json").option("multiLine","true").load(f"{DROP_COLUMNS_PATH}/struct_schema/struct_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
struct_schema_df.printSchema()
struct_schema_df.write.format("delta").mode("overwrite").save(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.show(truncate=False)

# Read in some json files with a couple of deleted columns to the flat schema
struct_schema_df_latest = spark.read.format("json").option("multiLine","true") \
                        .load(f"{DROP_COLUMNS_PATH}/struct_schema/struct_schema_drop_columns.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df_latest_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- project: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- language: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- record_number: long (nullable = true)
 |-- version: string (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+----------------------------------------+
|projects                                |
+----------------------------------------+
|[{Python, oftra, 3}, {Python, oftra, 4}]|
|[{Python, oftra, 5}, {Python, oftra, 6}]|
+----------------------------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- project: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- name: string (nullable = 

# Drop Columns (Part II)
## Drop Columns in a struct schema inside an array
--- 

In [22]:
# Drop Columns in a struct schema inside an array
from pyspark.sql.functions import explode
ordered_df = spark.read.format("json").option("multiLine","true").load(f"{DROP_COLUMNS_PATH}/struct_schema/struct_schema_inside_array.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")

ordered_df.printSchema()
ordered_df.write.format("delta").mode("overwrite").save(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")
struct_schema_df_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.select(explode("projects").alias("project"), "version").show(truncate=False)

# Read in some json files with a couple of deleted columns to the flat schema
struct_schema_df_latest = spark.read.format("json").option("multiLine","true") \
                        .load(f"{DROP_COLUMNS_PATH}/struct_schema/struct_schema_inside_array_drop_columns.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")
struct_schema_df_latest_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/struct_schema_inside_array.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.select(explode("projects").alias("project"), "version").show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- projects: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- language: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- record_number: long (nullable = true)
 |-- version: string (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+--------------------------------------------------+-------+
|project                                           |version|
+--------------------------------------------------+-------+
|{One framework to rule them all, Python, oftra, 3}|3.11   |
|{One framework to rule them all, Python, oftra, 4}|3.11   |
+--------------------------------------------------+-------+

---------------------------------------
Schema shape of newer incoming 

# Drop Columns (Part II)
## Drop Columns in free form array of arrays
---

In [23]:
# Drop Columns in free form array of arrays

from pyspark.sql.functions import explode
df = spark.read.format("json").option("multiLine","true").load(f"{DROP_COLUMNS_PATH}/array_schema/tensor.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")

df.printSchema()
df.write.format("delta").mode("overwrite").save(f"{DROP_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")
df_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
df_reread.select(explode("matrix").alias("matrix")).show(truncate=False)

# Read in some json files with a couple of deleted columns in the array of array
df_latest = spark.read.format("json").option("multiLine","true") \
                        .load(f"{DROP_COLUMNS_PATH}/array_schema/matrix.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
df_latest.printSchema()
df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{DROP_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")
df_latest_reread = spark.read.format("delta").load(f"{DROP_COLUMNS_OUTPUT_PATH}/array_inside_array.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
df_latest_reread.select(explode("matrix").alias("matrix")).show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- matrix: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: long (containsNull = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+-------------------------------------------+
|matrix                                     |
+-------------------------------------------+
|[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]       |
|[[[9, 10], [11, 12]], [[13, 14], [15, 16]]]|
+-------------------------------------------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- matrix: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: ar

AnalysisException: Failed to merge fields 'matrix' and 'matrix'. Failed to merge incompatible data types ArrayType(LongType,true) and LongType

# Change Column Data Types (Part III)
## Change column types in flat schema
---

In [24]:
# Change column types in flat schema

flat_schema = f"rn_byte_short byte, rn_byte_int byte, rn_short_int short, \
                         rn_int_long long, name string, description string"
flat_schema_df = spark.read.format("json") \
                      .schema(flat_schema) \
                      .option("multiLine","true") \
                      .load(f"{CHANGE_COLUMNS_PATH}/flat_schema/flat_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
flat_schema_df.printSchema()
flat_schema_df.write.format("delta").mode("overwrite") \
              .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
flat_schema_df_reread = spark.read.format("delta") \
                             .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
flat_schema_df_reread.show(truncate=False)

# Read in json files with widened types
widened_schema = f"rn_byte_short short, rn_byte_int int, rn_short_int integer, \
                         rn_int_long long, name string, description string"
                         
flat_schema_df_latest = spark.read.format("json") \
                        .schema(widened_schema) \
                        .option("multiLine","true") \
                        .load(f"{CHANGE_COLUMNS_PATH}/flat_schema/flat_schema_change_column_types.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
flat_schema_df_latest.printSchema()
flat_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/flat_schema.delta")
flat_schema_df_latest_reread = spark.read.format("delta").load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/flat_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
flat_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
flat_schema_df_latest_reread.show(truncate=False)

---------------------------------
Schema shape of original data is 
---------------------------------
root
 |-- rn_byte_short: byte (nullable = true)
 |-- rn_byte_int: byte (nullable = true)
 |-- rn_short_int: short (nullable = true)
 |-- rn_int_long: long (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)

-----------------------------------------
Output when querying the original dataset
-----------------------------------------
+-------------+-----------+------------+-----------+-----+-----------+
|rn_byte_short|rn_byte_int|rn_short_int|rn_int_long|name |description|
+-------------+-----------+------------+-----------+-----+-----------+
|1            |1          |200         |1          |oftra|Only oftra |
+-------------+-----------+------------+-----------+-----+-----------+

---------------------------------------
Schema shape of newer incoming data is 
---------------------------------------
root
 |-- rn_byte_short: short (nullable = 

# Change Column Data Types (Part III)
## Change column types in struct schema
---

In [None]:
# Change column types in struct schema
struct_schema = f"project STRUCT<rn_byte_short:byte, rn_byte_int:byte, rn_short_int:short, \
                         rn_int_long:long, name:string, description:string>"
struct_schema_df = spark.read.format("json") \
                        .schema(struct_schema) \
                        .option("multiLine","true") \
                        .load(f"{CHANGE_COLUMNS_PATH}/struct_schema/struct_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
struct_schema_df.printSchema()
struct_schema_df.write.format("delta").mode("overwrite") \
                .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df_reread = spark.read.format("delta") \
                               .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.show(truncate=False)

# Read in json files with widened types
widened_schema = f"project STRUCT<rn_byte_short:short, rn_byte_int:int, rn_short_int:integer, \
                         rn_int_long:long, name:string, description:string>"
                         
struct_schema_df_latest = spark.read.format("json") \
                        .schema(widened_schema) \
                        .option("multiLine","true") \
                        .load(f"{CHANGE_COLUMNS_PATH}/struct_schema/struct_schema_change_column_types.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                     .option("mergeSchema", "true").save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_schema.delta")
struct_schema_df_latest_reread = spark.read.format("delta").load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.show(truncate=False)

# Change Column Data Types (Part III)
## Change column types in struct inside array schema
---

In [None]:
# Change column types in struct inside array schema
from pyspark.sql.functions import explode, col
struct_schema = f"projects ARRAY<STRUCT<rn_byte_short:byte, rn_byte_int:byte, rn_short_int:short, \
                         rn_int_long:long, name:string, description:string>>"
struct_schema_df = spark.read.format("json") \
                        .schema(struct_schema) \
                        .option("multiLine","true") \
                        .load(f"{CHANGE_COLUMNS_PATH}/array_schema/struct_in_array_schema.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
struct_schema_df.printSchema()
struct_schema_df.write.format("delta").mode("overwrite") \
                .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_in_array_schema.delta")
struct_schema_df_reread = spark.read.format("delta") \
                               .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_in_array_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.select(explode("projects").alias("project")).show(truncate=False)

# Read in json files with widened types
widened_schema = f"projects ARRAY<STRUCT<rn_byte_short:short, rn_byte_int:int, rn_short_int:integer, \
                         rn_int_long:long, name:string, description:string>>"
                         
struct_schema_df_latest = spark.read.format("json") \
                        .schema(widened_schema) \
                        .option("multiLine","true") \
                        .load(f"{CHANGE_COLUMNS_PATH}/array_schema/struct_in_array_change_colum_types.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                       .option("mergeSchema", "true") \
                       .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_in_array_schema.delta")
struct_schema_df_latest_reread = spark.read.format("delta") \
                                      .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/struct_in_array_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.select(explode("projects").alias("project")).show(truncate=False)

# Change Column Data Types (Part III)
## Change column types in array inside array schema
---

In [None]:
# Change column types in array inside array schema
from pyspark.sql.functions import explode, col
struct_schema = f"matrix ARRAY<ARRAY<ARRAY<short>>>"
struct_schema_df = spark.read.format("json") \
                .schema(struct_schema) \
                 .option("multiLine","true") \
                 .load(f"{CHANGE_COLUMNS_PATH}/array_schema/matrix.json")

print("---------------------------------")
print("Schema shape of original data is ")
print("---------------------------------")
struct_schema_df.printSchema()
struct_schema_df.write.format("delta").mode("overwrite") \
                .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/array_in_array_schema.delta")
struct_schema_df_reread = spark.read.format("delta") \
                               .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/array_in_array_schema.delta")

print("-----------------------------------------")
print("Output when querying the original dataset")
print("-----------------------------------------")
struct_schema_df_reread.select(explode("matrix").alias("matrix")).show(truncate=False)

# Read in json files with widened types
widened_schema = f"matrix ARRAY<ARRAY<ARRAY<int>>>"
                         
struct_schema_df_latest = spark.read.format("json") \
                               .schema(widened_schema) \
                               .option("multiLine","true") \
                               .load(f"{CHANGE_COLUMNS_PATH}/array_schema/matrix_int.json")

print("---------------------------------------")
print("Schema shape of newer incoming data is ")
print("---------------------------------------")
struct_schema_df_latest.printSchema()
struct_schema_df_latest.write.format("delta").mode("append") \
                       .option("mergeSchema", "true") \
                       .save(f"{CHANGE_COLUMNS_OUTPUT_PATH}/array_in_array_schema.delta")
struct_schema_df_latest_reread = spark.read.format("delta") \
                                      .load(f"{CHANGE_COLUMNS_OUTPUT_PATH}/array_in_array_schema.delta")

print("---------------------------------------")
print("Merged schema of target delta table is ")
print("---------------------------------------")
struct_schema_df_latest_reread.printSchema()

print("-----------------------------------")
print("Output when querying merged dataset")
print("-----------------------------------")
struct_schema_df_latest_reread.select(explode("matrix").alias("matrix")).show(truncate=False)