In [1]:
from pyspark.sql import functions as F

BRONZE_PATH = "Files/bronze/taxi/taxi_trips/trip-data"

df_bronze = (
    spark.read
    .parquet(BRONZE_PATH)
)

df_bronze.printSchema()


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 3, Finished, Available, Finished)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [2]:
df_silver = (
    df_bronze
    .withColumn("pickup_ts", F.to_timestamp("tpep_pickup_datetime"))
    .withColumn("dropoff_ts", F.to_timestamp("tpep_dropoff_datetime"))
    .withColumn("pickup_date", F.to_date("pickup_ts"))
    .withColumn("dropoff_date", F.to_date("dropoff_ts"))
)


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 4, Finished, Available, Finished)

In [3]:
df_silver = (
    df_silver
    .filter(F.col("pickup_ts").isNotNull())
    .filter(F.col("dropoff_ts").isNotNull())
    .filter(F.col("trip_distance") > 0)
    .filter(F.col("total_amount") > 0)
    .filter(F.col("passenger_count") >= 0)
)


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 5, Finished, Available, Finished)

In [4]:
df_silver = (
    df_silver
    .withColumnRenamed("PULocationID", "pu_zone_id")
    .withColumnRenamed("DOLocationID", "do_zone_id")
)


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 6, Finished, Available, Finished)

In [5]:
df_silver = df_silver.select(
    "VendorID",
    "pickup_ts",
    "dropoff_ts",
    "pickup_date",
    "dropoff_date",
    "pu_zone_id",
    "do_zone_id",
    "passenger_count",
    "trip_distance",
    "payment_type",
    "fare_amount",
    "tip_amount",
    "tolls_amount",
    "extra",
    "mta_tax",
    "improvement_surcharge",
    "congestion_surcharge",
    "Airport_fee",
    "cbd_congestion_fee",
    "total_amount"
)


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 7, Finished, Available, Finished)

In [6]:
SILVER_PATH = "Files/silver/taxi/fact_taxi_trips"

(
    df_silver
    .write
    .mode("overwrite")
    .format("parquet")
    .partitionBy("pickup_date")
    .save(SILVER_PATH)
)


StatementMeta(, bf80406a-f329-4596-bb75-8c328c1635bd, 8, Finished, Available, Finished)

In [None]:
(
    df_silver
    .write
    .mode("overwrite")
    .format("parquet")
    .save("Files/silver/taxi/fact_taxi_trips")
)


In [6]:
from pyspark.sql.functions import (
    col, hour, unix_timestamp, when
)

df_enriched = (
    df_silver
    .withColumn("pickup_hour", hour(col("pickup_ts")))
    .withColumn(
        "trip_duration_min",
        (unix_timestamp("dropoff_ts") - unix_timestamp("pickup_ts")) / 60
    )
    .withColumn(
        "rush_hour",
        when(col("pickup_hour").between(7, 10), "morning")
        .when(col("pickup_hour").between(16, 19), "evening")
        .otherwise("off_peak")
    )
    .filter(col("trip_duration_min") > 0)
)


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 8, Finished, Available, Finished)

In [7]:
def save_gold_table(df, table_name):
    (
        df
        .write
        .format("delta")
        .mode("overwrite")
        .saveAsTable(f"dbo.{table_name}")
    )


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 9, Finished, Available, Finished)

In [8]:
from pyspark.sql.functions import count, avg, sum

gold_peak_times = (
    df_enriched
    .groupBy("pickup_date", "pickup_hour", "rush_hour")
    .agg(
        count("*").alias("trips_cnt"),
        avg("trip_duration_min").alias("avg_duration_min"),
        avg("fare_amount").alias("avg_fare"),
        sum("total_amount").alias("total_revenue")
    )
    .orderBy("pickup_hour")
)

save_gold_table(gold_peak_times, "gold_peak_times")

StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 10, Finished, Available, Finished)

In [9]:
gold_peak_zones = (
    df_enriched
    .groupBy("pickup_date", "pu_zone_id")
    .agg(
        count("*").alias("trips_cnt"),
        avg("trip_duration_min").alias("avg_duration_min"),
        sum("total_amount").alias("total_revenue")
    )
    .orderBy(col("trips_cnt").desc())
)

save_gold_table(gold_peak_zones, "gold_peak_zones")

StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 11, Finished, Available, Finished)

In [10]:
AIRPORT_ZONES = [132, 138]  # JFK, LGA


StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 12, Finished, Available, Finished)

In [11]:
gold_airports = (
    df_enriched
    .filter(col("pu_zone_id").isin(AIRPORT_ZONES))
    .groupBy("pickup_date", "pu_zone_id", "rush_hour")
    .agg(
        count("*").alias("trips_cnt"),
        avg("trip_duration_min").alias("avg_duration_min"),
        avg("fare_amount").alias("avg_fare"),
        sum("total_amount").alias("total_revenue")
    )
)

save_gold_table(gold_airports, "gold_airports")

StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 13, Finished, Available, Finished)

In [12]:
gold_trip_duration_bins = (
    df_enriched
    .withColumn(
        "duration_bucket",
        when(col("trip_duration_min") <= 5, "0–5")
        .when(col("trip_duration_min") <= 10, "5–10")
        .when(col("trip_duration_min") <= 20, "10–20")
        .when(col("trip_duration_min") <= 40, "20–40")
        .otherwise("40+")
    )
    .groupBy("pickup_date", "duration_bucket")
    .agg(
        count("*").alias("trips_cnt"),
        avg("fare_amount").alias("avg_fare")
    )
)

save_gold_table(gold_trip_duration_bins, "gold_trip_duration_bins")

StatementMeta(, 420d2f50-5436-4b87-8ef0-4894fbd43e9c, 14, Finished, Available, Finished)