
# What is Databricks Auto Loader?

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/autoloader/autoloader-edited-anim.gif" style="float:right; margin-left: 10px" />

[Databricks Auto Loader](https://docs.databricks.com/ingestion/auto-loader/index.html) lets you scan a cloud storage folder (S3, ADLS, GS) and only ingest the new data that arrived since the previous run.

This is called **incremental ingestion**.

Auto Loader can be used in a near real-time stream or in a batch fashion, e.g., running every night to ingest daily data.

Auto Loader provides a strong gaurantee when used with a Delta sink (the data will only be ingested once).

## How Auto Loader simplifies data ingestion

Ingesting data at scale from cloud storage can be really hard at scale. Auto Loader makes it easy, offering these benefits:


* **Incremental** & **cost-efficient** ingestion (removes unnecessary listing or state handling)
* **Simple** and **resilient** operation: no tuning or manual code required
* Scalable to **billions of files**
  * Using incremental listing (deprecated, relies on filename order)
  * Leveraging notification + message queue (recommended)
* **Schema inference** and **schema evolution** are handled out of the box for most formats (csv, json, avro, images...)

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=796524194907820&notebook=%2F02-Auto-loader-schema-evolution-Ingestion&demo_name=data-ingestion&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fdata-ingestion%2F02-Auto-loader-schema-evolution-Ingestion&version=1">

In [0]:
%run ./_resources/00-setup $reset_all_data=false

In [0]:
display(spark.read.text(volume_folder+'/user_json'))

### Auto Loader basics
Let's create a new Auto Loader stream that will incrementally ingest new incoming files.

In this example we will specify the full schema. We will also use `cloudFiles.maxFilesPerTrigger` to take 1 file a time to simulate a process adding files 1 by 1.

In [0]:
bronzeDF = (spark.readStream \
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.maxFilesPerTrigger", "1")  #demo only, remove in real stream
                .schema("address string, creation_date string, firstname string, lastname string, id bigint")
                .load(volume_folder+'/user_json'))
display(bronzeDF)

## Schema inference
Specifying the schema manually can be a challenge, especially with dynamic JSON. Notice that we are missing the "age" data because we overlooked specifying this column in the schema.

* Schema inference has always been expensive and slow at scale, but not with Auto Loader. Auto Loader efficiently samples data to infer the schema and stores it under `cloudFiles.schemaLocation` in your bucket. 
* Additionally, `cloudFiles.inferColumnTypes` will determine the proper data type from your JSON.

Let's redefine our stream with these features. Notice that we now have all of the JSON fields.

*Notes:*
* *With Delta Live Tables you don't even have to set this option, the engine manages the schema location for you.*
* *Sampling size can be changed with `spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes`*

In [0]:
bronzeDF = (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.schemaLocation", volume_folder+'/inferred_schema')
                .option("cloudFiles.inferColumnTypes", "true")
                .load(volume_folder+'/user_json'))
display(bronzeDF)

### Schema hints
You might need to enforce a part of your schema, e.g., to convert a timestamp. This can easily be done with Schema Hints.

In this case, we'll make sure that the `id` is read as `bigint` and not `int`:

In [0]:
bronzeDF = (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.schemaLocation", f"{volume_folder}/inferred_schema")
                .option("cloudFiles.inferColumnTypes", "true")
                .option("cloudFiles.schemaHints", "id bigint")
                .load(volume_folder+'/user_json'))
display(bronzeDF)

## Schema evolution

In [0]:
def get_stream():
  return (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "json")
                .option("cloudFiles.schemaLocation", f"{volume_folder}/inferred_schema")
                .option("cloudFiles.inferColumnTypes", "true")
                .option("cloudFiles.schemaHints", "id bigint")
                .load(volume_folder+'/user_json'))
display(get_stream())

### Incorrect schema
Auto Loader automatically recovers from incorrect schema and conflicting type. It'll save incorrect data in the `_rescued_data` column.

In [0]:
from pyspark.sql import Row
data = [Row(email="quentin.ambard@databricks.com", firstname="Quentin", id="456455", lastname="Ambard")]
incorrect_data = spark.createDataFrame(data)
incorrect_data.write.format("json").mode("append").save(volume_folder + "/user_json")

In [0]:
wait_for_rescued_data()
# Start the stream and filter on on the rescue column to see how the incorrect data is captured
display(get_stream().filter("_rescued_data is not null"))

### Adding a new column
By default the stream will tigger a `UnknownFieldException` exception on new column. You then have to restart the stream to include the new column. 

Make sure your previous stream is still running and run the next cell.

*Notes*:
* *See `cloudFiles.schemaEvolutionMode` for different behaviors and more details.*
* *Don't forget to add `.writeStream.option("mergeSchema", "true")` to dynamically add when columns when writting to a delta table*

In [0]:
# Stop all the existing streams
DBDemos.stop_all_streams()

# Add 'new_column'
data = [Row(email="quentin.ambard@databricks.com", firstname="Quentin", id=456454, lastname="Ambard", new_column="test new column value")]
new_row = spark.createDataFrame(data)
new_row.write.format("json").mode("append").save(volume_folder + "/user_json")

In [0]:
# Existing stream wil fail with: org.apache.spark.sql.catalyst.util.UnknownFieldException: Encountered unknown field(s) during parsing: {"new_column":"test new column value"}
display(get_stream())

In [0]:
# We just have to restart it to capture the new data. Let's filter on the new column to make sure we have the proper row 
# (re-run the cell)
display(get_stream().filter('new_column is not null'))

## Ingesting a high volume of input files
Scanning folders with many files to detect new data is an expensive operation, leading to ingestion challenges and higher cloud storage costs.

To solve this issue and support an efficient listing, Databricks autoloader offers two modes:

- Incremental listing with `cloudFiles.useIncrementalListing` (deprecated), based on the alphabetical order of the file's path to only scan new data: (`ingestion_path/YYYY-MM-DD`)
- Notification system, which sets up a managed cloud notification system sending new file name to a queue (recommended). See `cloudFiles.useNotifications` for more details.

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/autoloader-mode.png" width="700"/>

Use the notification system option whenever possible.

## Support for images
Databricks Auto Loader provides native support for images and binary files.

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/autoloader-images.png" width="800" />

Just set the format accordingly and the engine will do the rest: `.option("cloudFiles.format", "binaryFile")`

Use-cases:

- ETL images into a Delta table using Auto Loader
- Automatically ingest continuously arriving new images
- Easily retrain ML models on new images
- Perform distributed inference using a pandas UDF directly from Delta 

## Deploying robust ingestion jobs in production

Let's see how to use Auto Loader to ingest JSON files, support schema evolution, and automatically restart when a new column is found.

If you need your job to be resilient with regard to an evolving schema, you have multiple options:

* Let the full job fail & configure Databricks Workflow to restart it automatically
* Leverage Delta Live Tables to simplify all the setup (DLT handles everything for you out of the box)
* Wrap your call to restart the stream when the new column appears.

Here is an example:

In [0]:
def start_stream_restart_on_schema_evolution():
  while True:
    try:
      q = (spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "json")
                  .option("cloudFiles.schemaLocation", f"{volume_folder}/inferred_schema")
                  .option("cloudFiles.inferColumnTypes", "true")
                  .load(volume_folder+"/user_json")
                .writeStream
                  .toTable("autoloader_demo_output",
                           checkpointLocation=volume_folder+"/checkpoint",
                           mergeSchema=True)
          )
      q.awaitTermination()
      return q
    except BaseException as e:
      # Adding a new column will trigger an UnknownFieldException. In this case we just restart the stream:
      if 'UNKNOWN_FIELD_EXCEPTION' in str(e):
        print(f"Going to restart stream after schema change:\n{e}")
      else:
        raise e

#Careful - this will run forever do not forget to stop your job/notebook after you tried!
# Seeing [INFINITE_STREAMING_TRIGGER_NOT_SUPPORTED] ? Interactive serverless isn't designed for unlimited streaming. See https://docs.databricks.com/en/compute/serverless/limitations.html#streaming
# Use a classic cluster, or you can use the writeStream.trigger(availableNow=True) option instead, or move your code to a Declarative Pipeline!
#start_stream_restart_on_schema_evolution()

## Conclusion

We've seen how Databricks Auto Loader can be used to easily ingest your files, solving all ingestion challenges!

You're ready to use it in your projects!

In [0]:
DBDemos.stop_all_streams()