In [18]:
from pathlib import Path
import json
import shutil
import pickle
import numpy as np
import pandas as pd
from prophet import Prophet
from pandas.tseries.holiday import USFederalHolidayCalendar
from prophet.serialize import model_from_json

# ================== INPUTS ==================
HOURLY_PATH = Path("cluster_hourly.parquet")

SARIMAX_DIR = Path("sarimax/sarimax_best")
PROPHET_DIR = Path("prophet_folder/prophet_best")

CLUSTERS = [0, 8]

# We evaluate day-ahead forecasts for these months
VAL_MONTH = 11           # November (use Nov as validation)
TEST_MONTHS = [12]       # December (test)

HARD_END = pd.Timestamp("2018-12-31 23:00:00")

# Regressors used in training scripts
LAGS = [24, 168]  # hours
USE_US_HOLIDAYS_SARIMAX = True

# Prophet used weekend/weekday conditional seasonality + lag regressors
USE_LAGS_PROPHET = True

# ================== OUTPUTS ==================
PRED_OUT_DIR = Path("preds_ts_daily")
ART_SARIMAX_DIR = Path("artifacts_sarimax_daily")
ART_PROPHET_DIR = Path("artifacts_prophet_daily")
PRED_OUT_DIR.mkdir(parents=True, exist_ok=True)
ART_SARIMAX_DIR.mkdir(parents=True, exist_ok=True)
ART_PROPHET_DIR.mkdir(parents=True, exist_ok=True)

print("[INFO] Outputs ->", PRED_OUT_DIR.resolve())


[INFO] Outputs -> /mnt/data/Desktop/Masters/Business_Analytics/Intro_to_BA/preds_ts_daily


In [19]:
df = pd.read_parquet(HOURLY_PATH).copy()
print("[INFO] Loaded:", df.shape)
print("[INFO] Columns:", list(df.columns))

# cluster
if "cluster" not in df.columns:
    if "cluster_id" in df.columns:
        df = df.rename(columns={"cluster_id":"cluster"})
    elif "gmm20_cluster" in df.columns:
        df = df.rename(columns={"gmm20_cluster":"cluster"})
    else:
        raise ValueError("Need cluster column: 'cluster' or 'cluster_id' or 'gmm20_cluster'.")

# datetime_hour
if "datetime_hour" not in df.columns:
    if "timestamp" in df.columns:
        df = df.rename(columns={"timestamp":"datetime_hour"})
    elif "datetime" in df.columns:
        df = df.rename(columns={"datetime":"datetime_hour"})
    elif {"date","hour"}.issubset(df.columns):
        df["datetime_hour"] = pd.to_datetime(df["date"]).dt.normalize() + pd.to_timedelta(df["hour"], unit="h")
    else:
        raise ValueError("Need time column: 'datetime_hour' or 'timestamp'/'datetime' or ('date','hour').")

df["datetime_hour"] = pd.to_datetime(df["datetime_hour"], errors="coerce")
df = df.dropna(subset=["datetime_hour"]).copy()

# departures/arrivals
if "departures" not in df.columns:
    if "pickups" in df.columns:
        df = df.rename(columns={"pickups":"departures"})
    else:
        raise ValueError("Need departures column: 'departures' or 'pickups'.")

if "arrivals" not in df.columns:
    if "dropoffs" in df.columns:
        df = df.rename(columns={"dropoffs":"arrivals"})
    else:
        raise ValueError("Need arrivals column: 'arrivals' or 'dropoffs'.")

# filter
df = df[df["cluster"].isin(CLUSTERS)].copy()
df = df[df["datetime_hour"] <= HARD_END].copy()
df = df.sort_values(["cluster","datetime_hour"]).reset_index(drop=True)

print("[INFO] After filtering:", df.shape)
df.head()


[INFO] Loaded: (183960, 10)
[INFO] Columns: ['cluster', 'datetime_hour', 'departures', 'arrivals', 'was_missing_departures', 'was_missing_arrivals', 'weekday', 'hour', 'month', 'year']
[INFO] After filtering: (17520, 10)


Unnamed: 0,cluster,datetime_hour,departures,arrivals,was_missing_departures,was_missing_arrivals,weekday,hour,month,year
0,0,2018-01-01 00:00:00,3.0,1.0,False,False,0,0,1,2018
1,0,2018-01-01 01:00:00,2.0,4.0,False,False,0,1,1,2018
2,0,2018-01-01 02:00:00,3.0,2.0,False,False,0,2,1,2018
3,0,2018-01-01 03:00:00,4.0,7.0,False,False,0,3,1,2018
4,0,2018-01-01 04:00:00,0.0,1.0,False,False,0,4,1,2018


In [20]:
def make_dense_series(df_c: pd.DataFrame, col_y: str) -> pd.Series:
    tmp = df_c[["datetime_hour", col_y]].copy()
    tmp = tmp.groupby("datetime_hour", as_index=False)[col_y].sum().sort_values("datetime_hour")
    idx = pd.date_range(tmp["datetime_hour"].min(), tmp["datetime_hour"].max(), freq="h")
    out = pd.DataFrame({"ds": idx}).merge(tmp.rename(columns={"datetime_hour":"ds"}), on="ds", how="left")
    out[col_y] = out[col_y].fillna(0.0).astype(float)
    s = out.set_index("ds")[col_y].asfreq("H")
    if s.isna().any():
        raise ValueError("Dense series still has NaNs — unexpected.")
    return s

series = {}
for c in CLUSTERS:
    df_c = df[df["cluster"] == c].copy()
    series[c] = {
        "departures": make_dense_series(df_c, "departures"),
        "arrivals": make_dense_series(df_c, "arrivals"),
    }
    print(f"[INFO] cluster {c}: departures hours={len(series[c]['departures'])}, arrivals hours={len(series[c]['arrivals'])}")


[INFO] cluster 0: departures hours=8760, arrivals hours=8760
[INFO] cluster 8: departures hours=8760, arrivals hours=8760


  s = out.set_index("ds")[col_y].asfreq("H")
  s = out.set_index("ds")[col_y].asfreq("H")
  s = out.set_index("ds")[col_y].asfreq("H")
  s = out.set_index("ds")[col_y].asfreq("H")


In [21]:
# Cell 4 (LIGHTWEIGHT) — fast SARIMAX + stable Prophet (no collinear regressors)

import warnings
import pickle
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tools.sm_exceptions import ConvergenceWarning
from prophet import Prophet
from prophet.serialize import model_to_json

def us_holiday_indicator(idx: pd.DatetimeIndex) -> pd.Series:
    cal = USFederalHolidayCalendar()
    hols = cal.holidays(start=idx.min().floor("D"), end=idx.max().ceil("D"))
    return idx.floor("D").isin(hols).astype(int)

def prophet_conditions(df_future: pd.DataFrame) -> pd.DataFrame:
    dow = df_future["ds"].dt.dayofweek
    df_future["is_weekend"] = (dow >= 5)
    df_future["is_weekday"] = (dow < 5)
    return df_future

def build_lag_regressors_full(y: pd.Series, lags=LAGS) -> pd.DataFrame:
    exog = pd.DataFrame(index=y.index)
    for L in lags:
        exog[f"y_lag_{L}"] = y.shift(L).astype(float)
    return exog

def build_lag_regressors_from_history(future_idx: pd.DatetimeIndex, history: pd.Series, lags=LAGS) -> pd.DataFrame:
    exog = pd.DataFrame(index=future_idx)
    for L in lags:
        exog[f"y_lag_{L}"] = [float(history.get(t - pd.Timedelta(hours=L), np.nan)) for t in future_idx]
    return exog

ART_SARIMAX_DIR.mkdir(parents=True, exist_ok=True)
ART_PROPHET_DIR.mkdir(parents=True, exist_ok=True)

sarimax_models = {}
prophet_models = {}
sarimax_paths = {}
prophet_paths = {}
sarimax_scales = {}   # store scaling per cluster (important)
sarimax_used_spec = {}

for c in CLUSTERS:
    print(f"[INFO] Training cluster {c} SARIMAX (departures) + Prophet (arrivals)...")

    # ---------------- SARIMAX (departures) ----------------
    y_dep = series[c]["departures"].astype(float)

    exog_dep = build_lag_regressors_full(y_dep, lags=LAGS)
    if USE_US_HOLIDAYS_SARIMAX:
        exog_dep["is_us_holiday"] = us_holiday_indicator(y_dep.index).astype(float)

    exog_dep_train = exog_dep.dropna()
    y_dep_train = y_dep.loc[exog_dep_train.index]

    if len(y_dep_train) < 10:
        raise ValueError(f"Not enough data to train SARIMAX for cluster {c}")

    # simple scaling to help convergence + conditioning
    scale = float(np.nanmax(y_dep_train.values))
    if not np.isfinite(scale) or scale <= 0:
        scale = 1.0
    sarimax_scales[c] = scale

    y_s = y_dep_train / scale
    X_s = exog_dep_train / scale
    if USE_US_HOLIDAYS_SARIMAX:
        X_s["is_us_holiday"] = exog_dep_train["is_us_holiday"]  # don't scale binary

    primary_spec = dict(order=(1, 1, 2), seasonal_order=(1, 0, 1, 24))
    fallback_spec = dict(order=(0, 1, 1), seasonal_order=(1, 0, 1, 24))  # cheaper + often stable

    def _fit_one(spec):
        model = SARIMAX(
            y_s, exog=X_s,
            enforce_stationarity=True,     # helps convergence
            enforce_invertibility=True,    # helps convergence
            **spec
        )
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=ConvergenceWarning)
            res = model.fit(disp=False, method="lbfgs", maxiter=80)
        return res

    try:
        sar_res = _fit_one(primary_spec)
        converged = bool(sar_res.mle_retvals.get("converged", False))
        if not converged:
            # one cheap fallback only
            sar_res2 = _fit_one(fallback_spec)
            if bool(sar_res2.mle_retvals.get("converged", False)):
                sar_res = sar_res2
                sarimax_used_spec[c] = fallback_spec
            else:
                sarimax_used_spec[c] = primary_spec
        else:
            sarimax_used_spec[c] = primary_spec
    except Exception:
        # if primary fails hard, try fallback once
        sar_res = _fit_one(fallback_spec)
        sarimax_used_spec[c] = fallback_spec

    sarimax_models[c] = sar_res

    sar_path = ART_SARIMAX_DIR / f"sarimax_departures_model_{c}.pkl"
    with open(sar_path, "wb") as f:
        # store both model + scale (so forecasts can be unscaled later)
        pickle.dump({"res": sar_res, "scale": sarimax_scales[c], "spec": sarimax_used_spec[c]}, f)
    sarimax_paths[c] = sar_path

    conv_flag = bool(sar_res.mle_retvals.get("converged", False))
    print(f"[INFO] cluster {c}: SARIMAX converged={conv_flag}, spec={sarimax_used_spec[c]}, scale={sarimax_scales[c]:.3g}")

    # ---------------- Prophet (arrivals) ----------------
    y_arr = series[c]["arrivals"].astype(float)
    df_prop = pd.DataFrame({"ds": y_arr.index, "y": y_arr.values})
    df_prop = prophet_conditions(df_prop)

    if USE_LAGS_PROPHET:
        for L in LAGS:
            df_prop[f"y_lag_{L}"] = y_arr.shift(L).values

    df_prop_train = df_prop.dropna()
    if len(df_prop_train) < 10:
        raise ValueError(f"Not enough data to train Prophet for cluster {c}")

    m = Prophet(
        changepoint_prior_scale=0.2,
        seasonality_prior_scale=10,
        seasonality_mode="additive",
        weekly_seasonality=False,
        daily_seasonality=False,
    )

    # FIX: conditional seasonalities instead of collinear weekday/weekend regressors
    m.add_seasonality(name="daily_weekday", period=1, fourier_order=15, condition_name="is_weekday")
    m.add_seasonality(name="daily_weekend", period=1, fourier_order=15, condition_name="is_weekend")
    m.add_seasonality(name="weekly", period=7, fourier_order=9)

    if USE_LAGS_PROPHET:
        for L in LAGS:
            m.add_regressor(f"y_lag_{L}", prior_scale=10)

    m.fit(df_prop_train)
    prophet_models[c] = m

    prop_path = ART_PROPHET_DIR / f"prophet_model_{c}_prophet_additive_cps0.2_sps10_condseas.json"
    prop_path.write_text(model_to_json(m))
    prophet_paths[c] = prop_path

print("[OK] Trained and saved SARIMAX + Prophet models for all clusters.")


[INFO] Training cluster 0 SARIMAX (departures) + Prophet (arrivals)...
[INFO] cluster 0: SARIMAX converged=True, spec={'order': (1, 1, 2), 'seasonal_order': (1, 0, 1, 24)}, scale=130


23:20:56 - cmdstanpy - INFO - Chain [1] start processing
23:21:01 - cmdstanpy - INFO - Chain [1] done processing


[INFO] Training cluster 8 SARIMAX (departures) + Prophet (arrivals)...
[INFO] cluster 8: SARIMAX converged=True, spec={'order': (0, 1, 1), 'seasonal_order': (1, 0, 1, 24)}, scale=1.23e+03


23:26:17 - cmdstanpy - INFO - Chain [1] start processing
23:26:24 - cmdstanpy - INFO - Chain [1] done processing


[OK] Trained and saved SARIMAX + Prophet models for all clusters.


In [22]:
def us_holiday_indicator(idx: pd.DatetimeIndex) -> pd.Series:
    cal = USFederalHolidayCalendar()
    hols = cal.holidays(start=idx.min().floor("D"), end=idx.max().ceil("D"))
    return idx.floor("D").isin(hols).astype(int)

def prophet_conditions(df_future: pd.DataFrame) -> pd.DataFrame:
    dow = df_future["ds"].dt.dayofweek
    df_future["is_weekend"] = (dow >= 5)
    df_future["is_weekday"] = (dow < 5)
    return df_future

def build_lag_regressors_from_history(future_idx: pd.DatetimeIndex, history: pd.Series, lags=LAGS) -> pd.DataFrame:
    """
    For each future timestamp t, create y_lag_L = history[t-L].
    This is leak-free if history only contains values <= cutoff-1h and t is in next-day horizon.
    """
    exog = pd.DataFrame(index=future_idx)
    for L in lags:
        exog[f"y_lag_{L}"] = [float(history.get(t - pd.Timedelta(hours=L), np.nan)) for t in future_idx]
    return exog


In [23]:
def list_cutoffs_for_months(year=2018, months=[10,11,12]) -> list[pd.Timestamp]:
    cutoffs = []
    for m in months:
        start = pd.Timestamp(year=year, month=m, day=1, hour=0)
        if m == 12:
            end = pd.Timestamp(year=2019, month=1, day=1, hour=0)
        else:
            end = pd.Timestamp(year=year, month=m+1, day=1, hour=0)
        # these are cutoffs at midnight for each day
        cutoffs.extend(list(pd.date_range(start, end - pd.Timedelta(days=1), freq="D")))
    return cutoffs

val_cutoffs = list_cutoffs_for_months(2018, [VAL_MONTH])
test_cutoffs = list_cutoffs_for_months(2018, TEST_MONTHS)

print("[INFO] #val days:", len(val_cutoffs), "first:", val_cutoffs[0], "last:", val_cutoffs[-1])
print("[INFO] #test days:", len(test_cutoffs), "first:", test_cutoffs[0], "last:", test_cutoffs[-1])


[INFO] #val days: 30 first: 2018-11-01 00:00:00 last: 2018-11-30 00:00:00
[INFO] #test days: 31 first: 2018-12-01 00:00:00 last: 2018-12-31 00:00:00


In [24]:
def sarimax_daily_forecast(res_or_bundle, history_y: pd.Series, cutoff: pd.Timestamp) -> pd.Series:
    """
    Forecast next 24 hours starting at cutoff using SARIMAX results.
    Supports either a plain SARIMAXResults OR the dict bundle saved in Cell 4.
    """
    horizon = pd.date_range(cutoff, cutoff + pd.Timedelta(hours=23), freq="h")

    # handle bundled {res, scale}
    if isinstance(res_or_bundle, dict):
        res = res_or_bundle["res"]
        scale = float(res_or_bundle.get("scale", 1.0))
    else:
        res = res_or_bundle
        scale = float(sarimax_scales.get(getattr(res, "_cluster_id", None), 1.0)) if "sarimax_scales" in globals() else 1.0

    exog = build_lag_regressors_from_history(horizon, history_y, lags=LAGS)
    # match scaling used in training
    exog = exog / scale

    if USE_US_HOLIDAYS_SARIMAX:
        exog["is_us_holiday"] = us_holiday_indicator(horizon)

    fc = res.get_forecast(steps=len(horizon), exog=exog)
    pred = fc.predicted_mean * scale
    pred.index = horizon
    return pred


In [25]:
def prophet_daily_forecast(model, history_y: pd.Series, cutoff: pd.Timestamp) -> pd.Series:
    horizon = pd.date_range(cutoff, cutoff + pd.Timedelta(hours=23), freq="h")
    future = pd.DataFrame({"ds": horizon})
    future = prophet_conditions(future)

    if USE_LAGS_PROPHET:
        exog = build_lag_regressors_from_history(horizon, history_y, lags=LAGS)
        for col in exog.columns:
            future[col] = exog[col].values

        # If any lag is missing, cannot forecast that day safely
        if future[[f"y_lag_{L}" for L in LAGS]].isna().any().any():
            # return NaNs to be handled upstream
            return pd.Series(index=horizon, dtype=float)

    fcst = model.predict(future)
    yhat = pd.Series(fcst["yhat"].values, index=horizon)
    return yhat


In [26]:
rows = []

def add_one_split(cluster_id: int, split_name: str, cutoffs: list[pd.Timestamp]):
    y_dep = series[cluster_id]["departures"]
    y_arr = series[cluster_id]["arrivals"]

    sar_res = sarimax_models[cluster_id]
    pr_model = prophet_models[cluster_id]

    for cutoff in cutoffs:
        # history available up to cutoff-1h
        hist_end = cutoff - pd.Timedelta(hours=1)
        y_dep_hist = y_dep.loc[:hist_end]
        y_arr_hist = y_arr.loc[:hist_end]

        # horizon = next 24h of THIS day (starting at cutoff)
        horizon = pd.date_range(cutoff, cutoff + pd.Timedelta(hours=23), freq="h")

        # ground truth (allowed for evaluation)
        y_true_pickups = y_dep.loc[horizon].values
        y_true_dropoffs = y_arr.loc[horizon].values

        # predictions (leak-free)
        y_pred_pick = sarimax_daily_forecast(sar_res, y_dep_hist, cutoff).values
        y_pred_drop = prophet_daily_forecast(pr_model, y_arr_hist, cutoff).values

        for ts, hr, yt_p, yp_p, yt_d, yp_d in zip(horizon, horizon.hour, y_true_pickups, y_pred_pick, y_true_dropoffs, y_pred_drop):
            rows.append({
                "date": ts.normalize(),
                "hour": int(hr),
                "cluster_id": int(cluster_id),
                "split": split_name,
                "y_true_pickups": float(yt_p),
                "y_pred_sarimax_pickups": float(yp_p) if np.isfinite(yp_p) else np.nan,
                "y_true_dropoffs": float(yt_d),
                "y_pred_prophet_dropoffs": float(yp_d) if np.isfinite(yp_d) else np.nan,
            })

for c in CLUSTERS:
    add_one_split(c, "val", val_cutoffs)
    add_one_split(c, "test", test_cutoffs)

out = pd.DataFrame(rows)

# clip negatives (demand can't be negative)
out["y_pred_sarimax_pickups"] = out["y_pred_sarimax_pickups"].clip(lower=0)
out["y_pred_prophet_dropoffs"] = out["y_pred_prophet_dropoffs"].clip(lower=0)

# squared errors (for RMSE)
out["se_pickups_sarimax"] = (out["y_true_pickups"] - out["y_pred_sarimax_pickups"])**2
out["se_dropoffs_prophet"] = (out["y_true_dropoffs"] - out["y_pred_prophet_dropoffs"])**2

print("[INFO] out shape:", out.shape)
out.head()


[INFO] out shape: (2928, 10)


Unnamed: 0,date,hour,cluster_id,split,y_true_pickups,y_pred_sarimax_pickups,y_true_dropoffs,y_pred_prophet_dropoffs,se_pickups_sarimax,se_dropoffs_prophet
0,2018-11-01,0,0,val,10.0,5.393151,20.0,14.400663,21.223059,31.352575
1,2018-11-01,1,0,val,6.0,2.931564,16.0,7.721192,9.415297,68.538664
2,2018-11-01,2,0,val,3.0,0.964974,9.0,2.774456,4.14133,38.757403
3,2018-11-01,3,0,val,1.0,0.46989,6.0,1.944318,0.281017,16.448559
4,2018-11-01,4,0,val,1.0,0.954152,2.0,0.840037,0.002102,1.345513


In [27]:
for c in CLUSTERS:
    dfc = out[out["cluster_id"] == c].copy().sort_values(["date","hour"]).reset_index(drop=True)

    cols = [
        "date","hour","cluster_id","split",
        "y_true_pickups","y_pred_sarimax_pickups","se_pickups_sarimax",
        "y_true_dropoffs","y_pred_prophet_dropoffs","se_dropoffs_prophet",
    ]
    dfc = dfc[cols]

    csv_path = PRED_OUT_DIR / f"ts_daily_cluster_{c}_preds.csv"
    pq_path  = PRED_OUT_DIR / f"ts_daily_cluster_{c}_preds.parquet"

    dfc.to_csv(csv_path, index=False)
    dfc.to_parquet(pq_path, index=False)

    print(f"[OK] Saved cluster {c}:")
    print("  ", csv_path)
    print("  ", pq_path)


[OK] Saved cluster 0:
   preds_ts_daily/ts_daily_cluster_0_preds.csv
   preds_ts_daily/ts_daily_cluster_0_preds.parquet
[OK] Saved cluster 8:
   preds_ts_daily/ts_daily_cluster_8_preds.csv
   preds_ts_daily/ts_daily_cluster_8_preds.parquet


In [28]:
def summarize(split):
    d = out[out["split"] == split]
    # RMSE: sqrt(mean(squared_error))
    return {
        "rows": len(d),
        "rmse_pickups_sarimax": float(np.sqrt(d["se_pickups_sarimax"].mean())) if len(d) > 0 else float('nan'),
        "rmse_dropoffs_prophet": float(np.sqrt(d["se_dropoffs_prophet"].mean())) if len(d) > 0 else float('nan'),
    }

print("VAL:", summarize("val"))
print("TEST:", summarize("test"))


VAL: {'rows': 1440, 'rmse_pickups_sarimax': 92.52174598656532, 'rmse_dropoffs_prophet': 85.46599823206896}
TEST: {'rows': 1488, 'rmse_pickups_sarimax': 65.82222622225883, 'rmse_dropoffs_prophet': 66.17711050189448}


In [29]:
# Cell X: write a "general weights" json for this SARIMAX+Prophet (per-cluster) setup

from pathlib import Path
import json
import numpy as np
import pandas as pd

TS_WEIGHT_DIR = Path("artifacts_ts_daily")
TS_WEIGHT_DIR.mkdir(parents=True, exist_ok=True)

def _rmse(y_true, y_pred):
    y_true = np.asarray(y_true, dtype=float)
    y_pred = np.asarray(y_pred, dtype=float)
    m = np.isfinite(y_true) & np.isfinite(y_pred)
    if m.sum() == 0:
        return float("nan")
    return float(np.sqrt(np.mean((y_true[m] - y_pred[m]) ** 2)))

# Use 'out' produced in Cell 9 (contains val/test rows & squared errors)
val_df = out[out["split"] == "val"].copy()

rmse_pickups = _rmse(val_df["y_true_pickups"], val_df["y_pred_sarimax_pickups"])
rmse_dropoffs = _rmse(val_df["y_true_dropoffs"], val_df["y_pred_prophet_dropoffs"])

# Optional: a single combined score (useful for "best model" selection if you want one number)
rmse_combined = float(np.nanmean([rmse_pickups, rmse_dropoffs]))

val_start = pd.Timestamp(year=2018, month=VAL_MONTH, day=1).strftime("%Y-%m-%d")
val_end = (pd.Timestamp(year=2018, month=VAL_MONTH, day=1) + pd.offsets.MonthBegin(1)).strftime("%Y-%m-%d")

payload = {
    "model": "TS_SARIMAX_Departures__Prophet_Arrivals",
    "clusters": list(map(int, CLUSTERS)),
    "general_rmse_val_nov_pickups_sarimax": rmse_pickups,
    "general_rmse_val_nov_dropoffs_prophet": rmse_dropoffs,
    "general_rmse_val_nov_combined": rmse_combined,

    # A "weight-like" scalar mirroring your other jsons (bigger rmse -> smaller weight)
    # (not used for training; just for comparison / selection logic)
    "general_weight_raw": float(1.0 / (rmse_combined + 1e-9)) if np.isfinite(rmse_combined) else float("nan"),

    "val_start": val_start,
    "val_end": val_end,

    # What this TS model uses
    "target_departures_model": "SARIMAX",
    "target_arrivals_model": "Prophet",
    "lags_hours": list(map(int, LAGS)),
    "use_us_holidays_sarimax": bool(USE_US_HOLIDAYS_SARIMAX),
    "prophet_conditional_daily_seasonality": True,
    "prophet_weekly_seasonality": True,

    # Where the per-cluster artifacts live
    "sarimax_dir": str(ART_SARIMAX_DIR.as_posix()),
    "prophet_dir": str(ART_PROPHET_DIR.as_posix()),

    # Exact filenames by cluster
    "dep_model_paths_by_cluster": {str(int(c)): str((ART_SARIMAX_DIR / f"sarimax_departures_model_{c}.pkl").as_posix()) for c in CLUSTERS},
    "arr_model_paths_by_cluster": {str(int(c)): str((ART_PROPHET_DIR / f"prophet_model_{c}_prophet_additive_cps0.2_sps10_condseas.json").as_posix()) for c in CLUSTERS},
}

out_path = TS_WEIGHT_DIR / "ts_sarimax_prophet_general_weight.json"
out_path.write_text(json.dumps(payload, indent=2))
print("[OK] wrote:", out_path)
print(json.dumps(payload, indent=2)[:800] + "\n...")  # preview


[OK] wrote: artifacts_ts_daily/ts_sarimax_prophet_general_weight.json
{
  "model": "TS_SARIMAX_Departures__Prophet_Arrivals",
  "clusters": [
    0,
    8
  ],
  "general_rmse_val_nov_pickups_sarimax": 92.52174598656532,
  "general_rmse_val_nov_dropoffs_prophet": 85.46599823206896,
  "general_rmse_val_nov_combined": 88.99387210931714,
  "general_weight_raw": 0.011236728735214445,
  "val_start": "2018-11-01",
  "val_end": "2018-12-01",
  "target_departures_model": "SARIMAX",
  "target_arrivals_model": "Prophet",
  "lags_hours": [
    24,
    168
  ],
  "use_us_holidays_sarimax": true,
  "prophet_conditional_daily_seasonality": true,
  "prophet_weekly_seasonality": true,
  "sarimax_dir": "artifacts_sarimax_daily",
  "prophet_dir": "artifacts_prophet_daily",
  "dep_model_paths_by_cluster": {
    "0": "artifacts_sarimax_daily/sarimax_departures_model_0.pkl",
   
...


In [30]:
# Cell X: Export per-cluster .parquet with the same "pickups/dropoffs" schema style
# Uses the 'out' dataframe produced by Cell 9 (SARIMAX pickups, Prophet dropoffs)

from pathlib import Path
import numpy as np
import pandas as pd

TS_PRED_DIR = Path("preds_ts_daily_parquet_like_other_models")
TS_PRED_DIR.mkdir(parents=True, exist_ok=True)

def export_cluster_ts(out_df: pd.DataFrame, cluster_id: int):
    d = out_df[out_df["cluster_id"] == cluster_id].copy()

    val = d[d["split"] == "val"].copy()
    test = d[d["split"] == "test"].copy()

    if len(val) == 0:
        raise ValueError(f"Cluster {cluster_id}: empty validation slice.")
    if len(test) == 0:
        print(f"[WARN] Cluster {cluster_id}: empty test slice.")

    def finalize(block: pd.DataFrame, split_name: str) -> pd.DataFrame:
        outp = pd.DataFrame({
            "date": block["date"].values,
            "hour": block["hour"].values,
            "cluster_id": block["cluster_id"].values,
            "split": split_name,

            # match your other-model schema naming
            "y_true_pickups": block["y_true_pickups"].values,
            "y_pred_ts_pickups": block["y_pred_sarimax_pickups"].values,

            "y_true_dropoffs": block["y_true_dropoffs"].values,
            "y_pred_ts_dropoffs": block["y_pred_prophet_dropoffs"].values,
        })

        # squared errors
        outp["se_pickups_ts"] = (outp["y_true_pickups"] - outp["y_pred_ts_pickups"]) ** 2
        outp["se_dropoffs_ts"] = (outp["y_true_dropoffs"] - outp["y_pred_ts_dropoffs"]) ** 2

        # mean RMSE per row (same idea as your rf/mlp schema)
        out_mse = 0.5 * (outp["se_pickups_ts"] + outp["se_dropoffs_ts"])
        outp["rmse_mean_ts"] = np.sqrt(out_mse)

        return outp

    df_val_out = finalize(val, "val")
    df_test_out = finalize(test, "test") if len(test) > 0 else pd.DataFrame(columns=df_val_out.columns)

    df_out = pd.concat([df_val_out, df_test_out], ignore_index=True)

    parquet_path = TS_PRED_DIR / f"ts_cluster_{cluster_id}_preds.parquet"
    df_out.to_parquet(parquet_path, index=False)

    # summaries like your other pipeline
    rmse_val_pickups = float(np.sqrt(df_val_out["se_pickups_ts"].mean()))
    rmse_val_dropoffs = float(np.sqrt(df_val_out["se_dropoffs_ts"].mean()))
    rmse_val_mean = float(df_val_out["rmse_mean_ts"].mean())

    return {
        "cluster_id": int(cluster_id),
        "val_rmse_pickups": rmse_val_pickups,
        "val_rmse_dropoffs": rmse_val_dropoffs,
        "val_rmse_mean": rmse_val_mean,
        "parquet_path": str(parquet_path),
    }

summaries = []
for cid in CLUSTERS:
    try:
        summaries.append(export_cluster_ts(out, cid))
        print(f"[OK] Exported TS parquet for cluster {cid}")
    except ValueError as e:
        print(f"[WARN] {e}")

df_summary = pd.DataFrame(summaries).sort_values("cluster_id").reset_index(drop=True)
df_summary


[OK] Exported TS parquet for cluster 0
[OK] Exported TS parquet for cluster 8


Unnamed: 0,cluster_id,val_rmse_pickups,val_rmse_dropoffs,val_rmse_mean,parquet_path
0,0,12.513269,10.213944,8.029918,preds_ts_daily_parquet_like_other_models/ts_cl...
1,8,130.245787,120.434833,79.69167,preds_ts_daily_parquet_like_other_models/ts_cl...
