# Data Preprocessing

This notebook preprocesses and aggregates taxi trip data to daily zone-level for modeling, integrates external datasets, and creates features for service consistency analysis.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TLC_Data_Preprocessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/14 08:42:11 WARN Utils: Your hostname, Jordans-MBP.local, resolves to a loopback address: 127.0.0.1; using 10.13.252.224 instead (on interface en0)
25/08/14 08:42:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/14 08:42:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Load Raw Datasets

In [2]:
# Load taxi data (2024 training period)
yellow_df = spark.read.parquet("../data/interim/yellow_2024_complete.parquet")
green_df = spark.read.parquet("../data/interim/green_2024_complete.parquet")

# Load external data
zones_df = spark.read.parquet("../data/processed/taxi_zones.parquet")
weather_df = spark.read.parquet("../data/processed/weather_data.parquet")
census_df = spark.read.parquet("../data/processed/census_data.parquet")

print(f"Raw data: Yellow= {yellow_df.count():,}, Green= {green_df.count():,}")
print(f"External: {zones_df.count()} zones, {weather_df.count()} weather days, {census_df.count()} census zones")

                                                                                

Raw data: Yellow= 20,332,093, Green= 339,807
External: 265 zones, 363 weather days, 668 census zones


In [3]:
yellow_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- data_month: string (nullable = true)



## 2. Data Cleaning and Filtering

In [4]:
# Filter valid trips (valid zones, reasonable trip characteristics)
def clean_taxi_data(df, pickup_col, dropoff_col, datetime_col):
    return df.filter(
        (col("PULocationID") >= 1) & (col("PULocationID") <= 263) &
        (col("DOLocationID") >= 1) & (col("DOLocationID") <= 263) &
        col("PULocationID").isNotNull() & col("DOLocationID").isNotNull() &
        (col("trip_distance") >= 0) & (col("trip_distance") <= 200) &
        col(datetime_col).isNotNull()
    )

# Clean datasets
yellow_clean = clean_taxi_data(yellow_df, "PULocationID", "DOLocationID", "tpep_pickup_datetime")
green_clean = clean_taxi_data(green_df, "PULocationID", "DOLocationID", "lpep_pickup_datetime")

yellow_clean_count = yellow_clean.count()
green_clean_count = green_clean.count()

print(f"After cleaning: Yellow={yellow_clean_count:,} ({yellow_clean_count/yellow_df.count()*100:.1f}%)")
print(f"After cleaning: Green={green_clean_count:,} ({green_clean_count/green_df.count()*100:.1f}%)")

                                                                                

After cleaning: Yellow=20,122,742 (99.0%)
After cleaning: Green=335,640 (98.8%)


In [5]:
missing_yellow_passengers = yellow_clean.filter(col("passenger_count").isNull()).count()
missing_green_passengers = green_clean.filter(col("passenger_count").isNull()).count()
print(f"Missing passenger counts: Yellow={missing_yellow_passengers}, Green={missing_green_passengers}")

Missing passenger counts: Yellow=1969910, Green=13990


In [8]:
# Calculate median passenger count and impute nulls
yellow_median = yellow_clean.select(expr("percentile_approx(passenger_count, 0.5)")).collect()[0][0]
green_median = green_clean.select(expr("percentile_approx(passenger_count, 0.5)")).collect()[0][0]

# Round to whole number using Python's round function
yellow_median = int(yellow_median)
green_median = int(green_median)

# Impute nulls with median
yellow_clean = yellow_clean.fillna({"passenger_count": yellow_median})
green_clean = green_clean.fillna({"passenger_count": green_median})

print(f"Imputed nulls with median: Yellow={yellow_median}, Green={green_median}")



Imputed nulls with median: Yellow=1, Green=1


                                                                                

In [9]:
missing_yellow_passengers = yellow_clean.filter(col("passenger_count").isNull()).count()
missing_green_passengers = green_clean.filter(col("passenger_count").isNull()).count()
print(f"Missing passenger counts: Yellow={missing_yellow_passengers}, Green={missing_green_passengers}")

Missing passenger counts: Yellow=0, Green=0


## Aggregate to Daily Zone Level

In [10]:
# Aggregate yellow taxi to daily zone level
yellow_daily = yellow_clean.select(
    col("PULocationID").alias("LocationID"),
    date_format("tpep_pickup_datetime", "yyyy-MM-dd").alias("date"),
    col("trip_distance"),
    col("passenger_count")
).groupBy("LocationID", "date").agg(
    count("*").alias("trip_count"),
    avg("trip_distance").alias("avg_trip_distance"),
    sum("passenger_count").alias("total_passengers")
).withColumn("taxi_type", lit("yellow"))

# Aggregate green taxi to daily zone level  
green_daily = green_clean.select(
    col("PULocationID").alias("LocationID"),
    date_format("lpep_pickup_datetime", "yyyy-MM-dd").alias("date"),
    col("trip_distance"), 
    col("passenger_count")
).groupBy("LocationID", "date").agg(
    count("*").alias("trip_count"),
    avg("trip_distance").alias("avg_trip_distance"),
    sum("passenger_count").alias("total_passengers")
).withColumn("taxi_type", lit("green"))

# Combine yellow and green daily data
combined_daily = yellow_daily.union(green_daily)

# Aggregate both taxi types to single daily zone level
daily_zone_data = combined_daily.groupBy("LocationID", "date").agg(
    sum("trip_count").alias("daily_trips"),
    avg("avg_trip_distance").alias("avg_distance"),
    sum("total_passengers").alias("daily_passengers")
).withColumn("date", to_date(col("date")))

print(f"Daily zone aggregation: {daily_zone_data.count():,} zone-day records")

# Check aggregation results
date_range = daily_zone_data.select(min("date"), max("date")).collect()[0]
zone_coverage = daily_zone_data.select(count_distinct("LocationID")).collect()[0][0]
print(f"Date range: {date_range[0]} to {date_range[1]}")
print(f"Zones with activity: {zone_coverage}")

                                                                                

Daily zone aggregation: 41,670 zone-day records




Date range: 2002-12-31 to 2026-06-26
Zones with activity: 260


                                                                                

In [11]:
daily_zone_data.describe().show()

25/08/14 08:45:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+-----------------+------------------+------------------+
|summary|       LocationID|      daily_trips|      avg_distance|  daily_passengers|
+-------+-----------------+-----------------+------------------+------------------+
|  count|            41670|            41670|             41670|             41670|
|   mean|133.8241420686345|490.9618910487161| 5.074548956623695| 638.1779457643388|
| stddev|75.98234935653612|1095.038095269522|3.3930462328990587|1435.5483376644302|
|    min|                1|                1|               0.0|                 0|
|    max|              263|             7922|              64.7|             10066|
+-------+-----------------+-----------------+------------------+------------------+



                                                                                

## 4. Add Temporal Features

In [12]:
# Add temporal features for modeling
daily_with_temporal = daily_zone_data.withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day_of_week", dayofweek("date")) \
    .withColumn("day_of_month", dayofmonth("date")) \
    .withColumn("week_of_year", weekofyear("date")) \
    .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), 1).otherwise(0))

print("Added temporal features: year, month, day_of_week, day_of_month, week_of_year, is_weekend")

Added temporal features: year, month, day_of_week, day_of_month, week_of_year, is_weekend


In [13]:
daily_with_temporal.show(5, truncate=False)



+----------+----------+-----------+------------------+----------------+----+-----+-----------+------------+------------+----------+
|LocationID|date      |daily_trips|avg_distance      |daily_passengers|year|month|day_of_week|day_of_month|week_of_year|is_weekend|
+----------+----------+-----------+------------------+----------------+----+-----+-----------+------------+------------+----------+
|201       |2024-01-02|2          |11.7              |2               |2024|1    |3          |2           |1           |0         |
|147       |2024-01-02|2          |2.045             |2               |2024|1    |3          |2           |1           |0         |
|238       |2024-01-03|1700       |1.1158652148322543|2232            |2024|1    |4          |3           |1           |0         |
|152       |2024-01-03|61         |2.57564858490566  |75              |2024|1    |4          |3           |1           |0         |
|177       |2024-01-04|8          |4.597857142857142 |8               |2024|

                                                                                

In [14]:
daily_with_temporal.select('year', 'month', 'day_of_week', 'day_of_month', 'week_of_year', 'is_weekend').describe().show()



+-------+-------------------+------------------+-----------------+-----------------+------------------+-------------------+
|summary|               year|             month|      day_of_week|     day_of_month|      week_of_year|         is_weekend|
+-------+-------------------+------------------+-----------------+-----------------+------------------+-------------------+
|  count|              41670|             41670|            41670|            41670|             41670|              41670|
|   mean| 2023.9927285817134|3.5303575713942883|3.997048236141109|15.70026397888169|13.620878329733621|0.28509719222462204|
| stddev|0.36498172884833413| 1.717245819291182|1.998851556161848|8.775644390127718|7.5137456507089775| 0.4514661388621985|
|    min|               2002|                 1|                1|                1|                 1|                  0|
|    max|               2026|                12|                7|               31|                52|                  1|
+-------

                                                                                

Removing years outside of 2024

In [15]:
filtered_daily = daily_with_temporal.filter(col("year") == 2024)

print(f"before filtering: {daily_with_temporal.count():,} records, after filtering: {filtered_daily.count():,} records")




before filtering: 41,670 records, after filtering: 41,641 records


                                                                                

## Integrate External Data

In [16]:
# Join with weather data
daily_with_weather = filtered_daily.join(weather_df, "date", "left")

# Join with census data
daily_with_census = daily_with_weather.join(census_df, "LocationID", "left")


# Join with zone information
final_daily_data = daily_with_census.join(
    zones_df.select("LocationID", "Zone", "Borough", "service_zone"), 
    "LocationID", "left"
)

print(f"Final integrated dataset: {final_daily_data.count():,} records")

# Check integration success
integration_stats = final_daily_data.select(
    sum(col("temperature_avg").isNull().cast("int")).alias("missing_weather"),
    sum(col("Median_Income").isNull().cast("int")).alias("missing_census"),
    sum(col("Zone").isNull().cast("int")).alias("missing_zone_info")
).collect()[0]

print(f"Integration quality: Weather={integration_stats['missing_weather']}, Census={integration_stats['missing_census']}, Zones={integration_stats['missing_zone_info']}")

                                                                                

Final integrated dataset: 110,738 records




Integration quality: Weather=10, Census=268, Zones=0


                                                                                

In [17]:
final_daily_data.describe().show()

[Stage 207:>                                                        (0 + 1) / 1]

+-------+------------------+------------------+------------------+------------------+------+------------------+------------------+-----------------+------------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+---------------------+------------------+--------------------+-------------+------------+
|summary|        LocationID|       daily_trips|      avg_distance|  daily_passengers|  year|             month|       day_of_week|     day_of_month|      week_of_year|         is_weekend|   temperature_avg| precipitation_mm|           snow_mm|    Median_Income|Percent_No_Vehicle|No_Vehicle_Households|  Total_Households|                Zone|      Borough|service_zone|
+-------+------------------+------------------+------------------+------------------+------+------------------+------------------+-----------------+------------------+-------------------+------------------+-----------------+------------------+-----------------

                                                                                

In [18]:
clean_final = final_daily_data.dropna()

In [20]:
clean_final.describe().show()

[Stage 224:>                                                        (0 + 1) / 1]

+-------+------------------+------------------+-----------------+------------------+------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------+------------------+---------------------+------------------+--------------------+-------------+------------+
|summary|        LocationID|       daily_trips|     avg_distance|  daily_passengers|  year|             month|       day_of_week|      day_of_month|      week_of_year|         is_weekend|   temperature_avg| precipitation_mm|           snow_mm|    Median_Income|Percent_No_Vehicle|No_Vehicle_Households|  Total_Households|                Zone|      Borough|service_zone|
+-------+------------------+------------------+-----------------+------------------+------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------

                                                                                

In [21]:
clean_final.write.mode("overwrite").parquet("../data/processed/daily_zone_final_2024.parquet")


                                                                                