# Week 8: Spark Streaming in Databricks – Auto Loader & Ingestion Patterns (v2)

**Goals:** incremental ingestion, discovery modes, checkpoints, schema evolution, Bronze→Silver

In [0]:
%sql
drop catalog al_catalog cascade

In [0]:
%sql
create catalog newautoloader;

create schema newautoloader.autoloader_schema;
create volume newautoloader.autoloader_schema.autoloader_volume;




## 0. Setup (Paths & Widgets)

In [0]:

dbutils.widgets.text("base_path", "/Volumes/al_catalog/autoloader_schema/autoloader_volume")
dbutils.widgets.text("format", "json")  # json | csv | parquet
BASE = dbutils.widgets.get("base_path").rstrip("/")

INPUT_PATH = f"{BASE}/input"
CHECKPOINT = f"{BASE}/checkpoints/bronze"
SCHEMA_LOC = f"{BASE}/schema/bronze"
BRONZE_PATH = f"{BASE}/tables/bronze"
SILVER_PATH = f"{BASE}/tables/silver"

display({"INPUT_PATH": INPUT_PATH, "CHECKPOINT": CHECKPOINT, "SCHEMA_LOC": SCHEMA_LOC, "BRONZE_PATH": BRONZE_PATH, "SILVER_PATH": SILVER_PATH})


## 1. Generate Sample Files

In [0]:

from pyspark.sql.functions import current_timestamp, expr, lit
fmt = dbutils.widgets.get("format")
out = f"{INPUT_PATH}/{fmt}"
df = (spark.range(30, 40).withColumnRenamed("id","id")
      .withColumn("category", expr("CASE WHEN id % 2 = 0 THEN 'A' ELSE 'B' END"))
      .withColumn("amount", expr("round(rand()*100,2)"))
      .withColumn("ingest_ts", current_timestamp()))
(df.write.mode("append").format(fmt).option("header","true").save(out))
print("Seeded files:", out)


In [0]:
df = spark.read.format("json").load(f"{INPUT_PATH}/{fmt}")
df. display()

## 2. Start Auto Loader (Directory Listing)

In [0]:

fmt = dbutils.widgets.get("format")
raw_stream = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", fmt)
    .option("cloudFiles.schemaLocation", SCHEMA_LOC)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{INPUT_PATH}/{fmt}")
)
bronze_query = (raw_stream.writeStream.format("delta")
    .option("checkpointLocation", CHECKPOINT)
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)
    .start(BRONZE_PATH))
bronze_query


In [0]:
spark.read.format("delta").load('/Volumes/al_catalog/autoloader_schema/autoloader_volume/tables/bronze').display()

### Monitor Bronze

In [0]:

print(bronze_query.status)
import time
for i in range(3):
    time.sleep(5)
    print("Bronze count:", spark.read.format("delta").load(BRONZE_PATH).count())


## 3. Schema Evolution (Add Column)

In [0]:

from pyspark.sql.functions import lit
fmt = dbutils.widgets.get("format")
evo = (spark.range(20, 25).withColumnRenamed("id","id")
       .withColumn("category", lit("C"))
       .withColumn("amount", lit(42.0))
       .withColumn("source", lit("extra")))
(evo.write.mode("append").format(fmt).option("header","true").save(f"{INPUT_PATH}/{fmt}"))
print("Appended files with new 'source' column.")


In [0]:
fmt = dbutils.widgets.get("format")
raw_stream = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", fmt)
    .option("cloudFiles.schemaLocation", SCHEMA_LOC)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{INPUT_PATH}/{fmt}")
)
bronze_query = (raw_stream.writeStream.format("delta")
    .option("checkpointLocation", CHECKPOINT)
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)
    .start(BRONZE_PATH))
bronze_query

In [0]:
spark.read.format("delta").load('/Volumes/al_catalog/autoloader_schema/autoloader_volume/tables/bronze').display()

##Run with schema evolution-  Merge schema = true


In [0]:
fmt = dbutils.widgets.get("format")
raw_stream = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", fmt)
    .option("cloudFiles.schemaLocation", SCHEMA_LOC)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{INPUT_PATH}/{fmt}")
)
bronze_query = (raw_stream.writeStream.format("delta")
    .option("checkpointLocation", CHECKPOINT)
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)
    .start(BRONZE_PATH))
bronze_query

### Validate

In [0]:

import time; time.sleep(10)
br = spark.read.format("delta").load(BRONZE_PATH)
br.printSchema()
display(br.orderBy('id').limit(30))


## 4. Silver Stream

In [0]:

from pyspark.sql.functions import col
silver_df = spark.readStream.format("delta").load(BRONZE_PATH)
silver_clean = silver_df.filter(col("amount").isNotNull())
silver_query = (silver_clean.writeStream.format("delta")
    .option("checkpointLocation", f"{BASE}/checkpoints/silver")
    .outputMode("append").trigger(processingTime="15 seconds").start(SILVER_PATH))
silver_query


## 5. File Notifications Template (Azure)

In [0]:

# Requires Event Grid + Queue configured on ADLS
abfss_path = "abfss://<container>@<account>.dfs.core.windows.net/<prefix>"
notif_stream = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{BASE}/schema/bronze_abfss")
    .option("cloudFiles.useNotifications", "true")
    .load(abfss_path))
notif_query = (notif_stream.writeStream.format("delta")
    .option("checkpointLocation", f"{BASE}/checkpoints/bronze_abfss")
    .start(f"{BASE}/tables/bronze_abfss"))
notif_query


## 6. Backfill with availableNow

In [0]:

backfill = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", dbutils.widgets.get("format"))
    .option("cloudFiles.schemaLocation", f"{BASE}/schema/backfill")
    .load(f"{INPUT_PATH}/{dbutils.widgets.get('format')}"))
backfill_query = (backfill.writeStream.format("delta")
    .option("checkpointLocation", f"{BASE}/checkpoints/backfill")
    .trigger(availableNow=True)
    .start(f"{BASE}/tables/backfill"))
backfill_query.awaitTermination()


## 7. Bad Records Path

In [0]:

fmt = dbutils.widgets.get("format")
raw_stream_bad = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", fmt)
    .option("cloudFiles.schemaLocation", f"{BASE}/schema/bad")
    .option("badRecordsPath", f"{BASE}/bad_records")
    .load(f"{INPUT_PATH}/{fmt}"))
raw_stream_bad.isStreaming


In [0]:
display(raw_stream_bad )

## 8. Cleanup

In [0]:

for q in spark.streams.active:
    print("Stopping:", q.name or q.id)
    q.stop()


If I wanted to do a Merge Upsert instead of append only

In [0]:
fmt = dbutils.widgets.get("format")
raw_stream = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", fmt)
    .option("cloudFiles.schemaLocation", SCHEMA_LOC)
    .option("cloudFiles.inferColumnTypes", "true")
    .load(f"{INPUT_PATH}/{fmt}")
)

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

def merge_upsert(microBatchOutputDF, batchId):
    # Define a window specification to order by 'ingest_ts' and partition by 'id'
    windowSpec = Window.partitionBy("id").orderBy(desc("ingest_ts"))
    
    # Add a row number to each partition and filter to keep only the latest record
    dedupedDF = (microBatchOutputDF
                 .withColumn("row_num", row_number().over(windowSpec))
                 .filter("row_num = 1")
                 .drop("row_num"))
    
    deltaTable = DeltaTable.forPath(spark, BRONZE_PATH)
    deltaTable.alias("t").merge(
        dedupedDF.alias("u"),
        "u.id = t.id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

bronze_query = (raw_stream.writeStream
    .format("delta")
    .option("checkpointLocation", CHECKPOINT)
    .foreachBatch(merge_upsert)
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)
    .start(BRONZE_PATH))

bronze_query.awaitTermination()
display(spark.read.load(BRONZE_PATH))

In [0]:
from pyspark.sql.functions import current_timestamp, expr, lit
fmt = 'json'
out = f"abfss://raw@adlssummer2025.dfs.core.windows.net/autoloader_input/input/json"
df = (spark.range(0, 5).withColumnRenamed("id","id")
      .withColumn("category", expr("CASE WHEN id % 2 = 0 THEN 'A' ELSE 'B' END"))
      .withColumn("amount", expr("round(rand()*100,4)"))
      .withColumn("ingest_ts", current_timestamp()))
df.display()
(df.write.mode("append").format(fmt).option("header","true").save(out))