# Machine learning and deep learning models

In [1]:
from __future__ import annotations

# Standard library
import gc
import itertools
import json
import math
import os
import random
import time
from dataclasses import dataclass
from glob import glob
from pathlib import Path
from typing import Iterator, Optional, Tuple

# Third-party libraries
import duckdb
import numpy as np
import pandas as pd
import polars as pl
import torch
import torch.nn as nn
import torch.optim as optim
from collections import defaultdict
from math import sqrt
from torch.utils.data import DataLoader, Dataset
from xgboost import XGBRegressor

# scikit-learn
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

## Common data partitioning

To ensure comparability across models, consistent data partitions were used at each modeling level:

* The same train, validation, and test segments were applied to all models:

  * Training period: 2017–2021
  * Validation period: 2022
  * Test period: 2023–2024

* For hourly models, temporal ordering was preserved within each split, and rolling-origin evaluation was used to improve robustness:

  * Models were trained on data spanning [start … t] and one-step-ahead forecasts were generated for t + 1, iterating through the test period.

In [2]:
# Common data partitioning

BASE_DIR = "/Users/zoltanjelovich/Documents/ISEG/MFW/data"

# Targets
PATH_Y_DAILY_SYSTEM  = f"{BASE_DIR}/targets/system/y_daily_system.parquet"
PATH_Y_HOURLY_SYSTEM = f"{BASE_DIR}/targets/system/y_hourly_system.parquet"
PATH_Y_HOURLY_STATION = f"{BASE_DIR}/targets/station/y_hourly_station.parquet"

# Partition windows
TRAIN_START = pd.Timestamp("2017-01-01")
TRAIN_END   = pd.Timestamp("2021-12-31 23:00:00")
VAL_START   = pd.Timestamp("2022-01-01")
VAL_END     = pd.Timestamp("2022-12-31 23:00:00")
TEST_START  = pd.Timestamp("2023-01-01")
TEST_END    = pd.Timestamp("2024-12-31 23:00:00")

def _to_ts(x) -> pd.Timestamp:
    return pd.Timestamp(x)

def add_split_label(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
    out = df.copy()
    out[time_col] = pd.to_datetime(out[time_col])

    t = out[time_col]
    split = np.where((t >= TRAIN_START) & (t <= TRAIN_END), "train",
             np.where((t >= VAL_START)   & (t <= VAL_END),   "val",
             np.where((t >= TEST_START)  & (t <= TEST_END),  "test", None)))

    out["split"] = split
    out = out[out["split"].notna()].copy()
    return out

In [3]:
# Apply to system-level targets (daily + hourly)

# Daily system target
y_daily_system = pd.read_parquet(PATH_Y_DAILY_SYSTEM)
y_daily_system = add_split_label(y_daily_system, time_col="date")
print(y_daily_system["split"].value_counts(dropna=False))
print(y_daily_system["date"].min(), "→", y_daily_system["date"].max())

# Hourly system target
y_hourly_system = pd.read_parquet(PATH_Y_HOURLY_SYSTEM)
y_hourly_system = add_split_label(y_hourly_system, time_col="ts_hour")
y_hourly_system = y_hourly_system.sort_values("ts_hour")
print(y_hourly_system["split"].value_counts(dropna=False))
print(y_hourly_system["ts_hour"].min(), "→", y_hourly_system["ts_hour"].max())

split
train    1826
test      731
val       365
Name: count, dtype: int64
2017-01-01 00:00:00 → 2024-12-31 00:00:00
split
train    43824
test     17544
val       8760
Name: count, dtype: int64
2017-01-01 00:00:00 → 2024-12-31 23:00:00


In [4]:
# Apply to station-level hourly target

y_hourly_station = pd.read_parquet(PATH_Y_HOURLY_STATION, columns=["ts_hour", "station_id", "departures", "arrivals", "net"])
y_hourly_station = add_split_label(y_hourly_station, time_col="ts_hour")
y_hourly_station = y_hourly_station.sort_values(["ts_hour", "station_id"])

print(y_hourly_station["split"].value_counts())
print(y_hourly_station["ts_hour"].min(), "→", y_hourly_station["ts_hour"].max())
print("Unique stations:", y_hourly_station["station_id"].nunique())

split
test     22157137
train    21764585
val       8255194
Name: count, dtype: int64
2017-01-01 00:00:00 → 2024-12-31 23:00:00
Unique stations: 3594


In [5]:
# Rolling-origin “expanding window” iterator (forecast t+1)

@dataclass(frozen=True)
class RollingOriginStep:
    train_end: pd.Timestamp
    pred_time: pd.Timestamp

def rolling_origin_hours(
    test_start: pd.Timestamp,
    test_end: pd.Timestamp,
    train_start: pd.Timestamp = TRAIN_START,
    min_train_end: pd.Timestamp = TRAIN_END,
    step_hours: int = 1,
) -> Iterator[RollingOriginStep]:
    # First prediction time is the first hour in test period
    pred_time = test_start

    # Corresponding train_end is one hour before the prediction
    train_end = pred_time - pd.Timedelta(hours=1)

    # Enforce minimum training end
    if train_end < min_train_end:
        train_end = min_train_end
        pred_time = train_end + pd.Timedelta(hours=1)

    while pred_time <= test_end:
        yield RollingOriginStep(train_end=train_end, pred_time=pred_time)
        pred_time = pred_time + pd.Timedelta(hours=step_hours)
        train_end = pred_time - pd.Timedelta(hours=1)

# Preview the first 5 steps
steps = list(itertools.islice(rolling_origin_hours(TEST_START, TEST_END), 5))
steps

[RollingOriginStep(train_end=Timestamp('2022-12-31 23:00:00'), pred_time=Timestamp('2023-01-01 00:00:00')),
 RollingOriginStep(train_end=Timestamp('2023-01-01 00:00:00'), pred_time=Timestamp('2023-01-01 01:00:00')),
 RollingOriginStep(train_end=Timestamp('2023-01-01 01:00:00'), pred_time=Timestamp('2023-01-01 02:00:00')),
 RollingOriginStep(train_end=Timestamp('2023-01-01 02:00:00'), pred_time=Timestamp('2023-01-01 03:00:00')),
 RollingOriginStep(train_end=Timestamp('2023-01-01 03:00:00'), pred_time=Timestamp('2023-01-01 04:00:00'))]

## Feature sets per level

Consistent feature families were used across modeling levels:

* For system-level machine learning models:

  * Target variables included `y_hourly_system` and `y_daily_system` for one-step-ahead forecasting.
  * Feature sets included time-of-day and time-of-week indicators, weather variables, air quality index (AQI) measures, and phase indicators.

* For station-level machine learning and deep learning models:

  * The primary target variable was `y_hourly_station`, with log transformation applied where appropriate.
  * Feature sets included:

    * Temporal features such as hour, day-of-week, seasonality, and holiday indicators.
    * Weather variables.
    * Built environment and demographic attributes represented as station-level static features.
    * Network features including centrality measures, degree, and cluster membership.
    * Contextual variables such as traffic measures and bicycle counts.

* For cluster-level models:

  * Feature sets followed the station-level structure but were aggregated at the cluster level, enabling simpler model specifications and direct comparison with station-level results.

In [6]:
# Hourly system

BASE_DIR = "/Users/zoltanjelovich/Documents/ISEG/MFW/data"
PATH_X_HOURLY_SYSTEM = f"{BASE_DIR}/features/system/X_hourly_system.parquet"

# Split boundaries
TRAIN_START = pd.Timestamp("2017-01-01")
TRAIN_END   = pd.Timestamp("2021-12-31 23:00:00")
VAL_START   = pd.Timestamp("2022-01-01")
VAL_END     = pd.Timestamp("2022-12-31 23:00:00")
TEST_START  = pd.Timestamp("2023-01-01")
TEST_END    = pd.Timestamp("2024-12-31 23:00:00")

def add_split_label_on_time(df: pd.DataFrame, time_col: str) -> pd.DataFrame:
    out = df.copy()
    t = pd.to_datetime(out[time_col])
    out[time_col] = t

    split = np.where((t >= TRAIN_START) & (t <= TRAIN_END), "train",
             np.where((t >= VAL_START)   & (t <= VAL_END),   "val",
             np.where((t >= TEST_START)  & (t <= TEST_END),  "test", None)))

    out["split"] = split
    return out[out["split"].notna()].copy()

# Read hourly system features
df = pd.read_parquet(PATH_X_HOURLY_SYSTEM)
df["ts_hour"] = pd.to_datetime(df["ts_hour"])
df = df.sort_values("ts_hour").reset_index(drop=True)

# Create 1-step-ahead target and prediction timestamp
df["pred_time"] = df["ts_hour"] + pd.Timedelta(hours=1)
df["y"] = df["trips"].shift(-1)

# Drop last row (no t+1 target) and any rows where y is missing
df = df.dropna(subset=["y"]).copy()
df["y"] = df["y"].astype(np.float64)

# Assign split based on pred_time (target time)
df = add_split_label_on_time(df, time_col="pred_time")

# Define feature columns
drop_cols = {"ts_hour", "pred_time", "y", "trips"}
feature_cols = [c for c in df.columns if c not in drop_cols]

# Basic preprocessing: one-hot encode object columns, cast bools to int
X = df[feature_cols].copy()
y = df["y"].copy()
meta = df[["pred_time", "split"]].copy()

obj_cols = X.select_dtypes(include=["object"]).columns.tolist()
if obj_cols:
    X = pd.get_dummies(X, columns=obj_cols, drop_first=False)

bool_cols = X.select_dtypes(include=["bool"]).columns.tolist()
for c in bool_cols:
    X[c] = X[c].astype(np.int8)

# Sanity checks
print("Hourly system shapes:", X.shape, y.shape, meta.shape)
print(meta["split"].value_counts())
print("Pred-time range:", meta["pred_time"].min(), "→", meta["pred_time"].max())

Hourly system shapes: (70127, 84) (70127,) (70127, 2)
split
train    43823
test     17544
val       8760
Name: count, dtype: int64
Pred-time range: 2017-01-01 01:00:00 → 2024-12-31 23:00:00


In [7]:
# Daily system (features at t, trips at t+1 day)

PATH_X_DAILY_SYSTEM = f"{BASE_DIR}/features/system/X_daily_system.parquet"

dfd = pd.read_parquet(PATH_X_DAILY_SYSTEM)
dfd["date"] = pd.to_datetime(dfd["date"])
dfd = dfd.sort_values("date").reset_index(drop=True)

dfd["pred_date"] = dfd["date"] + pd.Timedelta(days=1)
dfd["y"] = dfd["trips"].shift(-1)

dfd = dfd.dropna(subset=["y"]).copy()
dfd["y"] = dfd["y"].astype(np.float64)

# Split by target date (pred_date)
dfd = add_split_label_on_time(dfd, time_col="pred_date")

drop_cols = {"date", "pred_date", "y", "trips"}
feature_cols = [c for c in dfd.columns if c not in drop_cols]

Xd = dfd[feature_cols].copy()
yd = dfd["y"].copy()
meta_d = dfd[["pred_date", "split"]].copy()

obj_cols = Xd.select_dtypes(include=["object"]).columns.tolist()
if obj_cols:
    Xd = pd.get_dummies(Xd, columns=obj_cols, drop_first=False)

bool_cols = Xd.select_dtypes(include=["bool"]).columns.tolist()
for c in bool_cols:
    Xd[c] = Xd[c].astype(np.int8)

print("Daily system shapes:", Xd.shape, yd.shape, meta_d.shape)
print(meta_d["split"].value_counts())
print("Pred-date range:", meta_d["pred_date"].min(), "→", meta_d["pred_date"].max())

Daily system shapes: (2921, 77) (2921,) (2921, 2)
split
train    1825
test      731
val       365
Name: count, dtype: int64
Pred-date range: 2017-01-02 00:00:00 → 2024-12-31 00:00:00


In [8]:
# # Create a supervised station-level dataset (departures at t+1)

# BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
# X_DIR = BASE_DIR / "features" / "station" / "hourly"
# OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_supervised_yearly_polars"
# OUT_DIR.mkdir(parents=True, exist_ok=True)

# years = range(2017, 2025)

# for y in years:
#     src = X_DIR / f"X_hourly_station_{y}.parquet"
#     assert src.exists(), f"Missing: {src}"

#     out_path = OUT_DIR / f"station_hourly_supervised_{y}.parquet"

#     lf = pl.scan_parquet(str(src))
#     lf2 = (
#         lf
#         .sort(["station_id", "ts_hour"])
#         .with_columns([
#             (pl.col("ts_hour") + pl.duration(hours=1)).alias("pred_time"),
#             pl.col("departures").shift(-1).over("station_id").alias("y"),
#         ])
#         .filter(pl.col("y").is_not_null())
#         .with_columns([
#             pl.when(pl.col("pred_time") < pl.datetime(2022, 1, 1, 0, 0, 0))
#               .then(pl.lit("train"))
#               .when(pl.col("pred_time") < pl.datetime(2023, 1, 1, 0, 0, 0))
#               .then(pl.lit("val"))
#               .otherwise(pl.lit("test"))
#               .alias("split")
#         ])
#     )

#     # Stream directly to parquet (no full in-memory collect)
#     lf2.sink_parquet(str(out_path))
#     print(f"Wrote {out_path.name}")

In [9]:
# Verification (counts + min/max pred_time)

BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_supervised_yearly"
pattern = str(OUT_DIR / "station_hourly_supervised_*.parquet")

lf = pl.scan_parquet(pattern)

counts = lf.group_by("split").agg(pl.len().alias("n")).sort("split").collect()
rng = lf.select([
    pl.col("pred_time").min().alias("min_pred"),
    pl.col("pred_time").max().alias("max_pred"),
]).collect()

print(counts)
print(rng)

shape: (3, 2)
┌───────┬──────────┐
│ split ┆ n        │
│ ---   ┆ ---      │
│ str   ┆ u32      │
╞═══════╪══════════╡
│ test  ┆ 41653875 │
│ train ┆ 48151826 │
│ val   ┆ 16046488 │
└───────┴──────────┘
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min_pred            ┆ max_pred            │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═════════════════════╪═════════════════════╡
│ 2017-01-01 01:00:00 ┆ 2024-12-31 23:00:00 │
└─────────────────────┴─────────────────────┘


In [10]:
# Cardinalities on a bounded sample

BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
IN_DIR = BASE_DIR / "modeling" / "station" / "hourly_supervised_yearly"
pattern = str(IN_DIR / "station_hourly_supervised_*.parquet")

lf = pl.scan_parquet(pattern)

schema = lf.collect_schema()
string_cols = [c for c, dt in schema.items() if dt == pl.Utf8]

print("String columns:", string_cols)

SAMPLE_N = 1_000_000
sample_lf = lf.head(SAMPLE_N)

card_df = (
    sample_lf.select([pl.col(c).n_unique().alias(c) for c in string_cols])
    .collect()
)
card = card_df.to_dicts()[0]
card_sorted = sorted(card.items(), key=lambda x: x[1], reverse=True)

card_sorted[:30]

String columns: ['station_id', 'holiday_short', 'holiday_name', 'pandemic_regime', 'station_name', 'borough_name', 'nta_code', 'nta_name', 'tract_geoid', 'tract_name', 'state_fips', 'county_fips', 'tract_code', 'county_name', 'pluto_zipcode', 'pluto_landuse_label', 'aqi_category', 'aqi_defining_parameter', 'split']


[('station_id', 115),
 ('station_name', 115),
 ('tract_geoid', 93),
 ('tract_name', 90),
 ('tract_code', 83),
 ('pluto_zipcode', 30),
 ('nta_code', 27),
 ('nta_name', 27),
 ('holiday_short', 11),
 ('holiday_name', 11),
 ('pluto_landuse_label', 11),
 ('aqi_category', 4),
 ('borough_name', 3),
 ('county_fips', 3),
 ('county_name', 3),
 ('aqi_defining_parameter', 3),
 ('state_fips', 2),
 ('pandemic_regime', 1),
 ('split', 1)]

In [11]:
# # Write ML-ready datasets

# BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
# IN_DIR = BASE_DIR / "modeling" / "station" / "hourly_supervised_yearly_polars"
# OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml"
# OUT_DIR.mkdir(parents=True, exist_ok=True)

# pattern = str(IN_DIR / "station_hourly_supervised_*.parquet")
# lf = pl.scan_parquet(pattern)
# schema = lf.collect_schema()

# META_COLS = {"ts_hour", "pred_time", "split", "station_id"}
# TARGET_COLS = {"y", "departures", "arrivals", "net"}

# DROP_STRING_COLS = {
#     "station_name",
#     "tract_geoid", "tract_name", "tract_code",
#     "nta_code", "nta_name",
#     "pluto_zipcode",
#     "state_fips", "county_fips", "county_name",
# }

# KEEP_STRING_COLS = {
#     "holiday_short",
#     "holiday_name",
#     "pluto_landuse_label",
#     "aqi_category",
#     "aqi_defining_parameter",
#     "borough_name",
#     "pandemic_regime",
# }

# # Build feature column list

# feature_cols = []
# for c, dt in schema.items():
#     if c in META_COLS or c in TARGET_COLS:
#         continue
#     if dt == pl.Utf8 and c not in KEEP_STRING_COLS:
#         continue
#     feature_cols.append(c)

# print(f"Number of ML features: {len(feature_cols)}")

# # Prepare base frame (meta + y + features)

# base_lf = lf.select(
#     ["station_id", "pred_time", "split", "y"] + feature_cols
# )

# # Cast booleans to int8
# bool_cols = [c for c in feature_cols if schema.get(c) == pl.Boolean]
# if bool_cols:
#     base_lf = base_lf.with_columns(
#         [pl.col(c).cast(pl.Int8) for c in bool_cols]
#     )

# # Write split-specific datasets

# for split in ["train", "val", "test"]:
#     out_path = OUT_DIR / f"{split}.parquet"
#     (
#         base_lf
#         .filter(pl.col("split") == split)
#         .drop("split")
#         .sink_parquet(str(out_path))
#     )
#     print(f"Wrote {out_path}")

In [12]:
BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml"

for split in ["train", "val", "test"]:
    lf = pl.scan_parquet(str(OUT_DIR / f"{split}.parquet"))
    print(f"\n{split.upper()}")
    print(lf.select(pl.len()).collect())
    print("Columns:", len(lf.collect_schema()))


TRAIN
shape: (1, 1)
┌──────────┐
│ len      │
│ ---      │
│ u32      │
╞══════════╡
│ 48151826 │
└──────────┘
Columns: 129

VAL
shape: (1, 1)
┌──────────┐
│ len      │
│ ---      │
│ u32      │
╞══════════╡
│ 16046488 │
└──────────┘
Columns: 129

TEST
shape: (1, 1)
┌──────────┐
│ len      │
│ ---      │
│ u32      │
╞══════════╡
│ 41653875 │
└──────────┘
Columns: 129


In [13]:
# # Identify traffic columns and confirm substitutes exist

# BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
# IN_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml"
# OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml_v2_no_traffic"
# OUT_DIR.mkdir(parents=True, exist_ok=True)

# # Scan schema from train split (schemas are identical across splits)
# schema = pl.scan_parquet(str(IN_DIR / "train.parquet")).collect_schema()
# cols = schema.names()

# traffic_cols = [c for c in cols if c.startswith("traffic_volume_")]

# # Functional substitutes
# substitute_candidates = [
#     "bike_count_nearest_counter",
#     # PLUTO / built environment intensity (keep those that exist)
#     "pluto_pct_residential",
#     "pluto_pct_commercial",
#     "pluto_far",
#     "pluto_units_density_per_10k_sqft",
#     "nearest_pluto_distance_m",
#     # Land-use mix proxies (keep those that exist)
#     "landuse_entropy",
#     "residential_share",
#     "commercial_share",
#     "industrial_share",
#     "institutional_share",
#     # ACS contextual proxies (keep those that exist)
#     "acs_population",
#     "acs_pop_density",
#     "acs_median_income",
#     "acs_pct_bike_commute",
#     "acs_pct_transit_commute",
#     "acs_pct_no_vehicle",
#     "acs_pct_bachelors_plus",
# ]

# present_substitutes = [c for c in substitute_candidates if c in cols]

# print("Traffic cols to drop:", traffic_cols)
# print("Present substitutes:", present_substitutes)
# print("N traffic cols:", len(traffic_cols))
# print("N present substitutes:", len(present_substitutes))

In [14]:
# # Write updated split files

# BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
# IN_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml"
# OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml_v2_no_traffic"
# OUT_DIR.mkdir(parents=True, exist_ok=True)

# # Determine columns to drop from schema
# schema = pl.scan_parquet(str(IN_DIR / "train.parquet")).collect_schema()
# cols = schema.names()
# traffic_cols = [c for c in cols if c.startswith("traffic_volume_")]

# assert len(traffic_cols) > 0, "No traffic_volume_* columns found to drop."

# for split in ["train", "val", "test"]:
#     in_path = IN_DIR / f"{split}.parquet"
#     out_path = OUT_DIR / f"{split}.parquet"

#     lf = pl.scan_parquet(str(in_path)).drop(traffic_cols)
#     lf.sink_parquet(str(out_path))
#     print(f"Wrote {out_path}")

In [15]:
# Confirm traffic cols are gone + substitutes remain + missingness improved

BASE_DIR = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
OUT_DIR = BASE_DIR / "modeling" / "station" / "hourly_ml"

# Confirm columns removed
schema2 = pl.scan_parquet(str(OUT_DIR / "train.parquet")).collect_schema()
cols2 = schema2.names()
print("Any traffic_volume_* remaining?", any(c.startswith("traffic_volume_") for c in cols2))

# Re-run missingness on a sample to confirm the "worst offenders" changed
SAMPLE_N = 1_000_000
sample = pl.scan_parquet(str(OUT_DIR / "train.parquet")).head(SAMPLE_N)

null_rates = (
    sample.select([
        (pl.col(c).null_count() / SAMPLE_N).alias(c)
        for c in sample.columns
        if c not in {"station_id", "pred_time", "y"}
    ])
    .collect()
    .to_dicts()[0]
)

worst = sorted(null_rates.items(), key=lambda x: x[1], reverse=True)[:15]
worst

Any traffic_volume_* remaining? False


  for c in sample.columns


[('bike_count_nearest_counter', 0.350524),
 ('pluto_pct_residential', 0.148903),
 ('pluto_pct_commercial', 0.148903),
 ('aqi_county', 0.081977),
 ('aqi_category', 0.081977),
 ('aqi_defining_parameter', 0.081977),
 ('aqi_county_usg_or_worse', 0.081977),
 ('aqi_county_unhealthy_or_worse', 0.081977),
 ('acs_population', 0.061313),
 ('acs_median_income', 0.061313),
 ('acs_pct_bike_commute', 0.061313),
 ('acs_pct_transit_commute', 0.061313),
 ('acs_pct_no_vehicle', 0.061313),
 ('acs_pct_bachelors_plus', 0.061313),
 ('acs_bike_commute_share', 0.061313)]

In [16]:
BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")

XH_PATH = BASE / "features/cluster/X_hourly_cluster.parquet"
XD_PATH = BASE / "features/cluster/X_daily_cluster.parquet"

OUT_H = BASE / "modeling/cluster/hourly_ml"
OUT_D = BASE / "modeling/cluster/daily_ml"
OUT_H.mkdir(parents=True, exist_ok=True)
OUT_D.mkdir(parents=True, exist_ok=True)

# Same split boundaries
TRAIN_END_H = pl.datetime(2021, 12, 31, 23, 0, 0)
VAL_END_H   = pl.datetime(2022, 12, 31, 23, 0, 0)

TRAIN_END_D = pl.datetime(2021, 12, 31, 0, 0, 0)
VAL_END_D   = pl.datetime(2022, 12, 31, 0, 0, 0)

def add_split_hourly(pred_time_expr: pl.Expr) -> pl.Expr:
    return (
        pl.when(pred_time_expr <= TRAIN_END_H).then(pl.lit("train"))
         .when(pred_time_expr <= VAL_END_H).then(pl.lit("val"))
         .otherwise(pl.lit("test"))
         .alias("split")
    )

def add_split_daily(pred_date_expr: pl.Expr) -> pl.Expr:
    return (
        pl.when(pred_date_expr <= TRAIN_END_D).then(pl.lit("train"))
         .when(pred_date_expr <= VAL_END_D).then(pl.lit("val"))
         .otherwise(pl.lit("test"))
         .alias("split")
    )

# # Hourly cluster supervised
# lf_h = pl.scan_parquet(str(XH_PATH))

# # Exclude identifiers/time and leakage columns from ML feature set.
# ID_TIME_COLS_H = {"ts_hour", "cluster_id", "cluster_label", "date", "year", "month", "day", "hour", "dow"}
# DROP_FROM_X_H = {"departures", "arrivals", "net"}  # contemporaneous values at ts_hour (avoid trivial leakage)

# schema_h = lf_h.collect_schema()
# all_cols_h = schema_h.names()

# ml_features_h = [
#     c for c in all_cols_h
#     if (c not in ID_TIME_COLS_H) and (c not in DROP_FROM_X_H)
# ]

# hourly_supervised = (
#     lf_h
#     .sort(["cluster_id", "ts_hour"])
#     .with_columns([
#         # 1-step-ahead target within cluster
#         pl.col("departures").shift(-1).over("cluster_id").alias("y"),
#         (pl.col("ts_hour") + pl.duration(hours=1)).alias("pred_time"),
#     ])
#     .with_columns([
#         add_split_hourly(pl.col("pred_time")),
#     ])
#     .filter(pl.col("y").is_not_null())
#     .select(
#         ["pred_time", "cluster_id", "cluster_label", "y", "split"] + ml_features_h
#     )
# )

# # Write split files
# for s in ["train", "val", "test"]:
#     out_path = OUT_H / f"{s}.parquet"
#     hourly_supervised.filter(pl.col("split") == s).sink_parquet(str(out_path))
#     print("Wrote", out_path)

# # Daily cluster supervised
# lf_d = pl.scan_parquet(str(XD_PATH))

# ID_TIME_COLS_D = {"date", "cluster_id", "cluster_label", "year", "month", "day", "dow"}
# DROP_FROM_X_D = {"departures"}

# schema_d = lf_d.collect_schema()
# all_cols_d = schema_d.names()

# ml_features_d = [
#     c for c in all_cols_d
#     if (c not in ID_TIME_COLS_D) and (c not in DROP_FROM_X_D)
# ]

# daily_supervised = (
#     lf_d
#     .sort(["cluster_id", "date"])
#     .with_columns([
#         pl.col("departures").shift(-1).over("cluster_id").alias("y"),
#         (pl.col("date") + pl.duration(days=1)).alias("pred_date"),
#     ])
#     .with_columns([
#         add_split_daily(pl.col("pred_date")),
#     ])
#     .filter(pl.col("y").is_not_null())
#     .select(
#         ["pred_date", "cluster_id", "cluster_label", "y", "split"] + ml_features_d
#     )
# )

# for s in ["train", "val", "test"]:
#     out_path = OUT_D / f"{s}.parquet"
#     daily_supervised.filter(pl.col("split") == s).sink_parquet(str(out_path))
#     print("Wrote", out_path)

# Quick sanity checks
train_h = pl.scan_parquet(str(OUT_H / "train.parquet"))
val_h   = pl.scan_parquet(str(OUT_H / "val.parquet"))
test_h  = pl.scan_parquet(str(OUT_H / "test.parquet"))

print("\nHourly cluster rows by split:")
print(pl.concat([
    train_h.select(pl.lit("train").alias("split"), pl.len().alias("n")),
    val_h.select(pl.lit("val").alias("split"), pl.len().alias("n")),
    test_h.select(pl.lit("test").alias("split"), pl.len().alias("n")),
]).collect())

print("\nHourly pred_time range:")
print(pl.scan_parquet(str(OUT_H / "train.parquet")).select(
    pl.min("pred_time").alias("min_pred"),
    pl.max("pred_time").alias("max_pred"),
).collect())

train_d = pl.scan_parquet(str(OUT_D / "train.parquet"))
val_d   = pl.scan_parquet(str(OUT_D / "val.parquet"))
test_d  = pl.scan_parquet(str(OUT_D / "test.parquet"))

print("\nDaily cluster rows by split:")
print(pl.concat([
    train_d.select(pl.lit("train").alias("split"), pl.len().alias("n")),
    val_d.select(pl.lit("val").alias("split"), pl.len().alias("n")),
    test_d.select(pl.lit("test").alias("split"), pl.len().alias("n")),
]).collect())

print("\nDaily pred_date range:")
print(pl.scan_parquet(str(OUT_D / "train.parquet")).select(
    pl.min("pred_date").alias("min_pred"),
    pl.max("pred_date").alias("max_pred"),
).collect())


Hourly cluster rows by split:
shape: (3, 2)
┌───────┬────────┐
│ split ┆ n      │
│ ---   ┆ ---    │
│ str   ┆ u32    │
╞═══════╪════════╡
│ train ┆ 108369 │
│ val   ┆ 26277  │
│ test  ┆ 52626  │
└───────┴────────┘

Hourly pred_time range:
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min_pred            ┆ max_pred            │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═════════════════════╪═════════════════════╡
│ 2017-01-01 01:00:00 ┆ 2021-12-31 23:00:00 │
└─────────────────────┴─────────────────────┘

Daily cluster rows by split:
shape: (3, 2)
┌───────┬──────┐
│ split ┆ n    │
│ ---   ┆ ---  │
│ str   ┆ u32  │
╞═══════╪══════╡
│ train ┆ 4545 │
│ val   ┆ 1095 │
│ test  ┆ 2193 │
└───────┴──────┘

Daily pred_date range:
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min_pred            ┆ max_pred            │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═══

In [17]:
# BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")

# XH = BASE / "features/system/X_hourly_system.parquet"
# OUT_H = BASE / "modeling/system/hourly_ml"
# OUT_H.mkdir(parents=True, exist_ok=True)

# lf = pl.scan_parquet(str(XH)).sort("ts_hour")

# system_hourly = (
#     lf
#     .with_columns([
#         (pl.col("ts_hour") + pl.duration(hours=1)).alias("pred_time"),
#         pl.col("trips").shift(-1).alias("y"),
#     ])
#     .filter(pl.col("y").is_not_null())
#     .with_columns([
#         pl.when(pl.col("pred_time") <= pl.datetime(2021, 12, 31, 23, 0, 0)).then(pl.lit("train"))
#           .when(pl.col("pred_time") <= pl.datetime(2022, 12, 31, 23, 0, 0)).then(pl.lit("val"))
#           .otherwise(pl.lit("test"))
#           .alias("split")
#     ])
# )

# # Drop contemporaneous outcome to avoid leakage
# drop_cols = ["trips", "ts_hour"]

# for s in ["train", "val", "test"]:
#     out_path = OUT_H / f"{s}.parquet"
#     (
#         system_hourly
#         .filter(pl.col("split") == s)
#         .drop(["split"] + drop_cols)
#         .sink_parquet(str(out_path))
#     )
#     print("Wrote", out_path)

In [18]:
# XD = BASE / "features/system/X_daily_system.parquet"
# OUT_D = BASE / "modeling/system/daily_ml"
# OUT_D.mkdir(parents=True, exist_ok=True)

# lfd = pl.scan_parquet(str(XD)).sort("date")

# system_daily = (
#     lfd
#     .with_columns([
#         (pl.col("date") + pl.duration(days=1)).alias("pred_date"),
#         pl.col("trips").shift(-1).alias("y"),
#     ])
#     .filter(pl.col("y").is_not_null())
#     .with_columns([
#         pl.when(pl.col("pred_date") <= pl.datetime(2021, 12, 31)).then(pl.lit("train"))
#           .when(pl.col("pred_date") <= pl.datetime(2022, 12, 31)).then(pl.lit("val"))
#           .otherwise(pl.lit("test"))
#           .alias("split")
#     ])
# )

# drop_cols = ["trips", "date"]

# for s in ["train", "val", "test"]:
#     out_path = OUT_D / f"{s}.parquet"
#     (
#         system_daily
#         .filter(pl.col("split") == s)
#         .drop(["split"] + drop_cols)
#         .sink_parquet(str(out_path))
#     )
#     print("Wrote", out_path)

In [19]:
# BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")

# DATASETS = [
#     BASE / "modeling/system/hourly_ml",
#     BASE / "modeling/system/daily_ml",
#     BASE / "modeling/cluster/hourly_ml",
#     BASE / "modeling/cluster/daily_ml",
#     BASE / "modeling/station/hourly_ml",
# ]

# HOLIDAY_COLS = ["holiday_short", "holiday_name", "holiday_id"]

# for ds_dir in DATASETS:
#     for split in ["train", "val", "test"]:
#         path = ds_dir / f"{split}.parquet"
#         lf = pl.scan_parquet(str(path))
#         schema = lf.collect_schema()
#         cols = schema.names()

#         present = [c for c in HOLIDAY_COLS if c in cols]
#         if not present:
#             continue

#         # is_holiday = 1 if any holiday field is non-null
#         is_holiday = (
#             pl.any_horizontal([pl.col(c).is_not_null() for c in present])
#             .cast(pl.Int8)
#             .alias("is_holiday")
#         )

#         out = (
#             lf.with_columns([is_holiday])
#               .drop(present)
#         )

#         tmp = ds_dir / f"_{split}.tmp.parquet"
#         out.sink_parquet(str(tmp))
#         tmp.replace(path)

#     print("Updated:", ds_dir)

## Random Forest and XGBoost

At the system, station, and cluster levels, tree-based models were implemented as follows:

* Tabular feature matrices were constructed with lagged demand variables:

  * Lagged target values (e.g., `y_{t−1}`, `y_{t−2}`, `y_{t−24}`) were included as features to capture autoregressive effects.

* Hyperparameter tuning was conducted using validation data:

  * For Random Forest models, the number of trees, maximum tree depth, and minimum samples per split and leaf were tuned.
  * For XGBoost models, the learning rate, maximum depth, subsampling rate, and number of estimators were tuned.

Validation sets were used to select hyperparameters, after which models were retrained on the combined training and validation data for final evaluation on the test set.

Model performance was assessed using RMSE, MAE, and MAPE for one-step-ahead forecasts, with multi-step forecasts evaluated where applicable using recursive prediction.

### Random Forest baseline (system-level hourly)

In [20]:
# Load and split data

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
SYS_H = BASE / "modeling/system/hourly_ml"

def load_split(split):
    return pd.read_parquet(SYS_H / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

TARGET = "y"
DROP_COLS = ["pred_time"]

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

print(X_train.shape, X_val.shape, X_test.shape)

(43823, 54) (8760, 54) (17544, 54)


In [21]:
cat_cols = ["day_type", "pandemic_regime"]

def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

obj_cols = [c for c in X_train.columns if X_train[c].dtype == "object" and c not in cat_cols]
if obj_cols:
    print("Dropping unexpected object columns:", obj_cols)
    X_train = X_train.drop(columns=obj_cols)
    X_val   = X_val.drop(columns=obj_cols)
    X_test  = X_test.drop(columns=obj_cols)

# Recompute numeric columns after dropping
num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Categoricals:", cat_cols)
print("Num cols:", len(num_cols))

Dropping datetime columns: ['date']
Dropping datetime columns: ['date']
Dropping datetime columns: ['date']
Final X shapes: (43823, 53) (8760, 53) (17544, 53)
Categoricals: ['day_type', 'pandemic_regime']
Num cols: 51


In [22]:
# One-hot encode categoricals

cat_cols = ["day_type", "pandemic_regime"]
num_cols = [c for c in X_train.columns if c not in cat_cols]

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

In [23]:
# Hyperparameter tuning

param_grid = [
    {"rf__n_estimators": 300, "rf__max_depth": 15, "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 300, "rf__max_depth": 25, "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 600, "rf__max_depth": 15, "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 600, "rf__max_depth": 25, "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 600, "rf__max_depth": None, "rf__min_samples_leaf": 10},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("rf", RandomForestRegressor(random_state=42, n_jobs=-1)),
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5

    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params, best_rmse = min(results, key=lambda x: x[1])
print("\nBest params:", best_params)
print("Best val RMSE:", round(best_rmse, 2))

{'rf__n_estimators': 300, 'rf__max_depth': 15, 'rf__min_samples_leaf': 5} → val RMSE: 633.73
{'rf__n_estimators': 300, 'rf__max_depth': 25, 'rf__min_samples_leaf': 5} → val RMSE: 630.86
{'rf__n_estimators': 600, 'rf__max_depth': 15, 'rf__min_samples_leaf': 5} → val RMSE: 634.16
{'rf__n_estimators': 600, 'rf__max_depth': 25, 'rf__min_samples_leaf': 5} → val RMSE: 631.26
{'rf__n_estimators': 600, 'rf__max_depth': None, 'rf__min_samples_leaf': 10} → val RMSE: 651.49

Best params: {'rf__n_estimators': 300, 'rf__max_depth': 25, 'rf__min_samples_leaf': 5}
Best val RMSE: 630.86


In [24]:
# Retrain on train + val, evaluate on test

# Combine train + val
X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

# Build pipeline
final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("rf", RandomForestRegressor(random_state=42, n_jobs=-1)),
    ]
)

# Apply tuned params (these include rf__ prefixes)
final_model.set_params(**best_params)

# Fit and predict
final_model.fit(X_trainval, y_trainval)
test_pred = final_model.predict(X_test)

# Metrics (RMSE without squared=)
rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

# Safe MAPE (handles zeros)
eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) / np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (system hourly RF)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (system hourly RF)
RMSE: 1181.33
MAE : 688.08
MAPE: 0.2346
R²  : 0.8997


### XGBoost baseline (system-level hourly)

In [25]:
# Load data and build X/y

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
SYS_H = BASE / "modeling/system/hourly_ml"

def load_split(split):
    return pd.read_parquet(SYS_H / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

TARGET = "y"
DROP_COLS = ["pred_time"]  # not a feature

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

# Drop datetime columns if present (version-proof)
def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

# Known categoricals
cat_cols = ["day_type", "pandemic_regime"]

# Safety: drop any unexpected object columns besides categoricals
obj_cols = [c for c in X_train.columns if X_train[c].dtype == "object" and c not in cat_cols]
if obj_cols:
    print("Dropping unexpected object columns:", obj_cols)
    X_train = X_train.drop(columns=obj_cols)
    X_val   = X_val.drop(columns=obj_cols)
    X_test  = X_test.drop(columns=obj_cols)

num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Num cols:", len(num_cols), "| Cat cols:", cat_cols)

Dropping datetime columns: ['date']
Dropping datetime columns: ['date']
Dropping datetime columns: ['date']
Final X shapes: (43823, 53) (8760, 53) (17544, 53)
Num cols: 51 | Cat cols: ['day_type', 'pandemic_regime']


In [26]:
# Preprocess one-hot categoricals

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

In [27]:
# Tune XGBoost on validation RMSE

param_grid = [
    {"xgb__n_estimators": 500,  "xgb__learning_rate": 0.05, "xgb__max_depth": 6,  "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 800,  "xgb__learning_rate": 0.05, "xgb__max_depth": 6,  "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 800,  "xgb__learning_rate": 0.05, "xgb__max_depth": 8,  "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 6,  "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 8,  "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
]

# ---- fail fast if the wrong grid is being used (prevents "nonsense tuning") ----
bad = [p for p in param_grid if not any(k.startswith("xgb__") for k in p)]
if bad:
    raise ValueError(
        "param_grid contains non-XGBoost parameter dict(s). "
        f"Example keys: {list(bad[0].keys())}. "
        "Expected keys like 'xgb__n_estimators', 'xgb__max_depth', etc."
    )

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("xgb", XGBRegressor(
                random_state=42,
                n_jobs=1,
                objective="reg:squarederror",
                tree_method="hist",
                reg_lambda=1.0,
                min_child_weight=1,
            ))
        ]
    )

    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

    # cleanup helps avoid peak-memory creep in notebooks
    del model, val_pred
    gc.collect()

best_params, best_rmse = min(results, key=lambda x: x[1])
print("\nBest params:", best_params)
print("Best val RMSE:", round(best_rmse, 2))

{'xgb__n_estimators': 500, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 6, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 632.97
{'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 6, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 636.26
{'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 8, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 618.69
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 6, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 632.3
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 8, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 619.06

Best params: {'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 8, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8}
Best val RMSE: 618.69


In [28]:
# Retrain on train+val and evaluate on test

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("xgb", XGBRegressor(
            random_state=42,
            n_jobs=1,
            objective="reg:squarederror",
            tree_method="hist",
            reg_lambda=1.0,
            min_child_weight=1,
        ))
    ]
)
final_model.set_params(**best_params)

final_model.fit(X_trainval, y_trainval)
test_pred = final_model.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

# Safe MAPE (handles zeros)
eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) / np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (system hourly XGBoost)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (system hourly XGBoost)
RMSE: 1398.48
MAE : 815.34
MAPE: 0.2512
R²  : 0.8594


### Random Forest (system-level daily)

In [29]:
# Load system-daily data and build X / y

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
SYS_D = BASE / "modeling/system/daily_ml"

def load_split(split):
    return pd.read_parquet(SYS_D / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

TARGET = "y"
DROP_COLS = ["pred_date"]

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

In [30]:
# Drop datetime columns + identify categoricals

def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

cat_cols = ["day_type", "pandemic_regime"]

# Drop unexpected object columns
obj_cols = [c for c in X_train.columns if X_train[c].dtype == "object" and c not in cat_cols]
if obj_cols:
    print("Dropping unexpected object columns:", obj_cols)
    X_train = X_train.drop(columns=obj_cols)
    X_val   = X_val.drop(columns=obj_cols)
    X_test  = X_test.drop(columns=obj_cols)

num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Num cols:", len(num_cols), "| Cat cols:", cat_cols)

Final X shapes: (1825, 47) (365, 47) (731, 47)
Num cols: 45 | Cat cols: ['day_type', 'pandemic_regime']


In [31]:
# Preprocessing

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

In [32]:
# RF validation-based hyperparameter tuning

param_grid = [
    {"rf__n_estimators": 200, "rf__max_depth": 10,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 300, "rf__max_depth": 15,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 300, "rf__max_depth": 20,  "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 500, "rf__max_depth": 20,  "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 500, "rf__max_depth": None,"rf__min_samples_leaf": 10},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("rf", RandomForestRegressor(
                random_state=42,
                n_jobs=-1
            )),
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params, best_rmse = min(results, key=lambda x: x[1])
print("\nBest params:", best_params)
print("Best val RMSE:", round(best_rmse, 2))

{'rf__n_estimators': 200, 'rf__max_depth': 10, 'rf__min_samples_leaf': 3} → val RMSE: 17933.28
{'rf__n_estimators': 300, 'rf__max_depth': 15, 'rf__min_samples_leaf': 3} → val RMSE: 17744.04
{'rf__n_estimators': 300, 'rf__max_depth': 20, 'rf__min_samples_leaf': 5} → val RMSE: 17584.48
{'rf__n_estimators': 500, 'rf__max_depth': 20, 'rf__min_samples_leaf': 5} → val RMSE: 17609.86
{'rf__n_estimators': 500, 'rf__max_depth': None, 'rf__min_samples_leaf': 10} → val RMSE: 17379.54

Best params: {'rf__n_estimators': 500, 'rf__max_depth': None, 'rf__min_samples_leaf': 10}
Best val RMSE: 17379.54


In [33]:
# Retrain on train+val and evaluate on test

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("rf", RandomForestRegressor(
            random_state=42,
            n_jobs=-1
        )),
    ]
)
final_model.set_params(**best_params)

final_model.fit(X_trainval, y_trainval)
test_pred = final_model.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

# Safe MAPE
eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) /
               np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (system daily RF)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (system daily RF)
RMSE: 31043.79
MAE : 24631.25
MAPE: 0.2556
R²  : 0.4215


### XGBoost (system-level daily)

In [34]:
# Load + build X/y (daily)

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
SYS_D = BASE / "modeling/system/daily_ml"

def load_split(split):
    return pd.read_parquet(SYS_D / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

TARGET = "y"
DROP_COLS = ["pred_date"]

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

# Drop datetime columns if any
def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

cat_cols = ["day_type", "pandemic_regime"]

# Drop unexpected object columns besides categoricals
obj_cols = [c for c in X_train.columns if X_train[c].dtype == "object" and c not in cat_cols]
if obj_cols:
    print("Dropping unexpected object columns:", obj_cols)
    X_train = X_train.drop(columns=obj_cols)
    X_val   = X_val.drop(columns=obj_cols)
    X_test  = X_test.drop(columns=obj_cols)

num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Num cols:", len(num_cols), "| Cat cols:", cat_cols)

Final X shapes: (1825, 47) (365, 47) (731, 47)
Num cols: 45 | Cat cols: ['day_type', 'pandemic_regime']


In [35]:
# Preprocess one-hot categoricals

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

In [36]:
# Tune XGBoost on validation RMSE daily

param_grid = [
    {"xgb__n_estimators": 600,  "xgb__learning_rate": 0.05, "xgb__max_depth": 3, "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 900,  "xgb__learning_rate": 0.05, "xgb__max_depth": 3, "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 900,  "xgb__learning_rate": 0.05, "xgb__max_depth": 4, "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 3, "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 4, "xgb__subsample": 0.8, "xgb__colsample_bytree": 0.8},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("xgb", XGBRegressor(
                random_state=42,
                n_jobs=1,
                objective="reg:squarederror",
                tree_method="hist",
                reg_lambda=1.0,
                min_child_weight=1,
            ))
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params, best_rmse = min(results, key=lambda x: x[1])
print("\nBest params:", best_params)
print("Best val RMSE:", round(best_rmse, 2))

{'xgb__n_estimators': 600, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 3, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 17076.47
{'xgb__n_estimators': 900, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 3, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 17198.81
{'xgb__n_estimators': 900, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 4, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 17544.31
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 3, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 16999.79
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 4, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8} → val RMSE: 16913.64

Best params: {'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 4, 'xgb__subsample': 0.8, 'xgb__colsample_bytree': 0.8}
Best val RMSE: 16913.64


In [37]:
# Retrain on train+val and evaluate on test

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("xgb", XGBRegressor(
            random_state=42,
            n_jobs=1,
            objective="reg:squarederror",
            tree_method="hist",
            reg_lambda=1.0,
            min_child_weight=1,
        ))
    ]
)
final_model.set_params(**best_params)

final_model.fit(X_trainval, y_trainval)
test_pred = final_model.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) /
               np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (system daily XGBoost)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (system daily XGBoost)
RMSE: 36617.58
MAE : 29242.08
MAPE: 0.2781
R²  : 0.1951


### Random Forest (cluster-level hourly)

In [38]:
BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
CL_H = BASE / "modeling/cluster/hourly_ml"

def load_split(split: str) -> pd.DataFrame:
    return pd.read_parquet(CL_H / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

TARGET = "y"
DROP_COLS = ["pred_time"]

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

print("Initial X shapes:", X_train.shape, X_val.shape, X_test.shape)

Initial X shapes: (108369, 105) (26277, 105) (52626, 105)


In [39]:
def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

# Drop leakage column if present
for df in (X_train, X_val, X_test):
    if "split" in df.columns:
        df.drop(columns=["split"], inplace=True)

# Auto-detect categoricals (object/string), keep only low-cardinality ones
cat_cols = list(X_train.select_dtypes(include=["object"]).columns)

LOW_CARD_MAX = 50
kept_cat_cols = []
dropped_high_card = []
for c in cat_cols:
    n_unique = X_train[c].nunique(dropna=True)
    if n_unique <= LOW_CARD_MAX:
        kept_cat_cols.append(c)
    else:
        dropped_high_card.append((c, n_unique))

if dropped_high_card:
    print("Dropping high-cardinality categoricals:", dropped_high_card)

cat_cols = kept_cat_cols
num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Categoricals (final):", cat_cols)
print("Num cols:", len(num_cols))

Final X shapes: (108369, 104) (26277, 104) (52626, 104)
Categoricals (final): ['cluster_label', 'pandemic_regime']
Num cols: 102


In [40]:
# One-hot categoricals

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

In [41]:
# RF tuning on validation RMSE

param_grid = [
    {"rf__n_estimators": 300, "rf__max_depth": 15,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 300, "rf__max_depth": 25,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 600, "rf__max_depth": 15,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 600, "rf__max_depth": 25,  "rf__min_samples_leaf": 3},
    {"rf__n_estimators": 600, "rf__max_depth": None,"rf__min_samples_leaf": 5},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("rf", RandomForestRegressor(random_state=42, n_jobs=-1)),
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params, best_rmse = min(results, key=lambda x: x[1])
print("\nBest params:", best_params)
print("Best val RMSE:", round(best_rmse, 2))

{'rf__n_estimators': 300, 'rf__max_depth': 15, 'rf__min_samples_leaf': 3} → val RMSE: 279.16
{'rf__n_estimators': 300, 'rf__max_depth': 25, 'rf__min_samples_leaf': 3} → val RMSE: 276.37
{'rf__n_estimators': 600, 'rf__max_depth': 15, 'rf__min_samples_leaf': 3} → val RMSE: 278.98
{'rf__n_estimators': 600, 'rf__max_depth': 25, 'rf__min_samples_leaf': 3} → val RMSE: 276.11
{'rf__n_estimators': 600, 'rf__max_depth': None, 'rf__min_samples_leaf': 5} → val RMSE: 279.67

Best params: {'rf__n_estimators': 600, 'rf__max_depth': 25, 'rf__min_samples_leaf': 3}
Best val RMSE: 276.11


In [42]:
# Retrain on train+val and evaluate on test

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("rf", RandomForestRegressor(random_state=42, n_jobs=-1)),
    ]
)
final_model.set_params(**best_params)

final_model.fit(X_trainval, y_trainval)
test_pred = final_model.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) /
               np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (cluster hourly RF — corrected, no leakage)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (cluster hourly RF — corrected, no leakage)
RMSE: 409.47
MAE : 205.35
MAPE: 0.1978
R²  : 0.9516


In [43]:
assert "split" not in X_train.columns and "split" not in X_val.columns and "split" not in X_test.columns
print("✔ 'split' column is not present in any X split")

✔ 'split' column is not present in any X split


### XGBoost (cluster-level hourly)

In [44]:
# XGBoost tuning

param_grid = [
    {"xgb__n_estimators": 600,  "xgb__learning_rate": 0.05, "xgb__max_depth": 5},
    {"xgb__n_estimators": 800,  "xgb__learning_rate": 0.05, "xgb__max_depth": 5},
    {"xgb__n_estimators": 800,  "xgb__learning_rate": 0.05, "xgb__max_depth": 7},
    {"xgb__n_estimators": 1000, "xgb__learning_rate": 0.03, "xgb__max_depth": 5},
    {"xgb__n_estimators": 1000, "xgb__learning_rate": 0.03, "xgb__max_depth": 7},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("xgb", XGBRegressor(
                random_state=42,
                n_jobs=1,
                objective="reg:squarederror",
                tree_method="hist",
                subsample=0.8,
                colsample_bytree=0.8,
                reg_lambda=1.0,
            ))
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params_xgb, best_rmse_xgb = min(results, key=lambda x: x[1])
print("\nBest params:", best_params_xgb)
print("Best val RMSE:", round(best_rmse_xgb, 2))

{'xgb__n_estimators': 600, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 5} → val RMSE: 280.87
{'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 5} → val RMSE: 279.0
{'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 7} → val RMSE: 273.41
{'xgb__n_estimators': 1000, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 5} → val RMSE: 278.46
{'xgb__n_estimators': 1000, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 7} → val RMSE: 274.03

Best params: {'xgb__n_estimators': 800, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 7}
Best val RMSE: 273.41


In [45]:
# Retrain on train+val and evaluate on test

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model_xgb = Pipeline(
    steps=[
        ("prep", preprocess),
        ("xgb", XGBRegressor(
            random_state=42,
            n_jobs=1,
            objective="reg:squarederror",
            tree_method="hist",
            subsample=0.8,
            colsample_bytree=0.8,
            reg_lambda=1.0,
        ))
    ]
)
final_model_xgb.set_params(**best_params_xgb)

final_model_xgb.fit(X_trainval, y_trainval)
test_pred = final_model_xgb.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

eps = 1.0
mape = np.mean(np.abs(y_test.to_numpy() - test_pred) /
               np.maximum(np.abs(y_test.to_numpy()), eps))

print("\nTEST METRICS (cluster hourly XGBoost)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (cluster hourly XGBoost)
RMSE: 495.41
MAE : 237.72
MAPE: 0.2012
R²  : 0.9291


### Random Forest (cluster-level daily)

In [46]:
# Load cluster daily data and build X/y

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
CL_D = BASE / "modeling/cluster/daily_ml"

def load_split(split: str) -> pd.DataFrame:
    return pd.read_parquet(CL_D / f"{split}.parquet")

train_df = load_split("train")
val_df   = load_split("val")
test_df  = load_split("test")

print("Loaded cluster DAILY shapes:", train_df.shape, val_df.shape, test_df.shape)

TARGET = "y"
DROP_COLS = ["pred_date"]  # daily key

X_train = train_df.drop(columns=[TARGET] + DROP_COLS)
y_train = train_df[TARGET]

X_val = val_df.drop(columns=[TARGET] + DROP_COLS)
y_val = val_df[TARGET]

X_test = test_df.drop(columns=[TARGET] + DROP_COLS)
y_test = test_df[TARGET]

print("Initial X shapes:", X_train.shape, X_val.shape, X_test.shape)

Loaded cluster DAILY shapes: (4545, 103) (1095, 103) (2193, 103)
Initial X shapes: (4545, 101) (1095, 101) (2193, 101)


In [47]:
# Drop datetime columns + remove leakage + set cat/num cols

def drop_datetime_cols(X: pd.DataFrame) -> pd.DataFrame:
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        print("Dropping datetime columns:", dt_cols)
        return X.drop(columns=dt_cols)
    return X

X_train = drop_datetime_cols(X_train)
X_val   = drop_datetime_cols(X_val)
X_test  = drop_datetime_cols(X_test)

# Drop leakage column if present (this caused your error: 'train'/'val'/'test' strings)
for df in (X_train, X_val, X_test):
    if "split" in df.columns:
        df.drop(columns=["split"], inplace=True)

# Cluster daily categoricals should match hourly cluster setup (minus split)
cat_cols = [c for c in X_train.select_dtypes(include=["object"]).columns]

LOW_CARD_MAX = 50
cat_cols = [c for c in cat_cols if X_train[c].nunique(dropna=True) <= LOW_CARD_MAX]

num_cols = [c for c in X_train.columns if c not in cat_cols]

print("Final X shapes:", X_train.shape, X_val.shape, X_test.shape)
print("Categoricals (final):", cat_cols)
print("Num cols:", len(num_cols))

Final X shapes: (4545, 100) (1095, 100) (2193, 100)
Categoricals (final): ['cluster_label', 'pandemic_regime']
Num cols: 98


In [48]:
# Preprocess + RF tuning (validation RMSE)

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", "passthrough", num_cols),
    ]
)

param_grid = [
    {"rf__n_estimators": 300, "rf__max_depth": 10,  "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 500, "rf__max_depth": 15,  "rf__min_samples_leaf": 5},
    {"rf__n_estimators": 500, "rf__max_depth": 20,  "rf__min_samples_leaf": 10},
    {"rf__n_estimators": 800, "rf__max_depth": None,"rf__min_samples_leaf": 10},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("rf", RandomForestRegressor(random_state=42, n_jobs=-1)),
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params_rf, best_rmse_rf = min(results, key=lambda x: x[1])
print("\nBest params:", best_params_rf)
print("Best val RMSE:", round(best_rmse_rf, 2))

{'rf__n_estimators': 300, 'rf__max_depth': 10, 'rf__min_samples_leaf': 5} → val RMSE: 7497.6
{'rf__n_estimators': 500, 'rf__max_depth': 15, 'rf__min_samples_leaf': 5} → val RMSE: 7487.59
{'rf__n_estimators': 500, 'rf__max_depth': 20, 'rf__min_samples_leaf': 10} → val RMSE: 7464.15
{'rf__n_estimators': 800, 'rf__max_depth': None, 'rf__min_samples_leaf': 10} → val RMSE: 7449.23

Best params: {'rf__n_estimators': 800, 'rf__max_depth': None, 'rf__min_samples_leaf': 10}
Best val RMSE: 7449.23


In [49]:
# Final test evaluation

# Combine train and validation
X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model = Pipeline(
    steps=[
        ("prep", preprocess),
        ("rf", RandomForestRegressor(
            random_state=42,
            n_jobs=-1
        )),
    ]
)
final_model.set_params(**best_params_rf)

final_model.fit(X_trainval, y_trainval)

# Test prediction
test_pred = final_model.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

# Safe MAPE
eps = 1.0
mape = np.mean(
    np.abs(y_test.to_numpy() - test_pred) /
    np.maximum(np.abs(y_test.to_numpy()), eps)
)

print("\nTEST METRICS (cluster daily RF)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (cluster daily RF)
RMSE: 11197.57
MAE : 6870.58
MAPE: 0.2273
R²  : 0.8615


### XGBoost (cluster-level daily)

In [50]:
# XGBoost validation tuning

param_grid = [
    {"xgb__n_estimators": 600,  "xgb__learning_rate": 0.05, "xgb__max_depth": 3},
    {"xgb__n_estimators": 900,  "xgb__learning_rate": 0.05, "xgb__max_depth": 3},
    {"xgb__n_estimators": 900,  "xgb__learning_rate": 0.05, "xgb__max_depth": 4},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 3},
    {"xgb__n_estimators": 1200, "xgb__learning_rate": 0.03, "xgb__max_depth": 4},
]

results = []

for params in param_grid:
    model = Pipeline(
        steps=[
            ("prep", preprocess),
            ("xgb", XGBRegressor(
                random_state=42,
                n_jobs=1,
                objective="reg:squarederror",
                tree_method="hist",
                subsample=0.8,
                colsample_bytree=0.8,
                reg_lambda=1.0,
            ))
        ]
    )
    model.set_params(**params)

    model.fit(X_train, y_train)
    val_pred = model.predict(X_val)

    rmse = mean_squared_error(y_val, val_pred) ** 0.5
    results.append((params, rmse))
    print(params, "→ val RMSE:", round(rmse, 2))

best_params_xgb, best_rmse_xgb = min(results, key=lambda x: x[1])
print("\nBest params:", best_params_xgb)
print("Best val RMSE:", round(best_rmse_xgb, 2))

{'xgb__n_estimators': 600, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 3} → val RMSE: 7282.3
{'xgb__n_estimators': 900, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 3} → val RMSE: 7318.45
{'xgb__n_estimators': 900, 'xgb__learning_rate': 0.05, 'xgb__max_depth': 4} → val RMSE: 7245.45
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 3} → val RMSE: 7279.9
{'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 4} → val RMSE: 7216.31

Best params: {'xgb__n_estimators': 1200, 'xgb__learning_rate': 0.03, 'xgb__max_depth': 4}
Best val RMSE: 7216.31


In [51]:
# Final train+val -> test evaluation

X_trainval = pd.concat([X_train, X_val], axis=0)
y_trainval = pd.concat([y_train, y_val], axis=0)

final_model_xgb = Pipeline(
    steps=[
        ("prep", preprocess),
        ("xgb", XGBRegressor(
            random_state=42,
            n_jobs=1,
            objective="reg:squarederror",
            tree_method="hist",
            subsample=0.8,
            colsample_bytree=0.8,
            reg_lambda=1.0,
        ))
    ]
)
final_model_xgb.set_params(**best_params_xgb)

final_model_xgb.fit(X_trainval, y_trainval)
test_pred = final_model_xgb.predict(X_test)

rmse = mean_squared_error(y_test, test_pred) ** 0.5
mae  = mean_absolute_error(y_test, test_pred)
r2   = r2_score(y_test, test_pred)

eps = 1.0
mape = np.mean(
    np.abs(y_test.to_numpy() - test_pred) /
    np.maximum(np.abs(y_test.to_numpy()), eps)
)

print("\nTEST METRICS (cluster daily XGBoost)")
print(f"RMSE: {rmse:.2f}")
print(f"MAE : {mae:.2f}")
print(f"MAPE: {mape:.4f}")
print(f"R²  : {r2:.4f}")


TEST METRICS (cluster daily XGBoost)
RMSE: 13726.23
MAE : 7867.53
MAPE: 0.2264
R²  : 0.7920


## CNN–LSTM models

For station-level short-term forecasting, convolutional–recurrent models were implemented as follows:

* Input construction:

  * For each station, input sequences of length `T` (e.g., the previous 24 or 48 hours) were constructed using only information available up to time *t*.

    * Included past demand values (`y_{t−1}, y_{t−2}, …`).
    * Included historical weather variables.
    * Included calendar variables such as hour of day, day type, and holiday indicators.
  * Future-aware rollups (e.g., centered or forward-looking rolling features) were avoided, and all rolling features were strictly backward-looking.
  * Static station features were incorporated:

    * High-cardinality identity features (e.g., `station_id`) were encoded using embeddings.
    * Low-cardinality categorical features (e.g., cluster, borough) were encoded using embeddings or one-hot encoding.
    * Built-environment and demographic variables were included as numeric features and concatenated with the learned sequence representation.

* Architecture:

  * One-dimensional convolutional layers were applied over the temporal dimension to extract short-term patterns such as intra-day dynamics.
  * LSTM layers were stacked after the convolutional block to capture longer-term temporal dependencies.
  * A dense output layer was used to predict `y_{t+1}`, with extensions to multi-horizon forecasting implemented where applicable using recursive or direct strategies.

* Training strategy:

  * A global CNN–LSTM model was trained across all stations, with a station identity embedding used to capture station-specific effects. This approach scaled efficiently and avoided training separate models for each station.
  * Where feasible, a hybrid extension inspired by Van der Wielen et al. was explored, augmenting the global model with lightweight station-specific bias or residual corrections rather than full station-level neural networks.

* Evaluation:

  * Station-level RMSE, MAE, and MAPE were computed and aggregated using:

    * Macro-averaging (equal weight per station) as the primary evaluation metric.
    * Volume-weighted averaging as a secondary metric to reflect system-level demand.
  * CNN–LSTM performance was compared against:

    * Classical time-series baselines (e.g., ARIMA) where available.
    * A fast tabular baseline trained on identical data splits (HistGradientBoosting), used in place of full station-level Random Forest or XGBoost models when computational constraints applied.

In [52]:
# Parameters and paths

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")

IN_DIR  = BASE / "modeling/station/hourly_ml"
OUT_DIR = BASE / "modeling/station/hourly_cnn_ready"
OUT_DIR.mkdir(parents=True, exist_ok=True)

train_in = IN_DIR / "train.parquet"
val_in   = IN_DIR / "val.parquet"
test_in  = IN_DIR / "test.parquet"

train_out = OUT_DIR / "train.parquet"
val_out   = OUT_DIR / "val.parquet"
test_out  = OUT_DIR / "test.parquet"

TARGET = "y"
ID_COL = "station_id"
TIME_COL = "pred_time"

print(train_in.exists(), val_in.exists(), test_in.exists())

True True True


In [53]:
# Build consistent categorical mappings from TRAIN and apply to all splits

MAP_PATH = OUT_DIR / "category_maps.json"

# def get_schema_cols(path: Path):
#     schema = pl.scan_parquet(path).collect_schema()
#     names = schema.names()
#     utf8_cols = [c for c, dt in schema.items() if dt == pl.Utf8]
#     return names, utf8_cols

# cols_train, utf8_train = get_schema_cols(train_in)

# if TARGET not in cols_train:
#     raise ValueError(f"Missing target column '{TARGET}' in {train_in}")
# if ID_COL not in cols_train:
#     raise ValueError(f"Missing station id column '{ID_COL}' in {train_in}")
# if TIME_COL not in cols_train:
#     raise ValueError(f"Missing time column '{TIME_COL}' in {train_in}")

# # We'll encode all string cols except station_id (station_id handled separately -> station_idx)
# encode_cols = [c for c in utf8_train if c != ID_COL]

# # Drop leakage columns if present
# DROP_COLS = [c for c in ["split"] if c in cols_train]

# print("String cols to encode (train):", encode_cols[:20], "… total:", len(encode_cols))
# print("Leakage cols to drop:", DROP_COLS)

# # Build category maps from TRAIN only (consistent coding across splits)
# maps = {}
# for c in encode_cols:
#     # include None explicitly as "missing"
#     vals = (
#         pl.scan_parquet(train_in)
#         .select(pl.col(c))
#         .unique()
#         .collect()[c]
#         .to_list()
#     )
#     # deterministic order; keep None first if present
#     vals_sorted = sorted([v for v in vals if v is not None])
#     if any(v is None for v in vals):
#         vals_sorted = [None] + vals_sorted
#     maps[c] = vals_sorted

# # station_id mapping -> station_idx (embedding index)
# station_vals = (
#     pl.scan_parquet(train_in)
#     .select(pl.col(ID_COL))
#     .unique()
#     .collect()[ID_COL]
#     .to_list()
# )
# station_vals_sorted = sorted([v for v in station_vals if v is not None])
# maps["__station_id__"] = station_vals_sorted

# with open(MAP_PATH, "w") as f:
#     json.dump(maps, f)

# print("Wrote category maps:", MAP_PATH)
# print("N stations (train):", len(maps["__station_id__"]))

In [54]:
# # Apply mappings and write CNN-ready parquets

# def apply_maps_and_write(in_path: Path, out_path: Path, maps: dict):
#     lf = pl.scan_parquet(in_path)

#     # drop leakage
#     for c in DROP_COLS:
#         if c in lf.collect_schema().names():
#             lf = lf.drop(c)

#     # station_idx
#     station_map = {sid: i for i, sid in enumerate(maps["__station_id__"])}
#     lf = lf.with_columns(
#         pl.col(ID_COL).replace(station_map, default=None).cast(pl.Int32).alias("station_idx")
#     )

#     # encode other Utf8 columns
#     schema = lf.collect_schema()
#     utf8_cols = [c for c, dt in schema.items() if dt == pl.Utf8 and c != ID_COL]
#     for c in utf8_cols:
#         if c not in maps:
#             lf = lf.drop(c)
#             continue
#         cmap = {v: i for i, v in enumerate(maps[c])}
#         lf = lf.with_columns(
#             pl.col(c).replace(cmap, default=None).cast(pl.Int16).alias(f"{c}_code")
#         ).drop(c)

#     # downcast floats for DL efficiency where safe
#     # (leave TIME_COL as datetime; we need it to form sequences)
#     out_schema = lf.collect_schema()
#     float_cols = [c for c, dt in out_schema.items() if dt in (pl.Float64,)]
#     if float_cols:
#         lf = lf.with_columns([pl.col(c).cast(pl.Float32) for c in float_cols])

#     # ensure target is float32
#     lf = lf.with_columns(pl.col(TARGET).cast(pl.Float32))

#     # write
#     lf.sink_parquet(out_path)
#     print("Wrote", out_path)

# # load maps from file to avoid accidental drift
# with open(MAP_PATH, "r") as f:
#     maps_loaded = json.load(f)

# apply_maps_and_write(train_in, train_out, maps_loaded)
# apply_maps_and_write(val_in,   val_out,   maps_loaded)
# apply_maps_and_write(test_in,  test_out,  maps_loaded)

In [55]:
# Quick sanity check (schemas, key columns)

def quick_check(path: Path):
    lf = pl.scan_parquet(path)
    schema = lf.collect_schema()
    cols = schema.names()
    assert TARGET in cols and ID_COL in cols and TIME_COL in cols and "station_idx" in cols
    # any Utf8?
    utf8 = [c for c, dt in schema.items() if dt == pl.Utf8]
    n = lf.select(pl.len()).collect().item()
    tminmax = lf.select([pl.col(TIME_COL).min().alias("min"), pl.col(TIME_COL).max().alias("max")]).collect()
    print(path.name, "rows:", n, "| utf8 remaining:", utf8)
    print(tminmax)

quick_check(train_out)
quick_check(val_out)
quick_check(test_out)

train.parquet rows: 48151826 | utf8 remaining: ['station_id']
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min                 ┆ max                 │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═════════════════════╪═════════════════════╡
│ 2017-01-01 01:00:00 ┆ 2021-12-31 23:00:00 │
└─────────────────────┴─────────────────────┘
val.parquet rows: 16046488 | utf8 remaining: ['station_id']
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min                 ┆ max                 │
│ ---                 ┆ ---                 │
│ datetime[μs]        ┆ datetime[μs]        │
╞═════════════════════╪═════════════════════╡
│ 2022-01-01 01:00:00 ┆ 2022-12-31 23:00:00 │
└─────────────────────┴─────────────────────┘
test.parquet rows: 41653875 | utf8 remaining: ['station_id']
shape: (1, 2)
┌─────────────────────┬─────────────────────┐
│ min                 ┆ max                 │
│ ---                 ┆ ---            

In [56]:
# Choose sequence settings + station sample

SEQ_DIR = BASE / "modeling/station/cnn_sequences"
SEQ_DIR.mkdir(parents=True, exist_ok=True)

T = 24        # try 24 first; later repeat with 48
H = 1         # 1-step ahead
N_STATIONS = 500   # increase later if time allows
SEED = 42

# Get a station sample from TRAIN
train_ids = (
    pl.scan_parquet(train_out)
    .select(["station_idx"])
    .unique()
    .collect()["station_idx"]
    .to_list()
)

rng = np.random.default_rng(SEED)
station_sample = rng.choice(train_ids, size=min(N_STATIONS, len(train_ids)), replace=False).tolist()
station_sample_set = set(station_sample)

print("Sampled stations:", len(station_sample))

Sampled stations: 500


In [57]:
# Build sequences and save as NPZ per split

SEQ_DIR = BASE / "modeling/station/cnn_sequences"
SEQ_DIR.mkdir(parents=True, exist_ok=True)

T = 24
H = 1

MAX_SEQ_PER_SPLIT = 300_000   # cap total sequences per split (adjust up later)
CHUNK_SIZE = 50_000           # sequences per chunk file
SEED = 42

def get_feature_cols(split_path: Path):
    names = pl.scan_parquet(split_path).collect_schema().names()
    return [c for c in names if c not in [TIME_COL, ID_COL, TARGET, "station_idx"]]

def station_sequence_generator(split_path: Path, station_idx: int, feature_cols: list, T: int, H: int):
    # Collect only one station at a time (bounded memory)
    g = (
        pl.scan_parquet(split_path)
        .filter(pl.col("station_idx") == station_idx)
        .select([TIME_COL, "station_idx", TARGET] + feature_cols)
        .collect()
        .sort(TIME_COL)
    )

    if g.height < T + H:
        return  # no sequences

    y_arr = g[TARGET].to_numpy()
    X_feat = g.select(feature_cols).to_numpy()

    for i in range(T, g.height - H + 1):
        X_seq = X_feat[i - T:i, :]
        y_next = float(y_arr[i + H - 1])
        yield X_seq, station_idx, y_next

# def write_sequences_chunked(split_path: Path, out_prefix: str, station_idxs: list, T: int, H: int,
#                             max_total: int, chunk_size: int, seed: int = 42):
#     rng = np.random.default_rng(seed)
#     station_idxs = list(station_idxs)
#     rng.shuffle(station_idxs)

#     feature_cols = get_feature_cols(split_path)
#     n_feat = len(feature_cols)
#     print(split_path.name, "| features:", n_feat, "| stations:", len(station_idxs))

#     X_buf, S_buf, y_buf = [], [], []
#     total_written = 0
#     chunk_id = 0

#     def flush():
#         nonlocal chunk_id, X_buf, S_buf, y_buf, total_written
#         if not X_buf:
#             return
#         X = np.stack(X_buf).astype(np.float32)
#         S = np.array(S_buf, dtype=np.int32)
#         y = np.array(y_buf, dtype=np.float32)

#         out_path = SEQ_DIR / f"{out_prefix}_T{T}_chunk{chunk_id:03d}.npz"
#         np.savez_compressed(out_path, X=X, S=S, y=y, feature_cols=np.array(feature_cols))
#         print("Wrote", out_path.name, "| sequences:", X.shape[0])
#         total_written += X.shape[0]
#         chunk_id += 1
#         X_buf, S_buf, y_buf = [], [], []

#     for sid in station_idxs:
#         if total_written >= max_total:
#             break

#         for X_seq, s, y_next in station_sequence_generator(split_path, sid, feature_cols, T, H):
#             X_buf.append(X_seq)
#             S_buf.append(s)
#             y_buf.append(y_next)

#             if len(X_buf) >= chunk_size:
#                 flush()
#                 if total_written >= max_total:
#                     break

#         if total_written >= max_total:
#             break

#     flush()
#     print(f"Done {out_prefix}: total_written={total_written}, chunks={chunk_id}, n_features={n_feat}")

# # Build chunked sequences for each split
# write_sequences_chunked(train_out, "train", station_sample, T, H, MAX_SEQ_PER_SPLIT, CHUNK_SIZE, SEED)
# write_sequences_chunked(val_out,   "val",   station_sample, T, H, MAX_SEQ_PER_SPLIT // 2, CHUNK_SIZE, SEED)
# write_sequences_chunked(test_out,  "test",  station_sample, T, H, MAX_SEQ_PER_SPLIT // 2, CHUNK_SIZE, SEED)

In [58]:
# Quick inventory of what was written

def list_chunks(prefix):
    files = sorted(glob(str(SEQ_DIR / f"{prefix}_T{T}_chunk*.npz")))
    print(prefix, "chunks:", len(files))
    if files:
        sample = np.load(files[0], allow_pickle=True)
        print("Example file:", Path(files[0]).name, "| X:", sample["X"].shape)

list_chunks("train")
list_chunks("val")
list_chunks("test")

train chunks: 6
Example file: train_T24_chunk000.npz | X: (50000, 24, 126)
val chunks: 3
Example file: val_T24_chunk000.npz | X: (50000, 24, 126)
test chunks: 3
Example file: test_T24_chunk000.npz | X: (50000, 24, 126)


In [59]:
# Sample rows + build X/y

ROW_N_TRAIN = 300_000
ROW_N_VAL   = 100_000
ROW_N_TEST  = 100_000
SEED = 42

def row_sample_to_pandas(path, n, seed=42):
    lf = pl.scan_parquet(path).with_row_index("row_nr")
    mod_base = max(n * 3, 1)
    df = (
        lf.with_columns((((pl.col("row_nr") * 1103515245 + seed) & 0x7FFFFFFF) % mod_base).alias("_k"))
          .filter(pl.col("_k") < n)
          .drop(["row_nr", "_k"])
          .limit(n)
          .collect()
          .to_pandas()
    )
    return df

def prep_tabular(df):
    y = df[TARGET].astype("float32")
    X = df.drop(columns=[TARGET, ID_COL, TIME_COL], errors="ignore")

    # Drop any datetime-like columns defensively
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        X = X.drop(columns=dt_cols)

    # ensure numeric dtypes only (everything should already be numeric/codes)
    for c in X.columns:
        if X[c].dtype == "object":
            X[c] = pd.to_numeric(X[c], errors="coerce")

    return X, y

tr = row_sample_to_pandas(train_out, ROW_N_TRAIN, SEED)
va = row_sample_to_pandas(val_out,   ROW_N_VAL,   SEED)
te = row_sample_to_pandas(test_out,  ROW_N_TEST,  SEED)

X_tr, y_tr = prep_tabular(tr)
X_va, y_va = prep_tabular(va)
X_te, y_te = prep_tabular(te)

def nan_rate(X):
    return float(np.isnan(X.to_numpy()).mean())

print("Shapes:", X_tr.shape, X_va.shape, X_te.shape)
print("NaN rate (train/val/test):", nan_rate(X_tr), nan_rate(X_va), nan_rate(X_te))

Shapes: (300000, 126) (100000, 126) (100000, 126)
NaN rate (train/val/test): 0.018050925925925925 0.012362698412698412 0.01719547619047619


In [60]:
# HistGradientBoostingRegressor

hgb = HistGradientBoostingRegressor(
    loss="squared_error",
    learning_rate=0.05,
    max_depth=10,
    max_iter=300,
    random_state=42
)

hgb.fit(X_tr, y_tr)

val_pred = hgb.predict(X_va)
test_pred = hgb.predict(X_te)

def safe_mape(y_true, y_pred, eps=1.0):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return np.mean(np.abs(y_true - y_pred) / np.maximum(np.abs(y_true), eps))

print("HGB (station hourly, sampled rows)")
print("VAL  RMSE:", (mean_squared_error(y_va, val_pred) ** 0.5))
print("TEST RMSE:", (mean_squared_error(y_te, test_pred) ** 0.5))
print("TEST MAE :", mean_absolute_error(y_te, test_pred))
print("TEST MAPE:", safe_mape(y_te, test_pred))
print("TEST R²  :", r2_score(y_te, test_pred))

HGB (station hourly, sampled rows)
VAL  RMSE: 0.638553394150442
TEST RMSE: 0.6521865668536464
TEST MAE : 0.4104449759257502
TEST MAPE: 0.360338323659901
TEST R²  : 0.024983362006145482


In [61]:
# Define improved chunk writer (balanced + includes pred_time)

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
CNN_DIR = BASE / "modeling/station/hourly_cnn_ready"
SEQ_DIR = BASE / "modeling/station/cnn_sequences_v2"
SEQ_DIR.mkdir(parents=True, exist_ok=True)

train_pq = CNN_DIR / "train.parquet"
val_pq   = CNN_DIR / "val.parquet"
test_pq  = CNN_DIR / "test.parquet"

TIME_COL = "pred_time"
TARGET = "y"
ID_COL = "station_id"

T = 24
H = 1
SEED = 42

# caps
MAX_SEQ_PER_SPLIT = 300_000
CHUNK_SIZE = 50_000
MAX_SEQ_PER_STATION = 1_000   # key fix for diversity (adjust 500–2000)

def get_feature_cols(split_path: Path):
    names = pl.scan_parquet(split_path).collect_schema().names()

    drop = {TIME_COL, ID_COL, TARGET, "station_idx", "date"}  # drop 'date' explicitly
    feat_cols = [c for c in names if c not in drop]

    # defensive: ensure no datetime columns slip through
    schema = pl.scan_parquet(split_path).collect_schema()
    for c in list(feat_cols):
        if str(schema[c]).startswith("Datetime") or str(schema[c]).startswith("Date"):
            feat_cols.remove(c)

    return feat_cols

def station_sequence_generator(split_path: Path, station_idx: int, feature_cols: list, T: int, H: int, max_per_station: int):
    g = (
        pl.scan_parquet(split_path)
        .filter(pl.col("station_idx") == station_idx)
        .select([TIME_COL, "station_idx", TARGET] + feature_cols)
        .collect()
        .sort(TIME_COL)
    )

    if g.height < T + H:
        return

    times = g[TIME_COL].to_numpy()
    y_arr = g[TARGET].to_numpy()
    X_feat = g.select(feature_cols).to_numpy()

    n_written = 0
    for i in range(T, g.height - H + 1):
        X_seq = X_feat[i - T:i, :]
        y_next = float(y_arr[i + H - 1])
        pred_time = times[i]  # target time t

        yield X_seq, station_idx, y_next, pred_time

        n_written += 1
        if n_written >= max_per_station:
            break

def write_sequences_chunked(split_path: Path, out_prefix: str, station_idxs: list, T: int, H: int,
                            max_total: int, chunk_size: int, max_per_station: int, seed: int = 42):
    rng = np.random.default_rng(seed)
    station_idxs = list(station_idxs)
    rng.shuffle(station_idxs)

    feature_cols = get_feature_cols(split_path)
    n_feat = len(feature_cols)
    print(out_prefix, "| features:", n_feat, "| stations:", len(station_idxs))

    X_buf, S_buf, y_buf, t_buf = [], [], [], []
    total_written = 0
    chunk_id = 0

    def flush():
        nonlocal chunk_id, X_buf, S_buf, y_buf, t_buf, total_written
        if not X_buf:
            return
        X = np.stack(X_buf).astype(np.float32)
        S = np.array(S_buf, dtype=np.int32)
        y = np.array(y_buf, dtype=np.float32)
        pt = np.array(t_buf)  # numpy datetime64

        out_path = SEQ_DIR / f"{out_prefix}_T{T}_chunk{chunk_id:03d}.npz"
        np.savez_compressed(out_path, X=X, S=S, y=y, pred_time=pt, feature_cols=np.array(feature_cols))
        print("Wrote", out_path.name, "| sequences:", X.shape[0])
        total_written += X.shape[0]
        chunk_id += 1
        X_buf, S_buf, y_buf, t_buf = [], [], [], []

    for sid in station_idxs:
        if total_written >= max_total:
            break

        for X_seq, s, y_next, pred_time in station_sequence_generator(split_path, sid, feature_cols, T, H, max_per_station):
            X_buf.append(X_seq)
            S_buf.append(int(s))
            y_buf.append(float(y_next))
            t_buf.append(pred_time)

            if len(X_buf) >= chunk_size:
                flush()
                if total_written >= max_total:
                    break

        if total_written >= max_total:
            break

    flush()
    print(f"Done {out_prefix}: total_written={total_written}, chunks={chunk_id}, n_features={n_feat}")

In [62]:
# Regenerate train/val/test chunks

write_sequences_chunked(train_pq, "train", station_sample, T, H, MAX_SEQ_PER_SPLIT, CHUNK_SIZE, MAX_SEQ_PER_STATION, SEED)
write_sequences_chunked(val_pq,   "val",   station_sample, T, H, MAX_SEQ_PER_SPLIT // 2, CHUNK_SIZE, MAX_SEQ_PER_STATION, SEED)
write_sequences_chunked(test_pq,  "test",  station_sample, T, H, MAX_SEQ_PER_SPLIT // 2, CHUNK_SIZE, MAX_SEQ_PER_STATION, SEED)

train | features: 125 | stations: 500
Wrote train_T24_chunk000.npz | sequences: 50000
Wrote train_T24_chunk001.npz | sequences: 50000
Wrote train_T24_chunk002.npz | sequences: 50000
Wrote train_T24_chunk003.npz | sequences: 50000
Wrote train_T24_chunk004.npz | sequences: 50000
Wrote train_T24_chunk005.npz | sequences: 50000
Done train: total_written=300000, chunks=6, n_features=125
val | features: 125 | stations: 500
Wrote val_T24_chunk000.npz | sequences: 50000
Wrote val_T24_chunk001.npz | sequences: 50000
Wrote val_T24_chunk002.npz | sequences: 50000
Done val: total_written=150000, chunks=3, n_features=125
test | features: 125 | stations: 500
Wrote test_T24_chunk000.npz | sequences: 50000
Wrote test_T24_chunk001.npz | sequences: 50000
Wrote test_T24_chunk002.npz | sequences: 50000
Done test: total_written=150000, chunks=3, n_features=125


In [63]:
# Quick re-check (diversity + date removed + pred_time present)

def load_first(prefix):
    files = sorted(glob(str(SEQ_DIR / f"{prefix}_T{T}_chunk*.npz")))
    d = np.load(files[0], allow_pickle=True)
    return d

d = load_first("train")
print("Keys:", list(d.keys()))
print("X:", d["X"].shape)
print("Unique stations in first chunk:", len(np.unique(d["S"])))
print("Has pred_time:", "pred_time" in d.files)
print("Has feature_cols:", "feature_cols" in d.files)
print("Contains 'date' feature:", "date" in list(d["feature_cols"]))

Keys: ['X', 'S', 'y', 'pred_time', 'feature_cols']
X: (50000, 24, 125)
Unique stations in first chunk: 50
Has pred_time: True
Has feature_cols: True
Contains 'date' feature: False


In [64]:
# Alignment check becomes unambiguous (uses pred_time)

# load one train chunk
files = sorted(glob(str(SEQ_DIR / f"train_T{T}_chunk*.npz")))
d = np.load(files[0], allow_pickle=True)

X = d["X"]
S = d["S"]
y = d["y"]
pt = d["pred_time"]

i = 100  # pick any index
sid = int(S[i])
pred_time = pt[i]
y_npz = float(y[i])

g = (
    pl.scan_parquet(train_pq)
    .filter((pl.col("station_idx") == sid) & (pl.col(TIME_COL) == pl.lit(pred_time)))
    .select([TIME_COL, "station_idx", TARGET])
    .collect()
)

print("NPZ station_idx:", sid)
print("NPZ pred_time:", pred_time)
print("NPZ y:", y_npz)
print("Source match rows:", g.shape[0])
print(g)

NPZ station_idx: 1569
NPZ pred_time: 2017-01-06T05:00:00.000000
NPZ y: 0.0
Source match rows: 1
shape: (1, 3)
┌─────────────────────┬─────────────┬─────┐
│ pred_time           ┆ station_idx ┆ y   │
│ ---                 ┆ ---         ┆ --- │
│ datetime[μs]        ┆ i32         ┆ f32 │
╞═════════════════════╪═════════════╪═════╡
│ 2017-01-06 05:00:00 ┆ 1569        ┆ 0.0 │
└─────────────────────┴─────────────┴─────┘


In [65]:
BASE = "/Users/zoltanjelovich/Documents/ISEG/MFW/data"
TRAIN_PQ = f"{BASE}/modeling/station/hourly_cnn_ready/train.parquet"
VAL_PQ   = f"{BASE}/modeling/station/hourly_cnn_ready/val.parquet"
TEST_PQ  = f"{BASE}/modeling/station/hourly_cnn_ready/test.parquet"

TIME_COL = "pred_time"
TARGET = "y"
ID_COL = "station_id"

station_sample_set = set(station_sample)

ROW_N_TRAIN = 300_000
ROW_N_VAL   = 100_000
ROW_N_TEST  = 100_000
SEED = 42

def sample_rows_from_stations(path, station_idxs, n_rows, seed=42):
    lf = (
        pl.scan_parquet(path)
        .filter(pl.col("station_idx").is_in(list(station_idxs)))
        .with_row_index("row_nr")
    )
    mod_base = max(n_rows * 5, 1)
    df = (
        lf.with_columns((((pl.col("row_nr") * 1103515245 + seed) & 0x7FFFFFFF) % mod_base).alias("_k"))
          .filter(pl.col("_k") < n_rows)
          .drop(["row_nr", "_k"])
          .limit(n_rows)
          .collect()
          .to_pandas()
    )
    return df

def prep_tabular(df):
    y = df[TARGET].astype("float32")
    X = df.drop(columns=[TARGET, ID_COL, TIME_COL], errors="ignore")

    # Drop datetime-like columns defensively
    dt_cols = list(X.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    if dt_cols:
        X = X.drop(columns=dt_cols)

    # Ensure numeric
    for c in X.columns:
        if X[c].dtype == "object":
            X[c] = pd.to_numeric(X[c], errors="coerce")
    return X, y

tr = sample_rows_from_stations(TRAIN_PQ, station_sample_set, ROW_N_TRAIN, SEED)
va = sample_rows_from_stations(VAL_PQ,   station_sample_set, ROW_N_VAL,   SEED)
te = sample_rows_from_stations(TEST_PQ,  station_sample_set, ROW_N_TEST,  SEED)

X_tr, y_tr = prep_tabular(tr)
X_va, y_va = prep_tabular(va)
X_te, y_te = prep_tabular(te)

hgb = HistGradientBoostingRegressor(
    loss="squared_error",
    learning_rate=0.05,
    max_depth=10,
    max_iter=300,
    random_state=42
)
hgb.fit(X_tr, y_tr)

val_pred = hgb.predict(X_va)
test_pred = hgb.predict(X_te)

def safe_mape(y_true, y_pred, eps=1.0):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return float(np.mean(np.abs(y_true - y_pred) / np.maximum(np.abs(y_true), eps)))

print("HGB baseline aligned to station_sample")
print("VAL  RMSE:", float(mean_squared_error(y_va, val_pred) ** 0.5))
print("TEST RMSE:", float(mean_squared_error(y_te, test_pred) ** 0.5))
print("TEST MAE :", float(mean_absolute_error(y_te, test_pred)))
print("TEST MAPE:", safe_mape(y_te, test_pred))
print("TEST R²  :", float(r2_score(y_te, test_pred)))

HGB baseline aligned to station_sample
VAL  RMSE: 1.5562880875275398
TEST RMSE: 1.6967175269325334
TEST MAE : 1.0317997285750538
TEST MAPE: 0.5714711954635572
TEST R²  : 0.560791202878895


In [66]:
# Imports, paths, and a fast NPZ inventory check

# Repro/determinism
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("DEVICE:", DEVICE)

BASE = Path("/Users/zoltanjelovich/Documents/ISEG/MFW/data")
SEQ_DIR = BASE / "modeling/station/cnn_sequences_v2"

T = 24
train_files = sorted(glob(str(SEQ_DIR / f"train_T{T}_chunk*.npz")))
val_files   = sorted(glob(str(SEQ_DIR / f"val_T{T}_chunk*.npz")))
test_files  = sorted(glob(str(SEQ_DIR / f"test_T{T}_chunk*.npz")))

assert len(train_files) > 0 and len(val_files) > 0 and len(test_files) > 0, "No NPZ chunks found"
print("Chunks | train:", len(train_files), "| val:", len(val_files), "| test:", len(test_files))

def peek_npz(path):
    d = np.load(path, allow_pickle=True)
    X, S, y = d["X"], d["S"], d["y"]
    assert X.ndim == 3, "X must be (N,T,F)"
    assert S.ndim == 1 and y.ndim == 1
    assert X.shape[0] == S.shape[0] == y.shape[0]
    assert "pred_time" in d.files, "pred_time missing (needed for traceability)"
    assert "feature_cols" in d.files, "feature_cols missing (needed for consistent ordering)"
    return X.shape, S.dtype, y.dtype, len(np.unique(S)), d["feature_cols"]

for name, files in [("train", train_files), ("val", val_files), ("test", test_files)]:
    shp, sd, yd, nunq, fcols = peek_npz(files[0])
    print(f"{name} example:", Path(files[0]).name, "| X:", shp, "| S dtype:", sd, "| y dtype:", yd, "| unique stations:", nunq)

# Ensure identical feature ordering across splits
train_fcols = np.load(train_files[0], allow_pickle=True)["feature_cols"].tolist()
val_fcols   = np.load(val_files[0], allow_pickle=True)["feature_cols"].tolist()
test_fcols  = np.load(test_files[0], allow_pickle=True)["feature_cols"].tolist()

assert train_fcols == val_fcols == test_fcols, "Feature order mismatch across splits!"
print("✔ feature_cols identical across train/val/test | n_features:", len(train_fcols))

DEVICE: cpu
Chunks | train: 6 | val: 3 | test: 3
train example: train_T24_chunk000.npz | X: (50000, 24, 125) | S dtype: int32 | y dtype: float32 | unique stations: 50
val example: val_T24_chunk000.npz | X: (50000, 24, 125) | S dtype: int32 | y dtype: float32 | unique stations: 50
test example: test_T24_chunk000.npz | X: (50000, 24, 125) | S dtype: int32 | y dtype: float32 | unique stations: 50
✔ feature_cols identical across train/val/test | n_features: 125


In [67]:
# Robust NPZ chunk dataset (lazy loading, NaN/Inf handling)

class NPZChunkDataset(Dataset):
    def __init__(self, npz_files, return_time=False):
        self.files = list(npz_files)
        self.return_time = return_time

        # Build global index: map global row
        self.counts = []
        for fp in self.files:
            with np.load(fp, allow_pickle=True) as d:
                self.counts.append(int(d["X"].shape[0]))
        self.offsets = np.cumsum([0] + self.counts)  # length = n_files + 1
        self.n = int(self.offsets[-1])

        # Cache one currently-open file to reduce IO overhead
        self._cache_file_idx = None
        self._cache_data = None

    def __len__(self):
        return self.n

    def _load_file(self, file_idx):
        if self._cache_file_idx == file_idx and self._cache_data is not None:
            return self._cache_data
        # close previous
        self._cache_data = np.load(self.files[file_idx], allow_pickle=True)
        self._cache_file_idx = file_idx
        return self._cache_data

    def __getitem__(self, idx):
        # locate file
        file_idx = int(np.searchsorted(self.offsets, idx, side="right") - 1)
        local_idx = int(idx - self.offsets[file_idx])

        d = self._load_file(file_idx)

        X = d["X"][local_idx]  # (T,F)
        s = d["S"][local_idx]
        y = d["y"][local_idx]

        # Replace NaN/Inf in X with 0.0
        if not np.isfinite(X).all():
            X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        # Convert
        X = torch.from_numpy(X).float()          # (T,F)
        s = torch.tensor(int(s)).long()          # embedding index
        y = torch.tensor(float(y)).float()

        if self.return_time:
            pt = d["pred_time"][local_idx]
            return X, s, y, pt
        return X, s, y

In [68]:
# Feature standardization

def compute_train_norm_stats(train_files, max_chunks=None):
    sum_x = None
    sum_x2 = None
    n_total = 0

    files = train_files if max_chunks is None else train_files[:max_chunks]
    for fp in files:
        d = np.load(fp, allow_pickle=True)
        X = d["X"]
        X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        # Flatten across N*T
        X2d = X.reshape(-1, X.shape[-1]).astype(np.float64)
        if sum_x is None:
            sum_x = X2d.sum(axis=0)
            sum_x2 = (X2d ** 2).sum(axis=0)
        else:
            sum_x += X2d.sum(axis=0)
            sum_x2 += (X2d ** 2).sum(axis=0)

        n_total += X2d.shape[0]

    mean = sum_x / n_total
    var = (sum_x2 / n_total) - (mean ** 2)
    std = np.sqrt(np.maximum(var, 1e-12))
    return mean.astype(np.float32), std.astype(np.float32)

# Compute stats from train only
mean, std = compute_train_norm_stats(train_files, max_chunks=None)
print("Computed mean/std | n_features:", len(mean))
print("std min/max:", float(std.min()), float(std.max()))
assert np.all(std > 0), "std must be > 0"

# Store for reuse
NORM_PATH = SEQ_DIR / f"norm_stats_T{T}.npz"
np.savez_compressed(NORM_PATH, mean=mean, std=std, feature_cols=np.array(train_fcols))
print("Wrote:", NORM_PATH)

Computed mean/std | n_features: 125
std min/max: 9.999999974752427e-07 2045011.375
Wrote: /Users/zoltanjelovich/Documents/ISEG/MFW/data/modeling/station/cnn_sequences_v2/norm_stats_T24.npz


In [69]:
# Normalized dataset wrapper

class NormalizedDataset(Dataset):
    def __init__(self, base_ds, mean, std):
        self.base = base_ds
        self.mean = torch.from_numpy(mean).float()
        self.std = torch.from_numpy(std).float()

    def __len__(self):
        return len(self.base)

    def __getitem__(self, idx):
        X, s, y = self.base[idx]
        X = (X - self.mean) / self.std
        X = torch.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
        return X, s, y

# Recreate base datasets
train_ds = NPZChunkDataset(train_files)
val_ds   = NPZChunkDataset(val_files)
test_ds  = NPZChunkDataset(test_files)

# Load normalization stats
norm = np.load(NORM_PATH)
mean = norm["mean"]
std  = norm["std"]

# Create normalized datasets
train_ds_n = NormalizedDataset(train_ds, mean, std)
val_ds_n   = NormalizedDataset(val_ds, mean, std)
test_ds_n  = NormalizedDataset(test_ds, mean, std)

# Recreate loaders
train_loader = DataLoader(train_ds_n, batch_size=256, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_ds_n, batch_size=256, shuffle=False)
test_loader  = DataLoader(test_ds_n, batch_size=256, shuffle=False)

# Quick sanity batch
Xb, sb, yb = next(iter(train_loader))
print("X mean:", float(Xb.mean()), "X std:", float(Xb.std()))
print("Shapes:", Xb.shape, sb.shape, yb.shape)

X mean: 0.00304386205971241 X std: 0.943386435508728
Shapes: torch.Size([256, 24, 125]) torch.Size([256]) torch.Size([256])


In [70]:
# Define the CNN–LSTM with station embedding

class CNNLSTM(nn.Module):
    def __init__(
        self,
        n_features: int,
        n_stations: int,
        emb_dim: int = 16,
        conv_channels: int = 64,
        conv_kernel: int = 3,
        lstm_hidden: int = 64,
        lstm_layers: int = 1,
        dropout: float = 0.1,
    ):
        super().__init__()

        self.station_emb = nn.Embedding(n_stations, emb_dim)

        # Conv over time dimension
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels=n_features, out_channels=conv_channels, kernel_size=conv_kernel, padding=conv_kernel//2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Conv1d(in_channels=conv_channels, out_channels=conv_channels, kernel_size=conv_kernel, padding=conv_kernel//2),
            nn.ReLU(),
        )

        # LSTM expects (B, T, C)
        self.lstm = nn.LSTM(
            input_size=conv_channels,
            hidden_size=lstm_hidden,
            num_layers=lstm_layers,
            batch_first=True,
            dropout=dropout if lstm_layers > 1 else 0.0,
        )

        self.head = nn.Sequential(
            nn.Linear(lstm_hidden + emb_dim, 64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 1),
        )

    def forward(self, x_seq, station_idx):
        x = x_seq.transpose(1, 2)
        x = self.conv(x)
        x = x.transpose(1, 2)

        out, _ = self.lstm(x)
        h_last = out[:, -1, :]

        s_emb = self.station_emb(station_idx)
        z = torch.cat([h_last, s_emb], dim=1)

        yhat = self.head(z).squeeze(1)
        return yhat

In [71]:
# Instantiate model + forward pass

Xb, sb, yb = next(iter(train_loader))
n_features = Xb.shape[-1]
n_stations = int(sb.max().item()) + 1

model = CNNLSTM(
    n_features=n_features,
    n_stations=n_stations,
    emb_dim=16,
    conv_channels=64,
    conv_kernel=3,
    lstm_hidden=64,
    lstm_layers=1,
    dropout=0.1,
).to(DEVICE)

print("n_features:", n_features, "| n_stations:", n_stations)
print("Model parameters:", sum(p.numel() for p in model.parameters()))

# Forward pass
Xb = Xb.to(DEVICE)
sb = sb.to(DEVICE)
with torch.no_grad():
    yhat = model(Xb, sb)

print("yhat shape:", yhat.shape, "| yhat dtype:", yhat.dtype)
print("yhat summary:", float(yhat.min()), float(yhat.mean()), float(yhat.max()))
assert yhat.shape == yb.to(DEVICE).shape, "Output shape must match y shape"
print("✔ Forward pass OK")

n_features: 125 | n_stations: 2690
Model parameters: 117985
yhat shape: torch.Size([256]) | yhat dtype: torch.float32
yhat summary: -0.20163452625274658 0.09736275672912598 0.4507068991661072
✔ Forward pass OK


In [72]:
# Quick loss computation sanity check

criterion = nn.MSELoss()
yb = yb.to(DEVICE)

with torch.no_grad():
    loss = criterion(yhat, yb)
print("MSE loss (single batch):", float(loss))

MSE loss (single batch): 6.798551082611084


In [73]:
# Metrics + one-epoch train/eval helpers

def rmse(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))

def mae(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return float(np.mean(np.abs(y_true - y_pred)))

def safe_mape(y_true, y_pred, eps=1.0):
    # eps=1 avoids exploding MAPE when y is 0-heavy
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    denom = np.maximum(np.abs(y_true), eps)
    return float(np.mean(np.abs(y_true - y_pred) / denom))

@torch.no_grad()
def eval_epoch(model, loader, device=DEVICE, max_batches=None, log_every=50):
    model.eval()
    ys, yhats = [], []
    for b, batch in enumerate(loader):
        if (b + 1) % log_every == 0:
            print(f"  val batch {b+1}")

        if max_batches is not None and b >= max_batches:
            break

        X, s, y = batch
        X, s = X.to(device), s.to(device)
        yhat = model(X, s).detach().cpu().numpy()
        ys.append(y.numpy())
        yhats.append(yhat)

    y_true = np.concatenate(ys)
    y_pred = np.concatenate(yhats)
    return {"rmse": rmse(y_true, y_pred), "mae": mae(y_true, y_pred), "mape": safe_mape(y_true, y_pred)}

def train_epoch(model, loader, optimizer, criterion, device=DEVICE, max_batches=None, log_every=50):
    model.train()
    running = 0.0
    n = 0

    for b, batch in enumerate(loader):
        if (b + 1) % log_every == 0:
            print(f"  train batch {b+1}")

        if max_batches is not None and b >= max_batches:
            break

        X, s, y = batch
        X, s, y = X.to(device), s.to(device), y.to(device)

        optimizer.zero_grad(set_to_none=True)
        yhat = model(X, s)
        loss = criterion(yhat, y)
        loss.backward()
        optimizer.step()

        running += float(loss.item()) * X.shape[0]
        n += X.shape[0]

    return running / max(n, 1)

In [74]:
def quick_station_idx_stats(seq_dir: Path, split: str, T: int, n_chunks=2):
    files = sorted(seq_dir.glob(f"{split}_T{T}_chunk*.npz"))
    assert len(files) > 0, f"No NPZ chunks found for {split} T={T} in {seq_dir}"
    files = files[:n_chunks]

    mins, maxs, uniq = [], [], set()
    for fp in files:
        d = np.load(fp, allow_pickle=False)
        S = d["S"]
        mins.append(int(S.min()))
        maxs.append(int(S.max()))
        uniq.update(np.unique(S[:5000]).tolist())  # sample uniques cheaply

    return {
        "split": split,
        "files_checked": len(files),
        "min": min(mins),
        "max": max(maxs),
        "n_unique_sample": len(uniq),
    }

T = 24
print(quick_station_idx_stats(SEQ_DIR, "train", T, n_chunks=2))
print(quick_station_idx_stats(SEQ_DIR, "val",   T, n_chunks=2))
print(quick_station_idx_stats(SEQ_DIR, "test",  T, n_chunks=2))

{'split': 'train', 'files_checked': 2, 'min': 17, 'max': 2636, 'n_unique_sample': 10}
{'split': 'val', 'files_checked': 2, 'min': 93, 'max': 2657, 'n_unique_sample': 10}
{'split': 'test', 'files_checked': 2, 'min': 129, 'max': 2657, 'n_unique_sample': 10}


In [75]:
# Get max station_idx across splits (check a few chunks each)
stats_train = quick_station_idx_stats(SEQ_DIR, "train", T, n_chunks=3)
stats_val   = quick_station_idx_stats(SEQ_DIR, "val",   T, n_chunks=2)
stats_test  = quick_station_idx_stats(SEQ_DIR, "test",  T, n_chunks=2)

max_sid = max(stats_train["max"], stats_val["max"], stats_test["max"])
n_stations = max_sid + 1
print("max station_idx:", max_sid, "| n_stations for embedding:", n_stations)

# n_features should come from feature_cols in the NPZ to avoid mismatch after restart
ex = np.load(sorted(SEQ_DIR.glob(f"train_T{T}_chunk*.npz"))[0], allow_pickle=False)
n_features = int(ex["X"].shape[-1])
print("n_features:", n_features)

# Re-instantiate model
model = CNNLSTM(
    n_features=n_features,
    n_stations=n_stations,
    emb_dim=16,
    conv_channels=32,
    lstm_hidden=64,
    lstm_layers=1,
    dropout=0.1
).to(DEVICE)

# Quick sanity: embedding size
print("Embedding num_embeddings:", model.station_emb.num_embeddings)

# Hard assert using one real chunk
S_check = ex["S"]
assert int(S_check.max()) < model.station_emb.num_embeddings, "Embedding still too small!"
print("station_idx within embedding range for this chunk")

max station_idx: 2657 | n_stations for embedding: 2658
n_features: 125
Embedding num_embeddings: 2658
station_idx within embedding range for this chunk


In [76]:
def station_idx_range_all_chunks(seq_dir: Path, split: str, T: int):
    files = sorted(seq_dir.glob(f"{split}_T{T}_chunk*.npz"))
    smin, smax = None, None
    bad = []

    for fp in files:
        d = np.load(fp, allow_pickle=False)
        S = d["S"]
        mn, mx = int(S.min()), int(S.max())

        if smin is None or mn < smin: smin = mn
        if smax is None or mx > smax: smax = mx

        if mn < 0:
            bad.append((fp.name, mn, mx))

    return {"min": smin, "max": smax, "n_files": len(files), "neg_chunks": bad}

for split in ["train", "val", "test"]:
    r = station_idx_range_all_chunks(SEQ_DIR, split, T)
    print(split, r["min"], r["max"], "files:", r["n_files"])
    if r["neg_chunks"]:
        print("  negative station_idx found in:", r["neg_chunks"][:5])

train 17 2689 files: 6
val 93 2689 files: 3
test 129 2689 files: 3


In [77]:
# Compute from full scan
r_train = station_idx_range_all_chunks(SEQ_DIR, "train", T)
r_val   = station_idx_range_all_chunks(SEQ_DIR, "val",   T)
r_test  = station_idx_range_all_chunks(SEQ_DIR, "test",  T)

n_stations = max(r_train["max"], r_val["max"], r_test["max"]) + 1
print("n_stations:", n_stations)

model = CNNLSTM(
    n_features=n_features,
    n_stations=n_stations,
    emb_dim=16,
    conv_channels=32,
    lstm_hidden=64,
    lstm_layers=1,
    dropout=0.1
).to(DEVICE)

print("Embedding num_embeddings:", model.station_emb.num_embeddings)

n_stations: 2690
Embedding num_embeddings: 2690


In [78]:
# Helpers: iterate chunks + minibatches
def list_chunks(seq_dir: Path, split: str, T: int):
    return sorted(seq_dir.glob(f"{split}_T{T}_chunk*.npz"))

def iter_minibatches_from_chunk(X, S, y, batch_size=1024, shuffle=True, seed=42):
    n = X.shape[0]
    idx = np.arange(n)
    if shuffle:
        rng = np.random.default_rng(seed)
        rng.shuffle(idx)
    for i in range(0, n, batch_size):
        j = idx[i:i+batch_size]
        yield X[j], S[j], y[j]

@torch.no_grad()
def eval_over_chunks(model, seq_dir: Path, split: str, T: int, mean_t, std_t,
                     batch_size=2048, max_batches=200):
    model.eval()
    ys, yhats = [], []
    n_batches = 0

    for fp in list_chunks(seq_dir, split, T):
        data = np.load(fp, allow_pickle=False)
        X = torch.from_numpy(data["X"]).to(torch.float32)
        S = torch.from_numpy(data["S"]).to(torch.int64)
        y = torch.from_numpy(data["y"]).to(torch.float32)

        # normalize (broadcast over batch/time)
        X = (X - mean_t) / std_t
        X = torch.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        # minibatch through this chunk
        for xb, sb, yb in iter_minibatches_from_chunk(X.numpy(), S.numpy(), y.numpy(),
                                                      batch_size=batch_size, shuffle=False):
            xb = torch.from_numpy(xb).to(DEVICE)
            sb = torch.from_numpy(sb).to(DEVICE)
            yb = torch.from_numpy(yb).to(DEVICE)

            yhat = model(xb, sb).detach().cpu().numpy()
            ys.append(yb.detach().cpu().numpy())
            yhats.append(yhat)

            n_batches += 1
            if max_batches is not None and n_batches >= max_batches:
                y_true = np.concatenate(ys)
                y_pred = np.concatenate(yhats)
                return {
                    "rmse": float(np.sqrt(np.mean((y_true - y_pred) ** 2))),
                    "mae":  float(np.mean(np.abs(y_true - y_pred))),
                    "mape": safe_mape(y_true, y_pred),
                }

    y_true = np.concatenate(ys)
    y_pred = np.concatenate(yhats)
    return {
        "rmse": float(np.sqrt(np.mean((y_true - y_pred) ** 2))),
        "mae":  float(np.mean(np.abs(y_true - y_pred))),
        "mape": safe_mape(y_true, y_pred),
    }

def train_one_epoch_over_chunks(model, seq_dir: Path, split: str, T: int, optimizer, criterion,
                                mean_t, std_t, batch_size=1024, max_chunks=None, seed=42):
    model.train()
    total_loss = 0.0
    total_n = 0

    chunk_files = list_chunks(seq_dir, split, T)
    if max_chunks is not None:
        chunk_files = chunk_files[:max_chunks]

    for ci, fp in enumerate(chunk_files):
        data = np.load(fp, allow_pickle=False)
        X = torch.from_numpy(data["X"]).to(torch.float32)
        S = torch.from_numpy(data["S"]).to(torch.int64)
        y = torch.from_numpy(data["y"]).to(torch.float32)

        # Normalize
        X = (X - mean_t) / std_t
        X = torch.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        # Minibatch loop
        Xn, Sn, yn = X.numpy(), S.numpy(), y.numpy()
        for xb, sb, yb in iter_minibatches_from_chunk(Xn, Sn, yn, batch_size=batch_size, shuffle=True, seed=seed+ci):
            xb = torch.from_numpy(xb).to(DEVICE)
            sb = torch.from_numpy(sb).to(DEVICE)
            yb = torch.from_numpy(yb).to(DEVICE)

            optimizer.zero_grad(set_to_none=True)
            yhat = model(xb, sb)
            loss = criterion(yhat, yb)
            loss.backward()
            optimizer.step()

            total_loss += float(loss.item()) * xb.shape[0]
            total_n += xb.shape[0]

    return total_loss / max(total_n, 1)

# Training config
T = 24
EPOCHS = 10
PATIENCE = 2
LR = 1e-3
WEIGHT_DECAY = 1e-5
BATCH_SIZE = 1024

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY)

BEST_PATH = SEQ_DIR / f"cnn_lstm_best_T{T}.pt"

mean_t = torch.from_numpy(mean).to(torch.float32).view(1, 1, -1).to(DEVICE)
std_t  = torch.from_numpy(std).to(torch.float32).view(1, 1, -1).to(DEVICE)

best_val = float("inf")
pat = 0

# Train loop
for epoch in range(1, EPOCHS + 1):
    t0 = time.time()

    train_mse = train_one_epoch_over_chunks(
        model, SEQ_DIR, "train", T,
        optimizer, criterion,
        mean_t, std_t,
        batch_size=BATCH_SIZE,
        max_chunks=None
    )

    # Validation capped for speed; increase later if needed
    val_metrics = eval_over_chunks(
        model, SEQ_DIR, "val", T,
        mean_t, std_t,
        batch_size=2048,
        max_batches=200
    )

    dt = time.time() - t0
    val_rmse = val_metrics["rmse"]

    print(
        f"Epoch {epoch:02d} | train MSE: {train_mse:.4f} | "
        f"val RMSE: {val_rmse:.4f} | val MAE: {val_metrics['mae']:.4f} | val MAPE: {val_metrics['mape']:.4f} | "
        f"time: {dt:.1f}s"
    )

    if val_rmse < best_val - 1e-4:
        best_val = val_rmse
        pat = 0
        torch.save(model.state_dict(), BEST_PATH)
        print(f"  ✔ saved best model to {BEST_PATH} (val RMSE={best_val:.4f})")
    else:
        pat += 1
        print(f"  no improvement (patience {pat}/{PATIENCE})")
        if pat >= PATIENCE:
            print("Early stopping.")
            break

Epoch 01 | train MSE: 4.8679 | val RMSE: 1.9260 | val MAE: 0.8224 | val MAPE: 0.3387 | time: 41.5s
  ✔ saved best model to /Users/zoltanjelovich/Documents/ISEG/MFW/data/modeling/station/cnn_sequences_v2/cnn_lstm_best_T24.pt (val RMSE=1.9260)
Epoch 02 | train MSE: 3.1213 | val RMSE: 1.6476 | val MAE: 0.7274 | val MAPE: 0.3326 | time: 43.6s
  ✔ saved best model to /Users/zoltanjelovich/Documents/ISEG/MFW/data/modeling/station/cnn_sequences_v2/cnn_lstm_best_T24.pt (val RMSE=1.6476)
Epoch 03 | train MSE: 2.5728 | val RMSE: 1.5625 | val MAE: 0.7052 | val MAPE: 0.3376 | time: 46.1s
  ✔ saved best model to /Users/zoltanjelovich/Documents/ISEG/MFW/data/modeling/station/cnn_sequences_v2/cnn_lstm_best_T24.pt (val RMSE=1.5625)
Epoch 04 | train MSE: 2.3871 | val RMSE: 1.5532 | val MAE: 0.7045 | val MAPE: 0.3404 | time: 44.8s
  ✔ saved best model to /Users/zoltanjelovich/Documents/ISEG/MFW/data/modeling/station/cnn_sequences_v2/cnn_lstm_best_T24.pt (val RMSE=1.5532)
Epoch 05 | train MSE: 2.2114 | v

In [79]:
# Load best model
model.load_state_dict(torch.load(BEST_PATH, map_location=DEVICE))
model.to(DEVICE)

# Mean/std tensors for broadcasting
mean_t = torch.from_numpy(mean).to(torch.float32).view(1, 1, -1).to(DEVICE)
std_t  = torch.from_numpy(std).to(torch.float32).view(1, 1, -1).to(DEVICE)

# Fast evaluation over NPZ chunks (no DataLoader)
val_metrics = eval_over_chunks(
    model, SEQ_DIR, "val", T,
    mean_t, std_t,
    batch_size=2048,
    max_batches=None
)

test_metrics = eval_over_chunks(
    model, SEQ_DIR, "test", T,
    mean_t, std_t,
    batch_size=2048,
    max_batches=None
)

print("\nBEST CHECKPOINT METRICS (chunk-stream)")
print("VAL  | RMSE:", val_metrics["rmse"], "| MAE:", val_metrics["mae"], "| MAPE:", val_metrics["mape"])
print("TEST | RMSE:", test_metrics["rmse"], "| MAE:", test_metrics["mae"], "| MAPE:", test_metrics["mape"])


BEST CHECKPOINT METRICS (chunk-stream)
VAL  | RMSE: 1.5351917743682861 | MAE: 0.7008955478668213 | MAPE: 0.34363049268722534
TEST | RMSE: 1.9284042119979858 | MAE: 0.9354224801063538 | MAPE: 0.4100418984889984


In [80]:
# Macro-average station metrics on TEST (from NPZ chunks)

T = 24
SPLIT = "test"
BATCH_SIZE = 2048

# Ensure model + norm stats are loaded
model.eval()

def safe_mape_np(y_true, y_pred, eps=1e-6):
    denom = np.maximum(np.abs(y_true), eps)
    return float(np.mean(np.abs(y_true - y_pred) / denom))

# Accumulators per station
sum_sq_err = defaultdict(float)
sum_abs_err = defaultdict(float)
sum_abs_pct_err = defaultdict(float)
sum_y = defaultdict(float)
count = defaultdict(int)

chunk_files = list_chunks(SEQ_DIR, SPLIT, T)
print(f"Chunks: {len(chunk_files)} | split={SPLIT}")

with torch.no_grad():
    for fp in chunk_files:
        data = np.load(fp, allow_pickle=False)
        X = torch.from_numpy(data["X"]).to(torch.float32).to(DEVICE)
        S = torch.from_numpy(data["S"]).to(torch.int64).to(DEVICE)
        y = torch.from_numpy(data["y"]).to(torch.float32).to(DEVICE)

        # normalize
        X = (X - mean_t) / std_t
        X = torch.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)

        n = X.shape[0]
        for i in range(0, n, BATCH_SIZE):
            xb = X[i:i+BATCH_SIZE]
            sb = S[i:i+BATCH_SIZE]
            yb = y[i:i+BATCH_SIZE]

            yhat = model(xb, sb)

            # Move to numpy for cheap aggregation
            sb_np = sb.detach().cpu().numpy()
            y_np = yb.detach().cpu().numpy()
            yhat_np = yhat.detach().cpu().numpy()

            err = y_np - yhat_np
            abs_err = np.abs(err)
            abs_pct_err = abs_err / np.maximum(np.abs(y_np), 1e-6)

            for sid, e, ae, ape, yy in zip(sb_np, err, abs_err, abs_pct_err, y_np):
                sum_sq_err[int(sid)] += float(e * e)
                sum_abs_err[int(sid)] += float(ae)
                sum_abs_pct_err[int(sid)] += float(ape)
                sum_y[int(sid)] += float(yy)
                count[int(sid)] += 1

# Build per-station table
station_ids = sorted(count.keys())
per_station = []
for sid in station_ids:
    n = count[sid]
    rmse = sqrt(sum_sq_err[sid] / n)
    mae = sum_abs_err[sid] / n
    mape = sum_abs_pct_err[sid] / n
    vol = sum_y[sid]
    per_station.append((sid, n, rmse, mae, mape, vol))

# Macro averages
macro_rmse = float(np.mean([r for _,_,r,_,_,_ in per_station]))
macro_mae  = float(np.mean([m for _,_,_,m,_,_ in per_station]))
macro_mape = float(np.mean([p for _,_,_,_,p,_ in per_station]))

# Volume-weighted averages
vols = np.array([v for *_, v in per_station], dtype=float)
w = vols / np.maximum(vols.sum(), 1e-9)

w_rmse = float(np.sum(w * np.array([r for _,_,r,_,_,_ in per_station])))
w_mae  = float(np.sum(w * np.array([m for _,_,_,m,_,_ in per_station])))
w_mape = float(np.sum(w * np.array([p for _,_,_,_,p,_ in per_station])))

print("\nPER-STATION METRICS (TEST) — summary")
print("Stations evaluated:", len(per_station))
print(f"Macro RMSE: {macro_rmse:.4f} | Macro MAE: {macro_mae:.4f} | Macro MAPE: {macro_mape:.4f}")
print(f"VolW  RMSE: {w_rmse:.4f} | VolW  MAE: {w_mae:.4f} | VolW  MAPE: {w_mape:.4f}")

# Inspect worst stations by RMSE (top 10)
worst = sorted(per_station, key=lambda x: x[2], reverse=True)[:10]
print("\nWorst 10 stations by RMSE (station_idx, n, rmse, mae, mape, volume):")
for row in worst:
    print(row)

Chunks: 3 | split=test

PER-STATION METRICS (TEST) — summary
Stations evaluated: 150
Macro RMSE: 1.5267 | Macro MAE: 0.9354 | Macro MAPE: 89703.1261
VolW  RMSE: 2.8461 | VolW  MAE: 1.8544 | VolW  MAPE: 131512.6515

Worst 10 stations by RMSE (station_idx, n, rmse, mae, mape, volume):
(1714, 1000, 6.255439021916436, 4.234659579992294, 103978.18137231923, 9875.0)
(1510, 1000, 5.45482706787819, 3.168477591127157, 103789.68942926753, 5263.0)
(1770, 1000, 4.87198933382591, 3.4219066592901943, 53292.52412310849, 8016.0)
(1751, 1000, 4.773034124895573, 3.3179468105882406, 23641.816429834787, 5713.0)
(1889, 1000, 4.548125371589033, 2.8835475091338156, 249490.4265081759, 4208.0)
(1994, 1000, 4.30056075186176, 2.516526435859501, 153311.42828978968, 4891.0)
(2114, 1000, 3.7248775045255487, 2.1847705082669853, 231406.28333466296, 3400.0)
(1914, 1000, 3.711627642937558, 1.8489375579059124, 182562.04636686284, 2764.0)
(2049, 1000, 3.688380854087593, 2.3307803369015456, 110070.29661660783, 4159.0)
(20