In [1]:
%load_ext autoreload
%autoreload 2

import os, sys

project_dir = os.path.abspath('..')
sys.path.insert(0, project_dir)

import pandas as pd
import numpy as np
import pandas_market_calendars as mcal
from typing import List, Any, Optional, Tuple
from enum import Enum
from functools import partial
from collections import defaultdict

from volatility_forecast.data.base import DateLike
from volatility_forecast.data.datamanager import (
    ReturnDataManager,
    LagReturnDataManager,
    OffsetReturnDataManager, 
    LagAbsReturnDataManager, 
    LagSquareReturnDataManager,
    SquareReturnDataManager,
    get_closest_next_business_day,
    get_closest_prev_business_day,
)
from volatility_forecast.model.stes_model import STESModel
from volatility_forecast.model.xgboost_stes_model import XGBoostSTESModel, DEFAULT_XGBOOST_PARAMS
from volatility_forecast.evaluation.model_evaluator import evaluate_model, compare_models, root_mean_squared_error


In [2]:
nyse = mcal.get_calendar("NYSE")
custom_bday = pd.offsets.CustomBusinessDay(calendar=nyse)
get_closest_prev_business_day(pd.Timestamp("2000-01-01"), nyse)# - custom_bday*0

Timestamp('1999-12-31 00:00:00')

In [3]:
pd.Timestamp("2000-01-01") - custom_bday * 0

Timestamp('2000-01-03 00:00:00')

In [4]:
ModelName = Enum(
    "ModelName", "ES STES_AE STES_SE STES_ESE STES_EAE STES_AESE STES_EAESE XGBoost_STES"
)

In [5]:
def equity_data_provider(tickers: Tuple[str], start_date: DateLike, end_date: DateLike, model_name: ModelName) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    returns = LagReturnDataManager().get_data(tickers, start_date, end_date) * 1e2
    realized_var = SquareReturnDataManager().get_data(tickers, start_date, end_date) * 1e4
    feature_sets = np.hstack([
        LagReturnDataManager().get_data(tickers, start_date, end_date) * 1e2,
        LagAbsReturnDataManager().get_data(tickers, start_date, end_date) * 1e2,
        LagSquareReturnDataManager().get_data(tickers, start_date, end_date) * 1e4,
    ])
    if model_name == ModelName.ES:
        return np.ones((len(returns), 1)), realized_var, returns
    elif model_name == ModelName.STES_AE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [1]], ]), realized_var, returns
    elif model_name == ModelName.STES_SE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [2]], ]), realized_var, returns
    elif model_name == ModelName.STES_EAE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [0, 1]], ]), realized_var, returns
    elif model_name == ModelName.STES_ESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [0, 2]], ]), realized_var, returns
    elif model_name == ModelName.STES_AESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [1, 2]], ]), realized_var, returns
    elif model_name == ModelName.STES_EAESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets, ]), realized_var, returns
    elif model_name == ModelName.XGBoost_STES:
        return feature_sets, realized_var, returns
    else:
        raise ValueError(f"Unknown model name: {model_name}")


In [26]:
def simulate_contaminated_garch(n, mu, omega, alpha, beta, eta):
    returns = np.zeros(n)
    sigma2s = np.zeros(n)
    shocks = (np.random.uniform(0, 1, n) < 0.005).astype(float)
    for t in range(1, n):
        sigma2s[t] = omega + alpha * returns[t-1]**2 + beta * sigma2s[t-1]
        returns[t] = np.random.normal(mu, np.sqrt(sigma2s[t])) + eta * shocks[t]

    return returns, sigma2s

def simulated_garch_data_provider(model_name: ModelName) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    returns, latent_var = simulate_contaminated_garch(n=2500, mu=0, omega=0.02, alpha=0.11, beta=0.87, eta=4)
    realized_var = returns[1:] ** 2
    returns = returns[:-1]
    feature_sets = np.vstack([
        returns,
        abs(returns),
        returns**2,
    ]).T
    if model_name == ModelName.ES:
        return np.ones((len(returns), 1)), realized_var, returns
    elif model_name == ModelName.STES_AE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [1]], ]), realized_var, returns
    elif model_name == ModelName.STES_SE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [2]], ]), realized_var, returns
    elif model_name == ModelName.STES_EAE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [0, 1]], ]), realized_var, returns
    elif model_name == ModelName.STES_ESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [0, 2]], ]), realized_var, returns
    elif model_name == ModelName.STES_AESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets[:, [1, 2]], ]), realized_var, returns
    elif model_name == ModelName.STES_EAESE:
        return np.hstack([np.ones((len(returns), 1)), feature_sets, ]), realized_var, returns
    elif model_name == ModelName.XGBoost_STES:
        return feature_sets, realized_var, returns
    else:
        raise ValueError(f"Unknown model name: {model_name}")


In [27]:
sim_models = {m: STESModel() if m != ModelName.XGBoost_STES else XGBoostSTESModel(**DEFAULT_XGBOOST_PARAMS) for m in ModelName}
spy_models = {m: STESModel() if m != ModelName.XGBoost_STES else XGBoostSTESModel(**DEFAULT_XGBOOST_PARAMS) for m in ModelName}

In [28]:
num_runs = 1000
simulated_results = defaultdict(float)

np.random.seed(0)
for model_name in ModelName:
    rand_seeds = np.random.randint(0, 1e6, size=num_runs)
    for i in range(num_runs):
        np.random.seed(rand_seeds[i])

        model = sim_models[model_name]
        data_provider = partial(
            simulated_garch_data_provider, 
            model_name=model_name,
        )
        
        res = evaluate_model(
            data_provider, 
            model, 
            root_mean_squared_error, 
            500, 2000
        )
        simulated_results[model_name] += res

for model_name in ModelName:
    simulated_results[model_name] /= num_runs

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_boost_round" } are not used.

Parameters: { "num_b

In [29]:
simulated_results

defaultdict(float,
            {<ModelName.ES: 1>: 2.851265469332402,
             <ModelName.STES_AE: 2>: 2.8864909620674513,
             <ModelName.STES_SE: 3>: 2.821076842096175,
             <ModelName.STES_ESE: 4>: 2.938132876527599,
             <ModelName.STES_EAE: 5>: 2.9126491851780054,
             <ModelName.STES_AESE: 6>: 2.9414660242775197,
             <ModelName.STES_EAESE: 7>: 2.8094387526006788,
             <ModelName.XGBoost_STES: 8>: 2.917599319542367})

In [39]:
tickers = ("SPY", )
start_date = "2000-01-01"
end_date = "2023-12-31"
spy_results = {}
for model_name in ModelName:
    np.random.seed(0)

    model = spy_models[model_name]
    data_provider = partial(
        equity_data_provider, 
        tickers=tickers, 
        start_date=start_date, 
        end_date=end_date, 
        model_name=model_name
    )

    res = evaluate_model(
        data_provider,
        model, 
        root_mean_squared_error,
        10, 4000
    )
    spy_results[model_name] = res


Parameters: { "num_boost_round" } are not used.



In [40]:
spy_results

{<ModelName.ES: 1>: 4.639087786264323,
 <ModelName.STES_AE: 2>: 4.54443786029657,
 <ModelName.STES_SE: 3>: 4.5209885075277265,
 <ModelName.STES_ESE: 4>: 4.497560955740893,
 <ModelName.STES_EAE: 5>: 4.519543041822014,
 <ModelName.STES_AESE: 6>: 4.496005888497485,
 <ModelName.STES_EAESE: 7>: 4.488055350145789,
 <ModelName.XGBoost_STES: 8>: 4.3781566115070865}

In [None]:
spy_models[ModelName.XGBoost_STES].predict()