In [0]:
# Databricks notebook source
# ------------------------------------------------------------------------------
# Notebook: 01_bronze_ingestion_autoloader
# Purpose : Ingest Chicago Taxi raw data into the bronze layer as Delta by path,
#           adding ingestion metadata and simulating Auto Loader / COPY INTO patterns.
#
# Exam Coverage (Databricks Certified Data Engineer Associate – Exam Guide 2025-07-30)
# - Section 2: Development and Ingestion
#   - Capabilities of notebooks for data engineering workflows.
#   - Auto Loader sources, use cases and syntax (conceptual in CE).
#   - COPY INTO as an ingestion pattern from cloud storage into Delta tables.
# - Section 3: Data Processing & Transformations
#   - Role of the bronze layer in a medallion pipeline.
# - Section 5: Data Governance & Quality
#   - Ingestion-time metadata columns and basic sanity checks.
#
# Key Practices
# - Read CSV with an explicit schema.
# - Write Delta by path under /Volumes/{catalog}/{schema}/{volume}/bronze/.
# - Add ingestion_ts and capture _metadata.file_path when available.
# - Keep ingestion idempotent for demos (overwrite) and document incremental options.
# ------------------------------------------------------------------------------


## Exam Focus – Ingestion Patterns (Section 2)

This notebook reinforces key ingestion patterns for the exam:

- Manual batch ingestion using `spark.read` + `DataFrame.write.format("delta")`.
- Conceptual understanding of **COPY INTO** to load files into Delta tables.
- Conceptual understanding of **Auto Loader** for incremental file ingestion.

Key exam ideas:
- **COPY INTO**:
  - SQL command to load data from cloud storage locations into a Delta table.
  - Good for repeatable, idempotent batch loads.
- **Auto Loader**:
  - Uses `cloudFiles` to incrementally track new files from cloud storage.
  - Handles schema inference and schema evolution.
- In Community Edition, these features might not be available, but knowing their
  syntax and use cases is important for the exam.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
spark = SparkSession.builder.getOrCreate()


In [0]:
# Widgets
dbutils.widgets.text("catalog", "taxi_catalog")
dbutils.widgets.text("schema",  "taxi_schema")
dbutils.widgets.text("volume",  "taxi_volume")

# Read widget values
catalog_name = dbutils.widgets.get("catalog")
schema_name  = dbutils.widgets.get("schema")
volume_name  = dbutils.widgets.get("volume")

# Base & medallion paths
base_path   = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}"
bronze_path = f"{base_path}/bronze"
silver_path = f"{base_path}/silver"
gold_path   = f"{base_path}/gold"

# Entities for bronze
bronze_table_path = f"{bronze_path}/chicago_taxi_trips"
bronze_view_name  = f"{catalog_name}.{schema_name}.chicago_taxi_bronze_v"

print("Base:", base_path)
print("Bronze table path:", bronze_table_path)
print("Bronze view:", bronze_view_name)


In [0]:
# Ensure medallion directories exist in the volume
dbutils.fs.mkdirs(bronze_path)
dbutils.fs.mkdirs(silver_path)
dbutils.fs.mkdirs(gold_path)

print("Medallion directories ensured under base path")
display(dbutils.fs.ls(base_path))

In [0]:
# Ensure medallion directories exist
dbutils.fs.mkdirs(bronze_path); dbutils.fs.mkdirs(silver_path); dbutils.fs.mkdirs(gold_path)

# Define and check source file in bronze
raw_source_path = f"{bronze_path}/m6dm-c72p.csv"
files = [f.path.replace("dbfs:", "") for f in dbutils.fs.ls(bronze_path)]
assert raw_source_path.replace("dbfs:", "") in files, f"Source file not found: {raw_source_path}"
print("OK - Source file:", raw_source_path)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# 1) Explicit schema (avoid inference)
taxi_schema = StructType([
    StructField("taxi_id", StringType(), True),
    StructField("trip_start_timestamp", TimestampType(), True),
    StructField("trip_end_timestamp", TimestampType(), True),
    StructField("trip_seconds", IntegerType(), True),
    StructField("trip_miles", DoubleType(), True),
    StructField("pickup_census_tract", StringType(), True),
    StructField("dropoff_census_tract", StringType(), True),
    StructField("pickup_community_area", IntegerType(), True),
    StructField("dropoff_community_area", IntegerType(), True),
    StructField("fare", DoubleType(), True),
    StructField("tips", DoubleType(), True),
    StructField("tolls", DoubleType(), True),
    StructField("extras", DoubleType(), True),
    StructField("trip_total", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("company", StringType(), True),
    StructField("pickup_centroid_latitude", DoubleType(), True),
    StructField("pickup_centroid_longitude", DoubleType(), True),
    StructField("dropoff_centroid_latitude", DoubleType(), True),
    StructField("dropoff_centroid_longitude", DoubleType(), True),
])

# 2) Read CSV and add governance metadata (UC-safe: _metadata.file_path)
df_raw = (spark.read.option("header", True).schema(taxi_schema).csv(raw_source_path))
df_bronze = (df_raw.withColumn("ingestion_ts", current_timestamp())
                  .withColumn("source_file", col("_metadata.file_path")))

# 3) Write Delta by path (idempotent overwrite for demo)
df_bronze.write.format("delta").mode("overwrite").save(bronze_table_path)

# 4) Expose UC VIEW over Delta path (tables with LOCATION are blocked in CE+UC)
spark.sql(f"""
  CREATE OR REPLACE VIEW {bronze_view_name} AS
  SELECT * FROM delta.`{bronze_table_path}`
""")

# 5) Basic consistency check: path vs VIEW
cnt_path = spark.sql(f"SELECT COUNT(*) c FROM delta.`{bronze_table_path}`").collect()[0]["c"]
cnt_view = spark.sql(f"SELECT COUNT(*) c FROM {bronze_view_name}").collect()[0]["c"]
print("Counts -> path:", cnt_path, "| view:", cnt_view)
assert cnt_path == cnt_view and cnt_path > 0, "Bronze ingestion failed or empty."


In [0]:
from delta.tables import DeltaTable

dt = DeltaTable.forPath(spark, bronze_table_path)
display(dt.history(5))  # recent commits

# Optional: try version 0
try:
    _ = (spark.read.format("delta").option("versionAsOf", 0).load(bronze_table_path).limit(1).count())
    print("Time travel OK (version 0)")
except Exception:
    print("No earlier version available yet")


In [0]:
from pyspark.sql.functions import col

bronze_df = spark.read.format("delta").load(bronze_table_path)
assert bronze_df.count() > 0, "Empty bronze."

# Expected columns present
for c in ["trip_total","fare","tips","tolls","extras","trip_seconds","trip_miles"]:
    assert c in bronze_df.columns, f"Missing expected column: {c}"

# Non-negative monetary/time/distance
viol = []
for c in ["trip_total","fare","tips","tolls","extras","trip_seconds","trip_miles"]:
    if bronze_df.filter(col(c) < 0).limit(1).count() > 0:
        viol.append(c)
assert not viol, f"Negative values found in: {viol}"
print("Bronze DQ checks passed.")


In [0]:
%sql
-- COPY INTO pattern (exam concept, may not run on CE)
COPY INTO taxi_catalog.taxi_schema.chicago_taxi_bronze
FROM 's3://bucket/path/to/chicago_taxi/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'false')
COPY_OPTIONS ('mergeSchema' = 'true');

-- Auto Loader pattern (exam concept, may not run on CE)
-- STREAMING EXAMPLE (PySpark pseudo-code, exam-style)
-- spark.readStream.format("cloudFiles") \
--   .option("cloudFiles.format", "csv") \
--   .option("cloudFiles.inferColumnTypes", "true") \
--   .load("s3://bucket/path/to/chicago_taxi/") \
--   .writeStream \
--   .option("checkpointLocation", "s3://bucket/checkpoints/chicago_taxi/") \
--   .table("taxi_catalog.taxi_schema.chicago_taxi_bronze");


In [0]:
# Bronze preflight: verify the Delta path exists and is readable
from pyspark.sql.utils import AnalysisException

try:
    _ = dbutils.fs.ls(bronze_table_path)  # folder must exist
    _ = spark.read.format("delta").load(bronze_table_path).limit(1).count()  # table must be readable
    print("Bronze preflight: OK ->", bronze_table_path)
except Exception as e:
    raise RuntimeError(f"Bronze not ready at {bronze_table_path}. Run the bronze ingestion cells first. Original: {e}")


In [0]:
# Signal success to orchestrator
dbutils.notebook.exit("OK")