# CSV Autoloader Streaming - Read from Volume and Write to Delta Table

This notebook demonstrates **streaming ingestion** using Auto Loader (cloudFiles) to read CSV files from a Unity Catalog Volume and write to a Delta table.

## Key Features:
- **Streaming read** with Auto Loader for incremental processing
- **Schema evolution** with automatic schema detection and storage
- **Checkpoint management** for fault-tolerance and exactly-once processing
- **Data transformations** with calculated columns (age + id)


## Configuration Variables

Define all paths and table names as variables for easy customization:


In [None]:
# Catalog, schema, and table configuration
CATALOG = "jpg"
SCHEMA = "default"
TABLE_NAME = "csv_notebook"
FULL_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{TABLE_NAME}"

# Volume paths configuration
VOLUME_BASE = f"/Volumes/{CATALOG}/{SCHEMA}"
SOURCE_PATH = f"{VOLUME_BASE}/csvs"
SCHEMA_LOCATION = f"{VOLUME_BASE}/schemas/csv_autoloader_schema"
CHECKPOINT_LOCATION = f"{VOLUME_BASE}/checkpoints/csv_autoloader_checkpoint"

# CSV reading options
CSV_HEADER = "true"
CSV_SEPARATOR = ","

print(f"Source Path: {SOURCE_PATH}")
print(f"Target Table: {FULL_TABLE_NAME}")
print(f"Schema Location: {SCHEMA_LOCATION}")
print(f"Checkpoint Location: {CHECKPOINT_LOCATION}")


## Read CSV Files Using Auto Loader (Streaming)


In [None]:
# Read CSV files from volume using Auto Loader with schema location
df_raw = spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("cloudFiles.schemaLocation", SCHEMA_LOCATION) \
  .option("header", CSV_HEADER) \
  .option("sep", CSV_SEPARATOR) \
  .load(SOURCE_PATH)

display(df_raw)


In [None]:
from pyspark.sql.functions import col, current_timestamp

# Apply transformations
df_transformed = df_raw \
  .withColumn("age_plus_id", col("age").cast("int") + col("id").cast("int")) \
  .withColumn("processed_timestamp", current_timestamp())

display(df_transformed)


## Write Transformed Data to Delta Table


In [None]:
# Write transformed data to Delta table with availableNow trigger (recommended for serverless)
query = df_transformed.writeStream \
  .format("delta") \
  .outputMode("append") \
  .trigger(availableNow=True) \
  .option("checkpointLocation", CHECKPOINT_LOCATION) \
  .toTable(FULL_TABLE_NAME)

query.awaitTermination()
print(f"Successfully processed all available data to {FULL_TABLE_NAME}")
