# Stream Customers Data From Cloud Files to Delta Lake using Auto Loader
1. Read files from cloud storage using Auto Loader
2. Transform the dataframe to add the following columns
- file_path: Cloud File Path
- ingestion date: Current Timestamp
3. Write the Transformed data stream to Delta Lake Table

### 1. Read files from cloud storage using Auto Loader

In [0]:
customers_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_schema") \
    .load('/Volumes/gizmobox/landing/operational_data/customers_autoloader/')

In the above shell, make note of below points
1. All of the columns data types are strings. That is because we have not specified that we want to infer the data types, auto loader has assigned the data type for every column as string
2. _rescued_data column is added as a string by auto loader. If it cant process some data because of schema evolution or anything like that, it will write the invalid data into the _resued_data which can be handled later.

#### Infer the Column Types

In [0]:
customers_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_schema") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .load('/Volumes/gizmobox/landing/operational_data/customers_autoloader/')

Auto Loader tries it best to get data types from some sample but it moght not be 100% correct for some columns. As Dates and Timestamps will be wrapped in quotes, it will identify them as strings. Sometimes the column will have a mix of strings and numbers which will be identified as a string.
To influecne auto loader to set data types for few columns, we have an option called cloudFiles.schemaHints

In [0]:
customers_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_schema") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaHints", "date_of_birth date, member_since date, created_timestamp timestamp") \
    .load('/Volumes/gizmobox/landing/operational_data/customers_autoloader/')

### 2. Transform the dataframe to add the following columns
- file_path: Cloud File Path
- ingestion date: Current Timestamp

In [0]:
from pyspark.sql.functions import *
customers_transformed_df = customers_df.withColumn("file_path", col("_metadata.file_path")) \
                                        .withColumn("ingestion_date", current_timestamp())

### 3. Write the Transformed data stream to Delta Lake Table

In [0]:
streaming_query = customers_transformed_df.writeStream \
    .format("delta") \
        .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/customers_autoloader/_checkpoint_stream") \
        .toTable("gizmobox.bronze.customers_autoloader")

In [0]:
streaming_query.stop()

In [0]:
%sql
select *
from gizmobox.bronze.customers_autoloader;