### Preprocess Taxi data

In [1]:
import os
import sys
sys.path.append("../")
from scripts.yellow_green_taxi_scrape import get_taxi_data
from scripts.utility import taxi_type_correction
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, stddev, mean, col, unix_timestamp, abs, round, to_date, count, when, dayofweek, sum
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

Scrape the taxi data from the TLC website

In [2]:
get_taxi_data()

Begin 10/2022 for yellow_taxi
Completed 10/2022 for yellow_taxi
Begin 11/2022 for yellow_taxi
Completed 11/2022 for yellow_taxi
Begin 12/2022 for yellow_taxi
Completed 12/2022 for yellow_taxi
Begin 01/2023 for yellow_taxi
Completed 01/2023 for yellow_taxi
Begin 02/2023 for yellow_taxi
Completed 02/2023 for yellow_taxi
Begin 03/2023 for yellow_taxi
Completed 03/2023 for yellow_taxi
Begin 10/2022 for green_taxi
Completed 10/2022 for green_taxi
Begin 11/2022 for green_taxi
Completed 11/2022 for green_taxi
Begin 12/2022 for green_taxi
Completed 12/2022 for green_taxi
Begin 01/2023 for green_taxi
Completed 01/2023 for green_taxi
Begin 02/2023 for green_taxi
Completed 02/2023 for green_taxi
Begin 03/2023 for green_taxi
Completed 03/2023 for green_taxi


In [3]:
# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Preprocess Taxi Data")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

# sdf = spark data frame
time_period = {
        "2022": [10,11,12],
        "2023": [1,2,3]
    }

# Read in the green taxi parquet files individually and correctly type cast them
green_sdf_list = []
for year, months in time_period.items():
    for month in months:
        month_str = str(month).zfill(2)
        file_path = f"../data/landing/tlc_data/green_taxi/{year}-{month_str}.parquet"
        df = spark.read.parquet(file_path)
        # Type cast to correct type 
        df_casted = taxi_type_correction(df)
        # Put into data/raw as it is now correctly type casted
        df_casted.write.mode('overwrite').parquet(
            f'../data/raw/tlc_data/green_taxi/{year}-{month_str}.parquet')
        green_sdf_list.append(df_casted)

# Read in the yellow taxi parquet files individually and correctly type cast them and correct
# column name inconsistencies
yellow_sdf_list = []
for year, months in time_period.items():
    for month in months:
        month_str = str(month).zfill(2)
        file_path = f"../data/landing/tlc_data/yellow_taxi/{year}-{month_str}.parquet"
        df = spark.read.parquet(file_path)
        df_casted = taxi_type_correction(df)
        # Put into data/raw as it is now correctly type casted
        df_casted.write.mode('overwrite').parquet(
            f'../data/raw/tlc_data/green_taxi/{year}-{month_str}.parquet')
        df_casted.withColumnRenamed("Airport_fee", "airport_fee")
        yellow_sdf_list.append(df_casted)

# Concatenate the yellow and green taxi dataframes respectively
yellow_sdf = yellow_sdf_list[0]
green_sdf = green_sdf_list[0]
for df in yellow_sdf_list[1:]:
    yellow_sdf = yellow_sdf.union(df)
for df in green_sdf_list[1:]:
    green_sdf = green_sdf.union(df)

num_green_instances, num_green_features = green_sdf.count(), len(green_sdf.columns)
num_yellow_instance, num_yellow_features = yellow_sdf.count(), len(yellow_sdf.columns)
print(f"The shape of green taxi dataframe: {num_green_instances} x {num_green_features}")
print(f"The shape of yellow taxi dataframe: {num_yellow_instance} x {num_yellow_features}")

your 131072x1 screen size is bogus. expect trouble
24/08/24 23:56:59 WARN Utils: Your hostname, DESKTOP-N0VCA8U resolves to a loopback address: 127.0.1.1; using 172.23.106.248 instead (on interface eth0)
24/08/24 23:56:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 23:57:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/24 23:57:03 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/08/24 23:57:03 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/08/24 23:57:03 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
                                                                                

The shape of green taxi dataframe: 409138 x 20
The shape of yellow taxi dataframe: 19712164 x 19


                                                                                

Rename columns to keep them consistent

In [4]:
column_name = {"VendorID": "vendor_id",
               "RatecodeID": "ratecode_id",
               "PULocationID": "pu_location_id",
               "DOLocationID": "do_location_id"}

for key, value in column_name.items():
    green_sdf = green_sdf.withColumnRenamed(key, value)
    yellow_sdf = yellow_sdf.withColumnRenamed(key, value)

In the dictionary descriptions, lpep and tpep are equivalent so generalise to be pep. Ehail fee is not defined in the data dictionary descriptions and only exists in one of the taxi datasets so we remove. Trip_type is also a difference and is irrelvant to the research question so we can also remove this. Store_and_fwd_flag is not relevant to the research question so we can drop this column from both datasets. As airport fee exists for yellow taxis but doesnt for green taxis make a new column for green taxis called airport_fee but set it to 0 for all instances.

In [5]:
# Drop ehail fee and add a column called airport fee with values initialised to 0
green_sdf = green_sdf.drop("ehail_fee", "trip_type", "store_and_fwd_flag")
yellow_sdf = yellow_sdf.drop("store_and_fwd_flag")
green_sdf = green_sdf.withColumn("airport_fee", lit(0))

# Rename the datetime columns to match
green_sdf = (green_sdf.withColumnRenamed("lpep_pickup_datetime", "pep_pickup_datetime")
                      .withColumnRenamed("lpep_dropoff_datetime", "pep_dropoff_datetime"))

yellow_sdf = (yellow_sdf.withColumnRenamed("tpep_pickup_datetime", "pep_pickup_datetime")
                        .withColumnRenamed("tpep_dropoff_datetime", "pep_dropoff_datetime"))

num_green_instances, num_green_features = green_sdf.count(), len(green_sdf.columns)
num_yellow_instance, num_yellow_features = yellow_sdf.count(), len(yellow_sdf.columns)
print(f"The shape of green taxi dataframe: {num_green_instances} x {num_green_features}")
print(f"The shape of yellow taxi dataframe: {num_yellow_instance} x {num_yellow_features}")

The shape of green taxi dataframe: 409138 x 18
The shape of yellow taxi dataframe: 19712164 x 18


Combine the yellow and green taxi dataframes

In [6]:
taxi_df = yellow_sdf.union(green_sdf)

initial_num_data_instances, initial_num_data_features = taxi_df.count(), len(taxi_df.columns)
print(f"The shape of taxi dataframe: {initial_num_data_instances} x {initial_num_data_features}")



The shape of taxi dataframe: 20121302 x 18


                                                                                

Look at the 5-number summary for each of the numerical attributes in the dataframe

In [7]:
numeric_columns = [
    "vendor_id",
    "passenger_count",
    "trip_distance",
    "ratecode_id",
    "pu_location_id",
    "do_location_id",
    "payment_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "congestion_surcharge",
    "airport_fee"
]

taxi_df.select(numeric_columns).describe().show(vertical=True)


24/08/24 23:59:08 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

-RECORD 0------------------------------------
 summary               | count               
 vendor_id             | 20121302            
 passenger_count       | 19469756            
 trip_distance         | 20121302            
 ratecode_id           | 19503681            
 pu_location_id        | 20087377            
 do_location_id        | 20121302            
 payment_type          | 20121302            
 fare_amount           | 20121302            
 extra                 | 20121302            
 mta_tax               | 20121302            
 tip_amount            | 20121302            
 tolls_amount          | 20121302            
 improvement_surcharge | 20121302            
 total_amount          | 20087377            
 congestion_surcharge  | 19469756            
 airport_fee           | 19503681            
-RECORD 1------------------------------------
 summary               | mean                
 vendor_id             | 1.7371428548709225  
 passenger_count       | 1.3742285

Identify and remove any instances where the data is incorrect or inconsistent. We can see some examples of that above where we have a negative total amount, payment_type as -150.0 etc, dates not being within the months that have been scraped, etc.


In [8]:
# Define date range
start_date = "2022-10-01"
end_date = "2023-03-31"

# Filter DataFrame
taxi_df = taxi_df.filter(
    (col("pep_pickup_datetime") >= start_date) &
    (col("pep_dropoff_datetime") <= end_date)
)

# Fix semantic errors in the data
taxi_df = taxi_df.where(
                (col("passenger_count") > 0) &
                (col("trip_distance") > 0) &
                (col("ratecode_id") >= 1) &
                (col("ratecode_id") <= 6) &
                (col("payment_type") >= 1) &
                (col("payment_type") <= 6) &
                (col("fare_amount") >= 2.50) &
                (col("extra") >= 0) &
                (col("mta_tax") >= 0) &
                (col("tip_amount") >= 0) &
                (col("tolls_amount") >= 0) &
                (col("improvement_surcharge") >= 0) &
                (col("total_amount") >= 0) &
                (col("congestion_surcharge") >= 0) &
                (col("airport_fee") >= 0) &
                (col("airport_fee") <= 1.25)
                )

# Define trip time in hours and filter time that is too little or negative
taxi_df = taxi_df.withColumn(
    "trip_time_hours",
    (unix_timestamp(col("pep_dropoff_datetime")) - unix_timestamp(col("pep_pickup_datetime"))) / (60*60)
)

taxi_df = taxi_df.where((col("trip_time_hours") > 0.02))

# Find total_amount and calc_amount (calculated amount) difference to find errors in summation 
taxi_df = taxi_df.withColumn(
                    "calc_total_amount",
                    col("fare_amount") + 
                    col("extra") + 
                    col("mta_tax") + 
                    col("tip_amount") + 
                    col("improvement_surcharge") + 
                    col("congestion_surcharge") +
                    col("tolls_amount") +
                    col("airport_fee"))

taxi_df = taxi_df.withColumn(
    "total_diff",
    round(abs(col("total_amount") - col("calc_total_amount")), 4)
)

# As small errors are bound to happen and airport_fee is often seen to not be added we can leave them as long
# as the total difference is <= 3
taxi_df.filter(
    (col("total_diff") <= 3)
)

num_data_instances, num_data_features = taxi_df.count(), len(taxi_df.columns)
print(f"The shape of taxi dataframe after semantic filtering: {num_data_instances} x {num_data_features}")



The shape of taxi dataframe after semantic filtering: 18112819 x 21


                                                                                

The inconsistency and semantically wrong data has been removed but outliers still exist so we can remove them by assuming a normal distribution and removing values beyond sqrt(2*log(N)) std away from the mean

In [9]:
continuous_columns = [
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "extra", "mta_tax",
    "tip_amount", 
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "congestion_surcharge",
    "airport_fee",
    "trip_time_hours"
]

for column in continuous_columns:
    stats = taxi_df.agg(
        mean(column).alias("mean"),
        stddev(column).alias("stddev")
    ).collect()[0]
    
    column_mean = stats["mean"]
    column_stddev = stats["stddev"]

    bound_sd = np.sqrt(2*np.log(taxi_df.count()))
    
    taxi_df = taxi_df.filter(
        (col(column) >= column_mean - bound_sd * column_stddev) &
        (col(column) <= column_mean + bound_sd * column_stddev)
    )

num_data_instances, num_data_features = taxi_df.count(), len(taxi_df.columns)
print(f"The shape of taxi dataframe after outlier removal: {num_data_instances} x {num_data_features}")



The shape of taxi dataframe after outlier removal: 17932561 x 21


                                                                                

This is our cleaned raw data

In [10]:
taxi_df.describe()

                                                                                

summary,vendor_id,passenger_count,trip_distance,ratecode_id,pu_location_id,do_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_time_hours,calc_total_amount,total_diff
count,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0,17932561.0
mean,1.7463752109918935,1.4035802248212066,3.3976104578704387,1.0434077430435065,166.3248044715978,164.32863783371488,1.2034765697994838,16.479031296198638,1.309605258278505,0.4999999107768266,3.0924208527717387,0.4793593737132333,0.6746864042509274,24.37416179184664,2.353252820386335,0.1040310834576277,0.261590260526033,24.99238699959946,0.6267004556683219
stddev,0.4350853548039464,0.9077292178959858,4.242937025709574,0.2275077006905831,64.08221625317614,69.30948343553915,0.4447302501716157,13.918165896855855,1.5703724063125268,0.0001335837853883535,3.207902658574981,1.70571870227484,0.3491340044991151,17.800789592257043,0.5876501097805554,0.3452772721230137,0.2020857037638375,17.811227187108017,1.098710458096016
min,1.0,1.0,0.01,1.0,1.0,1.0,1.0,2.5,0.0,0.3,0.0,0.0,0.0,3.3,0.0,0.0,0.0202777777777777,3.3,0.0
max,2.0,6.0,145.53,6.0,265.0,265.0,4.0,107.0,10.3,0.5,22.8,10.66,1.0,127.38,2.75,1.25,4.660277777777778,129.95000000000002,5.0


As we are looking at fare_amount to get a general idea of the amount per day per zone we can aggregate the mean on day and create an avg_amount_per_hour 

In [11]:
taxi_df = taxi_df.withColumn("date", to_date(col("pep_pickup_datetime")))

# Compute averages avg_total_amount and avg_amount_per_hour
taxi_averages_df = (
    taxi_df.groupBy("pu_location_id", "date")
    .agg(
        count("*").alias("num_pickups"),
        mean((col("total_amount") + col("calc_total_amount")) / 2).alias("avg_total_amount"),
        mean("trip_time_hours").alias("avg_trip_time"),
        mean("fare_amount").alias("avg_fare_amount"),
        mean("passenger_count").alias("avg_passenger_count"),
        mean("trip_distance").alias("avg_trip_distance"),
        mean("extra").alias("avg_extra"),
        mean("mta_tax").alias("avg_mta_tax"),
        mean("tip_amount").alias("avg_tip_amount"),
        mean("tolls_amount").alias("avg_tolls_amount"),
        mean("improvement_surcharge").alias("avg_improvement_surcharge"),
        mean("congestion_surcharge").alias("avg_congestion_surcharge"),
        mean("airport_fee").alias("avg_airport_fee")
    )
).withColumn(
    "avg_amount_per_hour",
    col("avg_total_amount") / col("avg_trip_time")
)

# Compute total daily revenue per zone
daily_zone_revenue_df = (
    taxi_df.groupBy("pu_location_id", "date")
    .agg(
        sum((col("total_amount") + col("calc_total_amount")) / 2).alias("daily_zone_revenue")
    )
)

# Join the daily_zone_revenue_df back with the taxi_averages_df to include the daily_zone_revenue
taxi_averages_df = taxi_averages_df.join(daily_zone_revenue_df, on=["pu_location_id", "date"], how="left")

# Add days of the week (Mon = 1, Tues = 2, etc.)
taxi_averages_df = taxi_averages_df.withColumn("day_of_week",
                   when(dayofweek(taxi_averages_df.date) == 1, 7)
                   .otherwise(dayofweek(taxi_averages_df.date) - 1))

taxi_averages_df = taxi_averages_df.orderBy("date")

num_data_avg_instances, num_data_avg_features = taxi_averages_df.count(), len(taxi_averages_df.columns)
print(f"The shape of taxi averages dataframe: {num_data_instances} x {num_data_features}")



The shape of taxi averages dataframe: 17932561 x 21


                                                                                

In [12]:
# Export into data/curated
taxi_df.write.mode('overwrite').parquet('../data/curated/cleaned_taxi_data.parquet')
taxi_averages_df.write.mode('overwrite').parquet('../data/curated/taxi_averages_data.parquet')

24/08/25 00:12:10 WARN DAGScheduler: Broadcasting large task binary with size 1222.7 KiB
