1. Import Modules

In [0]:
from pyspark.sql.functions import (
    col, hour, dayofweek, unix_timestamp, round, avg
    )

2. Create Database

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS nyc_taxi

3. Load Data

In [0]:
df = spark.read.format("parquet")\
                .option("inferschema",'True')\
                .option("mode","failfast")\
                .load("/Volumes/workspace/nyc_taxi_data/data/yellow_tripdata_2025-05.parquet")
display(df.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
1,2025-05-01T00:07:06.000,2025-05-01T00:24:15.000,1,3.7,1,N,140,202,1,18.4,4.25,0.5,4.85,0.0,1.0,29.0,2.5,0.0,0.75
2,2025-05-01T00:07:44.000,2025-05-01T00:14:27.000,1,1.03,1,N,234,161,1,8.6,1.0,0.5,4.3,0.0,1.0,18.65,2.5,0.0,0.75
2,2025-05-01T00:15:56.000,2025-05-01T00:23:53.000,1,1.57,1,N,161,234,2,10.0,1.0,0.5,0.0,0.0,1.0,15.75,2.5,0.0,0.75
2,2025-05-01T00:00:09.000,2025-05-01T00:25:29.000,1,9.48,1,N,138,90,1,40.8,6.0,0.5,11.7,6.94,1.0,71.94,2.5,1.75,0.75
2,2025-05-01T00:45:07.000,2025-05-01T00:52:45.000,1,1.8,1,N,90,231,1,10.0,1.0,0.5,1.5,0.0,1.0,17.25,2.5,0.0,0.75


In [0]:
# Load NYC Taxi zone lookup CSV
df_zones = spark.read.option("header", "true").csv(
    "/databricks-datasets/nyctaxi/taxizone/taxi_zone_lookup.csv"
)

4. Clean Data

In [0]:
def clean_data(df):
    return df.filter(
    (col("passenger_count") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0) &
    (col("tpep_pickup_datetime").isNotNull()) &
    (col("tpep_dropoff_datetime").isNotNull())
)

In [0]:
df_clean = clean_data(df)
print("Before filtering:", df.count())
print("After filtering:", df_clean.count())
print('Removed Entries:',df.count()- df_clean.count())

Before filtering: 4591845
After filtering: 3254702
Removed Entries: 1337143


5. Feature Engineering

In [0]:
def enrich_df(df):
    return (
    df
    #Hour of the day 0-23 for hourly aggretion & dashboards
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

    #Day of week 1-7 to comapre weekday & weekends
    .withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime")))
    
    #Trip Duration in minutes; rounded up to 2.
    .withColumn("trip_duration_minutes",
                round((unix_timestamp("tpep_dropoff_datetime") 
                       - unix_timestamp("tpep_pickup_datetime"))/60, 2)
                .cast("integer"))
    
    #Tip as aa % of total fare
    .withColumn(
        "tip_percent",
        round((col("tip_amount")/col("fare_amount"))*100,2)
    )
    )
df_enriched = enrich_df(df_clean)
display(df_enriched.limit(10))    

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee,pickup_hour,pickup_dayofweek,trip_duration_minutes,tip_percent
1,2025-05-01T00:07:06.000,2025-05-01T00:24:15.000,1,3.7,1,N,140,202,1,18.4,4.25,0.5,4.85,0.0,1.0,29.0,2.5,0.0,0.75,0,5,17,26.36
2,2025-05-01T00:07:44.000,2025-05-01T00:14:27.000,1,1.03,1,N,234,161,1,8.6,1.0,0.5,4.3,0.0,1.0,18.65,2.5,0.0,0.75,0,5,6,50.0
2,2025-05-01T00:15:56.000,2025-05-01T00:23:53.000,1,1.57,1,N,161,234,2,10.0,1.0,0.5,0.0,0.0,1.0,15.75,2.5,0.0,0.75,0,5,7,0.0
2,2025-05-01T00:00:09.000,2025-05-01T00:25:29.000,1,9.48,1,N,138,90,1,40.8,6.0,0.5,11.7,6.94,1.0,71.94,2.5,1.75,0.75,0,5,25,28.68
2,2025-05-01T00:45:07.000,2025-05-01T00:52:45.000,1,1.8,1,N,90,231,1,10.0,1.0,0.5,1.5,0.0,1.0,17.25,2.5,0.0,0.75,0,5,7,15.0
2,2025-05-01T00:09:24.000,2025-05-01T00:22:04.000,1,5.11,1,N,138,226,1,22.6,6.0,0.5,6.02,0.0,1.0,37.87,0.0,1.75,0.0,0,5,12,26.64
2,2025-04-30T23:50:34.000,2025-04-30T23:56:06.000,2,0.99,1,N,234,79,1,7.9,1.0,0.5,2.73,0.0,1.0,16.38,2.5,0.0,0.75,23,4,5,34.56
2,2025-05-01T00:04:45.000,2025-05-01T00:07:43.000,1,0.47,1,N,114,144,2,5.1,1.0,0.5,0.0,0.0,1.0,10.85,2.5,0.0,0.75,0,5,2,0.0
7,2025-05-01T00:22:31.000,2025-05-01T00:22:31.000,1,1.09,1,N,229,43,1,8.6,0.0,0.5,2.87,0.0,1.0,17.22,2.5,0.0,0.75,0,5,0,33.37
2,2025-05-01T00:09:36.000,2025-05-01T00:16:24.000,1,1.33,1,N,158,125,1,8.6,1.0,0.5,2.87,0.0,1.0,17.22,2.5,0.0,0.75,0,5,6,33.37


6. Aggregation

In [0]:
def agg_df(df): 
    return(
        df.groupBy('pickup_hour')
            .agg(
                round(avg("fare_amount"),2).alias('avg_fare'),
                round(avg("tip_percent"),2).alias('avg_tip')
            )
        .orderBy('pickup_hour')
        )

df_summary = agg_df(df_enriched)
display(df_summary.limit(5))


pickup_hour,avg_fare,avg_tip
0,21.13,21.71
1,19.68,21.52
2,17.0,21.44
3,17.71,20.99
4,23.75,20.3


7. Save Data to Delta Table

In [0]:
#Save the table to unity catalog as a table
def save_to_delta(df,table_name):
    (
        df.write.format('delta')\
                .mode('overwrite')\
                .saveAsTable(f'nyc_taxi.{table_name}')
    )
    return df

In [0]:
save_to_delta(df_summary,'hourly_summary')
display(df_summary.limit(5))

save_to_delta(df_enriched,'df_enriched')
display(df_enriched.limit(5))

save_to_delta(df_clean,'df_clean')
display(df_clean.limit(5))

save_to_delta(df_zones,'nyc_taxi_zone_lookup')
display(df_zones.limit(5))



pickup_hour,avg_fare,avg_tip
0,21.13,21.71
1,19.68,21.52
2,17.0,21.44
3,17.71,20.99
4,23.75,20.3


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee,pickup_hour,pickup_dayofweek,trip_duration_minutes,tip_percent
1,2025-05-01T00:07:06.000,2025-05-01T00:24:15.000,1,3.7,1,N,140,202,1,18.4,4.25,0.5,4.85,0.0,1.0,29.0,2.5,0.0,0.75,0,5,17,26.36
2,2025-05-01T00:07:44.000,2025-05-01T00:14:27.000,1,1.03,1,N,234,161,1,8.6,1.0,0.5,4.3,0.0,1.0,18.65,2.5,0.0,0.75,0,5,6,50.0
2,2025-05-01T00:15:56.000,2025-05-01T00:23:53.000,1,1.57,1,N,161,234,2,10.0,1.0,0.5,0.0,0.0,1.0,15.75,2.5,0.0,0.75,0,5,7,0.0
2,2025-05-01T00:00:09.000,2025-05-01T00:25:29.000,1,9.48,1,N,138,90,1,40.8,6.0,0.5,11.7,6.94,1.0,71.94,2.5,1.75,0.75,0,5,25,28.68
2,2025-05-01T00:45:07.000,2025-05-01T00:52:45.000,1,1.8,1,N,90,231,1,10.0,1.0,0.5,1.5,0.0,1.0,17.25,2.5,0.0,0.75,0,5,7,15.0


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
1,2025-05-01T00:07:06.000,2025-05-01T00:24:15.000,1,3.7,1,N,140,202,1,18.4,4.25,0.5,4.85,0.0,1.0,29.0,2.5,0.0,0.75
2,2025-05-01T00:07:44.000,2025-05-01T00:14:27.000,1,1.03,1,N,234,161,1,8.6,1.0,0.5,4.3,0.0,1.0,18.65,2.5,0.0,0.75
2,2025-05-01T00:15:56.000,2025-05-01T00:23:53.000,1,1.57,1,N,161,234,2,10.0,1.0,0.5,0.0,0.0,1.0,15.75,2.5,0.0,0.75
2,2025-05-01T00:00:09.000,2025-05-01T00:25:29.000,1,9.48,1,N,138,90,1,40.8,6.0,0.5,11.7,6.94,1.0,71.94,2.5,1.75,0.75
2,2025-05-01T00:45:07.000,2025-05-01T00:52:45.000,1,1.8,1,N,90,231,1,10.0,1.0,0.5,1.5,0.0,1.0,17.25,2.5,0.0,0.75


LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
