__Forecasting__

- Build sequences (input windows).
- Train baselines (naive, EWMA).
- Train LSTM/GRU
- Evaluate per horizon and by regime; rolling origin backtest.
- Refine with regime‑aware heads or Seq2Seq if needed.
- Stability: Try walk‑forward validation to ensure the improvement holds across different time periods/regimes.

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras import layers, models
import keras_tuner as kt

2025-09-28 08:58:52.861656: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


__Load parquet with aggregated features and regimes__

In [2]:
regimes_df = pd.read_parquet("daily_features_with_regimes.parquet") 
regimes_df.head()

Unnamed: 0,date,ATM_IV,Skew,Curvature,treasury_2y,ticker,year,Regime_K
0,2023-01-03,0.282223,0.047618,-0.000384,4.4,QQQ,2023,2
1,2023-01-04,0.274456,0.061536,-0.001318,4.36,QQQ,2023,2
2,2023-01-05,0.28238,0.038932,-0.0062,4.45,QQQ,2023,2
3,2023-01-06,0.258931,0.052242,0.000721,4.24,QQQ,2023,2
4,2023-01-09,0.267719,0.045165,0.00132,4.19,QQQ,2023,2


Normalizing the features so that ATM_IV, Skew, Curvature, and dgs2 are on comparable scales. This reduces bias toward large‑magnitude variables and stabilizes training.

In [3]:
# Preserve original ATM_IV for target scaling
regimes_df['ATM_IV_orig'] = regimes_df['ATM_IV']

In [4]:
# 1. Feature scaler
feature_scaler_cols = ['ATM_IV','Skew','Curvature','treasury_2y']

feature_scaler = StandardScaler()
regimes_df[feature_scaler_cols] = feature_scaler.fit_transform(regimes_df[feature_scaler_cols])

# 2. Target scaler (ATM_IV only)
target_scaler = StandardScaler()
regimes_df['ATM_IV_target'] = target_scaler.fit_transform(
    regimes_df[['ATM_IV_orig']]
)

In [5]:
regimes_df.head()

Unnamed: 0,date,ATM_IV,Skew,Curvature,treasury_2y,ticker,year,Regime_K,ATM_IV_orig,ATM_IV_target
0,2023-01-03,2.424622,-0.225656,-1.16973,-0.134857,QQQ,2023,2,0.282223,2.424622
1,2023-01-04,2.236534,0.833951,-1.341004,-0.241592,QQQ,2023,2,0.274456,2.236534
2,2023-01-05,2.428424,-0.886941,-2.236254,-0.001438,QQQ,2023,2,0.28238,2.428424
3,2023-01-06,1.860575,0.126379,-0.967097,-0.561798,QQQ,2023,2,0.258931,1.860575
4,2023-01-09,2.073388,-0.412409,-0.857346,-0.695216,QQQ,2023,2,0.267719,2.073388


Since Regime_K is a categorical state label with no natural ordering, it’s better to one‑hot encode it before feeding into the model, rather than keeping it as a raw integer.

In [6]:
# One-hot encode Regime_K
regimes_df = pd.get_dummies(regimes_df, columns=['Regime_K'], prefix='Regime')
regimes_df[['Regime_0','Regime_1','Regime_2']] = \
    regimes_df[['Regime_0','Regime_1','Regime_2']].astype(int)

In [7]:
regimes_df.head()

Unnamed: 0,date,ATM_IV,Skew,Curvature,treasury_2y,ticker,year,ATM_IV_orig,ATM_IV_target,Regime_0,Regime_1,Regime_2
0,2023-01-03,2.424622,-0.225656,-1.16973,-0.134857,QQQ,2023,0.282223,2.424622,0,0,1
1,2023-01-04,2.236534,0.833951,-1.341004,-0.241592,QQQ,2023,0.274456,2.236534,0,0,1
2,2023-01-05,2.428424,-0.886941,-2.236254,-0.001438,QQQ,2023,0.28238,2.428424,0,0,1
3,2023-01-06,1.860575,0.126379,-0.967097,-0.561798,QQQ,2023,0.258931,1.860575,0,0,1
4,2023-01-09,2.073388,-0.412409,-0.857346,-0.695216,QQQ,2023,0.267719,2.073388,0,0,1


In [8]:
def build_sequences(df, feature_cols, target_col='ATM_IV_target', W=60, H=3):
    """
    Build rolling input/output sequences for multi-step forecasting.
    
    Args:
        df : DataFrame with features and target
        feature_cols : list of feature column names
        target_col : str, column to forecast
        W : int, input window length
        H : int, forecast horizon (number of steps ahead)
    
    Returns:
        X : np.array, shape (N, W, F)
        y : np.array, shape (N, H)
        meta : pd.DataFrame, metadata with ticker and end_date
    """
    Xs, ys, meta = [], [], []
    
    for ticker, g in df.groupby('ticker'):
        g = g.sort_values('date').reset_index(drop=True)
        X_mat = g[feature_cols].values
        y_mat = g[target_col].values
        dates = g['date'].values
        
        for i in range(W-1, len(g)-H):
            x_win = X_mat[i-W+1:i+1]          # shape (W, F)
            y_target = y_mat[i+1:i+H+1]       # shape (H,)
            
            if np.isnan(y_target).any() or np.isnan(x_win).any():
                continue
            
            Xs.append(x_win)
            ys.append(y_target)
            meta.append({'ticker': ticker, 'end_date': dates[i]})
    
    X = np.array(Xs)
    y = np.array(ys)
    meta = pd.DataFrame(meta)
    return X, y, meta


The input is past IV, Skew, Curvature and 2 year Treasury, while the target is future IV.

In [9]:
feature_cols = ['ATM_IV','Skew','Curvature','treasury_2y'] + \
               [c for c in regimes_df.columns if c.startswith('Regime_')]

# Forecast next H days
XH, yH, metaH = build_sequences(regimes_df, feature_cols, 'ATM_IV_target', W=90, H=14)
print(XH.shape, yH.shape) 

(128, 90, 7) (128, 14)


  for ticker, g in df.groupby('ticker'):


__Forecast using Exponential Weighted Moving Average of the past W days as baseline__

In [10]:
def ewma_forecast(X, y, meta, alpha=0.3):
    """
    EWMA baseline: forecast next H days as the EWMA of past ATM_IV values.
    """
    H = y.shape[1]
    preds = []
    for x in X:
        iv_series = x[:, 0]  # ATM_IV is first feature
        # compute EWMA
        ewma_val = iv_series[0]
        for val in iv_series[1:]:
            ewma_val = alpha * val + (1 - alpha) * ewma_val
        preds.append(np.repeat(ewma_val, H))
    return np.array(preds)

In [None]:
y_pred_ewma = ewma_forecast(XH, yH, metaH, alpha=0.3)
print(y_pred_ewma.shape) 

In [None]:
# Invert scaling for both true and predicted values
y_true_orig = target_scaler.inverse_transform(yH)
y_pred_ewma_orig  = target_scaler.inverse_transform(y_pred_ewma)

In [None]:
def evaluate_baseline(y_true, y_pred, name="baseline"):
    rmse = np.sqrt(mean_squared_error(y_true.flatten(), y_pred.flatten()))
    mae = mean_absolute_error(y_true.flatten(), y_pred.flatten())
    print(f"{name} -> RMSE: {rmse:.4f}, MAE: {mae:.4f}")

In [None]:
# Use RMSE & MAE metrics to evaluate EWMA performance between predictions and true targets:
evaluate_baseline(y_true_orig, y_pred_ewma_orig, "EWMA")

Based on the above, EWMA baseline is, on average, about 1–1.5 percentage points of implied volatility away from reality. This sets the benchmark that our model needs to improve upon.

__Split time series to train, validation, and test__

In [None]:
def time_series_split(X, y, meta, train_size=0.6, val_size=0.2):
    """
    Split sequences into train/val/test by time (no shuffling).
    
    Args:
        X, y, meta : outputs from build_sequences
        train_size : fraction of data for training
        val_size   : fraction of data for validation (rest goes to test)
    
    Returns:
        (X_train, y_train, meta_train),
        (X_val, y_val, meta_val),
        (X_test, y_test, meta_test)
    """
    N = len(X)
    train_end = int(N * train_size)
    val_end   = int(N * (train_size + val_size))
    
    X_train, y_train, meta_train = X[:train_end], y[:train_end], meta.iloc[:train_end]
    X_val,   y_val,   meta_val   = X[train_end:val_end], y[train_end:val_end], meta.iloc[train_end:val_end]
    X_test,  y_test,  meta_test  = X[val_end:], y[val_end:], meta.iloc[val_end:]
    
    return (X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test)

In [None]:
# Split into train/val/test
(X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test) = \
    time_series_split(XH, yH, metaH)

print(X_train.shape, X_val.shape, X_test.shape)

In [None]:
def build_lstm(input_shape, H, units=64, depth=1, dropout=0.2):
    model = models.Sequential()
    # First LSTM layer
    model.add(layers.LSTM(units, return_sequences=(depth > 1), 
                          input_shape=input_shape))
    model.add(layers.Dropout(dropout))
    
    # Additional LSTM layers if depth > 1
    for _ in range(depth-1):
        model.add(layers.LSTM(units, return_sequences=False))
        model.add(layers.Dropout(dropout))
    
    # Dense output layer with H outputs (multi‑step forecast)
    model.add(layers.Dense(H))
    
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
input_shape = (XH.shape[1], XH.shape[2])  # (W, features)
H = yH.shape[1]
lstm_model = build_lstm(input_shape, H, units=64, depth=2, dropout=0.3)

history = lstm_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=32,
    callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)]
)

In [None]:
y_pred_scaled = lstm_model.predict(X_test)
y_pred_orig = target_scaler.inverse_transform(y_pred_scaled)
y_true_orig = target_scaler.inverse_transform(y_test)

evaluate_baseline(y_true_orig, y_pred_orig, "LSTM (orig units)")

In [None]:
def build_gru(input_shape, H, units=64, depth=1, dropout=0.2):
    model = models.Sequential()
    # First GRU layer
    model.add(layers.GRU(units, return_sequences=(depth > 1), 
                         input_shape=input_shape))
    model.add(layers.Dropout(dropout))
    
    # Additional GRU layers if depth > 1
    for _ in range(depth-1):
        model.add(layers.GRU(units, return_sequences=False))
        model.add(layers.Dropout(dropout))
    
    # Dense output layer with H outputs (multi‑step forecast)
    model.add(layers.Dense(H))
    
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
input_shape = (XH.shape[1], XH.shape[2])  # (W, features)
H = yH.shape[1]
gru_model = build_gru(input_shape, H, units=64, depth=2, dropout=0.3)

history = lstm_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=32,
    callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)]
)

In [None]:
y_pred_scaled = gru_model.predict(X_test)
y_pred_orig = target_scaler.inverse_transform(y_pred_scaled)
y_true_orig = target_scaler.inverse_transform(y_test)

evaluate_baseline(y_true_orig, y_pred_orig, "GRU (orig units)")

__Hyperparameter Tuning__

In [None]:
def model_builder(hp_or_dict, H=3, input_shape=None):
    
    # If it's a dict, just read values directly
    if isinstance(hp_or_dict, dict):
        units   = hp_or_dict['units']
        depth   = hp_or_dict['depth']
        dropout = hp_or_dict['dropout']
        cell    = hp_or_dict['cell']
    else:
        # Otherwise assume it's a KerasTuner HyperParameters object
        units   = hp_or_dict.Int('units', min_value=32, max_value=128, step=32)
        depth   = hp_or_dict.Int('depth', 1, 3)
        dropout = hp_or_dict.Float('dropout', 0.1, 0.5, step=0.1)
        cell    = hp_or_dict.Choice('cell', ['LSTM', 'GRU'])
        
    model = models.Sequential()
    for i in range(depth):
        return_seq = (i < depth - 1)
        if cell == 'LSTM':
            model.add(layers.LSTM(units, return_sequences=return_seq,
                                  input_shape=input_shape if i == 0 else None))
        else:
            model.add(layers.GRU(units, return_sequences=return_seq,
                                 input_shape=input_shape if i == 0 else None))
        model.add(layers.Dropout(dropout))
    
    model.add(layers.Dense(H))  # multi‑step output
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
tuner_results = []

for W in [90]:
    for H in [14]:
        X, y, meta = build_sequences(regimes_df, feature_cols, 'ATM_IV_target', W=W, H=H)
        (X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test) = time_series_split(X, y, meta)

        tuner = kt.RandomSearch(
            lambda hp: model_builder(hp, H=H),
            objective='val_loss',
            max_trials=10,
            directory='tuner_dir',
            project_name=f'iv_forecast_W{W}_H{H}',
            overwrite=True
        )
        tuner.search(X_train, y_train,
                     validation_data=(X_val, y_val),
                     epochs=30,
                     callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)])
        
        best_hp = tuner.get_best_hyperparameters(1)[0]
        print(f"W={W}, H={H}, best config:", best_hp.values)
        best_trial = tuner.oracle.get_best_trials(num_trials=1)[0]
        best_val_loss = best_trial.metrics.get_last_value("val_loss")
        
        tuner_results.append({
            "W": W,
            "H": H,
            "best_hp": best_hp.values,
            "val_loss": best_val_loss
        })

In [None]:
tuner_df = pd.DataFrame(tuner_results)
print(tuner_df.sort_values("val_loss"))

In [None]:
best_row = tuner_df.sort_values("val_loss").iloc[0]
print(best_row)

In [None]:
W_best = best_row["W"]
H_best = best_row["H"]

X, y, meta = build_sequences(regimes_df, feature_cols, 'ATM_IV_target', W=W_best, H=H_best)
(X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test) = time_series_split(X, y, meta)

In [None]:
best_hp_values = best_row["best_hp"]  
best_model = model_builder(best_hp_values, H_best, input_shape=X_train.shape[1:])

In [None]:
history = best_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)]
)

In [None]:
# Predict on test set
y_pred_scaled = best_model.predict(X_test)

# Invert scaling
y_pred_orig = target_scaler.inverse_transform(y_pred_scaled)
y_true_orig = target_scaler.inverse_transform(y_test)

# Evaluate
evaluate_baseline(y_true_orig, y_pred_orig, "Best tuned model")

In [None]:
def seq2seq_builder(hp_or_dict, H=3, input_shape=None):
    # Handle dict vs HyperParameters
    if isinstance(hp_or_dict, dict):
        units   = hp_or_dict['units']
        dropout = hp_or_dict['dropout']
        cell    = hp_or_dict['cell']
    else:
        units   = hp_or_dict.Int('units', min_value=32, max_value=128, step=32)
        dropout = hp_or_dict.Float('dropout', 0.1, 0.5, step=0.1)
        cell    = hp_or_dict.Choice('cell', ['LSTM', 'GRU'])

    # Encoder: produce context vector
    encoder_inputs = tf.keras.Input(shape=input_shape, name="encoder_inputs")
    if cell == 'LSTM':
        context = tf.keras.layers.LSTM(units, dropout=dropout, name="encoder_lstm")(encoder_inputs)
    else:
        context = tf.keras.layers.GRU(units, dropout=dropout, name="encoder_gru")(encoder_inputs)

    # Repeat context H times to form decoder input sequence
    repeated = tf.keras.layers.RepeatVector(int(H), name="repeat_vector")(context)

    # Decoder: generate H-step sequence
    if cell == 'LSTM':
        decoder_seq = tf.keras.layers.LSTM(units, return_sequences=True, dropout=dropout, name="decoder_lstm")(repeated)
    else:
        decoder_seq = tf.keras.layers.GRU(units, return_sequences=True, dropout=dropout, name="decoder_gru")(repeated)

    outputs = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1), name="decoder_dense")(decoder_seq)

    model = tf.keras.Model(encoder_inputs, outputs)
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
seq_tuner_results = []

for W in [90]:
    for H in [14]:
        X, y, meta = build_sequences(regimes_df, feature_cols, 'ATM_IV_target', W=W, H=H)
        (X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test) = time_series_split(X, y, meta)

        input_shape = X_train.shape[1:]  # (W, num_features)

        tuner = kt.RandomSearch(
            lambda hp: seq2seq_builder(hp, H=H, input_shape=input_shape),
            objective='val_loss',
            max_trials=10,
            directory='tuner_dir',
            project_name=f'seq2seq_W{W}_H{H}',
            overwrite=True
        )

        # IMPORTANT: Seq2Seq expects [encoder_inputs, decoder_inputs]
        # For a baseline, you can feed zeros as decoder_inputs during training
        decoder_train = np.zeros((X_train.shape[0], H, 1))
        decoder_val   = np.zeros((X_val.shape[0], H, 1))
        
        tuner.search(X_train, y_train,
             validation_data=(X_val, y_val),
             epochs=30,
             callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)])

#         tuner.search([X_train, decoder_train], y_train,
#                      validation_data=([X_val, decoder_val], y_val),
#                      epochs=30,
#                      callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)])

        best_hp = tuner.get_best_hyperparameters(1)[0]
        best_trial = tuner.oracle.get_best_trials(1)[0]
        best_val_loss = best_trial.metrics.get_last_value("val_loss")

        print(f"W={W}, H={H}, best config:", best_hp.values)

        seq_tuner_results.append({
            "W": W,
            "H": H,
            "best_hp": best_hp.values,
            "val_loss": best_val_loss
        })


In [None]:
seq_tuner_df = pd.DataFrame(seq_tuner_results)
print(seq_tuner_df.sort_values("val_loss"))

In [None]:
best_row = seq_tuner_df.sort_values("val_loss").iloc[0]
print(best_row)

In [None]:
W_best = best_row["W"]
H_best = best_row["H"]

X, y, meta = build_sequences(regimes_df, feature_cols, 'ATM_IV_target', W=W_best, H=H_best)
(X_train, y_train, meta_train), (X_val, y_val, meta_val), (X_test, y_test, meta_test) = time_series_split(X, y, meta)

In [None]:
best_hp_values = best_row["best_hp"] 

input_shape = X_train.shape[1:]       # (W, num_features)
best_model = seq2seq_builder(best_hp_values, H_best, input_shape=input_shape)

In [None]:
history = best_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=30,
    callbacks=[tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)]
)

In [None]:
# Predict on test set
# Flatten last two dims into (num_samples, H)
y_pred_scaled_2d = y_pred_scaled.reshape(y_pred_scaled.shape[0], -1)
y_test_2d        = y_test.reshape(y_test.shape[0], -1)

# Inverse transform
y_pred_orig_2d = target_scaler.inverse_transform(y_pred_scaled_2d)
y_true_orig_2d = target_scaler.inverse_transform(y_test_2d)

# Optionally reshape back to (num_samples, H, 1)
y_pred_orig = y_pred_orig_2d.reshape(y_pred_scaled.shape)
y_true_orig = y_true_orig_2d.reshape(y_test.shape)

# Evaluate
evaluate_baseline(y_true_orig, y_pred_orig, "Best tuned model")