### Data ingestion to Bronze 

In [0]:
# Clean slate
dbutils.fs.rm(checkpoint_path, recurse=True)
spark.sql("DROP TABLE IF EXISTS project.taxi_bronze.trips_raw")

# Run the corrected code above

DataFrame[]

In [0]:
from pyspark.sql import functions as F

raw_root = "/Volumes/project/default/my_data_volume/yellow_trips"

raw = (
    spark.read
      .format("parquet")
      .option("basePath", raw_root)   
      .load(f"{raw_root}/year=*/month=*")
)

display(raw.limit(10))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,year,month
1,2022-02-01T00:06:58.000,2022-02-01T00:19:24.000,1.0,5.4,1.0,N,138,252,1,17.0,1.75,0.5,3.9,0.0,0.3,23.45,0.0,1.25,2022,2
1,2022-02-01T00:38:22.000,2022-02-01T00:55:55.000,1.0,6.4,1.0,N,138,41,2,21.0,1.75,0.5,0.0,6.55,0.3,30.1,0.0,1.25,2022,2
1,2022-02-01T00:03:20.000,2022-02-01T00:26:59.000,1.0,12.5,1.0,N,138,200,2,35.5,1.75,0.5,0.0,6.55,0.3,44.6,0.0,1.25,2022,2
2,2022-02-01T00:08:00.000,2022-02-01T00:28:05.000,1.0,9.88,1.0,N,239,200,2,28.0,0.5,0.5,0.0,3.0,0.3,34.8,2.5,0.0,2022,2
2,2022-02-01T00:06:48.000,2022-02-01T00:33:07.000,1.0,12.16,1.0,N,138,125,1,35.5,0.5,0.5,8.11,0.0,0.3,48.66,2.5,1.25,2022,2
1,2022-02-01T00:57:23.000,2022-02-01T01:07:31.000,2.0,2.3,1.0,N,140,142,1,10.0,3.0,0.5,2.75,0.0,0.3,16.55,2.5,0.0,2022,2
1,2022-02-01T00:34:17.000,2022-02-01T01:01:32.000,1.0,7.5,1.0,N,140,36,1,25.0,3.0,0.5,5.0,0.0,0.3,33.8,2.5,0.0,2022,2
2,2022-02-01T00:03:26.000,2022-02-01T00:07:30.000,1.0,0.88,1.0,N,48,68,1,5.0,0.5,0.5,1.76,0.0,0.3,10.56,2.5,0.0,2022,2
2,2022-02-01T00:13:31.000,2022-02-01T00:31:41.000,1.0,6.48,1.0,N,142,244,1,21.0,0.5,0.5,4.96,0.0,0.3,29.76,2.5,0.0,2022,2
2,2022-02-01T00:52:11.000,2022-02-01T01:02:48.000,1.0,3.29,1.0,N,238,116,1,12.0,0.5,0.5,2.66,0.0,0.3,15.96,0.0,0.0,2022,2


In [0]:
from pyspark.sql import functions as F

spark.sql('''
    DROP TABLE IF EXISTS project.taxi_bronze.trips_raw;
''')

bronze = (
    raw
    .withColumn("year", F.col("year").cast("int"))
    .withColumn("month", F.col("month").cast("int"))
    .withColumn("ingest_ts", F.current_timestamp())
)

(bronze.write
  .format("delta")
  .mode("overwrite")              
  .partitionBy("year", "month")
  .saveAsTable("project.taxi_bronze.trips_raw")
)



In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS project.infra;
CREATE VOLUME IF NOT EXISTS project.infra.checkpoints;


In [0]:
from pyspark.sql import functions as F

source_path = "/Volumes/project/default/my_data_volume/yellow_trips/"
schema_path = "/Volumes/project/infra/checkpoints/taxi/schema"
checkpoint_path = "/Volumes/project/infra/checkpoints/taxi/data"


df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", schema_path)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("rescuedDataColumn", "_rescued_data")
        .load(source_path)
        .select(
            "*",
            F.col("_metadata.file_path").alias("source_file"),
            F.col("_metadata.file_modification_time").alias("source_file_mod_time")
        )
        .withColumn("ingest_ts", F.current_timestamp())
        .withColumn(
            "tpep_pickup_datetime",
            F.col("tpep_pickup_datetime").cast("timestamp")
        )
        .withColumn("year", F.year("tpep_pickup_datetime"))
        .withColumn("month", F.month("tpep_pickup_datetime"))
)

# Write and capture the query
query = (
    df.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_path)
        .option("mergeSchema", "true")
        .partitionBy("year", "month")
        .trigger(availableNow=True)
        .queryName("autoloader_taxi_bronze")
        .toTable("project.taxi_bronze.trips_raw")
)

# Wait for it to complete
query.awaitTermination()

print("Stream completed successfully!")

Stream completed successfully!


In [0]:
%sql
SELECT source_file, source_file_mod_time, ingest_ts
FROM project.taxi_bronze.trips_raw
LIMIT 10;


source_file,source_file_mod_time,ingest_ts
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=01/yellow_tripdata_2022-01.parquet,2025-12-18T21:25:20.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=01/yellow_tripdata_2022-01.parquet,2025-12-18T21:25:20.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=01/yellow_tripdata_2022-01.parquet,2025-12-18T21:25:20.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=01/yellow_tripdata_2022-01.parquet,2025-12-18T21:25:20.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z
/Volumes/project/default/my_data_volume/yellow_trips/year=2022/month=02/yellow_tripdata_2022-02.parquet,2025-12-18T23:06:13.000Z,2025-12-22T16:41:11.864Z


### Data Filter/Clean

In [0]:
spark.sql('''
    DROP TABLE IF EXISTS project.taxi_silver.trips ;
''')

bronze_df = spark.table("project.taxi_bronze.trips_raw")

silver = (
    bronze_df
    .withColumn("pickup_ts", F.to_timestamp("tpep_pickup_datetime"))
    .withColumn("dropoff_ts", F.to_timestamp("tpep_dropoff_datetime"))
    .withColumn("pickup_date", F.to_date("pickup_ts"))
    .withColumn("pickup_hour", F.hour("pickup_ts"))

    .withColumn("pickup_year", F.year("pickup_ts"))
    .withColumn("pickup_month", F.month("pickup_ts"))

    .withColumn(
        "trip_minutes",
        (F.unix_timestamp("dropoff_ts") - F.unix_timestamp("pickup_ts")) / 60.0
    )

    .filter(F.col("pickup_ts").isNotNull())
    .filter(F.col("dropoff_ts").isNotNull())
    .filter(F.col("trip_minutes").between(1, 240))
    .filter(F.col("trip_distance") > 0)
    .filter(F.col("total_amount") > 0)

    .filter(
        (F.col("pickup_year") == F.col("year")) &
        (F.col("pickup_month") == F.col("month"))
    )

    # schema 
    .select(
        "pickup_ts",
        "dropoff_ts",
        "pickup_date",
        "pickup_hour",
        F.col("PULocationID").cast("int").alias("pickup_location_id"),
        F.col("DOLocationID").cast("int").alias("dropoff_location_id"),
        F.col("passenger_count").cast("int"),
        F.col("trip_distance").cast("double"),
        F.col("fare_amount").cast("double"),
        F.col("tip_amount").cast("double"),
        F.col("total_amount").cast("double"),
        "trip_minutes",
        "year",
        "month"
    )
)

(
    silver.write
      .format("delta")
      .mode("overwrite")
      .partitionBy("pickup_date")
      .saveAsTable("project.taxi_silver.trips")
)




In [0]:
df = spark.read.table("project.taxi_silver.trips")

