In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os, json, time, re
from pathlib import Path
from typing import Tuple, Iterable
import requests
from tqdm import tqdm
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# ===================== CONFIG =====================
TAXI_TYPES: Iterable[str] = ("yellow", "green")   
START_YM: Tuple[int,int] = (2022, 7)              
END_YM:   Tuple[int,int] = (2025, 6)              

OUT_DIR = Path("nyc-taxi-jul2022-jun2025")
KAGGLE_OWNER = "ronakal"             
KAGGLE_SLUG  = "nyc-tlc-trip-records-jul2022-jun2025"  

# TLC official file pattern (no scraping required)
URL_TMPL = "https://d37ci6vzurychx.cloudfront.net/trip-data/{taxi}_tripdata_{year}-{month:02d}.parquet"
# ==================================================

def ym_iter(y0, m0, y1, m1):
    y, m = y0, m0
    while (y < y1) or (y == y1 and m <= m1):
        yield y, m
        m += 1
        if m == 13: y, m = y + 1, 1

def normalize(df: pd.DataFrame, taxi: str, y: int, m: int) -> pd.DataFrame:
    ren = {}
    if "tpep_pickup_datetime" in df.columns:  ren["tpep_pickup_datetime"]  = "pickup_datetime"
    if "lpep_pickup_datetime" in df.columns:  ren["lpep_pickup_datetime"]  = "pickup_datetime"
    if "tpep_dropoff_datetime" in df.columns: ren["tpep_dropoff_datetime"] = "dropoff_datetime"
    if "lpep_dropoff_datetime" in df.columns: ren["lpep_dropoff_datetime"] = "dropoff_datetime"
    if ren: df = df.rename(columns=ren)
    df["taxi_type"] = taxi
    df["year"] = y
    df["month"] = m
    return df

def download(url: str, dest: Path, retries=3, timeout=120):
    dest.parent.mkdir(parents=True, exist_ok=True)
    for k in range(retries):
        try:
            with requests.get(url, stream=True, timeout=timeout) as r:
                r.raise_for_status()
                total = int(r.headers.get("content-length", 0))
                with open(dest, "wb") as f, tqdm(total=total, unit="B", unit_scale=True, desc=dest.name) as pbar:
                    for chunk in r.iter_content(1024*512):
                        if chunk:
                            f.write(chunk)
                            pbar.update(len(chunk))
            if dest.stat().st_size < 1024:
                raise RuntimeError("Downloaded file suspiciously small")
            return True
        except Exception as e:
            if k == retries - 1:
                print(f"  -> giving up: {e}")
                return False
            time.sleep(1 + k)

def build():
    raw = OUT_DIR / "raw"
    raw.mkdir(parents=True, exist_ok=True)

    # 1) Download monthlies
    files = []
    for y, m in ym_iter(*START_YM, *END_YM):
        for taxi in TAXI_TYPES:
            url = URL_TMPL.format(taxi=taxi, year=y, month=m)
            dest = raw / f"{taxi}_tripdata_{y}-{m:02d}.parquet"
            if not dest.exists():
                print(f"Downloading {dest.name}")
                ok = download(url, dest)
                if not ok:
                    continue
            files.append((dest, taxi, y, m))

    if not files:
        raise SystemExit("No files downloaded—check TAXI_TYPES and date window.")

    # 2) Build a unified schema (single pass)
    unified = None
    for p, taxi, y, m in files:
        try:
            df = pd.read_parquet(p, engine="pyarrow")
            df = normalize(df, taxi, y, m)
            tbl = pa.Table.from_pandas(df, preserve_index=False)
            unified = tbl.schema if unified is None else pa.unify_schemas([unified, tbl.schema])
        except Exception as e:
            print(f"[schema] skip {p.name}: {e}")

    if unified is None:
        raise SystemExit("Failed to infer unified schema.")

    # 3) Write per-year combined Parquet (memory-safe append)
    by_year = {}
    for p, taxi, y, m in files:
        by_year.setdefault(y, []).append((p, taxi, m))
    for y, items in sorted(by_year.items()):
        # keep only months within window (handles 2022 and 2025 partials)
        items = [(p,t,m) for (p,t,m) in items if (y, m) >= START_YM and (y, m) <= END_YM]
        if not items: continue
        outp = OUT_DIR / f"nyc_taxi_{y}.parquet"
        writer = None
        try:
            for p, taxi, m in sorted(items, key=lambda x: x[2]):
                try:
                    df = pd.read_parquet(p, engine="pyarrow")
                    df = normalize(df, taxi, y, m)
                    tbl = pa.Table.from_pandas(df, preserve_index=False)
                    # Add any missing columns as nulls + cast to unified types where possible
                    cols = []
                    names = set(tbl.schema.names)
                    for f in unified:
                        if f.name in names:
                            col = tbl[f.name]
                            if not col.type.equals(f.type):
                                try: col = pa.compute.cast(col, f.type)
                                except Exception: pass
                            cols.append(col)
                        else:
                            cols.append(pa.nulls(len(tbl)).cast(f.type, safe=False))
                    tbl = pa.Table.from_arrays(cols, schema=unified)
                    if writer is None:
                        writer = pq.ParquetWriter(outp, unified, compression="snappy")
                    writer.write_table(tbl)
                except Exception as e:
                    print(f"[write {y}] skip {p.name}: {e}")
        finally:
            if writer: writer.close()
        print(f"✔ wrote {outp}")

    # 4) Kaggle metadata
    meta = {
        "title": "NYC TLC Trip Records (Jul 2022–Jun 2025, official Parquet)",
        "id": f"{KAGGLE_OWNER}/{KAGGLE_SLUG}",
        "licenses": [{"name": "CC0-1.0"}],
        "description": (
            "Official monthly Parquet files from NYC TLC, filtered to Jul 2022–Jun 2025. "
            "Light normalization: pickup/dropoff datetime harmonized; taxi_type/year/month added. "
            "Source: NYC TLC Trip Record Data."
        ),
        "tags": [
            {"name": "transportation"},
            {"name": "new-york"},
            {"name": "public-datasets"},
            {"name": "parquet"},
            {"name": "timeseries"}
        ],
    }
    (OUT_DIR / "dataset-metadata.json").write_text(json.dumps(meta, indent=2))
    print("✔ prepared dataset-metadata.json")

if __name__ == "__main__":
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    build()

In [None]:
import os
print(os.getcwd())  # shows current directory
!ls -lh
!ls -lh nyc-taxi-jul2022-jun2025

In [None]:
!ls -lh /kaggle/working
!ls -lh /kaggle/working/nyc-taxi-jul2022-jun2025