### Ingest Customers Data Into Bronze Layer With Autoloader

Define Schema

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Define schema
schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", IntegerType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("source_file", StringType(), True),
    StructField("source_file_timestamp", TimestampType(), True)
])


Stream Read

In [0]:
checkpoint = "/Volumes/mycatalog/olist_ecommerce_bronze/checkpoints/customers/"

In [0]:
df = spark.readStream\
    .option("header", True)\
    .schema(schema)\
    .format("cloudFiles")\
    .option("cloudFiles.format", "csv")\
    .option("cloudFiles.schemaEvolutionMode", "rescue")\
    .option("cloudFiles.schemaLocation", f"{checkpoint}/schema")\
    .load("/Volumes/mycatalog/olist_ecommerce/olist_landing/customers")\
    .selectExpr("*", "_metadata")


In [0]:
df = df \
    .withColumn("source_file", df._metadata.file_name) \
    .withColumn(
        "source_file_timestamp",
        df._metadata.file_modification_time.cast("timestamp")
    ) \
    .drop("_metadata")


Stream Write

In [0]:
df.writeStream\
    .format("delta")\
    .outputMode("append")\
    .trigger(once=True)\
    .option("mergeSchema", "true")\
    .option("checkpointLocation", f"{checkpoint}/_checkpoint")\
    .toTable("mycatalog.olist_ecommerce_bronze.customers")

Validate

In [0]:
%sql
-- SELECT * FROM mycatalog.olist_ecommerce_bronze.customers LIMIT 5
SELECT * FROM mycatalog.olist_ecommerce_bronze.customers where customer_id = '5ea2072bf6d8282cf452c471506c54a3'

### Schema Handling in Structured Streaming (Auto Loader)

1️⃣ When Schema Is Not Explicitly Defined

If you do not provide a schema while reading data using spark.readStream,
Auto Loader infers the schema automatically.

When new columns appear in incoming files, they are automatically added to the schema (default behavior).

2️⃣ When Schema Is Explicitly Defined

If you manually define a schema using .schema(...),
Spark enforces that schema strictly.

Any new or unexpected columns in the input files are silently dropped.

The stream continues without failure.

3️⃣ When Schema Enforcement + Drift Capture Is Required

If you want to:

- Enforce a defined schema
- Prevent pipeline failures
- Capture unexpected/new columns

You should enable **schema evolution** along with **mergeSchema**.

In this ingestion, schema evolution was configured to capture unexpected columns in the _rescued_data column, allowing the pipeline to continue without failure.

### Schema Evolution Modes in Auto Loader

Auto Loader supports three schema evolution modes:

- **addNewColumns** (default)
→ Automatically adds new columns to the table schema.

- rescue
→ **Captures** new/unexpected columns in a special _rescued_data column instead of modifying the table schema.

- **failOnNewColumns**
→ Fails the stream immediately if new columns are detected.