## NY Taxi Data Cleaning Script

### Building Tensor and Triplets, Multiple Data Files, Ride Type Not Combined

Source data: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

I used these files:

- yellow_tripdata_2020-03.parquet
- yellow_tripdata_2020-04.parquet
- green_tripdata_2020-03.parquet
- green_tripdata_2020-04.parquet
- fhv_tripdata_2020-03.parquet
- fhv_tripdata_2020-04.parquet 

In [None]:
import numpy as np
import pandas as pd
import calendar
from pathlib import Path
from typing import List, Tuple

def build_tensor_and_triplets(
    yellow_parquets:    List[Path],
    green_parquets:     List[Path],
    rideshare_parquets: List[Path],
    year:               int,
    start_month:        int,
    end_month:          int,
    out_prefix:         str = "taxi_and_rideshare"
) -> Tuple[
    pd.DataFrame,      # triplets_sparse
    pd.DataFrame,      # triplets_full
    np.ndarray,        # tensor X
    pd.DatetimeIndex,  # time_index
    pd.DataFrame       # location_map (loc_idx -> ride_type, origZoneID, composite_key)
]:
    """
    Build a sparse triplets table and a dense (PU × DO × Time) tensor, but treat
    each (ride_type, zoneID) as its own unique location.
    Produce a location_map to see which composite locations share the same original zoneID across types.

    Parameters
    ----------
    yellow_parquets : List[Path]
        Paths to yellow‐taxi Parquet files (e.g. ["yellow_tripdata_2020-01.parquet", ...]).
        Each must contain at least: "PULocationID" (int/float) and "DOLocationID", plus
        a timestamp column named "tpep_pickup_datetime".
    green_parquets : List[Path]
        Paths to green‐taxi Parquet files. Each must contain "PULocationID", "DOLocationID",
        and a timestamp column "lpep_pickup_datetime".
    rideshare_parquets : List[Path]
        Paths to rideshare (FHV) Parquet files. Each must contain "PUlocationID" (note
        the lowercase "l"), "DOlocationID", and a timestamp column "pickup_datetime".
    year : int
        Four‐digit year, e.g. 2020.
    start_month : int
        Starting month (1=Jan … 12=Dec). Must satisfy 1 <= start_month <= end_month <= 12.
    end_month : int
        Ending month (1=Jan … 12=Dec). Must satisfy start_month <= end_month <= 12.
    out_prefix : str, default "taxi_and_rideshare"
        Prefix for writing out:
          - {out_prefix}_triplets.parquet
          - {out_prefix}_triplets_zeroes.parquet
          - {out_prefix}_tensor.npy

    Returns
    -------
    triplets_sparse : pd.DataFrame
        DataFrame with columns [PU_idx, DO_idx, t_idx, trip_count] for all positive counts,
        where PU_idx and DO_idx refer to the NEW composite‐location indices (0..N−1).
    triplets_full : pd.DataFrame
        Same as "triplets_sparse" but with EVERY combination (PU_idx, DO_idx, t_idx) filled in (zeros where no trips).
    X : np.ndarray
        Dense NumPy array of shape (N, N, T), where N = total number of unique
        (ride_type × zoneID) composites, and T = # of hours from (start_month, start_day=1)
        through the last hour of (end_month, last_day) at 23:00.  
        Entry X[i,j,k] = number of trips from composite_loc i -> composite_loc j during hour‐index k.
    time_index : pd.DatetimeIndex
        The hourly index from year‐start_month‐01 00:00 through year‐end_month‐(last day) 23:00.
    location_map : pd.DataFrame
        A DataFrame of shape (N, 3) with columns:
          • loc_idx         (int, 0..N−1)  
          • ride_type       (string: "yellow" | "green" | "rideshare")  
          • orig_zone_id    (int)  (the original numeric zone ID)  
          • composite_key   (string: e.g. "yellow_264", "rideshare_  5", …)  

        Multiple rows in "location_map" with the same "orig_zone_id"
        but different "ride_type", then composite locations refer to the same
        underlying zone.  

    Raises
    ------
    ValueError
        If start_month/end_month are invalid, or if no rows remain after filtering.
    """

    # 1) Validate months
    if not (1 <= start_month <= 12 and 1 <= end_month <= 12 and start_month <= end_month):
        raise ValueError("start_month and end_month must be in 1..12, and start_month ≤ end_month.")

    # 2) Compute the absolute start/end timestamps for filtering
    start_ts = pd.Timestamp(year, start_month, 1, 0, 0)
    last_day_end = calendar.monthrange(year, end_month)[1]
    end_ts = pd.Timestamp(year, end_month, last_day_end, 23, 59)

    # 3) Helper to load & filter one "category" (yellow/green/rideshare).
    #    It also renames FHV's lowercase‐l columns to match taxi's uppercase‐"L".
    def _load_and_filter(
        paths: List[Path],
        time_col: str,
        ride_type: str,
        rename_map: dict = None
    ) -> pd.DataFrame:
        """
        For each parquet in 'paths', read it, optionally rename columns, parse 'time_col',
        drop rows with missing PU/DO, filter to [start_ts..end_ts], floor to the hour,
        and return a concatenated DataFrame with columns:
          [ride_type, PULocationID, DOLocationID, time_hour]
        """
        pieces = []
        for pq_path in paths:
            df = pd.read_parquet(pq_path).copy()

            # (a) If we need to rename (FHV uses lowercase-l), do so now:
            if rename_map:
                df = df.rename(columns=rename_map)

            # (b) Parse the timestamp:
            df["time"] = pd.to_datetime(df[time_col], errors="coerce")

            # (c) Drop any row where PU or DO is NaN
            df = df.dropna(subset=["PULocationID", "DOLocationID"])

            # (d) Keep only rows within [start_ts .. end_ts]
            df = df[(df["time"] >= start_ts) & (df["time"] <= end_ts)]
            if df.empty:
                continue

            # (e) Floor to the hour
            df["time_hour"] = df["time"].dt.floor("H")

            # (f) Keep exactly these columns—and add a literal 'ride_type':
            df = df[["PULocationID", "DOLocationID", "time_hour"]].copy()
            df["ride_type"] = ride_type

            pieces.append(df)

        if not pieces:
            # If no file yielded any rows, return an empty DF with the right columns
            return pd.DataFrame(columns=["PULocationID", "DOLocationID", "time_hour", "ride_type"])
        return pd.concat(pieces, ignore_index=True)

    # 4) Load each category:
    df_yellow = _load_and_filter(
        yellow_parquets,
        time_col="tpep_pickup_datetime",
        ride_type="yellow",
        rename_map=None
    )
    df_green = _load_and_filter(
        green_parquets,
        time_col="lpep_pickup_datetime",
        ride_type="green",
        rename_map=None
    )
    df_rideshare = _load_and_filter(
        rideshare_parquets,
        time_col="pickup_datetime",
        ride_type="rideshare",
        # rename the lowercase‐l columns to uppercase‐L so they match "PULocationID" etc.
        rename_map={"PUlocationID": "PULocationID", "DOlocationID": "DOLocationID"}
    )

    # 5) Concatenate all three types into one big DataFrame
    df_all = pd.concat([df_yellow, df_green, df_rideshare], ignore_index=True)
    if df_all.empty:
        raise ValueError(f"No rows found between {start_ts} and {end_ts} in any file.")

    # 6) Build a "composite key" for each location = f"{ride_type}_{origZoneID}".
    #    (We'll do this for both pickup and dropoff columns.)
    df_all["PU_composite"] = (
        df_all["ride_type"] + "_" + df_all["PULocationID"].astype(int).astype(str)
    )
    df_all["DO_composite"] = (
        df_all["ride_type"] + "_" + df_all["DOLocationID"].astype(int).astype(str)
    )

    # 7) Gather all unique composite‐keys (pickup ∪ dropoff)
    pu_uniques = df_all["PU_composite"].unique()
    do_uniques = df_all["DO_composite"].unique()
    all_composites = np.unique(np.concatenate([pu_uniques, do_uniques])).tolist()

    # 8) Create a lookup: composite_key -> loc_idx (0 .. N−1)
    #    (We sort so that it's deterministic, but any ordering is fine.)
    all_composites.sort()
    composite2idx = {key: idx for idx, key in enumerate(all_composites)}
    N = len(all_composites)
    print(f"Found {N} distinct (ride_type × zone) composites.")

    # 9) Build a "location_map" DataFrame so you know:
    #    loc_idx | ride_type | orig_zone_id | composite_key
    #    ---------------------------------------------------
    #    0       | "green"   |  10          | "green_10"
    #    1       | "rideshare"| 10          | "rideshare_10"
    #    2       | "yellow"  | 264          | "yellow_264"
    #    … etc …
    #
    loc_rows = []
    for composite_key, loc_idx in composite2idx.items():
        ride, orig_str = composite_key.split("_", 1)
        loc_rows.append({
            "loc_idx":      loc_idx,
            "ride_type":    ride,
            "orig_zone_id": int(orig_str),
            "composite_key": composite_key
        })
    location_map = pd.DataFrame(loc_rows).sort_values("loc_idx").reset_index(drop=True)

    # 10) Now map every row in df_all to two integer indices:
    #     PU_idx_local = composite2idx[ PU_composite ]
    #     DO_idx_local = composite2idx[ DO_composite ]
    df_all["PU_idx"] = df_all["PU_composite"].map(composite2idx).astype(int)
    df_all["DO_idx"] = df_all["DO_composite"].map(composite2idx).astype(int)

    # 11) Build the hourly time_index from start_ts (00:00) through the last full hour
    #     before end_ts. E.g. if end_ts = 2020-04-30 23:59, last hour = 2020-04-30 23:00.
    last_hour = pd.Timestamp(year, end_month, last_day_end, 23, 0)
    time_index = pd.date_range(start=start_ts, end=last_hour, freq="H")
    T = len(time_index)
    print("Total hours T =", T)

    # 12) Create a mapping for time -> index
    T2idx = {ts: idx for idx, ts in enumerate(time_index)}

    # 13) Map "time_hour" -> integer hour‐index
    df_all["t_idx"] = df_all["time_hour"].map(T2idx).astype(int)

    # 14) Build sparse triplets by grouping on (PU_idx, DO_idx, t_idx)
    triplets_sparse = (
        df_all
        .assign(trip_count=1)
        .groupby(["PU_idx", "DO_idx", "t_idx"], sort=False)["trip_count"]
        .sum()
        .reset_index()
    )
    sparse_path = f"{out_prefix}_triplets.parquet"
    triplets_sparse.to_parquet(sparse_path, index=False)
    print("DONE - Saved sparse triplets ->", sparse_path)

    # 15) Build the "full" cartesian product of (PU_idx, DO_idx, t_idx), filling zeros
    full_idx = pd.MultiIndex.from_product(
        [range(N), range(N), range(T)],
        names=["PU_idx", "DO_idx", "t_idx"]
    )
    triplets_full = (
        triplets_sparse
        .set_index(["PU_idx", "DO_idx", "t_idx"])
        .reindex(full_idx, fill_value=0)
        .reset_index()
    )
    full_path = f"{out_prefix}_triplets_zeroes.parquet"
    triplets_full.to_parquet(full_path, index=False)
    print("DONE - Saved full triplets ->", full_path)

    # 16) Build the dense tensor X of shape (N, N, T)
    X = np.zeros((N, N, T), dtype=int)
    X[
        triplets_full["PU_idx"].values,
        triplets_full["DO_idx"].values,
        triplets_full["t_idx"].values
    ] = triplets_full["trip_count"].values

    tensor_path = f"{out_prefix}_tensor.npy"
    np.save(tensor_path, X)
    print("DONE - Saved tensor ->", tensor_path, "shape=", X.shape)

    # SANITY CHECKS
    assert X.shape == (N, N, T), f"Tensor shape mismatch: {X.shape} vs ({N},{N},{T})"
    nnz = np.count_nonzero(X)
    assert len(triplets_sparse) == nnz, f"FAIL - sparse count {len(triplets_sparse)} vs nonzero {nnz}"
    expected_full = N * N * T
    assert len(triplets_full) == expected_full, (
        f"FAIL - full count {len(triplets_full)} vs expected {expected_full}"
    )
    # Check that rebuilding from triplets_full gives X exactly
    Y_full = np.zeros_like(X)
    Y_full[
        triplets_full["PU_idx"].values,
        triplets_full["DO_idx"].values,
        triplets_full["t_idx"].values
    ] = triplets_full["trip_count"].values
    assert np.array_equal(Y_full, X), "FAIL - full table =/= tensor"

    print("SUCCESS - All files consistent and correct.")

    return triplets_sparse, triplets_full, X, time_index, location_map

In [None]:
location_map.to_csv("taxi_yellow_green_rideshare_distinct_march_to_apr2020_locations_map.csv")

In [None]:
yellow_files = [
    Path("Raw Data/yellow_tripdata_2020-03.parquet"),
    Path("Raw Data/yellow_tripdata_2020-04.parquet"),
]
green_files  = [
    Path("Raw Data/green_tripdata_2020-03.parquet"),
    Path("Raw Data/green_tripdata_2020-04.parquet"),
]
rideshare_files  = [
    Path("Raw Data/fhv_tripdata_2020-03.parquet"),
    Path("Raw Data/fhv_tripdata_2020-04.parquet"),
] 

In [None]:
sparse_df, full_df, tensor, time_index, location_map = build_tensor_and_triplets(
    yellow_parquets    = yellow_files,
    green_parquets     = green_files,
    rideshare_parquets = rideshare_files,
    year               = 2020,
    start_month        = 3,
    end_month          = 4,
    out_prefix         = "taxi_yellow_green_rideshare_distinct_march_to_apr2020"
)