In [1]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_error

url = "https://raw.githubusercontent.com/jensmorten/onesixtynine/main/data/pollofpolls_master.csv"
df_full = pd.read_csv(url, index_col="Mnd", parse_dates=True)
df_full = df_full.sort_index()
df_full.index = df_full.index.to_period("M").to_timestamp("M")

kolonne_map = {
    "Hoyre": "Høgre",
    "Rodt": "Raudt",
    "SP": "Sp",
}
df_full = df_full.rename(columns=kolonne_map)


In [2]:
elections = pd.DataFrame(
    {
        "Ap":      [27.4, 26.3, 28.0],
        "Høgre":   [25.0, 20.4, 23.8],
        "Frp":     [15.2, 11.6, 14.6],
        "SV":      [6.0, 7.6, 5.6],
        "Sp":      [10.3, 13.5, 5.6],
        "KrF":     [4.2, 3.8, 4.2],
        "Venstre": [4.4, 4.6, 3.7],
        "MDG":     [3.2, 3.9, 4.7],
        "Raudt":   [2.4, 4.7, 5.3],
        "Andre":   [1.9, 3.6, 4.5],
    },
    index=pd.to_datetime(
        ["2017-09-30", "2021-09-30", "2025-09-30"]
    ),
)
elections.index.name = "date"


In [3]:
def hybrid_var_ml_forecast(df, n_months, var_lags, lags_ML, tau, vol_window, min_alpha, max_alpha):
    """
    Hybrid VAR + ML with:
      - adaptive α per party
      - regime gating based on volatility ratio
      - horizon decay

    Regime strength = recent_vol / long_run_vol
    """
    model = VAR(df)
    var_res = model.fit(maxlags=var_lags, method="ols", trend="n")

    mean_var, lower_var, upper_var = var_res.forecast_interval(
        var_res.endog, steps=n_months
    )

    # VAR residuals (in-sample)
    fitted = var_res.fittedvalues
    true = df.iloc[var_res.k_ar:]
    resid = true.values - fitted.values

    # Build ML dataset
    X, y = [], []
    for i in range(lags_ML, len(df)):
        if i - var_res.k_ar < 0:
            continue
        X.append(df.iloc[i-lags_ML:i].values.flatten())
        y.append(resid[i - var_res.k_ar])

    X = np.asarray(X)
    y = np.asarray(y)

    n_parties = df.shape[1]
    ml_resid_forecast = np.zeros((n_months, n_parties))

    if len(X) < 100:
        return mean_var, lower_var, upper_var

    # Pre-compute volatility regime indicators
    ddf = df.diff()

    recent_vol = (
        ddf.iloc[-vol_window:]
        .abs()
        .mean(axis=0)
        .values
    )

    long_run_vol = (
        ddf.abs()
        .mean(axis=0)
        .values
        + 1e-8
    )

    regime_strength = recent_vol / long_run_vol  # per party

    for j in range(n_parties):
        yj = y[:, j]

        # --- ML model ---
        model_ml = LGBMRegressor(
            n_estimators=500,
            num_leaves=16,
            learning_rate=0.01,
            subsample=0.8,
            colsample_bytree=0.8,
            random_state=123,
            verbose=-1
        )
        model_ml.fit(X, yj)

        # --- adaptive α (in-sample calibration) ---
        y_hat_train = model_ml.predict(X)

        num = np.dot(yj, y_hat_train)
        den = np.dot(y_hat_train, y_hat_train) + 1e-8
        alpha_j = num / den
        alpha_j = float(np.clip(alpha_j, min_alpha, max_alpha))

        # --- regime-gated weight ---
        rs = regime_strength[j]

        if rs < 0.8:
            regime_weight = 0.0         # calm regime → VAR only
        elif rs < 1.2:
            regime_weight = (rs - 0.8) / (1.2 - 0.8)  # linear ramp [0,1]
        else:
            regime_weight = 1.0         # regime change

        # --- forecasting ---
        win = df.values[-lags_ML:].copy()

        for t in range(n_months):
            r_raw = model_ml.predict(win.reshape(1, -1))[0]

            if tau is not None:
                time_decay = np.exp(-t / tau)
            else:
                time_decay = 1.0

            r = alpha_j * regime_weight * time_decay * r_raw

            ml_resid_forecast[t, j] = r

            # IMPORTANT: no feedback of corrected forecast
            win = np.vstack([win[1:], mean_var[t]])

    forecast = mean_var + ml_resid_forecast
    lower = lower_var + ml_resid_forecast
    upper = upper_var + ml_resid_forecast

    return forecast, lower, upper

In [4]:
forecast, forecast_lower, forecast_upper = hybrid_var_ml_forecast(
    df=df_full,
    n_months=6,
    var_lags=4,
    lags_ML=12,
    tau=4,
    vol_window=6,
    min_alpha=0.0,
    max_alpha=1.0,
)



In [5]:
def evaluate_start_year(
    df_full,
    elections,
    start_year,
    var_lags=4,
    lags_ML=12,
    tau=4,
    vol_window=6,
    min_alpha=0.0,
    max_alpha=1.0,
    months_before_election=3,
):
    """
    Evaluate a given training start year based on
    average MAE over elections 2017+.
    """

    df = df_full[df_full.index >= pd.Timestamp(f"{start_year}-01-01")].copy()

    errors = []

    for election_date, row in elections.iterrows():
        # skip elections before we have data
        if election_date not in df.index:
            continue

        target_loc = df.index.get_loc(election_date)
        train_end = target_loc - months_before_election
        if train_end <= max(var_lags, lags_ML):
            continue

        train = df.iloc[:train_end]
        n_months = target_loc - train_end + 1  # up to election month

        fc, _, _ = hybrid_var_ml_forecast(
            train,
            n_months=n_months,
            var_lags=var_lags,
            lags_ML=lags_ML,
            tau=tau,
            vol_window=vol_window,
            min_alpha=min_alpha,
            max_alpha=max_alpha,
        )

        y_pred = fc[-1]
        y_true = row[df.columns].values

        err = mean_absolute_error(y_true, y_pred)
        errors.append(err)

    return np.mean(errors) if errors else np.nan


In [6]:
elections_recent = elections.loc[
    elections.index >= pd.Timestamp("2017-01-01")
]

In [7]:
elections_recent

Unnamed: 0_level_0,Ap,Høgre,Frp,SV,Sp,KrF,Venstre,MDG,Raudt,Andre
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2017-09-30,27.4,25.0,15.2,6.0,10.3,4.2,4.4,3.2,2.4,1.9
2021-09-30,26.3,20.4,11.6,7.6,13.5,3.8,4.6,3.9,4.7,3.6
2025-09-30,28.0,23.8,14.6,5.6,5.6,4.2,3.7,4.7,5.3,4.5


In [None]:
results = []

for start_year in range(2008, 2019):
    score = evaluate_start_year(
        df_full,
        elections_recent,
        start_year=start_year,
        var_lags=4,
        lags_ML=12,
        tau=4,
        vol_window=3,
        min_alpha=0.0,
        max_alpha=1.0,
        months_before_election=3,
    )
    results.append({"start_year": start_year, "MAE": score})

start_tuning = (
    pd.DataFrame(results)
    .dropna()
    .sort_values("MAE")
)


  sigma = np.sqrt(self._forecast_vars(steps))
  sigma = np.sqrt(self._forecast_vars(steps))


In [None]:
start_tuning