In [None]:
import os
import numpy as np
import pandas as pd
import geopandas as gpd
from matplotlib import pyplot as plt
import warnings
warnings.filterwarnings('ignore')
# %matplotlib inline
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

In [None]:
# ---------------------------
# Reference Data
# ---------------------------

HVFHV_dict = {
    "Uber": ["B02877","B02866","B02882","B02869","B02617","B02876",
             "B02865","B02512","B02888","B02864","B02883","B02875",
             "B02682","B02880","B02870","B02404","B02598","B02765",
             "B02879","B02867","B02878","B02887","B02872","B02836",
             "B02884","B02835","B02764","B02889","B02871","B02395",
             "B03404"],
    "Lyft": ["B02510","B02844","B03406"],
    "Juno": ["B02914","B02907","B02908","B03035"],
    "Via":  ["B03136","B02800"]
}
HVFHV_bases = [value for values in HVFHV_dict.values() for value in values]
print("high-volume fhv base numbers:{}".format(len(HVFHV_bases)))


company_dict = {'HV0002':"Juno",'HV0003':"Uber",'HV0004':"Via",'HV0005':"Lyft"}

# Taxi zone shapefile: using only Manhattan zones
tz = gpd.read_file('data/shapefile/taxiZone/geo_export_bb555bf4-8fc5-4144-b5f6-615889d80884.shp')
mh_id = tz[tz.borough == 'Manhattan']['location_i'].unique()

# Congestion zone CSV file: used for zone classification later
cg = pd.read_csv('data/shapefile/taxiZone/congestZone.csv')
cg['location_i'] = cg['location_i'].round(0)
c_id = cg['location_i'].unique()

In [None]:
# ---------------------------
# Helper Functions
# ---------------------------

def process_time(df):
    """
    Standardize time columns, extract temporal features, and compute trip duration and speed.
    Uses either 'dropOff_datetime' or 'dropoff_datetime' based on available columns.
    Speed is calculated using 'trip_distance' if present; otherwise, 'trip_miles' is used.
    """
    # Rename time columns based on available names
    if 'dropOff_datetime' in df.columns:
        df = df.rename(columns={
            'pickup_datetime': 'PU_time', 
            'dropOff_datetime': 'DO_time',
            'PUlocationID': 'PULocationID', 
            'DOlocationID': 'DOLocationID'
        })
    elif 'dropoff_datetime' in df.columns:
        df = df.rename(columns={'pickup_datetime': 'PU_time', 'dropoff_datetime': 'DO_time'})
    else:
        raise ValueError("Neither 'dropOff_datetime' nor 'dropoff_datetime' found.")
    
    df['PU_time'] = pd.to_datetime(df['PU_time'])
    df['DO_time'] = pd.to_datetime(df['DO_time'])
    
    # Extract time features
    df['date'] = df['PU_time'].dt.date
    df['year'] = df['PU_time'].dt.year
    df['month'] = df['PU_time'].dt.month
    df['day'] = df['PU_time'].dt.day
    df['dow'] = df['PU_time'].dt.dayofweek
    df['dayID'] = df['month'].apply(lambda x: str(x).zfill(2)) + df['day'].apply(lambda x: str(x).zfill(2)) # for future table join
    df['pu_hour'] = df['PU_time'].dt.hour
    df['pu_min'] = df['PU_time'].dt.minute
    df['do_hour'] = df['DO_time'].dt.hour
    df['do_min'] = df['DO_time'].dt.minute
    df['trip_duration'] = (df['DO_time'] - df['PU_time']).dt.total_seconds()
    
    # Calculate speed if possible
    if 'trip_distance' in df.columns:
        df['speed'] = df['trip_distance'] / (df['trip_duration'] / 3600)
    elif 'trip_miles' in df.columns:
        df['speed'] = df['trip_miles'] / (df['trip_duration'] / 3600)
    return df

def filter_boundary_and_time(df, boundary_id):
    """
    Filter trips within the specified spatial boundary and restrict to weekdays.
    """
#     ini_shape = df.shape[0]
    df = df[df.PULocationID.isin(boundary_id) & df.DOLocationID.isin(boundary_id)]
#     print("After boundary filter: {:.2f}".format(df.shape[0] / ini_shape))
    df = df[df.dow <= 4]
#     print("After weekday filter: {:.2f}".format(df.shape[0] / ini_shape))
    return df

def filter_duration(df, min_duration=5*60, max_duration=100*60):
    """
    Filter trips based on trip duration.
    """
#     ini_shape = df.shape[0]
    df = df[(df.trip_duration >= min_duration) & (df.trip_duration <= max_duration)]
#     print("After duration filter: {:.2f}".format(df.shape[0] / ini_shape))
    return df

def basic_numeric_check(df):
    """
    Perform basic numeric validations for FHV records.
    Checks require positive values for trip_miles, speed, base_passenger_fare, and driver_pay.
    """
#     ini_shape = df.shape[0]
    condition = (df.trip_miles > 0) & (df.trip_duration > 0) & (df.speed > 0) & \
                (df.base_passenger_fare > 0) & (df.driver_pay > 0)
    df = df[condition]
#     print("After numeric check: {:.2f}".format(df.shape[0] / ini_shape))
    return df


def sanity_check(df):
    """
    Apply sanity checks on numeric columns for FHV records.
    """
#     ini_shape = df.shape[0]
    condition = (df.base_passenger_fare >= 2.5) & (df.base_passenger_fare <= 300) & \
                (df.driver_pay >= 2) & (df.driver_pay <= 300) & \
                (df.trip_miles >= df.trip_miles.quantile(0.01)) & (df.trip_miles <= 100) & \
                (df.speed >= df.speed.quantile(0.01)) & (df.speed <= 80) & \
                (df.trip_duration >= 5*60) & (df.trip_duration <= 100*60)
    df = df[condition]
#     print("After sanity check: {:.2f}".format(df.shape[0] / ini_shape))
    return df

def zones_check(df):
    """
    Classify trips based on whether their pickup and drop-off locations fall within congested regions.
    Refer to the accompanying paper for further details.
    """
    zones_conditions = [
        (df.PULocationID.isin(c_id)) & (df.DOLocationID.isin(c_id)),
        (df.PULocationID.isin(c_id)) & (~df.DOLocationID.isin(c_id)),
        (~df.PULocationID.isin(c_id)) & (df.DOLocationID.isin(c_id)),
        (~df.PULocationID.isin(c_id)) & (~df.DOLocationID.isin(c_id))
    ]
    zones = ['aa', 'ab', 'ba', 'bb']
    df['zones'] = np.select(zones_conditions, zones)
#     print("Zone distribution:")
#     print(df['zones'].value_counts(normalize=True))
    return df

def time_filter_month(df, month):
    """
    Filter DataFrame to include only records within the specified month.
    """
    start_date = pd.to_datetime(f"{month}-01")
    end_date = start_date + pd.offsets.MonthBegin(1)
    return df[(df.PU_time >= start_date) & (df.PU_time < end_date)]

def identify_hvfhv(df, HVFHV_dict,  method="dispatching"):
    """
    Identify high-volume FHV companies based on base numbers.
    
    Parameters:
    - df: DataFrame with FHV records.
    - HVFHV_dict: Dictionary mapping company names to base numbers.
    - method: 'dispatching' (default) or 'affiliated' to choose the column for identification.
    
    Returns:
    DataFrame filtered to only include rows identified as hv-fhv.
    """
    df['hvfhv'] = np.nan
    for key, bases in HVFHV_dict.items():
        if method == "dispatching":
            df.loc[df['dispatching_base_num'].isin(bases), 'hvfhv'] = key # adopt dispatch according to TLC's reply
        else:
            df.loc[df['Affiliated_base_number'].isin(bases), 'hvfhv'] = key
    
    return df.dropna(subset=['hvfhv'])

def aggregate_pickups(df, company, group_cols=None):
    """
    Aggregate hourly pick-up counts for a specific company.
    """
    if group_cols is None:
        group_cols = ['date', 'year', 'dayID', 'pu_hour', 'PULocationID', 'dow', 'zones']
    sdf = df[df.hvfhv == company]
    agg = sdf.groupby(group_cols, as_index=False).agg(PUn_trips=('DOLocationID', 'count'))
    agg.rename(columns={'PULocationID': 'locID'}, inplace=True)
    return agg


In [None]:
# ---------------------------
# Main Processing Pipeline for FHV Records
# ---------------------------

def process_fhv_record(name, parquet_read = True, hvfhv_identity = True, filtration = True, hvfhv_method="dispatching"):
    """
    Process FHV data for a given month.
    
    Parameters:
    - name: Month string in format 'YYYY-MM'.
    - parquet_read: deal with specific request for some months' records.
    - hvfhv_identify: Whether to identify high-volume FHV companies.
    - filtration: specific request for some months' records
    - hvfhv_method: Column selection for identification ('dispatching' or 'affiliated').
    
    Returns:
    Cleaned DataFrame.
    """ 
    
    if parquet_read:
        input_path = f"data/fhv/fhv_tripdata_{name}.parquet"
        df = pd.read_parquet(input_path)
    else:
        if name in ['2019-02', '2019-03', '2019-04', '2019-05', '2019-06', '2019-07']:
            input_path = f"data/fhv/fhvhv_tripdata_{name}.parquet"
            df = pq.read_table(input_path).to_pandas()
        else:
            input_path = f"data/fhv/fhv_tripdata_{name}.parquet"
            table = pq.read_table(input_path)
            df = table.filter(
                pc.less_equal(table["dropOff_datetime"], pa.scalar(pd.Timestamp.max))
                ).to_pandas()
 
    print(f"Processing FHV data for {name}. Initial shape: {df.shape}")
    
    # Identify hv-fhv if required
    if hvfhv_identity:
        df = identify_hvfhv(df, HVFHV_dict, method=hvfhv_method)
    else:
        df['hvfhv'] = df['hvfhs_license_num'].map(company_dict)
#     print("After hv-fhv identification, shape:", df.shape)

    # Process time columns and extract features
    df = process_time(df)
    
    # Apply boundary and weekday filtering
    df = filter_boundary_and_time(df, mh_id)
    
    if filtration:
        # Apply cleaning pipeline
        df = basic_numeric_check(df)
        df = sanity_check(df)
    
    # Classify trips by congestion zones
    df = zones_check(df)
    
    # Time filter: keep records only within the specified month
    df = time_filter_month(df, name)
    
    # Save cleaned data
    output_dir = "data/fhv/cleaned_hvfhv"
    os.makedirs(output_dir, exist_ok=True)
    output_path = os.path.join(output_dir, f"fhvhv_tripdata_{name}.csv")
    df.to_csv(output_path, index=False)
#     print(f"Cleaned data saved to {output_path}. Final shape: {df.shape}")
    return df


In [None]:
# ---------------------------
# Process and Aggregate
# ---------------------------

# may take 20~40 mins to proceed

if __name__ == "__main__":
    
    for month_to_process in ['2017-08', '2017-09', '2017-10', '2017-11', '2017-12','2018-01','2018-02', '2018-03', '2018-04']:
        # Select file_format and cleaning_pipeline based on dataset specifics.
        cleaned_df = process_fhv_record(month_to_process, parquet_read = True, \
                                        hvfhv_identity = True, filtration = False, hvfhv_method="dispatching")
        
        for key in ['Uber','Lyft']:
            company_key = key
            agg_df = aggregate_pickups(cleaned_df, company_key)
            print("Total pick-ups aggregated for", company_key, ":", agg_df['PUn_trips'].sum())

            # Save aggregated data
            agg_dir = f"data/fhv/agg_hvfhv/{company_key}"
            os.makedirs(agg_dir, exist_ok=True)
            agg_output_path = os.path.join(agg_dir, f"{company_key}_aggtrip_{month_to_process}.csv")
            agg_df.to_csv(agg_output_path, index=False)
            print(f"Aggregated data saved to {agg_output_path}.")
        
    for month_to_process in ["2018-05", "2018-06", "2018-07", "2018-08", "2018-09", "2018-10", "2018-11", "2018-12", "2019-01"]:
        # Select file_format and cleaning_pipeline based on dataset specifics.
        cleaned_df = process_fhv_record(month_to_process, parquet_read = False, \
                                        hvfhv_identity = True, filtration = False, hvfhv_method="dispatching")
        
        for key in ['Uber','Lyft']:
            company_key = key
            agg_df = aggregate_pickups(cleaned_df, company_key)
            print("Total pick-ups aggregated for", company_key, ":", agg_df['PUn_trips'].sum())

            # Save aggregated data
            agg_dir = f"data/fhv/agg_hvfhv/{company_key}"
            os.makedirs(agg_dir, exist_ok=True)
            agg_output_path = os.path.join(agg_dir, f"{company_key}_aggtrip_{month_to_process}.csv")
            agg_df.to_csv(agg_output_path, index=False)
            print(f"Aggregated data saved to {agg_output_path}.")
        
    for month_to_process in ['2019-02', '2019-03', '2019-04', '2019-05', '2019-06', '2019-07']:
        # Select file_format and cleaning_pipeline based on dataset specifics.
        cleaned_df = process_fhv_record(month_to_process, parquet_read = False, \
                                        hvfhv_identity = False, filtration = True, hvfhv_method="dispatching")
        
        for key in ['Uber','Lyft']:
            company_key = key
            agg_df = aggregate_pickups(cleaned_df, company_key)
            print("Total pick-ups aggregated for", company_key, ":", agg_df['PUn_trips'].sum())

            # Save aggregated data
            agg_dir = f"data/fhv/agg_hvfhv/{company_key}"
            os.makedirs(agg_dir, exist_ok=True)
            agg_output_path = os.path.join(agg_dir, f"{company_key}_aggtrip_{month_to_process}.csv")
            agg_df.to_csv(agg_output_path, index=False)
            print(f"Aggregated data saved to {agg_output_path}.")
    