# Introducing Data Ingestion
Data ingestion is the process of loading data from files into Delta Lake tables.

**Challenge in Traditional Pipelines**

- Every run reprocesses all files (even those already loaded).
- Leads to:
  - Higher compute costs.
  - Longer runtimes.
  - Extra deduplication work, especially with large datasets.

**Solution: Incremental Data Ingestion**

- Loads only newly arrived files since the last ingestion cycle.
- Benefits:
  - Faster processing.
  - Lower resource usage.
  - Avoids reprocessing old data.

### Databricks offers two primary mechanisms:
**1. COPY INTO**

**2. Auto Loader**

## COPY INTO Command
  - A SQL command that loads data from a file location into a Delta table.
  - Key Characteristics:
    - **Idempotent**: Running it multiple times won’t reprocess files already ingested.
    - **Incremental**: Each run loads only new files detected in the source location.

**COPY INTO Syntax**

```
COPY INTO <target_table>
FROM '<path_to_files>'
FILEFORMAT = <format>
FORMAT_OPTIONS (<format_options>)
COPY_OPTIONS (<copy_options>)
```

In [0]:
%run ./Generate-Sample-Data

In [0]:
dbutils.fs.rm("dataingestion/product/", True)
generate_product_csv(num_records=10)
dbutils.fs.ls("/dataingestion/product")

In [0]:
%sql
DROP TABLE IF EXISTS product_delta;
CREATE TABLE product_delta (
  product_id STRING,
  product_name STRING,
  category STRING,
  price STRING,
  created_at STRING
)

In [0]:
spark.sql("SELECT * FROM product_delta").show()

In [0]:
def copyinto():
    spark.sql(
        """
        COPY INTO product_delta
        FROM '/dataingestion/product'
        FILEFORMAT = CSV
        FORMAT_OPTIONS (
        'header' = 'true',
        'delimiter' = ','
        )
        COPY_OPTIONS (
        'onBadRecords' = 'skip',
        'mergeSchema' = 'true'
        )
        """
    )

In [0]:
copyinto()

In [0]:
spark.sql("SELECT * FROM product_delta").show()

In [0]:
generate_product_csv(num_records=5)
display(dbutils.fs.ls("/dataingestion/product"))
spark.sql("SELECT count(1) FROM product_delta").show()

In [0]:
copyinto()

In [0]:
spark.sql("SELECT count(1) FROM product_delta").show()

## Auto Loader
- Uses Spark Structured Streaming to continuously detect and load new files using readStream and writeStream.
- Key features:
  - Highly scalable: Can process billions of files.
  - High ingestion throughput: Supports millions of files per hour.
  - Checkpointing ensures exactly-once processing and recovery after failures.
  - Supports automatic schema inference and evolution.2.

**Basic Usage Example:**
```
spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "<source_format>") \
  .load("/path/to/files") \
  .writeStream \
  .option("checkpointLocation", "<checkpoint_path>") \
  .table("<table_name>")
```

**Here’s how this works step by step:**

- You set .format("cloudFiles") to activate Auto Loader.
- Use cloudFiles.format to specify the format of incoming files (e.g., csv, json, parquet).
- The .load() call defines the source folder Auto Loader monitors for new files.
- .writeStream writes the ingested data to the target table or location.
- checkpointLocation records progress so processing can resume if interrupted.

### Schema Management
Auto Loader includes built-in schema detection to simplify working with new data:

- **Automatic Schema Inference:**
  
  Auto Loader can detect the schema without requiring you to define it up front.

- **Schema Evolution:**
  
  If new columns appear in incoming data, Auto Loader can automatically update the schema of the destination table.

  To enable this, set: `.option("mergeSchema", "true")`

- **Schema Storage:**
  
  To avoid re-inferring schema every time your stream starts (which can be slow), you can save the inferred schema to a dedicated location: `.option("cloudFiles.schemaLocation", "<schema_storage_path>")`

- **Data Type Inference for Non-Typed Formats**
  > Important: Behavior depends on your file format:

  - **Typed formats** (e.g., Parquet): Auto Loader reads column types from the file metadata.

  - **Untyped formats** (e.g., CSV, JSON): By default, all columns are inferred as STRING.

  To automatically detect proper data types (e.g., INTEGER, DOUBLE), enable this option: `.option("cloudFiles.inferColumnTypes", "true")`

### Example with All Recommended Options

In [0]:
dbutils.fs.rm("/dataingestion", True)
spark.sql("DROP TABLE IF EXISTS new_product_delta")

In [0]:
generate_product_csv(num_records=10)
display(dbutils.fs.ls("/dataingestion/product"))

In [0]:
def autoLoadFunc() :
  (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaLocation", "/dataingestion/schema")
  .load("/dataingestion/product")
  .writeStream
  .option("checkpointLocation", "/dataingestion/checkpoint")
  .option("mergeSchema", "true")
  .outputMode("append")
  .table("new_product_delta"))

In [0]:
autoLoadFunc()

In [0]:
%sql
DESCRIBE TABLE new_product_delta;
-- DESCRIBE EXTENDED new_product_delta;
-- SHOW TABLES;

In [0]:
spark.sql("SELECT count(1) FROM new_product_delta").show()

In [0]:
generate_product_csv(num_records=5)
display(dbutils.fs.ls("/dataingestion/product"))

In [0]:
spark.sql("SELECT * FROM new_product_delta").show()

### How does Auto Loader schema evolution work?
Auto Loader detects the addition of new columns as it processes your data. When Auto Loader detects a new column, the stream stops with an **`UnknownFieldException`**. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged.

Databricks recommends configuring Auto Loader streams with Lakeflow Jobs to restart automatically after such schema changes.

In [0]:
generate_product_csv_new(num_records=20)

In [0]:
spark.sql("SELECT * FROM new_product_delta").display()

In [0]:
autoLoadFunc()

In [0]:
spark.sql("SELECT * FROM new_product_delta").display()

### What is _rescued_data?
_rescued_data is a special column automatically added by Auto Loader (and Spark schema inference in general) when:

- Your incoming data has unexpected columns (i.e., columns not present in the table schema)
- Or some data rows don’t match the expected schema (e.g., extra fields, unparseable fields)

Instead of failing or dropping the unexpected data, Spark will capture it in this _rescued_data column as a JSON string so nothing is lost.

In [0]:
%sql
DESCRIBE HISTORY new_product_delta

### Clean Up

In [0]:
# Print & Stop all active queries
for q in spark.streams.active:
    print(f"Name: {q.name}, Id: {q.id}, IsActive: {q.isActive}")
    if q.isActive:
        q.stop()

In [0]:
%sql
DROP TABLE IF EXISTS product_delta;
DROP TABLE IF EXISTS new_product_delta;

In [0]:
%fs rm -r /dataingestion