# Stream Customers Data From Cloud Files to Delta Lake Using Autoloader
1. Read files from cloud storage using DataStreamReader API
2. Transform the dataframe to add the following columns
    - file path: Cloud file path
    - ingest data: Current Timestamp
3. Write the transformed data stream to Delta Lake Table

## Spark Structured Streaming has some limitations when processing large amount of data. 
  - Inefficient File Listing (It repeatedly scans entire directory to find new files. It can be slow and expensive when dealing with millions of files).
  - Scalability Issues (Duplicate file detection is acheived by storing the list of files already processed in a in-momory map on the driver. It does not scale well).
  - Schema Evolution Problems (Users must manually define the schema before the steaming starts. if any new column starts coming later, it may cause data loss or require manual intervension)

## Auto Loader
  - Efficient File Detection using Cloud Services (Efficiently uses AWS Event Notifications to track new files)
  - Scalability Improvements (It replaces in-memory tracking with cloud storage)
  - Schema Evolution & Resiliency (Supports automatic schema inference from sample files.Also supports automatic shema evolution by automatically adding new columns or rescueing unexpected data in a separate rescue data column)

## 1. Read files from cloud storage using DataStreamReader API

In [0]:
customers_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "/Volumes/gizmobox/landing/operational_data/Customers_autoloader/_schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("cloudFiles.schemaHints", "date_of_birth Date, member_since Date, created_timestamp TIMESTAMP")
    .load("/Volumes/gizmobox/landing/operational_data/Customers_autoloader/")
)

## 2. Transform the dataframe to add the following columns

In [0]:
from pyspark.sql.functions import current_timestamp, col
customer_transformed_df = (
    customers_df
    .withColumn("file_path", col("_metadata.file_path"))
    .withColumn("ingestion_date", current_timestamp())
)

## 3. Write the transformed data stream to Delta Lake Table

In [0]:
## uncomment only when you want to start streaming. please make sure to terminate it after use.
# streaming_query = (
#         customer_transformed_df.writeStream
#             .format("delta")
#             .option("checkpointLocation", "/Volumes/gizmobox/landing/operational_data/Customers_autoloader/_checkpoint_stream")
#             .toTable("gizmobox.bronze.customers_autoloader")
# )

In [0]:
%sql
select * from gizmobox.bronze.customers_autoloader;

In [0]:
streaming_query.stop()