# ARMA & ARMAX — Δlog(volume) **og** Δvolume (TEST)
*Generated:* 2025-10-23 08:23

Denne notatboken kjører rutenett av **ARMA(p,q)** og **ARMAX(p,q)** på to mål:
1) **Δlog(volume)** (vekstrater) — klassisk for stasjonaritet.
2) **Δvolume** (nivåendringer) — når du vil modellere vanlig volum direkte.

Begge modellene rekonstruerer **nivå-prognoser** for sammenligning mot faktisk volum.
Vi skriver kun ut tabeller og grafer (ingenting lagres til disk).


In [None]:
# --- Imports
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller


In [None]:
# --- Helper functions
def residual_diagnostics(resid, lags=24):
    r = pd.Series(resid).dropna()
    lb = acorr_ljungbox(r, lags=[lags], return_df=True)
    return {
        "eps_mean": float(r.mean()),
        "eps_std": float(r.std(ddof=1)),
        f"LjungBox_p(lag{lags})": float(lb["lb_pvalue"].iloc[-1]),
    }

def mae(y, yhat):
    y, yhat = np.asarray(y), np.asarray(yhat)
    return float(np.mean(np.abs(y - yhat)))

def mape(y, yhat):
    y, yhat = np.asarray(y), np.asarray(yhat)
    denom = np.where(y == 0, np.nan, y)
    return float(np.nanmean(np.abs((y - yhat) / denom)) * 100.0)

def adf_p(x):
    x = pd.Series(x).dropna()
    return adfuller(x, autolag="AIC")[1]


## Data

In [None]:
# --- Load train/test (expects columns: date, volume_all, + optional exogenous vars)
train = pd.read_csv("train_data.csv", parse_dates=["date"]).set_index("date").sort_index()
test  = pd.read_csv("test_data.csv",  parse_dates=["date"]).set_index("date").sort_index()

print(f"Train: {train.index.min().date()} → {train.index.max().date()}  rows={len(train)}")
print(f"Test:  {test.index.min().date()}  → {test.index.max().date()}   rows={len(test)}")

# Targets / transforms
train['log_volume'] = np.log(train['volume_all'])
test['log_volume']  = np.log(test['volume_all'])

train['dlog'] = train['log_volume'].diff()
test['dlog']  = test['log_volume'].diff()

train['dlevel'] = train['volume_all'].diff()
test['dlevel']  = test['volume_all'].diff()

dlog_tr = train['dlog'].dropna()
dlog_te = test['dlog'].dropna()

dlev_tr = train['dlevel'].dropna()
dlev_te = test['dlevel'].dropna()

print("ADF p (Δlog train):", adf_p(dlog_tr))
print("ADF p (Δlevel train):", adf_p(dlev_tr))

# Keep last training values to reconstruct paths
last_log = train['log_volume'].iloc[-1]
last_level = train['volume_all'].iloc[-1]

# Exogenous (optional): numeric columns in both sets, excluding targets
exclude = {'volume_all','log_volume','dlog','dlevel'}
common_cols = [c for c in train.columns.intersection(test.columns) if c not in exclude]
exog_cols = [c for c in common_cols if np.issubdtype(train[c].dtype, np.number)]
print("Exogenous candidates:", exog_cols)

# Align exog to differenced series (drop first row)
Xtr_exog = train[exog_cols].iloc[1:].loc[dlog_tr.index] if exog_cols else None
Xte_exog = test[exog_cols].iloc[1:].loc[dlog_te.index] if exog_cols else None


## Funksjoner for rutenett (ARMA/ARMAX)

In [None]:
def run_arma_grid(d_tr, d_te, reconstruct_func, family_label="ARMA", exog_tr=None, exog_te=None):
    p_list = [1,2,3,4]
    q_list = [0,1,2,3,4]
    rows = []
    models = {}
    forecasts_level = {}
    for p in p_list:
        for q in q_list:
            try:
                mdl = sm.tsa.SARIMAX(
                    d_tr, order=(p,0,q), trend='c',
                    exog=exog_tr,
                    enforce_stationarity=True, enforce_invertibility=True
                ).fit(disp=False)
                h = len(d_te)
                fc = mdl.get_forecast(steps=h, exog=exog_te)
                d_pred = fc.predicted_mean
                level_pred, y_te_level = reconstruct_func(d_pred, h)
                mse  = float(np.mean((y_te_level - level_pred)**2))
                mae_ = float(np.mean(np.abs(y_te_level - level_pred)))
                mape_= float(np.nanmean(np.abs((y_te_level - level_pred)/y_te_level))*100)
                diag = residual_diagnostics(mdl.resid, lags=24)
                rows.append({
                    "family": family_label,
                    "p": p, "q": q, "model": f"{family_label}({p},{q})",
                    "AIC": float(mdl.aic),
                    "MSE(level)": mse, "MAE(level)": mae_, "MAPE(level)": mape_,
                    "LB_p(lag24)": diag["LjungBox_p(lag24)"]
                })
                models[(p,q)] = mdl
                forecasts_level[(p,q)] = (level_pred, y_te_level)
            except Exception:
                pass
    res = pd.DataFrame(rows).sort_values("AIC").reset_index(drop=True)
    return res, models, forecasts_level


## Rekonstruksjon til nivå

In [None]:
# Δlog → nivå
def reconstruct_from_dlog(dlog_pred, h):
    log_path = last_log + dlog_pred.cumsum()
    level_pred = np.exp(log_path)
    y_te_level = np.exp(test['log_volume'].iloc[:h])
    return level_pred, y_te_level

# Δlevel → nivå
def reconstruct_from_dlevel(dlevel_pred, h):
    level_path = last_level + dlevel_pred.cumsum()
    y_te_level = test['volume_all'].iloc[:h]
    return level_path, y_te_level


## ARMA på Δlog(volume) og Δlevel

In [None]:
arma_dlog_results, arma_dlog_models, arma_dlog_fc = run_arma_grid(
    dlog_tr, dlog_te, reconstruct_from_dlog, family_label="ARMA_dlog"
)
arma_dlev_results, arma_dlev_models, arma_dlev_fc = run_arma_grid(
    dlev_tr, dlev_te, reconstruct_from_dlevel, family_label="ARMA_dlevel"
)

display(arma_dlog_results.head())
display(arma_dlev_results.head())


## ARMAX (med eksogene) på Δlog og Δlevel

In [None]:
if Xtr_exog is None or Xtr_exog.shape[1]==0:
    print("Ingen eksogene variabler funnet — hopper over ARMAX.")
    armax_dlog_results = pd.DataFrame()
    armax_dlev_results = pd.DataFrame()
else:
    armax_dlog_results, armax_dlog_models, armax_dlog_fc = run_arma_grid(
        dlog_tr, dlog_te, reconstruct_from_dlog, family_label="ARMAX_dlog",
        exog_tr=Xtr_exog, exog_te=Xte_exog
    )
    armax_dlev_results, armax_dlev_models, armax_dlev_fc = run_arma_grid(
        dlev_tr, dlev_te, reconstruct_from_dlevel, family_label="ARMAX_dlevel",
        exog_tr=Xtr_exog, exog_te=Xte_exog
    )

display(armax_dlog_results.head() if not armax_dlog_results.empty else "—")
display(armax_dlev_results.head() if not armax_dlev_results.empty else "—")


## Samlet tabell

In [None]:
tables = [arma_dlog_results, arma_dlev_results]
if not 'armax_dlog_results' in globals() or armax_dlog_results.empty:
    pass
else:
    tables += [armax_dlog_results, armax_dlev_results]

summary = pd.concat(tables, ignore_index=True).sort_values(["family","AIC"]).reset_index(drop=True)
summary


## Plot: beste modeller for hver familie

In [None]:
def plot_best(res_table, fc_dict, title):
    if res_table.empty: 
        print(f"{title}: (ingen resultater)"); return
    best_idx = res_table['AIC'].idxmin()
    p = int(res_table.loc[best_idx,'p']); q = int(res_table.loc[best_idx,'q'])
    level_pred, y_te_level = fc_dict[(p,q)]
    idx = test.index[:len(level_pred)]
    plt.figure(figsize=(11,5))
    plt.plot(idx, y_te_level.values, label="Actual (level)")
    plt.plot(idx, level_pred.values, label=f"{res_table.loc[best_idx,'model']} forecast", linestyle="--")
    plt.title(title); plt.ylabel("Volume"); plt.legend(); plt.tight_layout(); plt.show()

plot_best(arma_dlog_results, arma_dlog_fc, "Best ARMA on Δlog(volume)")
plot_best(arma_dlev_results, arma_dlev_fc, "Best ARMA on Δlevel")
try:
    plot_best(armax_dlog_results, armax_dlog_fc, "Best ARMAX on Δlog(volume)")
    plot_best(armax_dlev_results, armax_dlev_fc, "Best ARMAX on Δlevel")
except NameError:
    pass
