In [0]:
# Create autoloader input folder in volume
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/01")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/02")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/03")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/04")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/05")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/06")
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/autoloader_input/2010/12/07")


In [0]:
#Create check point locations in volume
dbutils.fs.mkdirs("/Volumes/dev/bronze/landing/checkpoint/autoloader")

In [0]:
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-01.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/01")
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-02.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/02")
dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-03.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/03")


In [0]:
#Read Files Using Autoloader with checkpoint
#and Schema location "/volume/dev/bronze/landing/checkpoint/autoloader"
#File Detection mode
#- Direcory listing (uses API calls to detect new files)
#- File notification (uses notification and Queue Services - requires elevated cloud permissions for setup)

df = (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("pathGlobfilter", "*.csv")
    .option("header", "true")
    .option("cloudfiles.schemaHints", "Quantity Int, UnitPrice double")
    .option("cloudFiles.schemaLocation", "/Volumes/dev/bronze/landing/checkpoint/autoloader/1/")
    .load("/Volumes/dev/bronze/landing/autoloader_input/*/"))

In [0]:
#write data to a delta table dev.bronze.invoice_al_1
from pyspark.sql.functions import col

(
    df
    .withColumn("_file", col("_metadata.file_name"))
    .writeStream
    .option("checkpointlocation","/Volumes/dev/bronze/landing/checkpoint/autoloader/1/" )
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("dev.bronze.Invoice_al_1")

)

In [0]:
%sql
select * from dev.bronze.Invoice_al_1;


In [0]:
%sql
select _file, count(1)
from dev.bronze.Invoice_al_1
group by _file

In [0]:
#write data to delta table - dev.bronze.invoice_al_1

from pyspark.sql.functions import col
(
    df
    .withColumn("_file", col("_metadata.file_name"))
    .writeStream
    .option("checkpointLocation","/Volumes/dev/bronze/landing/checkpoint/autoloader/1/")
    .option("mergeSchema,"  True)       #this will provide the default option to add a new column to the schema
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("dev.bronze.Invoice_al_1")
    

)

In [0]:
#Add another file to the directory to check the incremental processing in the cloud

dbutils.fs.cp("/databricks-datasets/definitive-guide/data/retail-data/by-day/2010-12-05.csv", "/Volumes/dev/bronze/landing/autoloader_input/2010/12/05")

##Autoloader Schema evolution work

In [0]:
df = (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("pathGlobfilter", "*.csv")
    .option("header", "true")
    .option("cloudfiles.schemaHints", "Quantity Int, UnitPrice double")
    .option("cloudFiles.schemaLocation", "/Volumes/dev/bronze/landing/checkpoint/autoloader/2/")
    .option("cloudfiles.schemaEvololutionMode", "rescue")  #(this will provide the default option to add a new column to the schema)
    .load("/Volumes/dev/bronze/landing/autoloader_input/*/"))

In [0]:
from pyspark.sql.functions import col
(
    df
    .withColumn("_file", col("_metadata.file_name"))
    .writeStream
    .option("checkpointLocation","/Volumes/dev/bronze/landing/checkpoint/autoloader/2/")
    .option("mergeSchema", True)       #this will provide the default option to add a new column to the schema
    .outputMode("append")
    .trigger(availableNow=True)
    .toTable("dev.bronze.Invoice_al_2")
    

)