# Data Cleaning and Aggregation


This notebook performs ETL (Extract, Transform, Load) on NYC taxi and FHV datasets:  
- Cleans and standardizes raw data  
- Aligns column names and data types  
- Handles missing or invalid values  
- Aggregates metrics like fare per mile, fare per minute, average speed  
- Flags CBD trips, peak hours, and weekends  
- Saves cleaned and curated Parquet files for downstream analysis

In [None]:
# import library for downloading data
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, round, abs
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import builtins
import pyarrow.parquet as pq
import os

In [None]:
# Helper function
def get_shape(service_type, year, months, base_dir="../data/tlc_data/raw/cleaned"):
    total_rows = 0
    num_features = None
    
    for month in months:
        folder_path = f"{base_dir}/{service_type}/{year}-{str(month).zfill(2)}"
        df = spark.read.parquet(folder_path)
        total_rows += df.count()
        
        if num_features is None:
            num_features = len(df.columns)
    
    return {"service": service_type, "year": year, "rows": total_rows, "features": num_features}

In [None]:
spark = (
    SparkSession.builder.appName("TLC eda")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/02 16:06:52 WARN Utils: Your hostname, Tasneems-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.20.7 instead (on interface en0)
25/09/02 16:06:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/02 16:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/02 16:06:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 51273)
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.13/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/homebrew/Cellar/python@3.11/3.11.13/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/homebrew/Cellar/python@3.11/3.11.13/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/homebrew/Cellar/python@3.11/3.11.13/Frameworks/Python.framework/Versions/3.11/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/Users/tasneemzulaiqa/Documents/GitHub/project-1-indi

## 1. Clean Data

### a. Yellow

In [None]:
def estimate_fare(trip_distance, trip_duration_min, ratecode_id, avg_speed):
    """
    Estimate taxi fare based on trip distance, duration, rate code, and average speed.

    - Base fare: $3.00
    - Distance-based if avg_speed >= 12 mph; else waiting-time based
    - Adjust fare by ratecode:
        1 = standard, 2 = JFK flat $70, 3 = +$20, 4 = out-of-city x1.5
    - Returns fare rounded to 2 decimals
    """
    base_fare = 3.00
    jfk_manhattan = 70.0  

    # Calculate standard fare based on distance and speed
    if avg_speed >= 12:
        distance_fare = 0.70 * (trip_distance / 0.2)
        waiting_fare = 0.0
    else:
        distance_fare = 0.0
        waiting_fare = 0.70 * trip_duration_min

    standard_fare = base_fare + distance_fare + waiting_fare

    ## Out-of-city (Nassau/Westerchester)
    if ratecode_id == 4:  #
        estimated_fare = standard_fare * 1.5
        # Standard city rate
    elif ratecode_id == 1: 
        estimated_fare = standard_fare
        # JFK Manhattan
    elif ratecode_id == 2:  
        estimated_fare = jfk_manhattan
        # EWR
    elif ratecode_id == 3:  
        estimated_fare = standard_fare + 20
    else:
        estimated_fare = None

    if estimated_fare is not None:
        return builtins.round(estimated_fare, 2)
    else:
        return None

estimate_fare_udf = udf(estimate_fare, DoubleType())


In [None]:
# Change year to preprocess each year
YEAR = "2025"  
MONTHS = range(1, 7)  
TLC_OUTPUT_DIR = '../data/tlc_data/'

def clean_yellow(input_path, zones_df, year, month):
    
    """
    Clean and preprocess NYC yellow taxi data for a given year and month
    """
    
    # Read parquet file
    df = spark.read.parquet(input_path)
    
    # Calculate trip time in minutes, rounded to 2 decimals
    df = df.withColumn("trip_time", round((unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60, 2))
    
    # Map pickup zones
    pu_zones = zones_df.select(
        F.col("LocationID").alias("pu_location_id"),
        F.col("Zone").alias("pu_zone"),
        F.col("Borough").alias("pu_borough"),
        F.col("service_zone").alias("pu_service_zone")
    )
    
    df = df.join(pu_zones, on="pu_location_id", how="left")
    
    # Map drop-off zones (rename columns for joining)
    do_zones = zones_df.select(
        F.col("LocationID").alias("do_location_id"),
        F.col("Zone").alias("do_zone"),
        F.col("Borough").alias("do_borough"),
        F.col("service_zone").alias("do_service_zone")
    )
    df = df.join(do_zones, on="do_location_id", how="left")
    
    # Drop service zones columns 
    df = df.drop("pu_service_zone", "do_service_zone")
    
    # Filter trip distance outliers
    df = df.filter((F.col("trip_miles") > 0.0) & (F.col("trip_miles") <= 200.0))
    
    # Filter trip time outliers
    df = df.filter((F.col("trip_time") > 0.0) & (F.col("trip_time") < 300.0))
    
    # Calculate average speed mph
    df = df.withColumn("avg_speed_mph", round(F.col("trip_miles") / (F.col("trip_time") / 60), 2))
    
    # Filter unreasonable speeds
    df = df.filter((F.col("avg_speed_mph") >= 3) & (F.col("avg_speed_mph") < 80))
    
    # Filter voided trips
    df = df.filter(F.col("payment_type") != 6)
    
    
    # Filter low fare amount
    df = df.filter(F.col("fare_amount") > 3)
    
    # Calculate estimated fare (assuming you have defined estimate_fare_udf)
    df = df.withColumn("estimated_fare", estimate_fare_udf(F.col("trip_miles"), F.col("trip_time"), F.col("ratecode_id"), F.col("avg_speed_mph")))
    
    # Calculate absolute fare difference
    df = df.withColumn("fare_diff", F.abs(F.col("fare_amount") - F.col("estimated_fare")))
    
    # Filter where estimated fare exists and difference <= 100
    df = df.filter((F.col("estimated_fare").isNotNull()) & (F.col("fare_diff") <= 100.0))
    
    # Filter passenger count
    df = df.filter((F.col("passenger_count") >= 1) & (F.col("passenger_count") < 6))
    
    # extra for 2025 data / comment the code for 2024 where 
    
    df = df.filter(F.col("cbd_congestion_fee") >= 0.0)
    
    # Filter valid pickup and dropoff zones
    df = df.filter(
        (F.col("pu_zone").isNotNull()) &
        (~F.col("pu_zone").isin("NV", "NA")) &
        (F.col("do_zone").isNotNull()) &
        (~F.col("do_zone").isin("NV", "NA"))
    )
    output_path = f'{TLC_OUTPUT_DIR}raw/cleaned/yellow/{year}-{str(month).zfill(2)}'
    df.coalesce(1).write.mode('overwrite').parquet(output_path)
    
    return None


In [7]:
zones_df = spark.read.csv("../data/taxi_zones/taxi+_zone_lookup.csv", header=True, inferSchema=True)
for month in MONTHS:
    input_path = f'{TLC_OUTPUT_DIR}raw/yellow/{YEAR}-{str(month).zfill(2)}'
    clean_yellow(input_path, zones_df, YEAR, month)

25/09/02 16:07:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

### b. HVFHV

In [None]:
# Change year to preprocess each year
YEAR = "2025"  
MONTHS = range(1, 7)  
TLC_OUTPUT_DIR = '../data/tlc_data/'

def clean_hvfhv(input_path, zones_df, year, month):
    
    """
    Clean and preprocess NYC hvfhv data for a given year and month
    """
    # Read parquet file
    df = spark.read.parquet(input_path)
    
    # Map pickup zones
    pu_zones = zones_df.select(
        F.col("LocationID").alias("pu_location_id"),
        F.col("Zone").alias("pu_zone"),
        F.col("Borough").alias("pu_borough"),
        F.col("service_zone").alias("pu_service_zone")
    )
    df = df.join(pu_zones, on="pu_location_id", how="left")
    
    # Map drop-off zones
    do_zones = zones_df.select(
        F.col("LocationID").alias("do_location_id"),
        F.col("Zone").alias("do_zone"),
        F.col("Borough").alias("do_borough"),
        F.col("service_zone").alias("do_service_zone")
    )
    df = df.join(do_zones, on="do_location_id", how="left")
    
    # Drop service zone columns
    df = df.drop("pu_service_zone", "do_service_zone")
    
    # Convert trip_time from seconds to minutes
    df = df.withColumn("trip_time", F.round(F.col("trip_time") / 60, 2))
    
    # Remove nulls / NaNs in numeric columns
    numeric_cols = ["trip_miles", "trip_time", "fare_amount", "driver_pay",
                    "tolls", "tips", "bcf", "sales_tax", "congestion_surcharge", "airport_fee"]
    for col in numeric_cols:
        df = df.filter((F.col(col).isNotNull()) & (~F.isnan(F.col(col))))
    
    # Remove nulls in datetime columns
    datetime_cols = ['request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime']
    for col in datetime_cols:
        df = df.filter(F.col(col).isNotNull())
    
    # Remove nulls in string/categorical columns
    string_cols = ["pu_zone", "do_zone", "pu_borough", "do_borough",
                   'originating_base_num', 'dispatching_base_num', 'hvfhs_license_num']
    for col in string_cols:
        df = df.filter(F.col(col).isNotNull())
    
    # Filter trip distance outliers
    df = df.filter((F.col("trip_miles") > 0.0) & (F.col("trip_miles") <= 200.0))
    
    # Filter trip time outliers
    df = df.filter((F.col("trip_time") > 0.0) & (F.col("trip_time") < 300.0))
    
    # Calculate average speed (mph)
    df = df.withColumn("avg_speed_mph", F.round(F.col("trip_miles") / (F.col("trip_time") / 60), 2))
    
    # Filter unreasonable speeds
    df = df.filter((F.col("avg_speed_mph") >= 3) & (F.col("avg_speed_mph") < 80))
    
    # Filter low fares and driver pay
    df = df.filter((F.col("fare_amount") > 3.0) & (F.col("driver_pay") > 0.0))
    
    df = df.filter(F.col("cbd_congestion_fee") >= 0.0)
    
    # Filter invalid pickup and dropoff zones
    df = df.filter(
        (~F.col("pu_zone").isin("NV", "NA")) &
        (~F.col("do_zone").isin("NV", "NA"))
    )

    # Save cleaned data
    output_path = f'{TLC_OUTPUT_DIR}raw/cleaned/hvfhv/{year}-{str(month).zfill(2)}'
    df.coalesce(1).write.mode('overwrite').parquet(output_path)
    
    return None

In [12]:
for month in MONTHS:
    input_path = f'{TLC_OUTPUT_DIR}raw/hvfhv/{YEAR}-{str(month).zfill(2)}'
    clean_hvfhv(input_path, zones_df, YEAR, month)

                                                                                

In [None]:
hvfhv_2024 = get_shape("hvfhv", 2024, range(1, 7))
hvfhv_2025 = get_shape("hvfhv", 2025, range(1, 7))
yellow_2024 = get_shape("yellow", 2024, range(1, 7))
yellow_2025 = get_shape("yellow", 2025, range(1, 7))

for s in [yellow_2024, yellow_2025, hvfhv_2024, hvfhv_2025]:
    print(f"{s['service'].upper()} {s['year']}")
    print(f"  Total rows: {s['rows']:,}")
    print(f"  Features: {s['features']}")
    print("-" * 40)


                                                                                

YELLOW 2024
  Total rows: 16,868,542
  Features: 27
----------------------------------------
YELLOW 2025
  Total rows: 16,918,186
  Features: 28
----------------------------------------
HVFHV 2024
  Total rows: 86,058,824
  Features: 29
----------------------------------------
HVFHV 2025
  Total rows: 83,670,510
  Features: 30
----------------------------------------


## 2. Data Aggregation

In [18]:
cbd_zones = {50, 48, 163, 230, 161, 162, 229, 233, 170, 164, 100, 186, 68, 246,
            90, 234, 107, 137, 224, 158, 249, 114, 113, 79, 4, 125, 211, 144,
            148, 232, 45, 231, 13, 209, 87, 88, 12, 261}

### a. HVFHV dataset

In [None]:
YEAR = "2024"
MONTHS = range(1, 7)
TLC_INPUT_DIR = '../data/tlc_data/raw/cleaned/'
TLC_OUTPUT_DIR = '../data/tlc_data/raw/cleaned/curated/'

def aggregate_hvfhv(input_path, year, month):
    
    """
    Aggregates NYC hvfhv data for a given year and month
    """
    
    # Read parquet file
    df = spark.read.parquet(input_path)
    
    # Calculate total_amount
    df = df.withColumn(
        "total_amount",
        F.round(
            (F.col("fare_amount") + F.col("tolls") + F.col("bcf") + F.col("sales_tax") +
             F.col("congestion_surcharge") + F.col("airport_fee") + F.col("tips") + F.col("driver_pay")), 2)
    )
    
    # Calculate total per mile 
    df = df.withColumn(
        "fare_per_mile",
        F.when(F.col("trip_miles") > 0, F.round(F.col("fare_amount") / F.col("trip_miles"), 2)).otherwise(None)
    )
    
    # Calculate total per minute
    df = df.withColumn(
        "fare_per_min",
        F.when(F.col("trip_time") > 0, F.round(F.col("fare_amount") / F.col("trip_time"), 2)).otherwise(None)
    )
    
    # add cbd flag
    if (year == "2024"):   
        df = df.withColumn("is_cbd",F.when((F.col("pu_location_id").isin(cbd_zones)) | (F.col("do_location_id").isin(cbd_zones)), 1).otherwise(0))
    else:   
        df = df.withColumn("is_cbd", F.when(F.col("cbd_congestion_fee") > 0, 1).otherwise(0))
    
    
    # Extract day_of_week, is_weekend
    df = df.withColumn("day_of_week", F.dayofweek("pickup_datetime"))
    df = df.withColumn("is_weekend", F.when(F.col("day_of_week").isin([1,7]),1).otherwise(0))
    
    df = df.withColumn("is_peak",F.when((F.hour("pickup_datetime").between(6, 10)) | (F.hour("pickup_datetime").between(16, 20)), 1).otherwise(0))
    
    # Save cleaned data
    df.coalesce(1).write.mode("overwrite").parquet(f'{TLC_OUTPUT_DIR}hvfhv/{year}-{str(month).zfill(2)}')
    
    return None


In [44]:
for month in MONTHS:
    input_path = f'{TLC_INPUT_DIR}hvfhv/{YEAR}-{str(month).zfill(2)}'
    aggregate_hvfhv(input_path, YEAR, month)

                                                                                

### b. Yellow dataset

In [None]:
YEAR = "2025"
MONTHS = range(1, 7)
TLC_INPUT_DIR = '../data/tlc_data/raw/cleaned/'
TLC_OUTPUT_DIR = '../data/tlc_data/raw/cleaned/curated/'

def aggregate_yellow(input_path, year, month):
    
       
    """
    Aggregates NYC Yellow taxi data for a given year and month
    """
    # Read parquet file
    df = spark.read.parquet(input_path)
    
    df = df.drop("fare_diff", "estimated_fare")
    
    # Calculate fare per mile 
    df = df.withColumn(
        "fare_per_mile",
        F.when(F.col("trip_miles") > 0, F.round(F.col("fare_amount") / F.col("trip_miles"), 2)).otherwise(None)
    )
    
    # Calculate fare per minute
    df = df.withColumn(
        "fare_per_min",
        F.when(F.col("trip_time") > 0, F.round(F.col("fare_amount") / F.col("trip_time"), 2)).otherwise(None)
    )
    
    # add cbd flag
    if (year == "2024"):   
        df = df.withColumn("is_cbd",F.when((F.col("pu_location_id").isin(cbd_zones)) | (F.col("do_location_id").isin(cbd_zones)), 1).otherwise(0))
    else:   
        df = df.withColumn("is_cbd", F.when(F.col("cbd_congestion_fee") > 0, 1).otherwise(0))
    
    
    # Extract day_of_week, is_weekend
    df = df.withColumn("day_of_week", F.dayofweek("pickup_datetime"))
    
    df = df.withColumn("is_weekend", F.when(F.col("day_of_week").isin([1,7]),1).otherwise(0))
    
    df = df.withColumn("is_peak",F.when((F.hour("pickup_datetime").between(6, 10)) | (F.hour("pickup_datetime").between(16, 20)), 1).otherwise(0))
    
    # Save cleaned data
    df.coalesce(1).write.mode("overwrite").parquet(f'{TLC_OUTPUT_DIR}yellow/{year}-{str(month).zfill(2)}')
    
    return None

In [34]:
for month in MONTHS:
    input_path = f'{TLC_INPUT_DIR}yellow/{YEAR}-{str(month).zfill(2)}'
    aggregate_yellow(input_path, YEAR, month)

                                                                                