In [26]:
import numpy as np
import pandas as pd

# --------------------
# CONSTANTS
# --------------------
# Define expected input columns
EXPECTED_INPUT_COLUMNS = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "extra",
    "total_amount",
]

# Columns after one-hot encoding RatecodeID and payment_type
EXPECTED_RATECODE_COLUMNS = [f"RatecodeID_{i}" for i in [1, 2, 3, 4, 5, 6, 99]]
EXPECTED_PAYMENT_COLUMNS = [f"payment_type_{i}" for i in [1, 2, 3, 4, 5]]


# Columns associated with datetime features
EXPECTED_DATETIME_FEATURES = [
    f"tpep_pickup_datetime_{unit}" for unit in ["day", "month", "year", "hour", "minute", "second"]
] + [
    f"tpep_dropoff_datetime_{unit}" for unit in ["day", "month", "year", "hour", "minute", "second"]
]

# Complete list of features (excluding label)
EXPECTED_FEATURE_COLUMNS = (
    ["passenger_count", "trip_distance", "extra", "PULocationID", "DOLocationID"] 
    + EXPECTED_RATECODE_COLUMNS
    + EXPECTED_PAYMENT_COLUMNS
    + EXPECTED_DATETIME_FEATURES
    + ["trip_duration"]
)

# --------------------
# FUNCTIONS
# --------------------
def preprocess(df):
    """
    Preprocess one chunk of the taxi dataset.
    - narrowing down to the columns mentioned in the problem statement
    - drop NAs and filter invalid data
    - add encoding for categorical variables (one-hot or frequency encoding depending on number of unique values)
    Returns df, feature_columns, skip_normalization_columns.
    """
    df = df[EXPECTED_INPUT_COLUMNS].copy()
    
    # Drop rows with any NA values
    df.dropna(inplace=True)
    
    # Clean extra values to be non-negative
    # based on the attribute descriptions from Kaggle: https://www.kaggle.com/datasets/diishasiing/revenue-for-cab-drivers/data
    df = df[df["extra"] >= 0]
    
    # Ensure total_amount is within 99.8% confidence interval to eliminate outliers
    lower_limit = df['total_amount'].quantile(0.001)
    upper_limit = df['total_amount'].quantile(0.999)
    df = df[(df['total_amount'] >= lower_limit) & (df['total_amount'] <= upper_limit)]


    # Convert datetime columns (vectorized)
    df.loc[:, "tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"], errors="coerce")
    df.loc[:, "tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"], errors="coerce")

    # Derive datetime features
    df = get_datetime_features(df, "tpep_pickup_datetime")
    df = get_datetime_features(df, "tpep_dropoff_datetime")

    # Trip duration in minutes
    df["trip_duration"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60

    # Filter out invalid trip durations (<=0 minutes or > 3 hours)
    df = df[(df["trip_duration"] > 0) & (df["trip_duration"] <= 180)]

    # Drop original datetime cols
    df.drop(columns=["tpep_pickup_datetime", "tpep_dropoff_datetime"], inplace=True)

    # Since there are 6 unique values for RatecodeID, 263 for PULocationID, 262 for DOLocationID and 5 for payment_type
    # taking into account the volume of data, using one hot encoding for ratecodeId and payment_type,
    # using frequency encoding for PULocationID and DOLocationID
    df["RatecodeID"] = df["RatecodeID"].astype("Int64")
    df["RatecodeID"] = pd.Categorical(df["RatecodeID"], categories=[1, 2, 3, 4, 5, 6, 99])

    df["payment_type"] = df["payment_type"].astype("Int64")
    df["payment_type"] = pd.Categorical(df["payment_type"], categories=[1, 2, 3, 4, 5])

    df = pd.get_dummies(df, columns=["RatecodeID", "payment_type"], prefix=["RatecodeID", "payment_type"])

    for col in ["PULocationID", "DOLocationID"]:
        freq = df[col].value_counts(normalize=True)
        df[col] = df[col].map(freq).fillna(0)

    # Add missing dummy columns with 0s - not necessary after removing chunking, but kept for safety
    for col in EXPECTED_RATECODE_COLUMNS + EXPECTED_PAYMENT_COLUMNS:
        if col not in df:
            df[col] = 0

    # Keep column order consistent
    df = df.reindex(columns=EXPECTED_FEATURE_COLUMNS + ["total_amount"], fill_value=0)

    # Drop rows with any NA values - doing this again as some NA values are getting introduced during datetime conversion
    df.dropna(inplace=True)

    # Tracking feature_columns and skip_normalization_columns to skip normalization of the attributes
    # that are the derived date-time attributes, were one-hot encoded or frequency encoded above
    skip_normalization_columns = [
        col for col in df.columns
        if col.startswith("RatecodeID_")
        or col.startswith("payment_type_")
        or col.startswith("tpep_pickup_datetime_")
        or col.startswith("tpep_dropoff_datetime_")
        or col in ["PULocationID", "DOLocationID"]
    ]
    feature_columns = [c for c in df.columns if c != "total_amount"]

    # ensuring X and y are of type float64 as object type arrays cause errors with MPI Allreduce
    X = df[feature_columns].values.astype(np.float64) 
    y = df["total_amount"].values.astype(np.float64)

    # stack X and y back into a dataframe to return a single dataframe
    df = pd.DataFrame(np.hstack((X, y.reshape(-1, 1))), columns=feature_columns + ["total_amount"])
    return df, feature_columns, skip_normalization_columns

def get_datetime_features(df, col_name):
    '''
    Derive datetime features from a datetime column
    '''
    dt = df[col_name].dt
    features = pd.DataFrame({
        col_name + "_day": dt.day,
        col_name + "_month": dt.month,
        col_name + "_year": dt.year,
        col_name + "_hour": dt.hour,
        col_name + "_minute": dt.minute,
        col_name + "_second": dt.second,
    }, index=df.index)
    return pd.concat([df, features], axis=1)


In [2]:
import pandas as pd
import os

csv_file = "../../data/nytaxi2022.csv"
pkl_file = "../../data/nytaxi2022.pkl"

if os.path.exists(pkl_file):
    # Fast path: load from pickle
    df = pd.read_pickle(pkl_file)
else:
    # Slow path: read CSV, then cache as pickle
    df = pd.read_csv(
        csv_file,
        header=0,
        parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"]
    )
    df.to_pickle(pkl_file)

In [27]:
taxi_data, feature_columns, skip_normalization_columns = preprocess(df)

print(taxi_data.shape)
print(taxi_data.head())

(38025510, 31)
   passenger_count  trip_distance  extra  PULocationID  DOLocationID  \
0              2.0           3.80    3.0      0.033060      0.043290   
1              1.0           2.10    0.5      0.042263      0.004269   
2              1.0           0.97    0.5      0.005294      0.007412   
3              1.0           1.09    0.5      0.011947      0.024367   
4              1.0           4.30    0.5      0.025087      0.024282   

   RatecodeID_1  RatecodeID_2  RatecodeID_3  RatecodeID_4  RatecodeID_5  ...  \
0           1.0           0.0           0.0           0.0           0.0  ...   
1           1.0           0.0           0.0           0.0           0.0  ...   
2           1.0           0.0           0.0           0.0           0.0  ...   
3           1.0           0.0           0.0           0.0           0.0  ...   
4           1.0           0.0           0.0           0.0           0.0  ...   

   tpep_pickup_datetime_minute  tpep_pickup_datetime_second  \
0       

In [40]:
taxi_data.head()

Unnamed: 0,passenger_count,trip_distance,extra,PULocationID,DOLocationID,RatecodeID_1,RatecodeID_2,RatecodeID_3,RatecodeID_4,RatecodeID_5,...,tpep_pickup_datetime_minute,tpep_pickup_datetime_second,tpep_dropoff_datetime_day,tpep_dropoff_datetime_month,tpep_dropoff_datetime_year,tpep_dropoff_datetime_hour,tpep_dropoff_datetime_minute,tpep_dropoff_datetime_second,trip_duration,total_amount
0,2.0,3.8,3.0,0.03306,0.04329,1.0,0.0,0.0,0.0,0.0,...,35.0,40.0,1.0,1.0,2022.0,0.0,53.0,29.0,17.816667,21.95
1,1.0,2.1,0.5,0.042263,0.004269,1.0,0.0,0.0,0.0,0.0,...,33.0,43.0,1.0,1.0,2022.0,0.0,42.0,7.0,8.4,13.3
2,1.0,0.97,0.5,0.005294,0.007412,1.0,0.0,0.0,0.0,0.0,...,53.0,21.0,1.0,1.0,2022.0,1.0,2.0,19.0,8.966667,10.56
3,1.0,1.09,0.5,0.011947,0.024367,1.0,0.0,0.0,0.0,0.0,...,25.0,21.0,1.0,1.0,2022.0,0.0,35.0,23.0,10.033333,11.8
4,1.0,4.3,0.5,0.025087,0.024282,1.0,0.0,0.0,0.0,0.0,...,36.0,48.0,1.0,1.0,2022.0,1.0,14.0,20.0,37.533333,30.3


In [41]:
# printing the feature columns and skip normalization columns for sanity check
# and for also copying over to use in the normalization step in the main logic of the project
print(feature_columns)
print(skip_normalization_columns)

['passenger_count', 'trip_distance', 'extra', 'PULocationID', 'DOLocationID', 'RatecodeID_1', 'RatecodeID_2', 'RatecodeID_3', 'RatecodeID_4', 'RatecodeID_5', 'RatecodeID_6', 'RatecodeID_99', 'payment_type_1', 'payment_type_2', 'payment_type_3', 'payment_type_4', 'payment_type_5', 'tpep_pickup_datetime_day', 'tpep_pickup_datetime_month', 'tpep_pickup_datetime_year', 'tpep_pickup_datetime_hour', 'tpep_pickup_datetime_minute', 'tpep_pickup_datetime_second', 'tpep_dropoff_datetime_day', 'tpep_dropoff_datetime_month', 'tpep_dropoff_datetime_year', 'tpep_dropoff_datetime_hour', 'tpep_dropoff_datetime_minute', 'tpep_dropoff_datetime_second', 'trip_duration']
['PULocationID', 'DOLocationID', 'RatecodeID_1', 'RatecodeID_2', 'RatecodeID_3', 'RatecodeID_4', 'RatecodeID_5', 'RatecodeID_6', 'RatecodeID_99', 'payment_type_1', 'payment_type_2', 'payment_type_3', 'payment_type_4', 'payment_type_5', 'tpep_pickup_datetime_day', 'tpep_pickup_datetime_month', 'tpep_pickup_datetime_year', 'tpep_pickup_date

In [42]:
taxi_data.columns

Index(['passenger_count', 'trip_distance', 'extra', 'PULocationID',
       'DOLocationID', 'RatecodeID_1', 'RatecodeID_2', 'RatecodeID_3',
       'RatecodeID_4', 'RatecodeID_5', 'RatecodeID_6', 'RatecodeID_99',
       'payment_type_1', 'payment_type_2', 'payment_type_3', 'payment_type_4',
       'payment_type_5', 'tpep_pickup_datetime_day',
       'tpep_pickup_datetime_month', 'tpep_pickup_datetime_year',
       'tpep_pickup_datetime_hour', 'tpep_pickup_datetime_minute',
       'tpep_pickup_datetime_second', 'tpep_dropoff_datetime_day',
       'tpep_dropoff_datetime_month', 'tpep_dropoff_datetime_year',
       'tpep_dropoff_datetime_hour', 'tpep_dropoff_datetime_minute',
       'tpep_dropoff_datetime_second', 'trip_duration', 'total_amount'],
      dtype='object')

In [43]:
print(len(taxi_data))

38025510


In [44]:
assert not taxi_data.isna().any().any(), "NaNs remain in dataframe after preprocessing!"

In [45]:
taxi_data['trip_duration'].describe()

count    3.802551e+07
mean     1.556626e+01
std      1.256280e+01
min      1.666667e-02
25%      7.366667e+00
50%      1.211667e+01
75%      1.961667e+01
max      1.800000e+02
Name: trip_duration, dtype: float64

In [None]:
taxi_data['payment_type_5'].describe() # sanity check to ensure all categorical variables that were encoded have the right values

count    3.802551e+07
mean     1.051925e-07
std      3.243340e-04
min      0.000000e+00
25%      0.000000e+00
50%      0.000000e+00
75%      0.000000e+00
max      1.000000e+00
Name: payment_type_5, dtype: float64

In [11]:
print(len(df))

39656098


In [None]:
pd.DataFrame.to_csv(taxi_data, "../../data/processed/nytaxi2022_preprocessed_final.csv", index=False)