In [1]:
import pandas as pd
import math
import numpy as np
!pip install arch
from arch import arch_model
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# === Data Preparation and Cleaning Pipeline ===

# Step 1: Load and organize multi-row column headers from the raw CSV.
file_path = '/content/drive/My Drive/Comparing risk models/Book1.csv'
columns = pd.read_csv(
    file_path,
    header=None,
    nrows=2,
    skiprows=3
)

# Step 2: Construct the final list of column names.
#   - First column is always "Date"
#   - Other columns use the asset or index name
index_names = columns.iloc[0].ffill()
real_names = columns.iloc[1]
final_columns = []

for idx, val in enumerate(real_names):
    if idx == 0:
        final_columns.append("Date")
    else:
        final_columns.append(str(index_names[idx]).strip())

# Step 3: Load the main data and convert 'Date' to datetime index
df = pd.read_csv(file_path, header=None, skiprows=6)
df.columns = final_columns
df = df.set_index('Date')
df.index = pd.to_datetime(df.index, errors="coerce")
df.head()

# Step 4: Filter data to a target analysis window
#   - Here, we focus on a 25-year window ending at 2025-06-12.
start_date = pd.Timestamp('2000-06-12')
end_date = pd.Timestamp('2025-06-12')
df_window = df.loc[(df.index >= start_date) & (df.index <= end_date)]

# Step 5: Filter for assets with sufficient non-missing data
#   - Only keep columns with more than 95% valid observations in the window.
threshold = 0.95
valid_cols = [col for col in df_window.columns if df_window[col].notna().mean() > threshold]
filtered_df = df_window[valid_cols]

# Step 6: Exclude columns with long consecutive missing stretches
#   - Remove assets with any gap > 10 days.
def max_consecutive_nans(series):
    isna = series.isna().astype(int)
    return (isna.groupby((isna != isna.shift()).cumsum()).cumsum() * isna).max()
eligible_cols = [col for col in filtered_df.columns if max_consecutive_nans(filtered_df[col]) <= 10]
filtered_df = filtered_df[eligible_cols]

print("Remaining columns:", filtered_df.columns.tolist())

# Step 7: Choose a single asset for further analysis (e.g., the first one in list)
selected_index = filtered_df.columns[0]
series = filtered_df[selected_index].dropna()
print("Selected asset for demonstration:", selected_index)
print("Date range available:", series.index.min().date(), "to", series.index.max().date())
series

Remaining columns: ['LD12TRUU INDEX', 'SPGCCITR INDEX', 'NDUEEGF INDEX', 'NDDUEAFE INDEX', 'LUABTRUU INDEX', 'LF98TRUU INDEX', 'LUACTRUU INDEX', 'GDDUUS INDEX', 'IYR US EQUITY', 'RMS G INDEX', 'RMSG INDEX', 'LUMSTRUU INDEX', 'GCUDUS INDEX', 'IWM US EQUITY', 'LBUTTRUU INDEX']
Selected asset for demonstration: LD12TRUU INDEX
Date range available: 2000-06-12 to 2025-06-11


Unnamed: 0_level_0,LD12TRUU INDEX
Date,Unnamed: 1_level_1
2025-06-11,232.99
2025-06-10,232.97
2025-06-09,232.94
2025-06-06,232.91
2025-06-05,232.83
...,...
2000-06-16,148.45
2000-06-15,148.43
2000-06-14,148.40
2000-06-13,148.38


In [3]:
# === Risk Models: Historical Volatility and GARCH Model ===

# -- Historical Volatility Forecasting (Daily) --
def historical_vol_forecast(
    prices_df: pd.DataFrame,
    rolling_window: int = 20,
    freq: str = 'D',
    annualize: bool = False,
) -> pd.DataFrame:
    """
    Compute historical volatility forecasts for one or multiple assets.

    Args:
        prices_df (pd.DataFrame): Each column is an asset, index is date.
        rolling_window (int): Rolling window size (number of periods).
        freq (str): Forecast frequency.
        annualize (bool): Whether to annualize the volatility.

    Notes:
        - NaN values will appear at the start due to rolling calculation.
        - All calculations use log returns.
    """
    prices_df = prices_df.sort_index()
    freq_map = {'D': 252, 'W': 52, 'M': 12, 'Q': 4, 'A': 1}
    if freq not in freq_map:
        raise ValueError(f"freq must be one of {list(freq_map.keys())}")

    def compute_vol(s):
        log_ret = np.log(s).diff()
        if freq == 'D':
            vol = log_ret.rolling(rolling_window).std()
        else:
            log_ret = log_ret.resample(freq).sum()
            vol = log_ret.rolling(rolling_window).std()
        if annualize:
            vol = vol * np.sqrt(freq_map[freq])
        return vol

    return prices_df.apply(compute_vol)

# Compute historical volatility for the selected asset(s)
#   - rolling window = 20
filtered_df = filtered_df.sort_index()
hv_vol_d = historical_vol_forecast(
    filtered_df[['LD12TRUU INDEX']],
    rolling_window=20,
    freq='D',
    annualize=False
)
display(hv_vol_d.tail(10))

# -- GARCH(1,1) Volatility Forecasting (Daily) --

# 1. Extract and clean the price series for the selected index
LD12TRUU_INDEX_series = filtered_df['LD12TRUU INDEX'].dropna()   # Keep only non-missing values

# 2. Calculate daily log returns
LD12TRUU_INDEX_log_ret = np.log(LD12TRUU_INDEX_series).diff().dropna()

# 3. Fit the GARCH(1,1) model and get predicted daily volatility
model = arch_model(LD12TRUU_INDEX_log_ret, vol='Garch', p=1, q=1)
model_fit = model.fit(disp="off")
garch_vol_d = model_fit.conditional_volatility

display(garch_vol_d.head(10))

Unnamed: 0_level_0,LD12TRUU INDEX
Date,Unnamed: 1_level_1
2025-05-30,0.000116
2025-06-02,0.000116
2025-06-03,0.000116
2025-06-04,0.000115
2025-06-05,0.000115
2025-06-06,0.000115
2025-06-09,0.000114
2025-06-10,0.000114
2025-06-11,0.000115
2025-06-12,


estimating the model parameters. The scale of y is 1.423e-08. Parameter
estimation work better when this value is between 1 and 1000. The recommended
rescaling is 1e+04 * y.

model or by setting rescale=False.

Inequality constraints incompatible
See scipy.optimize.fmin_slsqp for code meaning.



Unnamed: 0_level_0,cond_vol
Date,Unnamed: 1_level_1
2000-06-13,0.000197
2000-06-14,0.000196
2000-06-15,0.000186
2000-06-16,0.00018
2000-06-19,0.000171
2000-06-20,0.000205
2000-06-21,0.000197
2000-06-22,0.000187
2000-06-23,0.000177
2000-06-26,0.000168


In [4]:
# === Compute z-scores and evaluation statistics (bias-statistics and Q-statistics)===

epsilon = 1e-8  # Small value to prevent division by zero

# 1. Compute z-scores for both models

  # Historical Volatility
hv_vol_series = hv_vol_d['LD12TRUU INDEX']
hv_vol_safe = hv_vol_series.shift(1) + epsilon  # Lagged forecast + epsilon
z_score_hv_vol = LD12TRUU_INDEX_log_ret / hv_vol_safe
z_score_hv_vol = z_score_hv_vol.replace([np.inf, -np.inf], np.nan).dropna()  # Remove inf/nan
z_score_hv_vol = z_score_hv_vol[z_score_hv_vol != 0]  # Exclude zeros to avoid log(0)

  # GARCH(1,1)
garch_vol_safe = garch_vol_d.shift(1) + epsilon
z_score_garch_vol = LD12TRUU_INDEX_log_ret / garch_vol_safe
z_score_garch_vol = z_score_garch_vol.replace([np.inf, -np.inf], np.nan).dropna()
z_score_garch_vol = z_score_garch_vol[z_score_garch_vol != 0]

# 2. Bias Statistic (standard deviation of z-scores)
bias_stat_hv = z_score_hv_vol.std()
print(f"Bias Statistic (Historical Volatility): {bias_stat_hv:.4f}")

bias_stat_garch = z_score_garch_vol.std()
print(f"Bias Statistic (GARCH): {bias_stat_garch:.4f}")

# 3. Q-statistic (mean of z^2 - log(z^2)), after filtering out any problematic values
Q_hv = z_score_hv_vol ** 2 - np.log(z_score_hv_vol ** 2)
Q_hv = Q_hv.replace([np.inf, -np.inf], np.nan).dropna()
Q_hv_mean = Q_hv.mean()
print(f"Q-statistic Mean (Historical Volatility): {Q_hv_mean:.4f}")

Q_garch = z_score_garch_vol ** 2 - np.log(z_score_garch_vol ** 2)
Q_garch = Q_garch.replace([np.inf, -np.inf], np.nan).dropna()
Q_garch_mean = Q_garch.mean()
print(f"Q-statistic Mean (GARCH): {Q_garch_mean:.4f}")

Bias Statistic (Historical Volatility): 384.6325
Bias Statistic (GARCH): 0.9910
Q-statistic Mean (Historical Volatility): 148304.1546
Q-statistic Mean (GARCH): 2.1759
