# Intro to Forecasting (Practical Template)

This notebook is designed for monthly spend / execution / performer data.

It supports:
- Wide month columns (e.g., `23-Oct`, `23-Nov`, …)
- Long format time series (`group`, `date`, `value`)

## Quick start
1. Set `DATA_PATH` to your Excel/CSV
2. Set `GROUP_COL` (e.g., `Performer`)
3. Run all cells


In [None]:
# If running in a fresh environment:
# !pip install pandas numpy matplotlib openpyxl statsmodels

import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

# Optional model library
try:
    from statsmodels.tsa.holtwinters import ExponentialSmoothing
    HAS_STATSMODELS = True
except Exception:
    HAS_STATSMODELS = False

pd.set_option("display.max_columns", 200)


In [None]:
# ---- USER SETTINGS ----
DATA_PATH = "YOUR_FILE.xlsx"   # or .csv
SHEET_NAME = 0                 # change if needed
GROUP_COL = "Performer"        # e.g., "Vendor", "Performer"
DATE_COL = "Date"              # if your data is already long format
VALUE_COL = None               # set if long format; otherwise leave None for wide-month parsing

# For wide-month columns like "23-Oct"
MONTH_COL_PATTERN = r"^\d{2}-[A-Za-z]{3}$"

# Forecast settings
HORIZON = 6             # months ahead
SEASONAL_PERIOD = 12    # monthly seasonality
INITIAL_TRAIN_MONTHS = 18
BACKTEST_STEP = 1


In [None]:
def load_any_table(path: str, sheet_name=0) -> pd.DataFrame:
    if path.lower().endswith(".csv"):
        return pd.read_csv(path)
    if path.lower().endswith((".xlsx", ".xls")):
        return pd.read_excel(path, sheet_name=sheet_name)
    raise ValueError(f"Unsupported file type: {path}")

def parse_month_col(col: str) -> pd.Timestamp:
    # "23-Oct" -> 2023-10-01
    # assumes 20xx (works through 2099)
    return pd.to_datetime(col, format="%y-%b")

def wide_months_to_long(df: pd.DataFrame, group_col: str, month_col_pattern: str) -> pd.DataFrame:
    month_cols = [c for c in df.columns if re.match(month_col_pattern, str(c))]
    if not month_cols:
        raise ValueError("No month columns matched MONTH_COL_PATTERN. Update MONTH_COL_PATTERN.")
    keep_cols = [c for c in df.columns if c not in month_cols]
    # If multiple rows per group, we'll aggregate later
    long_df = df.melt(id_vars=keep_cols, value_vars=month_cols, var_name="month_str", value_name="value")
    long_df["date"] = long_df["month_str"].astype(str).apply(parse_month_col)
    long_df["group"] = long_df[group_col].astype(str)
    long_df["value"] = pd.to_numeric(long_df["value"], errors="coerce")
    return long_df[["group", "date", "value"]].dropna(subset=["date"])

def long_format(df: pd.DataFrame, group_col: str, date_col: str, value_col: str) -> pd.DataFrame:
    out = df[[group_col, date_col, value_col]].copy()
    out.columns = ["group", "date", "value"]
    out["group"] = out["group"].astype(str)
    out["date"] = pd.to_datetime(out["date"])
    out["value"] = pd.to_numeric(out["value"], errors="coerce")
    return out.dropna(subset=["date"])


In [None]:
raw = load_any_table(DATA_PATH, sheet_name=SHEET_NAME)
raw.head()


In [None]:
if VALUE_COL is not None:
    ts_long = long_format(raw, GROUP_COL, DATE_COL, VALUE_COL)
else:
    ts_long = wide_months_to_long(raw, GROUP_COL, MONTH_COL_PATTERN)

# Aggregate if multiple rows per group-date
ts = (ts_long
      .groupby(["group", "date"], as_index=False)["value"]
      .sum())
ts = ts.sort_values(["group", "date"])
ts.head()


In [None]:
# Quick sanity plot for one group
example_group = ts["group"].iloc[0]
g = ts[ts["group"] == example_group].set_index("date")["value"]

plt.figure()
g.plot()
plt.title(f"Monthly series — {example_group}")
plt.xlabel("Date")
plt.ylabel("Value")
plt.show()


In [None]:
# ---- Forecast helpers ----

def naive_forecast(train: pd.Series, horizon: int) -> np.ndarray:
    last = train.dropna().iloc[-1] if train.dropna().size else np.nan
    return np.full(horizon, last, dtype=float)

def seasonal_naive_forecast(train: pd.Series, horizon: int, seasonal_period: int = 12) -> np.ndarray:
    train = train.dropna()
    if train.size < seasonal_period:
        return naive_forecast(train, horizon)
    out = []
    for i in range(horizon):
        out.append(train.iloc[-seasonal_period + (i % seasonal_period)])
    return np.array(out, dtype=float)

def moving_average_forecast(train: pd.Series, horizon: int, window: int = 3) -> np.ndarray:
    train = train.dropna()
    if train.size == 0:
        return np.full(horizon, np.nan, dtype=float)
    w = min(window, train.size)
    m = train.iloc[-w:].mean()
    return np.full(horizon, m, dtype=float)

def smape(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    denom = (np.abs(y_true) + np.abs(y_pred))
    denom = np.where(denom == 0, 1.0, denom)
    return float(np.mean(2.0 * np.abs(y_pred - y_true) / denom))

def mae(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return float(np.mean(np.abs(y_true - y_pred)))

def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))

@dataclass
class BacktestResult:
    group: str
    model: str
    mae: float
    rmse: float
    smape: float
    n_folds: int

def rolling_backtest(series: pd.Series,
                     horizon: int,
                     initial_train_months: int,
                     step: int,
                     model_fn,
                     model_name: str,
                     seasonal_period: int = 12) -> Tuple[BacktestResult, pd.DataFrame]:
    # series indexed by date, monthly frequency preferred
    s = series.dropna().copy()
    s = s.asfreq("MS")  # monthly start; may introduce NaNs if gaps exist
    # simple gap fill strategy: keep NaNs (models will drop) — adjust if you prefer interpolation
    dates = s.index

    folds = []
    errors = []
    start = initial_train_months

    while start + horizon <= len(dates):
        train = s.iloc[:start]
        test = s.iloc[start:start + horizon]

        if model_name == "seasonal_naive":
            pred = seasonal_naive_forecast(train, horizon, seasonal_period=seasonal_period)
        else:
            pred = model_fn(train, horizon)

        y_true = test.values.astype(float)
        y_pred = np.asarray(pred, dtype=float)

        # Align lengths
        if len(y_pred) != len(y_true):
            y_pred = y_pred[:len(y_true)]

        folds.append({
            "train_end": dates[start - 1],
            "test_start": dates[start],
            "test_end": dates[start + horizon - 1],
            "y_true": y_true,
            "y_pred": y_pred
        })

        # compute fold error (ignoring NaNs)
        mask = ~np.isnan(y_true) & ~np.isnan(y_pred)
        if mask.any():
            errors.append({
                "mae": mae(y_true[mask], y_pred[mask]),
                "rmse": rmse(y_true[mask], y_pred[mask]),
                "smape": smape(y_true[mask], y_pred[mask]),
            })

        start += step

    if not errors:
        bt = BacktestResult(group="(unknown)", model=model_name, mae=np.nan, rmse=np.nan, smape=np.nan, n_folds=0)
        return bt, pd.DataFrame()

    err_df = pd.DataFrame(errors)
    bt = BacktestResult(group="(unknown)", model=model_name,
                        mae=float(err_df["mae"].mean()),
                        rmse=float(err_df["rmse"].mean()),
                        smape=float(err_df["smape"].mean()),
                        n_folds=len(err_df))
    # Detailed fold table
    rows = []
    for f in folds:
        for i, d in enumerate(pd.date_range(f["test_start"], periods=len(f["y_true"]), freq="MS")):
            rows.append({
                "train_end": f["train_end"],
                "date": d,
                "y_true": f["y_true"][i],
                "y_pred": f["y_pred"][i],
            })
    detail = pd.DataFrame(rows)
    return bt, detail


In [None]:
# Optional: ETS model forecast
def ets_forecast(train: pd.Series, horizon: int, seasonal_period: int = 12) -> np.ndarray:
    train = train.dropna().asfreq("MS")
    # Simple ETS choice: additive trend + additive seasonality if enough data
    use_seasonal = train.dropna().shape[0] >= 2 * seasonal_period
    model = ExponentialSmoothing(
        train,
        trend="add",
        seasonal="add" if use_seasonal else None,
        seasonal_periods=seasonal_period if use_seasonal else None
    )
    fit = model.fit(optimized=True)
    fc = fit.forecast(horizon)
    return fc.values.astype(float)


In [None]:
# ---- Run backtests for a handful of groups (or all) ----

groups = ts["group"].unique().tolist()
# For speed during development, you can slice: groups = groups[:10]

results = []
details = []

for grp in groups:
    s = ts[ts["group"] == grp].set_index("date")["value"].sort_index()

    # Baselines
    r_naive, d_naive = rolling_backtest(
        s, HORIZON, INITIAL_TRAIN_MONTHS, BACKTEST_STEP,
        model_fn=naive_forecast, model_name="naive", seasonal_period=SEASONAL_PERIOD
    )
    r_naive.group = grp
    results.append(r_naive)
    d_naive["group"] = grp
    d_naive["model"] = "naive"
    details.append(d_naive)

    r_snaive, d_snaive = rolling_backtest(
        s, HORIZON, INITIAL_TRAIN_MONTHS, BACKTEST_STEP,
        model_fn=naive_forecast, model_name="seasonal_naive", seasonal_period=SEASONAL_PERIOD
    )
    r_snaive.group = grp
    results.append(r_snaive)
    d_snaive["group"] = grp
    d_snaive["model"] = "seasonal_naive"
    details.append(d_snaive)

    r_ma, d_ma = rolling_backtest(
        s, HORIZON, INITIAL_TRAIN_MONTHS, BACKTEST_STEP,
        model_fn=lambda tr, h: moving_average_forecast(tr, h, window=3), model_name="moving_avg_3",
        seasonal_period=SEASONAL_PERIOD
    )
    r_ma.group = grp
    results.append(r_ma)
    d_ma["group"] = grp
    d_ma["model"] = "moving_avg_3"
    details.append(d_ma)

    # ETS (optional)
    if HAS_STATSMODELS:
        r_ets, d_ets = rolling_backtest(
            s, HORIZON, INITIAL_TRAIN_MONTHS, BACKTEST_STEP,
            model_fn=lambda tr, h: ets_forecast(tr, h, seasonal_period=SEASONAL_PERIOD),
            model_name="ets",
            seasonal_period=SEASONAL_PERIOD
        )
        r_ets.group = grp
        results.append(r_ets)
        d_ets["group"] = grp
        d_ets["model"] = "ets"
        details.append(d_ets)

score = pd.DataFrame([r.__dict__ for r in results])
score.sort_values(["group", "smape"]).head(20)


In [None]:
# Winner per group (lower sMAPE)
winners = (score.sort_values(["group", "smape"])
                .groupby("group", as_index=False)
                .first())
winners.head()


In [None]:
# Pick a group and visualize backtest predictions
detail_df = pd.concat([d for d in details if len(d)], ignore_index=True)

grp = winners["group"].iloc[0]
grp_best = winners[winners["group"] == grp]["model"].iloc[0]

df_plot = detail_df[(detail_df["group"] == grp) & (detail_df["model"] == grp_best)].copy()
df_plot = df_plot.sort_values("date")

plt.figure()
plt.plot(df_plot["date"], df_plot["y_true"], label="actual")
plt.plot(df_plot["date"], df_plot["y_pred"], label=f"pred ({grp_best})")
plt.title(f"Backtest — {grp} — {grp_best}")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.show()


In [None]:
# ---- Fit final model and forecast forward + scenarios ----

def fit_and_forecast(series: pd.Series, model_name: str, horizon: int) -> np.ndarray:
    if model_name == "naive":
        return naive_forecast(series, horizon)
    if model_name == "seasonal_naive":
        return seasonal_naive_forecast(series, horizon, seasonal_period=SEASONAL_PERIOD)
    if model_name == "moving_avg_3":
        return moving_average_forecast(series, horizon, window=3)
    if model_name == "ets":
        if not HAS_STATSMODELS:
            return seasonal_naive_forecast(series, horizon, seasonal_period=SEASONAL_PERIOD)
        return ets_forecast(series, horizon, seasonal_period=SEASONAL_PERIOD)
    # fallback
    return seasonal_naive_forecast(series, horizon, seasonal_period=SEASONAL_PERIOD)

def apply_scenario(fc: np.ndarray, uplift: float = 0.0, step_change: float = 0.0) -> np.ndarray:
    # uplift is % (e.g., +0.10), step_change is absolute
    return (fc * (1.0 + uplift)) + step_change

grp = winners["group"].iloc[0]
best_model = winners[winners["group"] == grp]["model"].iloc[0]

s = ts[ts["group"] == grp].set_index("date")["value"].sort_index().asfreq("MS")
fc = fit_and_forecast(s, best_model, HORIZON)

future_dates = pd.date_range(s.index.max() + pd.offsets.MonthBegin(1), periods=HORIZON, freq="MS")
fc_df = pd.DataFrame({
    "date": future_dates,
    "nominal": fc,
    "optimistic": apply_scenario(fc, uplift=-0.05),
    "pessimistic": apply_scenario(fc, uplift=+0.10),
})

plt.figure()
plt.plot(s.index, s.values, label="actual")
plt.plot(fc_df["date"], fc_df["nominal"], label="nominal")
plt.plot(fc_df["date"], fc_df["optimistic"], label="optimistic")
plt.plot(fc_df["date"], fc_df["pessimistic"], label="pessimistic")
plt.title(f"Forecast + scenarios — {grp} — {best_model}")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.show()

fc_df
