# Volatility Forecasting in FX Markets with Shrinkage Estimators
**Core Class Implementation for Data Engineering, Model Estimation and Evaluation**


---

**Author:** David Harrieder
**Supervisor:** Prof. Dr. Daniel Rösch 
**University:** University of Regensburg
**Date:** July 2025  



#### Imports & Data

In [1]:
import math
import re
import warnings
from dataclasses import dataclass, field
from itertools import cycle
from typing import Any, Dict, List, Literal, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.stats.stattools import durbin_watson, jarque_bera
from sklearn.linear_model import (
    Lasso,
    LassoCV,
    LassoLars,
    LinearRegression,
    lars_path,
)
from sklearn.metrics import (
    mean_absolute_error,
    mean_absolute_percentage_error,
    mean_squared_error,
    r2_score,
)
from sklearn.model_selection import TimeSeriesSplit
from sklearn.multioutput import MultiOutputRegressor
from sklearn.preprocessing import RobustScaler, StandardScaler


# Main Class

In [4]:
class UnifiedVolForecast:
    def __init__(self, df, target_col="RV", model_type="HAR", flex_threshold=100, lasso_cv=10, lasso_max_iter=100000):
        self.df = df.copy()
        self.target_col = target_col
        self.model_type = model_type.upper()
        self.flex_threshold = flex_threshold
        self.lasso_cv = lasso_cv
        self.lasso_max_iter = lasso_max_iter
        #
        self.forecasts = {}
        self.results = {}
        self.lasso_nonzero_coefs = {} 

    # HELPER FUNCTIONS
    
    def _mincer_zarnowitz_r2(self, y_true, y_pred):
        mask = np.isfinite(y_true) & np.isfinite(y_pred)
        y_true = y_true[mask]
        y_pred = y_pred[mask]
        if len(y_true) == 0:
            return np.nan  # Keine gültigen Werte vorhanden
        y_true = y_true.reshape(-1, 1)
        y_pred_const = sm.add_constant(y_pred)
        model = sm.OLS(y_true, y_pred_const).fit()
        return model.rsquared

    def _tewma(self, series, span, kappa=0.94):
        raw_weights = [(1 - kappa) * (kappa ** i) for i in range(span)]
        raw_weights = np.array(raw_weights)
        normalization = 1 - kappa ** span
        weights = raw_weights / normalization
        weights = weights[::-1]
        return series.rolling(window=span).apply(lambda x: np.dot(weights, x), raw=True)

    # FEATURE GENERATION        

    def _generate_features(self, df):
        if self.model_type == "HAR":
            df["lag1"] = df[self.target_col].shift(1)
            df["lag5"] = df[self.target_col].rolling(5).mean().shift(1)
            df["lag22"] = df[self.target_col].rolling(22).mean().shift(1)
            return df.dropna(), ["lag1", "lag5", "lag22"]

        elif self.model_type == "HEXP":
            df["tewma_1"] = self._tewma(df[self.target_col].shift(1), 1)
            df["tewma_5"] = self._tewma(df[self.target_col].shift(1), 5)
            df["tewma_22"] = self._tewma(df[self.target_col].shift(1), 22)
            return df.dropna(), ["tewma_1", "tewma_5", "tewma_22"]

        elif self.model_type == "LASSO-HAR":
            rolling_means = pd.concat({
                f"mean_{w}": df[self.target_col].rolling(w).mean().shift(1)
                for w in range(1, self.flex_threshold + 1)
            }, axis=1)
            df_ext = pd.concat([df, rolling_means], axis=1).dropna()
            return df_ext, rolling_means.columns.tolist()

        elif self.model_type == "LASSO-HEXP":
            tewmas = pd.concat({
                f"tewma_{s}": self._tewma(df[self.target_col].shift(1), s)
                for s in range(1, self.flex_threshold + 1)
            }, axis=1)

            df_ext = pd.concat([df, tewmas], axis=1).dropna()
            return df_ext, tewmas.columns.tolist()

    # LASSO MODEL ESTIMATION GETTER
    
    def _get_model(self):
        if self.model_type.startswith("LASSO"):
            return LassoCV(
                cv=TimeSeriesSplit(n_splits=self.lasso_cv),
                n_jobs=-1,
                fit_intercept=True,
                positive=True,
                max_iter=self.lasso_max_iter
            )
        else:
            return LinearRegression(fit_intercept=True, positive=True)


    # IN-SAMPLE EVALUATION
            
    def in_sample_fit(self):
        df_feat, feature_cols = self._generate_features(self.df)
        if not self.model_type.startswith("LASSO"):
            offset = max(self.flex_threshold - 22, 0)
            df_feat = df_feat.iloc[offset:]
            print(f"Offset for In-Sample at {self.model_type}: Trim {offset} rows")
            
        X = df_feat[feature_cols]
        y = df_feat[self.target_col].values
        scaler = StandardScaler().fit(X)
        X_scaled = scaler.transform(X)
        model = self._get_model()
        model.fit(X_scaled, y)
        y_pred = model.predict(X_scaled)

        

        print(f"In-Sample period: {df_feat.index[0].date()} bis {df_feat.index[-1].date()}")

        from statsmodels.api import OLS, add_constant

        if self.model_type.startswith("LASSO"):
            coefs = model.coef_
            
            non_zero_mask = coefs != 0
            surviving_vars = [feature_cols[j] for j, nz in enumerate(non_zero_mask) if nz]
            surviving_X = X_scaled[:, non_zero_mask]
            X_ols = add_constant(surviving_X)
            ols_model = OLS(y, X_ols).fit()
            ols_coefs = ols_model.params[1:]
            ols_pvals = ols_model.pvalues[1:]
            adj_r2 = ols_model.rsquared_adj
            signif = ols_pvals < 0.05
            self.in_sample_model_details = pd.DataFrame({
                "Variable": ["Intercept"] + surviving_vars,
                "Coefficient": [f"{ols_model.params[0]:.4e}"] + [f"{val:.4e}" for val in ols_coefs],
                "P-Value": [ols_model.pvalues[0]] + list(ols_pvals),
                "Significant_5pct": [ols_model.pvalues[0] < 0.05] + list(signif)
            })

            self.lasso_nonzero_coefs = {
                "in_sample": dict(zip(surviving_vars, coefs[non_zero_mask]))
            }
        else:
            X_ols = add_constant(X_scaled)
            ols_model = OLS(y, X_ols).fit()
            coefs = ols_model.params
            pvals = ols_model.pvalues
            adj_r2 = ols_model.rsquared_adj
            signif = pvals < 0.05
            self.in_sample_model_details = pd.DataFrame({
                "Variable": ["Intercept"] + feature_cols,
                "Coefficient": [f"{coefs[0]:.4e}"] + [f"{val:.4e}" for val in coefs[1:]],
                "P-Value": [pvals[0]] + list(pvals[1:]),
                "Significant_5pct": [pvals[0] < 0.05] + list(pvals[1:] < 0.05)
            })


        df_insample = pd.DataFrame({
            "realized": y,
            "forecast": y_pred
        }, index=df_feat.index)

        self.forecasts["in_sample"] = df_insample
        eval_metrics = self.evaluate_forecasts()
        eval_metrics["in_sample"]["Adjusted_R2"] = adj_r2
        return eval_metrics

    # ROLLING FORECAST FRAMEWORK        

    def rolling_forecast(self, window_size=1000, horizon=1, step_size=1):
        df_feat, feature_cols = self._generate_features(self.df)

        offset = 0
        if not self.model_type.startswith("LASSO"):
            offset = max(self.flex_threshold - 22, 0)
            df_feat = df_feat.iloc[offset:]
            print(f"Offset at {self.model_type}: shorten by {offset} (flex_threshold={self.flex_threshold})")
        else:
            print(f"No offset applied at {self.model_type} (LASSO-Modell)")

        print(f"Remaining length: {len(df_feat)}")

        forecasts = []
        reals = []
        valid_dates = []

        if self.model_type.startswith("LASSO"):
            coef_dict = {}

        first_valid_index = None

        for i in range(window_size, len(df_feat) - horizon + 1, step_size):
            train = df_feat.iloc[i - window_size:i]
            test = df_feat.iloc[i:i + horizon]
            X_train = train[feature_cols]
            y_train = train[self.target_col].values
            scaler = StandardScaler().fit(X_train)
            X_train_scaled = scaler.transform(X_train)
            model = self._get_model()
            model.fit(X_train_scaled, y_train)

            if self.model_type.startswith("LASSO"):
                coefs = model.coef_
                non_zero = {feature_cols[j]: coefs[j] for j in range(len(coefs)) if coefs[j] != 0}
                date_key = df_feat.index[i].strftime("%Y-%m-%d")
                coef_dict[date_key] = non_zero

            X_test = test[feature_cols].iloc[:horizon]
            y_test = test[self.target_col].iloc[:horizon]
            if X_test.isnull().values.any() or y_test.isnull().any():
                continue
            X_test_scaled = scaler.transform(X_test)
            y_pred = model.predict(X_test_scaled)
            forecasts.append(y_pred.sum())
            reals.append(y_test.sum())
            valid_dates.append(df_feat.index[i])

            if first_valid_index is None:
                print(f"First Rolling-Window starts: {train.index[0].date()} bis {train.index[-1].date()}")
                print(f"First Forecast for: {df_feat.index[i].date()}")
                first_valid_index = i

        if len(forecasts) > 0:
            df_h = pd.DataFrame({
                f"realized_{horizon}d": reals,
                f"forecast_{horizon}d": forecasts
            }, index=valid_dates)
            self.forecasts[f"{horizon}d"] = df_h
        else:
            print(f"No valid forecasts for horizon: {horizon}")
            df_h = pd.DataFrame()

        if self.model_type.startswith("LASSO"):
            self.lasso_nonzero_coefs = coef_dict

        df_h[f"residual_{horizon}d"] = df_h[f"realized_{horizon}d"] - df_h[f"forecast_{horizon}d"]
        return df_h

    # OUT-OF-SAMPLE EVALUATION
    
    def evaluate_forecasts(self):
        results = {}
        for horizon, df in self.forecasts.items():
            rv = df.iloc[:, 0].values
            f = df.iloc[:, 1].values
            mask = (rv > 0) & (f > 0)
            rv, f = rv[mask], f[mask]
            mse = mean_squared_error(rv, f)
            mae = mean_absolute_error(rv, f)
            if np.any(rv <= 0) or np.any(f <= 0):
                print("WARNING: At least one value in 'rv' is ≤ 0. QLIKE might be biased")
            ratio = np.where((rv > 0) & (f > 0), rv / f, np.nan)
            log_ratio = np.log(ratio, out=np.full_like(ratio, np.nan), where=(ratio > 0))
            qlike_val = np.nanmean(ratio - log_ratio - 1)
            r2 = r2_score(rv, f)
            mz_r2 = self._mincer_zarnowitz_r2(rv, f)
            dw = durbin_watson(rv - f)
            results[horizon] = {
                "MSE": mse,
                "RMSE": np.sqrt(mse),
                "MAE": mae,
                "QLIKE": qlike_val,
                "R2": r2,
                "R2_MincerZarnowitz": mz_r2,
                "Durbin_Watson": dw
            }
        self.results = results
        return results

    # PLOTS
    
    def plot_forecast(self, horizon):
        df = self.forecasts.get(horizon)
        if df is None:
            print(f"No forecasting horizon found for horizon: {horizon}")
            return
        fig, ax = plt.subplots(figsize=(14, 3), dpi=600)
        ax.plot(df.index, df.iloc[:, 0], label="Realized", color="black", lw=1, )
        ax.plot(df.index, df.iloc[:, 1], label="Forecast", color="red", lw=1)
        ax.set_ylabel("RV")
        ax.set_title(f"{self.model_type} - {horizon}")
        ax.ticklabel_format(axis='y', style='sci', scilimits=(0,0))
        plt.show()
    
    
    def plot_in_sample(self):
        df = self.forecasts.get("in_sample")
        if df is None:
            print("No available In-sample fit.")
            return
        fig, ax = plt.subplots(figsize=(14, 4), dpi=600)
        ax.plot(df.index, df.iloc[:, 0], label="Realized", color="black", lw=1)
        ax.plot(df.index, df.iloc[:, 1], label="Forecast", color="red", lw=1)
        ax.set_ylabel("RV")
        ax.set_title(f"{self.model_type} - 1d")
        ax.ticklabel_format(axis='y', style='sci', scilimits=(0,0)) 
        plt.show()
