# Masterthesis Walkthrough Notebook

**Titel:** Evaluating the ARIMA-GARCH Model's Accuracy Across Diverse Cryptocurrencies  
**Autor:** Markus Öffel  
**Assets:** BTC, ETH, DOGE, SOL  
**Zeitraum:** 2020-05-11 bis 2024-04-20

Dieses Notebook ist die strukturierte, schrittweise Reproduktion deines Thesis-Workflows in kleinen Theorie- und Codehappen.

## Zielbild dieses Notebooks

Wir bauen den Workflow in klaren Blöcken auf:

1. Daten holen und prüfen  
2. Log-Returns und Splits erstellen  
3. Diagnostik (ADF/KPSS, Ljung-Box, ARCH-LM)  
4. ARIMA + GARCH-Familie fitten  
5. Rolling Backtests und Multi-Horizon Evaluation  
6. DM-Tests und VaR-Backtests (Kupiec/Christoffersen)  
7. Resultate interpretieren und exportieren

## Bezug zum Originalcode

Dieses Notebook basiert auf dem Originalskript:

- `ARIMA GARCH FINAL.py` (v28.5)

Inhaltlich übernommen wurden u. a.:

- Konfigurationslogik (Assets, Zeitraum, Horizonte, Rolling Window)  
- ARIMA-GARCH Modellfamilien  
- EWMA-Benchmark  
- Diebold-Mariano Tests  
- Parametrische VaR/ES + Kupiec/Christoffersen Backtests

In [None]:
# Core-Konfiguration aus der Thesis (leicht reduziert auf Notebook-Flow)
THESIS_CONFIG = {
    "symbols": {
        "bitcoin": "BTC-USD",
        "ethereum": "ETH-USD",
        "dogecoin": "DOGE-USD",
        "solana": "SOL-USD",
    },
    "start_date": "2020-05-11",
    "end_date": "2024-04-20",
    "split_ratios": (0.70, 0.15, 0.15),
    "horizons": [1, 3, 7, 14, 30],
    "rolling_window": 60,
    "robustness_window": 365,
    "refit_interval": 1,
    "ewma_lambda": 0.94,
    "dm_alpha": 0.05,
    "var_alpha": 0.05,
}

THESIS_CONFIG

## Pakete und Reproduzierbarkeit

Wenn einzelne Pakete fehlen, installiere sie in deiner Notebook-Umgebung (z. B. via `pip install ...`).

In [None]:
import warnings
from dataclasses import dataclass
from pathlib import Path

import numpy as np
import pandas as pd
import yfinance as yf

from scipy.stats import norm, t as student_t, chi2
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
from arch import arch_model

warnings.filterwarnings("ignore")

np.random.seed(42)

OUTPUT_DIR = Path("thesis_notebook_outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

print("Output directory:", OUTPUT_DIR.resolve())

## Datenerhebung aus Yahoo Finance

Die Thesis nutzt tägliche Kurse (`Close`) und arbeitet danach mit Log-Returns.

In [None]:
def fetch_price_history(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:
    history = yf.download(symbol, start=start_date, end=end_date, auto_adjust=True, progress=False)
    if history.empty:
        raise ValueError(f"No data returned for {symbol}")

    frame = history[["Close"]].copy()
    frame = frame.rename(columns={"Close": "close"})
    frame.index = pd.to_datetime(frame.index)
    frame = frame.sort_index()
    return frame

In [None]:
raw_price_data: dict[str, pd.DataFrame] = {}

for asset_name, ticker_symbol in THESIS_CONFIG["symbols"].items():
    raw_price_data[asset_name] = fetch_price_history(
        symbol=ticker_symbol,
        start_date=THESIS_CONFIG["start_date"],
        end_date=THESIS_CONFIG["end_date"],
    )

{asset_name: frame.shape for asset_name, frame in raw_price_data.items()}

## Data Quality Check

Kleine Plausibilitätsprüfung vor dem Modellieren.

In [None]:
def profile_price_frame(price_frame: pd.DataFrame) -> dict:
    return {
        "start": price_frame.index.min(),
        "end": price_frame.index.max(),
        "rows": len(price_frame),
        "missing_close": int(price_frame["close"].isna().sum()),
        "min_close": float(price_frame["close"].min()),
        "max_close": float(price_frame["close"].max()),
    }

quality_report = pd.DataFrame(
    {asset_name: profile_price_frame(price_frame) for asset_name, price_frame in raw_price_data.items()}
).T

quality_report

## Log-Returns und Datensplits

Wir nutzen:

- `log_return_t = ln(P_t / P_{t-1})`
- Split 70/15/15 in Train/Validation/Test

In [None]:
def add_log_returns(price_frame: pd.DataFrame) -> pd.DataFrame:
    frame = price_frame.copy()
    frame["log_return"] = np.log(frame["close"] / frame["close"].shift(1))
    frame["squared_return"] = frame["log_return"] ** 2
    return frame.dropna().copy()


def split_by_ratio(series_frame: pd.DataFrame, split_ratios: tuple[float, float, float]):
    train_ratio, validation_ratio, test_ratio = split_ratios
    assert abs((train_ratio + validation_ratio + test_ratio) - 1.0) < 1e-9

    n_total = len(series_frame)
    train_end = int(n_total * train_ratio)
    validation_end = train_end + int(n_total * validation_ratio)

    train_frame = series_frame.iloc[:train_end].copy()
    validation_frame = series_frame.iloc[train_end:validation_end].copy()
    test_frame = series_frame.iloc[validation_end:].copy()
    return train_frame, validation_frame, test_frame

In [None]:
asset_data: dict[str, dict[str, pd.DataFrame]] = {}

for asset_name, price_frame in raw_price_data.items():
    returns_frame = add_log_returns(price_frame)
    train_frame, validation_frame, test_frame = split_by_ratio(
        returns_frame,
        THESIS_CONFIG["split_ratios"],
    )
    asset_data[asset_name] = {
        "full": returns_frame,
        "train": train_frame,
        "validation": validation_frame,
        "test": test_frame,
    }

split_overview = pd.DataFrame(
    {
        asset_name: {
            "full": len(parts["full"]),
            "train": len(parts["train"]),
            "validation": len(parts["validation"]),
            "test": len(parts["test"]),
        }
        for asset_name, parts in asset_data.items()
    }
).T

split_overview

## Stationarität: ADF + KPSS

**Theorie kurz:**

- ADF Nullhypothese: Serie ist nicht stationär  
- KPSS Nullhypothese: Serie ist stationär  

Kombiniert geben beide Tests ein robusteres Bild.

In [None]:
def run_adf_kpss(series: pd.Series, adf_alpha: float = 0.05, kpss_alpha: float = 0.05) -> dict:
    clean_series = series.dropna().astype(float)

    adf_statistic, adf_p_value, *_ = adfuller(clean_series, autolag="AIC")
    kpss_statistic, kpss_p_value, *_ = kpss(clean_series, regression="c", nlags="auto")

    return {
        "adf_stat": adf_statistic,
        "adf_p_value": adf_p_value,
        "adf_stationary": bool(adf_p_value < adf_alpha),
        "kpss_stat": kpss_statistic,
        "kpss_p_value": kpss_p_value,
        "kpss_stationary": bool(kpss_p_value >= kpss_alpha),
    }

In [None]:
stationarity_results = pd.DataFrame(
    {
        asset_name: run_adf_kpss(parts["train"]["log_return"])
        for asset_name, parts in asset_data.items()
    }
).T

stationarity_results

## Residual-Diagnostik: Ljung-Box + ARCH-LM

**Theorie kurz:**

- Ljung-Box prüft verbleibende Autokorrelation in Residuen  
- ARCH-LM prüft verbleibende Heteroskedastizität

In [None]:
def run_residual_diagnostics(residual_series: pd.Series, lb_lag: int = 20, arch_lag: int = 12) -> dict:
    clean_series = residual_series.dropna().astype(float)

    ljung_box_frame = acorr_ljungbox(clean_series, lags=[lb_lag], return_df=True)
    arch_stat, arch_p_value, *_ = het_arch(clean_series, nlags=arch_lag)

    return {
        "ljung_box_p": float(ljung_box_frame["lb_pvalue"].iloc[0]),
        "ljung_box_white_noise": bool(ljung_box_frame["lb_pvalue"].iloc[0] >= 0.05),
        "arch_lm_p": float(arch_p_value),
        "arch_effect_present": bool(arch_p_value < 0.05),
    }

## Baselines für den Vergleich

Wir nehmen dieselben Referenzen wie in der Thesis:

- **Preis/Return:** Naive Forecast  
- **Varianz:** EWMA mit `lambda=0.94`

In [None]:
def naive_return_forecast(last_observed_return: float, horizon: int) -> np.ndarray:
    return np.repeat(last_observed_return, horizon)


def ewma_variance(history_returns: pd.Series, lambda_value: float = 0.94) -> float:
    clean_history = history_returns.dropna().astype(float)
    if clean_history.empty:
        return np.nan

    variance_estimate = float(clean_history.var())
    for observed_return in clean_history:
        variance_estimate = lambda_value * variance_estimate + (1 - lambda_value) * (observed_return ** 2)
    return variance_estimate

## ARIMA-Order Tuning (Validation)

Wir halten `d=0` für die Return-Serie und suchen `(p,q)` mit AIC-Kriterium.

In [None]:
def tune_arima_order(return_series: pd.Series, max_p: int = 3, max_q: int = 3) -> tuple[tuple[int, int, int], float]:
    clean_series = return_series.dropna().astype(float)

    best_order = (1, 0, 1)
    best_aic = np.inf

    for ar_order in range(max_p + 1):
        for ma_order in range(max_q + 1):
            try:
                model = ARIMA(clean_series, order=(ar_order, 0, ma_order), enforce_stationarity=False, enforce_invertibility=False)
                fit_result = model.fit()
                if fit_result.aic < best_aic:
                    best_aic = float(fit_result.aic)
                    best_order = (ar_order, 0, ma_order)
            except Exception:
                continue

    return best_order, best_aic

## GARCH-Familie Tuning (Validation)

Wir testen dieselben Volatilitätsfamilien wie im Skript:

- GARCH  
- GJR  
- FIGARCH  
- EGARCH

In [None]:
def fit_garch_model(residual_series: pd.Series, garch_type: str, distribution: str = "t"):
    clean_residuals = residual_series.dropna().astype(float)
    scaled_residuals = clean_residuals * 100.0

    if garch_type == "FIGARCH":
        model = arch_model(scaled_residuals, vol="FIGARCH", p=1, q=1, dist=distribution, mean="Zero", rescale=False)
    elif garch_type == "EGARCH":
        model = arch_model(scaled_residuals, vol="EGARCH", p=1, o=1, q=1, dist=distribution, mean="Zero", rescale=False)
    elif garch_type == "GJR":
        model = arch_model(scaled_residuals, vol="GARCH", p=1, o=1, q=1, dist=distribution, mean="Zero", rescale=False)
    else:
        model = arch_model(scaled_residuals, vol="GARCH", p=1, q=1, dist=distribution, mean="Zero", rescale=False)

    return model.fit(disp="off", show_warning=False)


def tune_garch_family(residual_series: pd.Series) -> tuple[str, str, float]:
    candidate_models = ["GARCH", "GJR", "FIGARCH", "EGARCH"]
    candidate_distributions = ["normal", "t"]

    best_combo = ("GARCH", "t", np.inf)

    for candidate_model in candidate_models:
        for candidate_distribution in candidate_distributions:
            try:
                fit_result = fit_garch_model(residual_series, candidate_model, distribution=candidate_distribution)
                if fit_result.aic < best_combo[2]:
                    best_combo = (candidate_model, candidate_distribution, float(fit_result.aic))
            except Exception:
                continue

    return best_combo

## ARIMA + GARCH Pipeline Fit

Erst Mean-Modell (ARIMA), dann Volatilitätsmodell auf ARIMA-Residuen.

In [None]:
@dataclass
class ArimaGarchFit:
    arima_order: tuple[int, int, int]
    arima_result: any
    garch_type: str
    garch_distribution: str
    garch_result: any


def fit_arima_garch_pipeline(train_returns: pd.Series, validation_returns: pd.Series) -> ArimaGarchFit:
    tuning_series = pd.concat([train_returns, validation_returns]).dropna().astype(float)

    best_arima_order, _ = tune_arima_order(tuning_series)
    arima_model = ARIMA(tuning_series, order=best_arima_order, enforce_stationarity=False, enforce_invertibility=False)
    arima_result = arima_model.fit()

    best_garch_type, best_garch_distribution, _ = tune_garch_family(arima_result.resid)
    garch_result = fit_garch_model(arima_result.resid, best_garch_type, distribution=best_garch_distribution)

    return ArimaGarchFit(
        arima_order=best_arima_order,
        arima_result=arima_result,
        garch_type=best_garch_type,
        garch_distribution=best_garch_distribution,
        garch_result=garch_result,
    )

## One-Step Forecast Helper

In [None]:
def forecast_one_step(arima_garch_fit: ArimaGarchFit) -> tuple[float, float]:
    mean_forecast = float(arima_garch_fit.arima_result.get_forecast(steps=1).predicted_mean.iloc[0])

    garch_forecast = arima_garch_fit.garch_result.forecast(horizon=1, reindex=False)
    variance_forecast_scaled = float(garch_forecast.variance.iloc[0, 0])
    variance_forecast = max(variance_forecast_scaled / (100.0 ** 2), 0.0)

    return mean_forecast, variance_forecast

## Rolling Backtest (60 Tage)

Backtest-Prinzip wie in der Thesis:

- Rolling Fit Window (60)  
- Refit je Schritt (`refit_interval=1`)  
- Vergleich gegen Naive + EWMA

In [None]:
def run_rolling_backtest(return_series: pd.Series, fitting_window: int, refit_interval: int = 1) -> pd.DataFrame:
    clean_series = return_series.dropna().astype(float)
    records: list[dict] = []

    for current_index in range(fitting_window, len(clean_series) - 1):
        window_series = clean_series.iloc[current_index - fitting_window:current_index]
        actual_next_return = float(clean_series.iloc[current_index + 1])

        if (current_index - fitting_window) % refit_interval == 0:
            fit_bundle = fit_arima_garch_pipeline(window_series, pd.Series(dtype=float))

        mean_forecast, variance_forecast = forecast_one_step(fit_bundle)
        naive_forecast = float(window_series.iloc[-1])
        ewma_variance_forecast = ewma_variance(window_series, lambda_value=THESIS_CONFIG["ewma_lambda"])

        records.append(
            {
                "date": clean_series.index[current_index + 1],
                "actual_return": actual_next_return,
                "forecast_return_arima_garch": mean_forecast,
                "forecast_return_naive": naive_forecast,
                "forecast_variance_arima_garch": variance_forecast,
                "forecast_variance_ewma": ewma_variance_forecast,
            }
        )

    return pd.DataFrame.from_records(records)

## Multi-Horizon Evaluation (1/3/7/14/30)

Für jede Forecast-Origin im Testset werden mehrere Horizonte geprüft.

In [None]:
def evaluate_multi_horizon_static(arima_garch_fit: ArimaGarchFit, test_returns: pd.Series, horizons: list[int]) -> pd.DataFrame:
    clean_series = test_returns.dropna().astype(float)
    rows: list[dict] = []

    max_horizon = max(horizons)
    for origin_position in range(0, len(clean_series) - max_horizon):
        origin_date = clean_series.index[origin_position]
        base_return = float(clean_series.iloc[origin_position])

        mean_forecast_series = arima_garch_fit.arima_result.get_forecast(steps=max_horizon).predicted_mean.values
        variance_forecast_series = arima_garch_fit.garch_result.forecast(horizon=max_horizon, reindex=False).variance.iloc[0].values / (100.0 ** 2)

        for horizon in horizons:
            target_position = origin_position + horizon
            rows.append(
                {
                    "origin_date": origin_date,
                    "horizon": horizon,
                    "actual_return": float(clean_series.iloc[target_position]),
                    "arima_garch_return": float(mean_forecast_series[horizon - 1]),
                    "naive_return": base_return,
                    "arima_garch_variance": float(max(variance_forecast_series[horizon - 1], 0.0)),
                }
            )

    return pd.DataFrame(rows)

## Diebold-Mariano Tests

Wir testen jeweils die Prognosegüte-Differenz:

- Preis/Return: ARIMA-GARCH vs Naive  
- Varianz: ARIMA-GARCH vs EWMA

In [None]:
try:
    from dieboldmariano import dm_test
    DIEBOLDMARIANO_AVAILABLE = True
except Exception:
    dm_test = None
    DIEBOLDMARIANO_AVAILABLE = False


def run_dm_test(actual_values: pd.Series, model_forecast: pd.Series, benchmark_forecast: pd.Series, horizon: int):
    aligned = pd.concat(
        [actual_values, model_forecast, benchmark_forecast],
        axis=1,
        keys=["actual", "model", "benchmark"],
    ).dropna()

    if len(aligned) < 15:
        return {"error": "Too few observations"}

    if not DIEBOLDMARIANO_AVAILABLE:
        return {"error": "dieboldmariano package not installed"}

    statistic, p_value = dm_test(
        aligned["actual"].values,
        aligned["model"].values,
        aligned["benchmark"].values,
        h=horizon,
        one_sided=True,
    )

    return {
        "dm_stat": float(statistic),
        "p_value": float(p_value),
        "significant": bool(p_value < THESIS_CONFIG["dm_alpha"]),
    }

## VaR und ES (parametrisch)

Für die Thesis wurde 5%-VaR im Modellvergleich genutzt.

In [None]:
def parametric_var_es(mean_forecast: float, variance_forecast: float, alpha: float = 0.05, distribution: str = "normal", dof: float | None = None):
    sigma_forecast = np.sqrt(max(variance_forecast, 1e-12))

    if distribution == "t" and dof is not None and dof > 2:
        quantile = student_t.ppf(alpha, dof)
        var_threshold = mean_forecast + quantile * sigma_forecast
        expected_shortfall = mean_forecast - sigma_forecast * (student_t.pdf(quantile, dof) / alpha) * ((dof + quantile**2) / (dof - 1))
    else:
        quantile = norm.ppf(alpha)
        var_threshold = mean_forecast + quantile * sigma_forecast
        expected_shortfall = mean_forecast - sigma_forecast * norm.pdf(quantile) / alpha

    return float(var_threshold), float(expected_shortfall)

## VaR Backtests: Kupiec + Christoffersen

- **Kupiec** prüft die Trefferquote der VaR-Verletzungen  
- **Christoffersen** ergänzt die Unabhängigkeit der Verletzungsserie

In [None]:
def kupiec_test(violations: np.ndarray, alpha: float = 0.05):
    violation_count = int(violations.sum())
    sample_size = len(violations)

    if sample_size == 0:
        return {"error": "No observations"}

    observed_rate = violation_count / sample_size
    observed_rate = np.clip(observed_rate, 1e-8, 1 - 1e-8)

    log_likelihood_null = (sample_size - violation_count) * np.log(1 - alpha) + violation_count * np.log(alpha)
    log_likelihood_alt = (sample_size - violation_count) * np.log(1 - observed_rate) + violation_count * np.log(observed_rate)

    likelihood_ratio = -2 * (log_likelihood_null - log_likelihood_alt)
    p_value = 1 - chi2.cdf(likelihood_ratio, df=1)

    return {"lr": float(likelihood_ratio), "p_value": float(p_value), "pass": bool(p_value >= 0.05)}


def christoffersen_independence_test(violations: np.ndarray):
    if len(violations) < 2:
        return {"error": "Too few observations"}

    lagged = violations[:-1]
    current = violations[1:]

    n00 = int(((lagged == 0) & (current == 0)).sum())
    n01 = int(((lagged == 0) & (current == 1)).sum())
    n10 = int(((lagged == 1) & (current == 0)).sum())
    n11 = int(((lagged == 1) & (current == 1)).sum())

    pi0 = n01 / max(n00 + n01, 1)
    pi1 = n11 / max(n10 + n11, 1)
    pi = (n01 + n11) / max(n00 + n01 + n10 + n11, 1)

    def safe_log(value: float) -> float:
        return np.log(np.clip(value, 1e-12, 1 - 1e-12))

    log_likelihood_independent = (n00 + n10) * safe_log(1 - pi) + (n01 + n11) * safe_log(pi)
    log_likelihood_markov = n00 * safe_log(1 - pi0) + n01 * safe_log(pi0) + n10 * safe_log(1 - pi1) + n11 * safe_log(pi1)

    likelihood_ratio = -2 * (log_likelihood_independent - log_likelihood_markov)
    p_value = 1 - chi2.cdf(likelihood_ratio, df=1)

    return {"lr": float(likelihood_ratio), "p_value": float(p_value), "pass": bool(p_value >= 0.05)}

## End-to-End Runner pro Asset

Diese Funktion orchestriert die Schritte in der Reihenfolge der Thesis.

In [None]:
def run_asset_pipeline(asset_name: str, data_splits: dict[str, pd.DataFrame]):
    train_returns = data_splits["train"]["log_return"]
    validation_returns = data_splits["validation"]["log_return"]
    test_returns = data_splits["test"]["log_return"]

    fit_bundle = fit_arima_garch_pipeline(train_returns, validation_returns)

    rolling_input = pd.concat([train_returns, validation_returns, test_returns])
    rolling_results = run_rolling_backtest(
        return_series=rolling_input,
        fitting_window=THESIS_CONFIG["rolling_window"],
        refit_interval=THESIS_CONFIG["refit_interval"],
    )

    horizon_results = evaluate_multi_horizon_static(
        arima_garch_fit=fit_bundle,
        test_returns=test_returns,
        horizons=THESIS_CONFIG["horizons"],
    )

    return {
        "fit_bundle": fit_bundle,
        "rolling": rolling_results,
        "horizon": horizon_results,
    }

## Optional: Gesamtlauf für alle Assets

> Der Lauf ist rechenintensiv. Für schnelles Arbeiten zuerst nur ein Asset starten (z. B. `bitcoin`).

In [None]:
RUN_FULL_EXPERIMENT = False
ASSETS_TO_RUN = ["bitcoin"] if not RUN_FULL_EXPERIMENT else list(asset_data.keys())

experiment_results = {}
for asset_name in ASSETS_TO_RUN:
    print(f"Running pipeline for {asset_name}...")
    experiment_results[asset_name] = run_asset_pipeline(asset_name, asset_data[asset_name])

list(experiment_results.keys())

## Robustness Check (365-Tage Window)

Diese Zelle spiegelt den Robustness-Teil aus der Thesis (Fenstervergleich 60 vs 365).

In [None]:
def run_robustness_check(asset_name: str, data_splits: dict[str, pd.DataFrame]):
    combined_returns = pd.concat(
        [
            data_splits["train"]["log_return"],
            data_splits["validation"]["log_return"],
            data_splits["test"]["log_return"],
        ]
    )

    backtest_short_window = run_rolling_backtest(
        return_series=combined_returns,
        fitting_window=THESIS_CONFIG["rolling_window"],
        refit_interval=THESIS_CONFIG["refit_interval"],
    )

    backtest_long_window = run_rolling_backtest(
        return_series=combined_returns,
        fitting_window=THESIS_CONFIG["robustness_window"],
        refit_interval=THESIS_CONFIG["refit_interval"],
    )

    return backtest_short_window, backtest_long_window

## Ergebnis-Tabellen exportieren

In [None]:
def export_result_tables(asset_name: str, result_bundle: dict):
    rolling_path = OUTPUT_DIR / f"{asset_name}_rolling_results.csv"
    horizon_path = OUTPUT_DIR / f"{asset_name}_horizon_results.csv"

    result_bundle["rolling"].to_csv(rolling_path, index=False)
    result_bundle["horizon"].to_csv(horizon_path, index=False)

    return {"rolling_csv": str(rolling_path), "horizon_csv": str(horizon_path)}

## Interpretation (Thesis-nah)

Beim Schreiben des Ergebnis-Kapitels helfen diese Leitfragen:

1. Wo ist Return-Predictability statistisch signifikant?  
2. Verbessert ARIMA-GARCH die Varianzprognose ggü. EWMA robust?  
3. Besteht das Modell den 5%-VaR-Backtest pro Asset und pro Horizont?  
4. Ändern sich Aussagen im Robustness-Check (365 statt 60 Tage)?

In [None]:
def build_interpretation_summary(asset_name: str, result_bundle: dict) -> pd.DataFrame:
    horizon_frame = result_bundle["horizon"].copy()

    summary_rows = []
    for horizon in sorted(horizon_frame["horizon"].dropna().unique()):
        horizon_slice = horizon_frame[horizon_frame["horizon"] == horizon]

        row = {
            "asset": asset_name,
            "horizon": int(horizon),
            "n_obs": len(horizon_slice),
            "mae_return_model": float((horizon_slice["actual_return"] - horizon_slice["arima_garch_return"]).abs().mean()),
            "mae_return_naive": float((horizon_slice["actual_return"] - horizon_slice["naive_return"]).abs().mean()),
        }
        summary_rows.append(row)

    return pd.DataFrame(summary_rows)

## Nächste Erweiterungen

- Skewed Distributions (`skewt`, `skewnorm`) aus dem großen Skript wieder integrieren  
- Vollständige Parameter-Tabellen (ARIMA + GARCH) als PNG/LaTeX-Export  
- Multi-Asset Vergleichsgrafiken (Heatmaps) für DM- und VaR-Testresultate  
- Notebook in Kapitel-Slices splitten (`01_data.ipynb`, `02_models.ipynb`, ...)

## Mapping Notebook ↔ Originalskript

| Notebook-Block | Originalskript (v28.5) |
|---|---|
| Config und Setup | `CONFIG`, `CRYPTO_SYMBOLS` |
| Data Fetch | `fetch_data_yahoo` |
| Preprocessing | `preprocess_data`, `train_val_test_split` |
| ARIMA/GARCH Fit | `fit_arima_garch` |
| Forecasting | `forecast_arima_garch` |
| DM Test | `diebold_mariano_test` |
| VaR/ES + Backtests | `calculate_parametric_var_es`, `kupiec_test`, `christoffersen_test` |
| Backtest/Horizon Loop | Main-Flow `backtest` und `horizon_evaluation` |

Damit hast du eine nachvollziehbare, thesis-konforme Lern- und Demo-Version deines größten Projekts.