In [1]:
import os
import pandas as pd
import numpy as np

In [2]:

# ---------------------------------------------------------
# 0. Paths 
# ---------------------------------------------------------
RAW_BASE_DIR  = "../data/raw"
PROC_BASE_DIR = "../data/processed"

os.makedirs(RAW_BASE_DIR, exist_ok=True)
os.makedirs(PROC_BASE_DIR, exist_ok=True)

taxi_path    = os.path.join(RAW_BASE_DIR,  "nyc_taxi_2023.csv")
zones_path   = os.path.join(RAW_BASE_DIR,  "nyc_taxi_zones.csv")
weather_path = os.path.join(RAW_BASE_DIR,  "weather_2023_nyc.csv")

csv_out_path     = os.path.join(PROC_BASE_DIR, "taxi_weather_zones_merged_2023_sample.csv")
parquet_out_path = os.path.join(PROC_BASE_DIR, "taxi_weather_zones_merged_2023_sample.parquet")
schema_out_path  = os.path.join(PROC_BASE_DIR, "data_schema_sample.txt")

print("Taxi path   :", taxi_path)
print("Zones path  :", zones_path)
print("Weather path:", weather_path)

# ---------------------------------------------------------
# 1. Load datasets 
# ---------------------------------------------------------
TAXI_ROWS = 2_000_000  

print(f"\nLoading first {TAXI_ROWS} rows from taxi data...")
taxi = pd.read_csv(taxi_path, nrows=TAXI_ROWS)
zones = pd.read_csv(zones_path)
weather = pd.read_csv(weather_path)

print("\n=== Initial Shapes ===")
print("Taxi   :", taxi.shape)
print("Zones  :", zones.shape)
print("Weather:", weather.shape)

# ---------------------------------------------------------
# 2. Taxi cleaning
# ---------------------------------------------------------
print("\n=== Cleaning Taxi Data ===")

taxi["tpep_pickup_datetime"] = pd.to_datetime(
    taxi["tpep_pickup_datetime"], errors="coerce"
)
taxi["pickup_date"] = taxi["tpep_pickup_datetime"].dt.floor("D")

for col in ["PULocationID", "DOLocationID"]:
    taxi[col] = pd.to_numeric(taxi[col], errors="coerce")

before_filter = len(taxi)
taxi = taxi[
    (taxi["trip_distance"] > 0) &
    (taxi["fare_amount"] > 0)
].copy()
after_filter = len(taxi)

print(f"Taxi records before filter: {before_filter}")
print(f"Taxi records after  filter: {after_filter}")

# ---------------------------------------------------------
# 3. Zones cleaning
# ---------------------------------------------------------
print("\n=== Cleaning Zones Data ===")

zones.columns = zones.columns.str.strip()
cols_lower = {c.lower(): c for c in zones.columns}
loc_col     = cols_lower.get("locationid")
borough_col = cols_lower.get("borough")
zone_col    = cols_lower.get("zone")

if loc_col is None or borough_col is None or zone_col is None:
    raise KeyError("Zones CSV must contain LocationID, Borough, Zone (case-insensitive).")

zones[loc_col] = pd.to_numeric(zones[loc_col], errors="coerce")

zones_pickup = zones.rename(columns={
    loc_col: "zone_id",
    borough_col: "pickup_borough",
    zone_col: "pickup_zone"
})

zones_dropoff = zones.rename(columns={
    loc_col: "zone_id_do",
    borough_col: "dropoff_borough",
    zone_col: "dropoff_zone"
})

# ---------------------------------------------------------
# 4. Weather cleaning
# ---------------------------------------------------------
print("\n=== Cleaning Weather Data ===")

weather.columns = weather.columns.str.strip()
weather_cols_lower = {c.lower(): c for c in weather.columns}
date_col = weather_cols_lower.get("date")

if date_col is None:
    raise KeyError("Weather CSV must contain a 'date' column.")

weather[date_col] = pd.to_datetime(weather[date_col], errors="coerce")

for col in ["temperature_avg", "precipitation", "is_raining"]:
    if col in weather.columns:
        weather[col] = pd.to_numeric(weather[col], errors="coerce")

# ---------------------------------------------------------
# 5. Merge: Taxi + Pickup Zones
# ---------------------------------------------------------
print("\n=== Step 1: Taxi + Pickup Zones ===")

merged = taxi.merge(
    zones_pickup[["zone_id", "pickup_borough", "pickup_zone"]],
    how="left",
    left_on="PULocationID",
    right_on="zone_id"
).drop(columns=["zone_id"])

print("Shape after pickup merge:", merged.shape)

# ---------------------------------------------------------
# 6. Merge: Taxi + Weather
# ---------------------------------------------------------
print("\n=== Step 2: Taxi + Weather ===")

merged["pickup_date"] = pd.to_datetime(merged["pickup_date"], errors="coerce")
weather["date"]       = pd.to_datetime(weather[date_col], errors="coerce")

merged = merged.merge(
    weather,
    how="left",
    left_on="pickup_date",
    right_on="date",
    suffixes=("", "_weather")
).drop(columns=["date"], errors="ignore")

print("Shape after weather merge:", merged.shape)

# ---------------------------------------------------------
# 7. Merge: Dropoff Zones
# ---------------------------------------------------------
print("\n=== Step 3: Dropoff Zones ===")

merged = merged.merge(
    zones_dropoff[["zone_id_do", "dropoff_borough", "dropoff_zone"]],
    how="left",
    left_on="DOLocationID",
    right_on="zone_id_do"
).drop(columns=["zone_id_do"])

print("Shape after dropoff merge:", merged.shape)

# ---------------------------------------------------------
# 8. Validation & summaries
# ---------------------------------------------------------
print("\n=== Data Validation ===")

original_taxi_count = after_filter
merged_count = len(merged)

print("Original taxi (post-filter):", original_taxi_count)
print("Merged records             :", merged_count)

expected_cols = [
    "tpep_pickup_datetime", "pickup_date",
    "PULocationID", "DOLocationID",
    "trip_distance", "fare_amount",
    "pickup_borough", "pickup_zone",
    "temperature_avg", "precipitation", "is_raining", "temp_category",
    "dropoff_borough", "dropoff_zone"
]

missing_expected = [c for c in expected_cols if c not in merged.columns]
if missing_expected:
    print("⚠️ Missing expected columns:", missing_expected)
else:
    print(" All expected key columns present.")

print("\n=== Dtypes ===")
print(merged.dtypes)

print("\n=== Missing Value Summary ===")
missing_summary = merged.isna().sum()
print(missing_summary)

print("\n=== Date Ranges ===")
print("Taxi pickup_date min:", taxi["pickup_date"].min())
print("Taxi pickup_date max:", taxi["pickup_date"].max())

if "date" in weather.columns:
    print("Weather date min    :", weather["date"].min())
    print("Weather date max    :", weather["date"].max())

print("\n=== Duplicate Rows ===")
print("Duplicates:", merged.duplicated().sum())

print("\n=== Sample merged rows (5) ===")
sample_cols = [c for c in expected_cols if c in merged.columns]
print(merged[sample_cols].head(5))

# ---------------------------------------------------------
# 9. Save outputs 
# ---------------------------------------------------------
print("\n=== Saving Outputs ===")

merged.to_csv(csv_out_path, index=False)
merged.to_parquet(parquet_out_path, index=False)

with open(schema_out_path, "w") as f:
    f.write("Data Schema (sample)\n")
    f.write("====================\n\n")
    for col, dtype in merged.dtypes.items():
        f.write(f"{col}: {dtype}\n")

csv_size_mb     = os.path.getsize(csv_out_path) / (1024**2)
parquet_size_mb = os.path.getsize(parquet_out_path) / (1024**2)

print(f" CSV saved to    : {csv_out_path}  (~{csv_size_mb:.2f} MB)")
print(f" Parquet saved to: {parquet_out_path}  (~{parquet_size_mb:.2f} MB)")
print(f" Schema saved to : {schema_out_path}")

print("\n Sample merge + save completed successfully.")


Taxi path   : ../data/raw/nyc_taxi_2023.csv
Zones path  : ../data/raw/nyc_taxi_zones.csv
Weather path: ../data/raw/weather_2023_nyc.csv

Loading first 2000000 rows from taxi data...

=== Initial Shapes ===
Taxi   : (2000000, 20)
Zones  : (263, 6)
Weather: (365, 5)

=== Cleaning Taxi Data ===
Taxi records before filter: 2000000
Taxi records after  filter: 1959448

=== Cleaning Zones Data ===

=== Cleaning Weather Data ===

=== Step 1: Taxi + Pickup Zones ===
Shape after pickup merge: (1959633, 23)

=== Step 2: Taxi + Weather ===
Shape after weather merge: (1959633, 27)

=== Step 3: Dropoff Zones ===
Shape after dropoff merge: (1960211, 29)

=== Data Validation ===
Original taxi (post-filter): 1959448
Merged records             : 1960211
 All expected key columns present.

=== Dtypes ===
VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime            object
passenger_count                 float64
trip_distance                   float64
Rat