In [0]:
from pyspark.sql.types import LongType, DoubleType, TimestampType

CASTS = {
    "VendorID": LongType(),        
    "passenger_count": LongType(),
    "tpep_pickup_datetime": TimestampType(),
    "tpep_dropoff_datetime": TimestampType(),
    "trip_distance": DoubleType(), 
    "RatecodeID": LongType(),
    "PULocationID": LongType(),    
    "DOLocationID": LongType(),
    "payment_type": LongType(),    
    "fare_amount": DoubleType(),
    "extra": DoubleType(),         
    "mta_tax": DoubleType(),
    "tip_amount": DoubleType(),    
    "tolls_amount": DoubleType(),
    "improvement_surcharge": DoubleType(),
    "total_amount": DoubleType(),  
    "congestion_surcharge": DoubleType(),
    "airport_fee": DoubleType(),
    "trip_type": LongType(),
    "ehail_fee": LongType(),
}

In [0]:
import requests, os
from pyspark.sql.functions import coalesce, col, to_timestamp, month, year, lit

landing_path = "/Volumes/workspace/default/landing/nyc_taxi_trip_records"
schema_name  = "bronze"
table_name   = "nyc_taxi_trip_records"   

spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_name}")
spark.sql(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")

os.makedirs(landing_path, exist_ok=True)

def ingest_to_bronze(parquet_file: str, taxi_color: str):
    df = spark.read.parquet(parquet_file)

    if taxi_color == "green":
        df = (df
              .withColumnRenamed("lpep_pickup_datetime",  "tpep_pickup_datetime")
              .withColumnRenamed("lpep_dropoff_datetime", "tpep_dropoff_datetime"))

    if "Airport_fee" in df.columns:
        df = df.withColumn("airport_fee", coalesce(col("airport_fee"), col("Airport_fee"))) \
               .drop("Airport_fee")

    if "ehail_fee" in df.columns:
        df = df.drop("ehail_fee")

    if "trip_type" in df.columns:
        df = df.withColumn("trip_type", col("trip_type").cast("integer"))
    else:
        df = df.withColumn("trip_type", lit(None))

    
    for c, t in CASTS.items():
        df = df.withColumn(c, col(c).cast(t)) if c in df.columns else df

    df = (
        df
        .withColumn("year",       year("tpep_pickup_datetime"))
        .withColumn("month",      month("tpep_pickup_datetime"))
        .withColumn("taxi_color", lit(taxi_color))
    )
    (
        df.write
          .format("delta")
          .mode("append")
          .option("mergeSchema", "true")
          .partitionBy("year", "month")
          .saveAsTable(f"{schema_name}.{table_name}")  
    )
colors = ["yellow", "green"]

for color in colors: 
    for m in range(1, 6):
        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{color}_tripdata_2023-{m:02d}.parquet"
        local_fp  = f"{landing_path}/{color}_tripdata_2023-{m:02d}.parquet"

        resp = requests.get(url)
        resp.raise_for_status()
        with open(local_fp, "wb") as f:
            f.write(resp.content)  
        ingest_to_bronze(local_fp, color)
