## Refrenece: https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html

In [0]:
# delete existing DB and folders consisting of old checkpoints and updated files
dbutils.fs.rm('dbfs:/user/hive/warehouse/stream.db',True)
dbutils.fs.rm('dbfs:/FileStore/streaming',True)

Out[1]: True

In [0]:
%sql
# In the SQL command DROP DATABASE IF EXISTS stream CASCADE,
# the keyword CASCADE is an option
# used to ensure that the database is dropped along with all of its dependent objects
# or associated objects (like tables, views, functions, schemas, etc.)
DROP DATABASE IF EXISTS stream CASCADE;
CREATE DATABASE IF NOT EXISTS stream


## AutoLoader

In [0]:
# create a fresh folder in dbfs
source_dir = 'dbfs:/FileStore/streaming/'

In [0]:
# What is role of schemaLocation?
# So the schema location path is required because first when it is trying to read the file, it is going
# to understand the schema of this particular data frame.
# So auto loader will first try to read the 100 files or the first 50 GB files.
# And it is going to conclude that this is the schema which it is going to expect.

df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option('header','true')\
        .load(source_dir)

In [0]:
# If you see the output, every column is in "string" format,
# which means it just storing the schema location
# You can see every column is string.

# Although we have the integers in the citizens column.

# And the reason is you are just storing the schema location,
# but you are not inferring the schema.

# So infer schema is something you need to
# tell what schema that is present in this CSV files.

In [0]:
# when inferColumnTypes" is "true" then it will reflect correct data type of each column

In [0]:
df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('header','true')\
        .load(source_dir)

In [0]:
dbutils.fs.ls(f'{source_dir}/schemaInfer')

Out[13]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/', name='_schemas/', size=0, modificationTime=0)]

In [0]:
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')
# you can also see in UI in dbfs

Out[14]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/0', name='0', size=274, modificationTime=1701879153000)]

In [0]:
%sql

SELECT *
FROM JSON.`dbfs:/FileStore/streaming/schemaInfer/_schemas/0`

_corrupt_record,dataSchemaJson,partitionSchemaJson
v1,,
,"{""type"":""struct"",""fields"":[{""name"":""Country"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Citizens"",""type"":""integer"",""nullable"":true,""metadata"":{}}]}","{""type"":""struct"",""fields"":[]}"


### SchemaHints

In [0]:
# What is schemaHints',"Citizens LONG"?
# if I want to change "Citizens"
# from Int to "LONG", then schemaHints',"Citizens LONG" needs to be added
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('cloudFiles.schemaHints',"Citizens LONG")\
        .option('header','true')\
        .load(source_dir)

In [0]:
display(df)

Country,Citizens,_rescued_data
India,10,
USA,5,
China,10,
India,10,
Canada,40,
Brazil,10,


In [0]:
%sql
# We are not seeing any impact of schemaHints',"Citizens LONG" here, which means
# And if you can see here it is still showing as integer for citizens.
# So schema hint is something.
# It is going to explicitly define the data type to this column.
# That doesn't mean that it is going to change the data type entirely in the schema level or the schema location.
# So this is one of the option which is present in this particular autoloader.

SELECT *
FROM JSON.`dbfs:/FileStore/streaming/schemaInfer/_schemas/0`

_corrupt_record,dataSchemaJson,partitionSchemaJson
v1,,
,"{""type"":""struct"",""fields"":[{""name"":""Country"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Citizens"",""type"":""integer"",""nullable"":true,""metadata"":{}}]}","{""type"":""struct"",""fields"":[]}"



## Schema Evolution

In [0]:
# Now upload new csv file into 'dbfs:/FileStore/streaming', with name 'Countries_newcolumn1.csv'

In [0]:
# Add an extra option cloudFiles.schemaEvolutionMode
df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option('cloudFiles.schemaEvolutionMode','rescue')\
        .option('rescuedDataColumn','_rescued_data')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('cloudFiles.schemaHints',"Citizens LONG")\
        .option('header','true')\
        .load(source_dir)

In [0]:
display(df)

Country,Citizens,_rescued_data
India,10,"{""Month"":""November"",""Year"":""2023"",""_file_path"":""dbfs:/FileStore/streaming/Countries_newcolumn1.csv""}"
USA,10,"{""Month"":""November"",""Year"":""2023"",""_file_path"":""dbfs:/FileStore/streaming/Countries_newcolumn1.csv""}"
China,20,"{""Month"":""November"",""Year"":""2023"",""_file_path"":""dbfs:/FileStore/streaming/Countries_newcolumn1.csv""}"
Brazil,10,"{""Month"":""November"",""Year"":""2023"",""_file_path"":""dbfs:/FileStore/streaming/Countries_newcolumn1.csv""}"
India,10,
USA,5,
China,10,
India,10,
Canada,40,
Brazil,10,


In [0]:
%sql

SELECT *
FROM JSON.`dbfs:/FileStore/streaming/schemaInfer/_schemas/0`

_corrupt_record,dataSchemaJson,partitionSchemaJson
v1,,
,"{""type"":""struct"",""fields"":[{""name"":""Country"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Citizens"",""type"":""integer"",""nullable"":true,""metadata"":{}}]}","{""type"":""struct"",""fields"":[]}"


In [0]:
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')
# once it will evolve Schema, so will add one more file here in this location

Out[22]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/0', name='0', size=274, modificationTime=1701879153000)]


### 02 - addNewColumns - Default

In [0]:
df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('cloudFiles.schemaHints',"Citizens LONG")\
        .option('header','true')\
        .load(source_dir)

In [0]:
display(df)
# in first execution, it is going to fail

Country,Citizens,Year,Month,_rescued_data
India,10,2023.0,November,
USA,10,2023.0,November,
China,20,2023.0,November,
Brazil,10,2023.0,November,
India,10,,,
USA,5,,,
China,10,,,
India,10,,,
Canada,40,,,
Brazil,10,,,


In [0]:
# rerun the same command again, bcs previosuly it got failed,
# in first execution, it was going to fail
# in second execution, it is going to evolve Schema
display(df)


In [0]:
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')
# once it will evolve Schema, so will add one more file here in this location
# now there are two file, name='0' and name='1', show in dbfs UI as well

Out[25]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/0', name='0', size=274, modificationTime=1701879153000),
 FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/1', name='1', size=424, modificationTime=1701883002000)]

In [0]:
%sql

SELECT *
FROM JSON.`dbfs:/FileStore/streaming/schemaInfer/_schemas/1`

_corrupt_record,dataSchemaJson,partitionSchemaJson
v1,,
,"{""type"":""struct"",""fields"":[{""name"":""Country"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""Citizens"",""type"":""integer"",""nullable"":true,""metadata"":{}},{""name"":""Year"",""type"":""integer"",""nullable"":true,""metadata"":{}},{""name"":""Month"",""type"":""string"",""nullable"":true,""metadata"":{}}]}","{""type"":""struct"",""fields"":[]}"


In [0]:
# Now upload new csv file into 'dbfs:/FileStore/streaming', with name 'Countries_newcolumn2.csv'

In [0]:
# rerun the same command again, to see more rows are displaying

display(df)


### 03- failOnNewColumns

In [0]:
# So this will also fail the stream and the stream does not restart unless you provide it.
# Either the updated schema or you need to remove the file which are appending the schemas.
# Here spark will not automatically change any of the schema.
# You either need to update it by providing some explicit schema, or you need to provide the file that
# is having additional fields.

In [0]:
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')

Out[33]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/0', name='0', size=274, modificationTime=1701879153000)]

In [0]:
# remove two schemas
dbutils.fs.rm('dbfs:/FileStore/streaming/schemaInfer/_schemas/1')
dbutils.fs.rm('dbfs:/FileStore/streaming/schemaInfer/_schemas/2')

In [0]:
# now we have only one schema with name='0'
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')

In [0]:
df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option('cloudFiles.schemaEvolutionMode','failOnNewColumns')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('cloudFiles.schemaHints',"Citizens LONG")\
        .option('header','true')\
        .load(source_dir)

In [0]:
display(df)
# ideally we should get an error

In [0]:
# Check the Schema, which means it did not evlove anything

In [0]:
dbutils.fs.ls('dbfs:/FileStore/streaming/schemaInfer/_schemas/')

Out[36]: [FileInfo(path='dbfs:/FileStore/streaming/schemaInfer/_schemas/0', name='0', size=274, modificationTime=1701879153000)]

In [0]:
# rerun the same command again, to see more rows are displaying

display(df)


### None

In [0]:
# So let me help you with what exactly it is going to do.
# It is going to ignore any of the new columns.
# It will not evolve any of the schema and all the new columns will be ignored.
# And also that data will not be rescued unless you mention some rescue data column.
# And this will mainly do not fail the stream even though there are schema changes.
# Whatever, there is a schema change in the incoming data, it is going to take only the data which is defined.
# So you just need to change the schema evolution mode to none.

In [0]:
# Although we have the additional files which are present in this particular data.
# You can see there are new files like countries, new column one countries new column two.
# in location: 'dbfs:/FileStore/streaming'
# It is just simply ignoring the additional columns in these files.
# And it is only taking the data which it is going to need to further processing.
# So this is how the null data type is going to ignore everything.
# And it is going to take only the defined schema strictly.

In [0]:
df = spark.readStream\
        .format('cloudFiles')\
        .option("cloudFiles.format","csv")\
        .option("cloudFiles.schemaLocation",f'{source_dir}/schemaInfer')\
        .option('cloudFiles.schemaEvolutionMode','none')\
        .option("cloudFiles.inferColumnTypes","true")\
        .option('cloudFiles.schemaHints',"Citizens LONG")\
        .option('header','true')\
        .load(source_dir)

In [0]:
display(df)

Country,Citizens
India,10
USA,10
China,20
Brazil,10
India,10
USA,10
China,20
Brazil,10
India,10
USA,5
