# Databricks Autoloader
## Introduction to Autoloader

Autoloader is a Databricks feature designed for efficient, scalable, and incremental ingestion of new data files as they arrive in cloud storage (e.g., ADLS, S3, GCS). It enables processing of massive datasets in a cost-effective way without manual file tracking.

### Key Benefits

- Incrementally and automatically detects new files
- Scalable for large numbers of files
- Supports schema evolution
- Can ingest from multiple cloud providers

## How Autoloader Works

- **Directory Listing Mode:** Default and most efficient for cloud object stores. Lists files and processes only new ones.
- **File Notification Mode:** Integrates with cloud-native notifications for higher performance and reliability.
- **Checkpointing:** Tracks which files have already been processed (using a checkpoint location) to prevent duplicate ingestion.

## Common Use Cases

- Streaming ingestion of files into a Delta Lake table
- Near real-time ETL pipelines
- Data lake ingestion patterns

## Example Scenario Using Sample Files

We'll demonstrate how to use Autoloader to ingest the following files:

- `customer_details.csv`
- `product_catalog.csv`
- `sales.csv`

Assume these files arrive incrementally in a folder such as `/FileStore/data/input/`.

## Step-by-Step Example: Using Autoloader

### 1. Basic Autoloader Setup (CSV Example)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

input_path = "/FileStore/data/"
checkpoint_path = "/FileStore/data/checkpoints/autoloader_demo/"
output_path = "/FileStore/data/out"
schema_location = "/FileStore/data/schema/autoloader_demo/"

sales_schema = StructType([
    StructField("SalesOrderNumber", StringType()),
    StructField("SalesOrderLineNumber", StringType()),
    StructField("OrderDate", StringType()),
    StructField("CustomerID", StringType()),
    StructField("Item", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("UnitPrice", DoubleType()),
    StructField("TaxAmount", DoubleType()),
])

sales_df = (
  spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", True)
      .option("cloudFiles.schemaLocation", schema_location)
      .schema(sales_schema)
      .load(input_path)
)

query = (
  sales_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .start(output_path)
)


### 2. Autoloader with Schema Evolution
If you expect the schema to change (columns added/removed), enable schema evolution:

In [0]:
sales_df = (
    spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("header", True)
            .option("inferSchema", True)
            .option("cloudFiles.schemaLocation", checkpoint_path + "/schema/")
            .load(input_path)
)

### 3. Autoloader with Multiple File Types

If your input folder has mixed file types (e.g., CSV and JSON):

In [0]:
sales_df = (
    spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "csv") # or "json"
            .option("header", True)
            .load(input_path)
)

## 4. Hands-On Exercise

**Exercise:**

1. Place all sample files in `/FileStore/data/input/`.
2. Run the above notebook cells to create a Delta table from the ingested data.
3. Monitor the stream, add a new file, and confirm that Autoloader picks it up automatically.
4. Query the Delta table in a new cell:

In [0]:
spark.read.format("delta").load(output_path).show()

+----------------+--------------------+----------+----------+--------------------+--------+---------+---------+
|SalesOrderNumber|SalesOrderLineNumber| OrderDate|CustomerID|                Item|Quantity|UnitPrice|TaxAmount|
+----------------+--------------------+----------+----------+--------------------+--------+---------+---------+
|         SO20000|                   2|2024-08-24|  CUST2373|      Running Shorts|       4|   278.63|    55.73|
|         SO20001|                   2|2024-10-06|  CUST2779|      Vacuum Cleaner|       4|   111.58|    22.32|
|         SO20002|                   2|2024-08-02|  CUST2732|       Tennis Racket|       6|   152.03|    45.61|
|         SO20003|                   3|2024-09-21|  CUST1815|     Children's Book|       3|     6.62|     0.99|
|         SO20004|                   2|2025-03-18|  CUST2913|                Doll|       7|    31.13|     10.9|
|         SO20005|                   3|2024-12-30|  CUST1241| Vitamin Supplements|       2|   247.52|   

## Additional Tips

- **File Naming:** Use unique file names for each batch to avoid duplicates.
- **Checkpoints:** Use a unique checkpoint directory for each streaming query.
- **Performance:** For production, consider tuning options like `cloudFiles.maxFilesPerTrigger`.