### Stream Customers data from cloud to Delta Lake
#### 1. Read files from cloud storage using AutoLoader
#### 2. Transform the dataframe to add following columns.
- ##### Cloud file_path
- ##### Ingestion_date 
#### 3. Write the transformed data stream to Delta Lake table.      

#### 1. Read files from cloud storage using AutoLoader

- ##### Autoloader supports schema evalution so schema for the tables neednot to be explicitly defined. 
- ##### `.format(cloudFiles)` Enables Autoloader. 

In [0]:
df_customers_autoloader = (spark
                        .readStream
                        .format('cloudFiles')
                        .option('cloudFiles.format', 'json')
                        .option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_schema')
                        .option('cloudFiles.inferColumnTypes', 'True')
                        .load('/Volumes/gizmobox/landing/operations_volume/customers_autoloader/')
                      )

- ##### 1. `.option('cloudFiles.inferColumnTypes', 'True')` Enables automatic schema inference by autoloader.
- ##### 2. `.option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_schema')` Defines the path where Schema is to be stored.
- ##### `.option('cloudFiles.schemaHints, 'col1 datatype, col2 datatype, col3 datatype')` can be used to define the schema hints for the required columns if `.option('cloudFiles.inferColumnTypes', 'True')` misreads the schema datatypes. 

In [0]:
df_customers_autoloader = (spark
                        .readStream
                        .format('cloudFiles')
                        .option('cloudFiles.format', 'json')
                        .option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_schema')
                        .option('cloudFiles.inferColumnTypes', 'True')
                        .option('cloudFiles.schemaHints', 'created_timestamp TIMESTAMP,customer_id INTEGER, date_of_birth DATE, member_since DATE')
                        .load('/Volumes/gizmobox/landing/operations_volume/customers_autoloader/')
                      )

#### `.option('pathGlobFilter')` helps in ingesting only the required file pattern. 

In [0]:
df_customers_autoloader = (spark
                        .readStream
                        .format('cloudFiles')
                        .option('cloudFiles.format', 'json')
                        .option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_schema')
                        .option('cloudFiles.inferColumnTypes', 'True')
                        .option('cloudFiles.schemaHints', 'created_timestamp TIMESTAMP,customer_id INTEGER, date_of_birth DATE, member_since DATE')
                        .option('pathGlobFilter', 'customers_2024_*.json')
                        .load('/Volumes/gizmobox/landing/operations_volume/customers_autoloader/')
                      )

- #### `.option('cloudFiles.schemaEvolutionMode','addNewColumns/rescue/failOnNewColumns/none')` determines the behaviour of stream when a new file with different schema is inserted to the ingestion path.

- ####  `addNewColumns` is the default option when schema is not defined explicitly. When schema is defined explicitly `none` is the default.

- #### `.option('mergeSchema', 'True')` should be added to writeStream query to merge the new schema change to the output table.

In [0]:
df_customers_autoloader = (spark
                        .readStream
                        .format('cloudFiles')
                        .option('cloudFiles.format', 'json')
                        .option('cloudFiles.schemaLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_schema')
                        .option('cloudFiles.inferColumnTypes', 'True')
                        .option('cloudFiles.schemaHints', 'created_timestamp TIMESTAMP,customer_id INTEGER, date_of_birth DATE, member_since DATE')
                        .option('pathGlobFilter', 'customers_2024_*.json')
                        .option('cloudFiles.schemaEvolutionMode', 'addNewColumns')
                        .load('/Volumes/gizmobox/landing/operations_volume/customers_autoloader/')
                      )

#### 2. Transform the dataframe to add following columns.
- #### Cloud file_path
- #### Ingestion_date 

In [0]:
from pyspark.sql.functions import col, current_timestamp

df_customers_stream_autoloader_transformed = (df_customers_autoloader
                                                    .withColumn('file_path', col('_metadata.file_path'))
                                                    .withColumn('ingestion_date', current_timestamp())
                                  )

#### 3. Write the transformed data stream to Delta Lake table.

In [0]:
customers_streaming_autloader_query = (df_customers_stream_autoloader_transformed
                                      .writeStream
                                      .format('delta')
                                      .option('checkpointLocation', '/Volumes/gizmobox/landing/operations_volume/customers_autoloader/_autoloader_checkpoint')
                                      .option('mergeSchema', 'True')
                                      .toTable('gizmobox.bronze.customers_autoloader_stream')
                            )
display(customers_streaming_autloader_query)

In [0]:
display(spark.sql('Select * from gizmobox.bronze.customers_autoloader_stream'))

- #### writeStream queries will not stop automatically. It has to be manually terminated. It can be manually terminated by clicking on `Terminate` icon in the query block or by using `.stop()` 

In [0]:
customers_streaming_autloader_query.stop()