In [6]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.functions import *
months = ["2025-05","2025-06","2025-07"]

# read the three files directly (they are files named 2025-05, 2025-06, 2025-07)
paths = [
    "Files/Files/bronze/yellow/2025-05",
    "Files/Files/bronze/yellow/2025-06",
    "Files/Files/bronze/yellow/2025-07",
]
dfs = [spark.read.format("parquet").load(p) for p in paths]
bronze = dfs[0]
for d in dfs[1:]:
    bronze = bronze.unionByName(d, allowMissingColumns=True)


keep = [
    "VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count",
    "trip_distance","RatecodeID","PULocationID","DOLocationID","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount",
    "improvement_surcharge","congestion_surcharge","airport_fee","total_amount"
]
bronze = bronze.select([c for c in keep if c in bronze.columns])
(bronze.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
       .write.format("delta").mode("overwrite").saveAsTable("bronze_yellow_trips"))


StatementMeta(, 5e456344-8664-4504-8f10-ea58d3693e0d, 8, Finished, Available, Finished)

In [9]:
from pyspark.sql.functions import *

# --- Load Bronze ---
b = spark.table("bronze_yellow_trips")

# --- Make schema drift-safe: add optional columns if missing ---
# (default values keep downstream math valid)
optional_cols = {
    "airport_fee": ("double", 0.0),
    "congestion_surcharge": ("double", 0.0),
    "improvement_surcharge": ("double", 0.0),
    "extra": ("double", 0.0),
    "mta_tax": ("double", 0.0),
    "tip_amount": ("double", 0.0),
    "tolls_amount": ("double", 0.0),
}
for name, (dtype, default) in optional_cols.items():
    if name not in b.columns:
        b = b.withColumn(name, lit(default).cast(dtype))
        print(f"Added missing column: {name}")

# --- Cast, derive, and standardize ---
s = (b.select(
        col("VendorID").cast("int").alias("VendorID"),
        to_timestamp("tpep_pickup_datetime").alias("tpep_pickup_datetime"),
        to_timestamp("tpep_dropoff_datetime").alias("tpep_dropoff_datetime"),
        col("passenger_count").cast("int").alias("passenger_count"),
        col("trip_distance").cast("double").alias("trip_distance"),
        col("RatecodeID").cast("int").alias("RatecodeID"),
        col("PULocationID").cast("int").alias("PULocationID"),
        col("DOLocationID").cast("int").alias("DOLocationID"),
        col("payment_type").cast("int").alias("payment_type"),
        col("fare_amount").cast("double").alias("fare_amount"),
        col("extra").cast("double").alias("extra"),
        col("mta_tax").cast("double").alias("mta_tax"),
        col("tip_amount").cast("double").alias("tip_amount"),
        col("tolls_amount").cast("double").alias("tolls_amount"),
        col("improvement_surcharge").cast("double").alias("improvement_surcharge"),
        col("congestion_surcharge").cast("double").alias("congestion_surcharge"),
        col("airport_fee").cast("double").alias("airport_fee"),
        col("total_amount").cast("double").alias("total_amount")
    )
    .withColumn("trip_minutes", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))/60.0)
    .withColumn("pickup_date", to_date("tpep_pickup_datetime"))
    .withColumn("pickup_hour", hour("tpep_pickup_datetime"))
)

# --- Data quality filters ---
s = s.filter(
    (col("trip_distance").isNotNull()) & (col("trip_distance") > 0) & (col("trip_distance") < 100) &
    (col("trip_minutes").isNotNull()) & (col("trip_minutes") > 0) & (col("trip_minutes") < 240) &
    (col("fare_amount") >= 0) & (col("total_amount") >= 0)
)

# --- De-duplicate ---
s = s.dropDuplicates([
    "tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID","total_amount"
])

# --- Write Silver table (partitioned by date) ---
(s.write.format("delta")
   .mode("overwrite")
   .partitionBy("pickup_date")
   .saveAsTable("silver_yellow_trips"))

print("Silver rows:", spark.table("silver_yellow_trips").count())


StatementMeta(, 5e456344-8664-4504-8f10-ea58d3693e0d, 11, Finished, Available, Finished)

Added missing column: airport_fee
Silver rows: 11455283


In [10]:
from pyspark.sql.functions import *

# ========= GOLD: DIMENSIONS =========

# Date dimension from Silver range
s = spark.table("silver_yellow_trips")
rng = s.select(min("pickup_date").alias("min_d"), max("pickup_date").alias("max_d")).collect()[0]

dim_date = (
    spark.range(1)
      .select(sequence(lit(rng["min_d"]), lit(rng["max_d"]), expr("interval 1 day")).alias("d"))
      .select(explode(col("d")).alias("date"))
      .select(
          col("date"),
          date_format("date","yyyyMMdd").cast("int").alias("date_key"),
          year("date").alias("year"),
          month("date").alias("month"),
          dayofweek("date").alias("day_of_week"),
          weekofyear("date").alias("week_of_year"),
          (when(dayofweek("date").isin(1,7), 1).otherwise(0)).alias("is_weekend")
      )
)
dim_date.write.format("delta").mode("overwrite").saveAsTable("gold_dim_date")

# Zone dimension (lookup CSV may be under Files/ or Files/Files/)
zones = None
for p in ["Files/reference/taxi_zone_lookup.csv", "Files/Files/reference/taxi_zone_lookup.csv"]:
    try:
        zones = spark.read.format("csv").option("header", True).load(p)
        break
    except Exception:
        pass
if zones is None:
    raise FileNotFoundError("taxi_zone_lookup.csv not found under Files/reference or Files/Files/reference")

dim_zone = zones.select(
    col("LocationID").cast("int").alias("zone_id"),
    col("Borough"),
    col("Zone"),
    col("service_zone")
)
dim_zone.write.format("delta").mode("overwrite").saveAsTable("gold_dim_zone")

# ========= GOLD: FACT =========

fact = (
    s.select(
        # keys
        date_format(col("pickup_date"), "yyyyMMdd").cast("int").alias("date_key"),
        col("PULocationID").alias("pu_zone_id"),
        col("DOLocationID").alias("do_zone_id"),
        # features / measures
        col("passenger_count"),
        round(col("trip_distance"), 2).alias("trip_distance"),
        round(col("trip_minutes"), 2).alias("trip_minutes"),
        col("pickup_hour"),
        round(col("fare_amount"), 2).alias("fare_amount"),
        round(col("tip_amount"), 2).alias("tip_amount"),
        round(col("tolls_amount"), 2).alias("tolls_amount"),
        round(col("congestion_surcharge"), 2).alias("congestion_surcharge"),
        round(col("airport_fee"), 2).alias("airport_fee"),
        round(col("total_amount"), 2).alias("total_amount"),
        when(col("fare_amount") > 0,
             round(col("tip_amount") / col("fare_amount"), 4)
        ).otherwise(lit(0.0)).alias("tip_pct")
    )
)

(fact.write.format("delta")
     .mode("overwrite")
     .partitionBy("date_key")
     .saveAsTable("gold_fact_trips"))

print(
    "gold_dim_date:", spark.table("gold_dim_date").count(),
    "| gold_dim_zone:", spark.table("gold_dim_zone").count(),
    "| gold_fact_trips:", spark.table("gold_fact_trips").count()
)


StatementMeta(, 5e456344-8664-4504-8f10-ea58d3693e0d, 12, Finished, Available, Finished)

gold_dim_date: 6056 | gold_dim_zone: 265 | gold_fact_trips: 11455283
