# Step A: Load 2019 Yellow Taxi Sample (Development Dataset)

Loaded 4.6 GB NYC Yellow Taxi dataset from Databricks public datasets for distributed processing.



In [0]:
# For TEST ONLY
base = "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow"
files = dbutils.fs.ls(base)
paths_2019_01 = [f.path for f in files if "2019-01" in f.name]

df_dev = (spark.read
              .option("header", "true")
              .option("inferSchema", "true")
              .csv(paths_2019_01))

df_dev.printSchema()
df_dev.show(5)


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+---

In [0]:
# === Load Full Dataset (2017–2020) ===
base = "dbfs:/databricks-datasets/nyctaxi/tripdata/yellow"
files = dbutils.fs.ls(base)

subset_full = [f.path for f in files if any(y in f.name for y in ["2017", "2018", "2019", "2020"])]

df_dev = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(subset_full)
)

df_dev.printSchema()
df_dev.show(5)
print(f"Loaded full dataset with {df_dev.count():,} rows")


root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+-------

# Step B: Load Taxi Zone Reference Data for Joins

In [0]:
zone_path = "dbfs:/databricks-datasets/nyctaxi/taxizone"
zone_df = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv(zone_path))

zone_df = (zone_df
           .select("LocationID", "Borough", "Zone")
           .withColumnRenamed("LocationID", "LocationID_key")
           .withColumnRenamed("Borough", "Borough_name")
           .withColumnRenamed("Zone", "Zone_name"))

zone_df.show(5)


+--------------+-------------+--------------------+
|LocationID_key| Borough_name|           Zone_name|
+--------------+-------------+--------------------+
|             1|          EWR|      Newark Airport|
|             2|       Queens|         Jamaica Bay|
|             3|        Bronx|Allerton/Pelham G...|
|             4|    Manhattan|       Alphabet City|
|             5|Staten Island|       Arden Heights|
+--------------+-------------+--------------------+
only showing top 5 rows


# Step C: Column Transformations and Data Filtering


In [0]:
from pyspark.sql import functions as F

cols_needed = [
    "tpep_pickup_datetime","tpep_dropoff_datetime",
    "passenger_count","trip_distance",
    "PULocationID","DOLocationID",
    "fare_amount","tip_amount","total_amount","payment_type","RatecodeID"
]
df1 = df_dev.select([c for c in cols_needed if c in df_dev.columns])

num_cols_double = ["trip_distance","fare_amount","tip_amount","total_amount"]
num_cols_int    = ["passenger_count","PULocationID","DOLocationID","RatecodeID"]

df_cast = df1
for c in num_cols_double:
    if c in df_cast.columns:
        df_cast = df_cast.withColumn(c, F.col(c).cast("double"))
for c in num_cols_int:
    if c in df_cast.columns:
        df_cast = df_cast.withColumn(c, F.col(c).cast("int"))

df_cast = (df_cast
    .withColumn("tpep_pickup_datetime",  F.to_timestamp("tpep_pickup_datetime"))
    .withColumn("tpep_dropoff_datetime", F.to_timestamp("tpep_dropoff_datetime"))
)

df2 = (df_cast
    .withColumn(
        "trip_minutes",
        (F.col("tpep_dropoff_datetime").cast("long") - F.col("tpep_pickup_datetime").cast("long"))/60.0
    )
    .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
    .withColumn("pickup_dow",  F.date_format("tpep_pickup_datetime","E"))
    .withColumn(
        "tip_rate",
        F.when(F.col("fare_amount") > 0,
               F.col("tip_amount") / F.col("fare_amount"))
         .otherwise(F.lit(None).cast("double"))
    )
)

df_clean = (df2
    .filter((F.col("trip_minutes") >= 1) & (F.col("trip_minutes") <= 180))
    .filter((F.col("trip_distance") >= 0.2) & (F.col("trip_distance") <= 50))
    .filter((F.col("passenger_count") >= 1) & (F.col("passenger_count") <= 6))
    .filter(F.col("total_amount") > 0)
)

df_clean.select("tpep_pickup_datetime","trip_minutes","trip_distance","passenger_count",
                "fare_amount","tip_amount","tip_rate").show(5)


+--------------------+------------------+-------------+---------------+-----------+----------+-------------------+
|tpep_pickup_datetime|      trip_minutes|trip_distance|passenger_count|fare_amount|tip_amount|           tip_rate|
+--------------------+------------------+-------------+---------------+-----------+----------+-------------------+
| 2017-10-01 00:01:50|12.383333333333333|          2.0|              1|       10.0|       0.0|                0.0|
| 2017-10-01 00:02:43| 5.866666666666666|          2.3|              2|        8.0|      1.85|            0.23125|
| 2017-10-01 00:12:08|13.683333333333334|          2.8|              3|       13.0|      1.72| 0.1323076923076923|
| 2017-10-01 00:00:25|10.983333333333333|         1.97|              1|        9.5|       1.0|0.10526315789473684|
| 2017-10-01 00:15:30| 9.683333333333334|         2.17|              1|        9.0|      2.06| 0.2288888888888889|
+--------------------+------------------+-------------+---------------+---------

# Step D: Join Taxi Trips with Zone Information (PU and DO)


In [0]:
df_pu = (df_clean
         .join(zone_df.withColumnRenamed("LocationID_key","PULocationID"),
               on="PULocationID", how="left")
         .withColumnRenamed("Borough_name","PU_Borough")
         .withColumnRenamed("Zone_name","PU_Zone"))

df_joined = (df_pu
             .join(zone_df.withColumnRenamed("LocationID_key","DOLocationID"),
                   on="DOLocationID", how="left")
             .withColumnRenamed("Borough_name","DO_Borough")
             .withColumnRenamed("Zone_name","DO_Zone"))

df_joined.select("PULocationID","PU_Borough","PU_Zone",
                 "DOLocationID","DO_Borough","DO_Zone","trip_minutes","tip_rate").show(5)


+------------+----------+-------------------+------------+----------+--------------------+------------------+-------------------+
|PULocationID|PU_Borough|            PU_Zone|DOLocationID|DO_Borough|             DO_Zone|      trip_minutes|           tip_rate|
+------------+----------+-------------------+------------+----------+--------------------+------------------+-------------------+
|         142| Manhattan|Lincoln Square East|         233| Manhattan| UN/Turtle Bay South|12.383333333333333|                0.0|
|         142| Manhattan|Lincoln Square East|         166| Manhattan| Morningside Heights| 5.866666666666666|            0.23125|
|         151| Manhattan|   Manhattan Valley|         262| Manhattan|      Yorkville East|13.683333333333334| 0.1323076923076923|
|         100| Manhattan|   Garment District|         229| Manhattan|Sutton Place/Turt...|10.983333333333333|0.10526315789473684|
|         141| Manhattan|    Lenox Hill West|         142| Manhattan| Lincoln Square East|

# Step E: GroupBy Aggregations


In [0]:
agg_df = (df_joined
          .groupBy("PU_Borough","pickup_hour")
          .agg(F.count("*").alias("trips"),
               F.avg("trip_distance").alias("avg_miles"),
               F.avg("total_amount").alias("avg_total"),
               F.avg("tip_rate").alias("avg_tip_rate"))
          .orderBy(F.desc("trips"))
         )

agg_df.show(20, truncate=False)


+----------+-----------+--------+------------------+------------------+-------------------+
|PU_Borough|pickup_hour|trips   |avg_miles         |avg_total         |avg_tip_rate       |
+----------+-----------+--------+------------------+------------------+-------------------+
|Manhattan |18         |17482004|2.136958465974506 |15.192062065590234|0.16856183517145382|
|Manhattan |19         |16903389|2.1567067219478986|14.799441652734169|0.16963685194799008|
|Manhattan |17         |15394948|2.226213928751183 |15.833075497822092|0.16094658442483903|
|Manhattan |20         |15339035|2.310482299571011 |14.582196747653878|0.17216770597667894|
|Manhattan |21         |15143904|2.4475205647103944|14.911812764341487|0.1697537024051325 |
|Manhattan |14         |14716392|2.455025166494635 |15.919868422170422|0.1470033868378145 |
|Manhattan |15         |14592458|2.390400012115867 |15.798106106531106|0.1482791990605973 |
|Manhattan |22         |14262112|2.5817350109156436|15.372424419274372|0.1694551

# Step F: SQL Queries on Cleaned Dataset

In [0]:
df_joined.createOrReplaceTempView("nyc_taxi_clean")

# SQL Query 1
sql_1 = spark.sql("""
SELECT payment_type,
       COUNT(*)        AS trips,
       AVG(tip_rate)   AS avg_tip_rate,
       AVG(total_amount) AS avg_total
FROM nyc_taxi_clean
GROUP BY payment_type
ORDER BY trips DESC
""")
sql_1.show(10, truncate=False)

# SQL Query 2
sql_2 = spark.sql("""
SELECT PU_Borough,
       pickup_dow,
       COUNT(*) AS trips
FROM nyc_taxi_clean
GROUP BY PU_Borough, pickup_dow
ORDER BY PU_Borough, trips DESC
""")
sql_2.show(20, truncate=False)


+------------+---------+---------------------+------------------+
|payment_type|trips    |avg_tip_rate         |avg_total         |
+------------+---------+---------------------+------------------+
|1           |208059451|0.23123426126361196  |18.585367609842752|
|2           |88894373 |8.12385093996425E-6  |14.220029456288684|
|3           |1059158  |2.7777545480639173E-4|23.153573432861332|
|4           |336246   |3.359316588845696E-4 |21.558201465592308|
|5           |4        |0.05                 |16.599999999999998|
+------------+---------+---------------------+------------------+

+----------+----------+------+
|PU_Borough|pickup_dow|trips |
+----------+----------+------+
|Bronx     |Sun       |49246 |
|Bronx     |Fri       |48507 |
|Bronx     |Sat       |47886 |
|Bronx     |Thu       |41624 |
|Bronx     |Wed       |41344 |
|Bronx     |Tue       |40982 |
|Bronx     |Mon       |39354 |
|Brooklyn  |Sat       |682931|
|Brooklyn  |Sun       |627858|
|Brooklyn  |Fri       |564034|
|B

# Step G: Performance Analysis – Explain Plan


In [0]:
df_joined.explain(True)
agg_df.explain(True)


== Parsed Logical Plan ==
Project [DOLocationID#13217, PULocationID#13214, tpep_pickup_datetime#13267, tpep_dropoff_datetime#13269, passenger_count#13211, trip_distance#13199, fare_amount#13202, tip_amount#13205, total_amount#13208, payment_type#13042, RatecodeID#13265, trip_minutes#13272, pickup_hour#13274, pickup_dow#13276, tip_rate#13278, PU_Borough#13332, PU_Zone#13333, DO_Borough#13341, Zone_name#13170 AS DO_Zone#13342]
+- Project [DOLocationID#13217, PULocationID#13214, tpep_pickup_datetime#13267, tpep_dropoff_datetime#13269, passenger_count#13211, trip_distance#13199, fare_amount#13202, tip_amount#13205, total_amount#13208, payment_type#13042, RatecodeID#13265, trip_minutes#13272, pickup_hour#13274, pickup_dow#13276, tip_rate#13278, PU_Borough#13332, PU_Zone#13333, Borough_name#13169 AS DO_Borough#13341, Zone_name#13170]
   +- Project [DOLocationID#13217, PULocationID#13214, tpep_pickup_datetime#13267, tpep_dropoff_datetime#13269, passenger_count#13211, trip_distance#13199, fare

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

tip_by_boro = (
    df_joined.groupBy("PU_Borough")
    .agg(F.avg("tip_rate").alias("avg_tip_rate"), F.count("*").alias("trips"))
    .orderBy(F.desc("avg_tip_rate"))
)
tip_by_boro.show()

peak_hour = (
    df_clean.groupBy("pickup_hour")
    .count()
    .orderBy(F.desc("count"))
)
peak_hour.show(5)

avg_dist_pairs = (
    df_joined.groupBy("PU_Borough", "DO_Borough")
    .agg(F.avg("trip_distance").alias("avg_distance"), F.count("*").alias("trips"))
    .filter("trips >= 5000")
    .orderBy(F.desc("avg_distance"))
)
avg_dist_pairs.show(10)


+--------------------+--------------------+---------+
|          PU_Borough|        avg_tip_rate|    trips|
+--------------------+--------------------+---------+
|         Credit card|    6.89351151545218|     4307|
|                 EWR|    6.89351151545218|     4307|
|       Standard Rate|    6.89351151545218|     4307|
|             Unknown|   0.206932773625666|  3736058|
|               Bronx|   0.192775586906355|   308943|
|         Voided trip| 0.19024551733576173|      729|
|          Group ride| 0.19024551733576167|      729|
|              Queens| 0.16113235073694462| 18547394|
|           Manhattan| 0.16086570303507508|270720743|
|                Cash| 0.14109757672825113|      290|
|                 JFK| 0.14109757672825113|      290|
|Nassau or Westche...| 0.13741292244672712|   663869|
|             Dispute| 0.13741292244672706|   663869|
|            Brooklyn| 0.13185189101940958|  3680216|
|       Staten Island| 0.09630268628105244|     6510|
|              Newark|0.0295

# Step H: Write Results to Parquet


In [0]:
agg_df.show(20)

+----------+-----------+--------+------------------+------------------+-------------------+
|PU_Borough|pickup_hour|   trips|         avg_miles|         avg_total|       avg_tip_rate|
+----------+-----------+--------+------------------+------------------+-------------------+
| Manhattan|         18|17482004| 2.136958465974506|15.192062065590234|0.16856183517145382|
| Manhattan|         19|16903389|2.1567067219478986|14.799441652734169|0.16963685194799008|
| Manhattan|         17|15394948| 2.226213928751183|15.833075497822092|0.16094658442483903|
| Manhattan|         20|15339035| 2.310482299571011|14.582196747653878|0.17216770597667894|
| Manhattan|         21|15143904|2.4475205647103944|14.911812764341487| 0.1697537024051325|
| Manhattan|         14|14716392| 2.455025166494635|15.919868422170422| 0.1470033868378145|
| Manhattan|         15|14592458| 2.390400012115867|15.798106106531106| 0.1482791990605973|
| Manhattan|         22|14262112|2.5817350109156436|15.372424419274372|0.1694551

Due to serverless environment restrictions, results were not persisted to DBFS, but displayed in the notebook output instead.