In [0]:
dbutils.widgets.text("src","")

In [0]:
src_value = dbutils.widgets.get("src")
src_value

In [0]:
# Create a streaming DataFrame using Databricks Auto Loader (cloudFiles)
df = (
    spark.readStream.format("cloudFiles")
    # Specify the format of incoming files (CSV in this case)
    .option("cloudFiles.format", "csv")
    # Location to store schema and checkpoint info
    # - Tracks which files are already processed (avoids duplicates)
    # - Stores inferred schema so it's not recomputed each time
    .option("cloudFiles.schemaLocation", f"/Volumes/workspace/flight_bronze/bronzevolume/{src_value}/checkpoint")
    # Handle schema evolution:
    # "rescue" means unexpected new columns are captured in "_rescued_data" instead of failing
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    # Path where Auto Loader will look for new incoming CSV files
    # Uses src_value variable so each source has its own folder
    .load(f"/Volumes/workspace/flight_raw/rawvolume/rawdata/{src_value}/")
)


In [0]:
# Write the streaming DataFrame into a Delta table (Bronze layer)
(
    df.writeStream.format("delta")
    # Append mode: new data gets added without modifying existing records
    .outputMode("append")
    # Trigger set to "once":
    .trigger(once=True)
    # Checkpoint location for fault tolerance
    .option("checkpointLocation", f"/Volumes/workspace/flight_bronze/bronzevolume/{src_value}/checkpoint")
    # Final storage path for the Delta table
    .option("path", f"/Volumes/workspace/flight_bronze/bronzevolume/{src_value}/data")
    # Start the streaming write query
    .start()
)


In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/flight_bronze/bronzevolume/customers/data`