In [None]:
# ZAR OIS Curve Prediction from o/n rate and FX spot using Deep Learning (AutoML Enhanced)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
from scipy.interpolate import interp1d

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import optuna

# STEP 1: Load and Inspect Data
file_path = "ZAR_OIS_curve_data.csv"
df = pd.read_csv(file_path)
df['Dates'] = pd.to_datetime(df['Dates'])
df.set_index('Dates', inplace=True)
df.dropna(inplace=True)

# STEP 2: Generate Features
tenor_map = {"1m": 30/365, "2m": 60/365, "3m": 90/365, "4m": 120/365, "5m": 150/365, "6m": 180/365,
             "7m": 210/365, "8m": 240/365, "9m": 270/365, "1y": 365/365, "2y": 2, "3y": 3}

def calc_1d_forward(df_row, tenors=tenor_map):
    curve_x = [tenors[k] for k in tenors]
    curve_y = [df_row[k] for k in tenors]
    interpolator = interp1d(curve_x, curve_y, kind='cubic', fill_value='extrapolate')
    one_day = 1 / 365
    results = {}
    for k in ["1m", "2m", "3m", "4m", "5m", "6m"]:
        base = tenors[k]
        r1 = interpolator(base)
        r2 = interpolator(base + one_day)
        fwd_rate = ((1 + r2)**(base + one_day) / (1 + r1)**base)**(1 / one_day) - 1
        results[f"fwd_1d_after_{k}"] = fwd_rate
    return pd.Series(results)

fwd_features = df.apply(calc_1d_forward, axis=1)
X = pd.concat([fwd_features, df[['USDZAR', 'o/n interest rate']]], axis=1).dropna()
y = df.loc[X.index, list(tenor_map.keys())]

SEQUENCE_LENGTH = 7
X_seq, y_seq, dates_seq = [], [], []
for i in range(SEQUENCE_LENGTH, len(X)):
    X_seq.append(X.iloc[i - SEQUENCE_LENGTH:i].values)
    y_seq.append(y.iloc[i].values)
    dates_seq.append(X.index[i])

X_seq = np.array(X_seq)
y_seq = np.array(y_seq)
dates_seq = np.array(dates_seq)

train_size = int(0.8 * len(X_seq))
X_train, X_test = X_seq[:train_size], X_seq[train_size:]
y_train, y_test = y_seq[:train_size], y_seq[train_size:]
dates_test = dates_seq[train_size:]

feature_scaler = StandardScaler()
target_scaler = MinMaxScaler()

X_train_scaled = feature_scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_test_scaled = feature_scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)
y_train_scaled = target_scaler.fit_transform(y_train)
y_test_scaled = target_scaler.transform(y_test)

# STEP 3: PyTorch LSTM Model and Optuna Tuning
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.drop = nn.Dropout(dropout)
        self.out = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        x = self.fc1(h_n[-1])
        x = self.drop(torch.relu(x))
        return self.out(x)

def objective(trial):
    hidden_dim = trial.suggest_int("hidden_dim", 64, 256)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)

    model = LSTMModel(X_train_scaled.shape[2], hidden_dim, y_train.shape[1], dropout).to('cpu')
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    X_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
    y_tensor = torch.tensor(y_train_scaled, dtype=torch.float32)
    loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=32, shuffle=True)

    model.train()
    for epoch in range(20):
        for xb, yb in loader:
            pred = model(xb)
            loss = criterion(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # Validation
    model.eval()
    X_val_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_test_scaled, dtype=torch.float32)
    with torch.no_grad():
        val_preds = model(X_val_tensor).numpy()
    val_preds = target_scaler.inverse_transform(val_preds)
    y_true = y_test
    r2 = r2_score(y_true, val_preds)
    return -r2  # Optuna minimizes

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=25)

print("Best trial:", study.best_trial.params)

# Retrain with best params
best = study.best_trial.params
final_model = LSTMModel(X_train_scaled.shape[2], best['hidden_dim'], y_train.shape[1], best['dropout'])
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(final_model.parameters(), lr=best['lr'])

final_model.train()
train_loader = DataLoader(TensorDataset(torch.tensor(X_train_scaled, dtype=torch.float32),
                                        torch.tensor(y_train_scaled, dtype=torch.float32)),
                          batch_size=32, shuffle=True)
for epoch in range(40):
    for xb, yb in train_loader:
        pred = final_model(xb)
        loss = criterion(pred, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# Evaluate on full test set
final_model.eval()
with torch.no_grad():
    pred_scaled = final_model(torch.tensor(X_test_scaled, dtype=torch.float32)).numpy()
preds = target_scaler.inverse_transform(pred_scaled)

# Scores
mse = mean_squared_error(y_test, preds)
r2 = r2_score(y_test, preds)
print(f"Optuna-LSTM Test MSE: {mse:.4f}, R2: {r2:.4f}")

# STEP 4: Visualization - Actual vs Predicted for Test Set
tenors = list(tenor_map.keys())
plt.figure(figsize=(22, 6))
for i, tenor in enumerate(tenors):
    plt.plot(dates_test, y_test[:, i], label=f"Actual {tenor}", alpha=0.8)
    plt.plot(dates_test, preds[:, i], linestyle='--', label=f"Predicted {tenor}", alpha=0.7)
plt.legend(ncol=6)
plt.title("Test Set: Actual vs Predicted OIS Curve")
plt.grid(True)
plt.tight_layout()
plt.show()

# STEP 5: Evaluate Model on Entire Dataset
X_all_seq, y_all_seq, dates_all = [], [], []
for i in range(SEQUENCE_LENGTH, len(X)):
    X_all_seq.append(X.iloc[i - SEQUENCE_LENGTH:i].values)
    y_all_seq.append(y.iloc[i].values)
    dates_all.append(X.index[i])
X_all_seq = np.array(X_all_seq)
y_all_seq = np.array(y_all_seq)
X_all_scaled = feature_scaler.transform(X_all_seq.reshape(-1, X_all_seq.shape[-1])).reshape(X_all_seq.shape)

final_model.eval()
with torch.no_grad():
    preds_all_scaled = final_model(torch.tensor(X_all_scaled, dtype=torch.float32)).numpy()
preds_all = target_scaler.inverse_transform(preds_all_scaled)

# Full Dataset R2 and MSE
full_mse = mean_squared_error(y_all_seq, preds_all)
full_r2 = r2_score(y_all_seq, preds_all)
print(f"Full Dataset MSE: {full_mse:.4f}, R2: {full_r2:.4f}")

# STEP 6: Interactive Prediction
def predict_yield_curve(custom_input):
    input_df = pd.DataFrame([custom_input])
    input_scaled = feature_scaler.transform(input_df.values)
    input_seq = np.tile(input_scaled, (SEQUENCE_LENGTH, 1))
    input_tensor = torch.tensor(input_seq[np.newaxis, :, :], dtype=torch.float32)
    final_model.eval()
    with torch.no_grad():
        pred_scaled = final_model(input_tensor).numpy()
    pred = target_scaler.inverse_transform(pred_scaled)
    return pred.flatten()

# Example usage
example_input = {
    'fwd_1d_after_1m': 0.072,
    'fwd_1d_after_2m': 0.071,
    'fwd_1d_after_3m': 0.070,
    'fwd_1d_after_4m': 0.069,
    'fwd_1d_after_5m': 0.068,
    'fwd_1d_after_6m': 0.067,
    'USDZAR': 18.2,
    'o/n interest rate': 0.071
}
predicted_curve = predict_yield_curve(example_input)

plt.figure(figsize=(10, 5))
plt.plot(tenors, predicted_curve, marker='o')
plt.title("Predicted ZAR OIS Curve from Custom Inputs")
plt.xlabel("Tenor")
plt.ylabel("Rate")
plt.grid(True)
plt.show()


In [None]:
# ZAR OIS Curve Prediction from o/n rate and FX spot using Deep Learning (AutoML Enhanced)

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
from scipy.interpolate import interp1d

import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import optuna

# STEP 1: Load and Inspect Data
file_path = "ZAR_OIS_curve_data.csv"
df = pd.read_csv(file_path)
df['Dates'] = pd.to_datetime(df['Dates'])
df.set_index('Dates', inplace=True)
df.dropna(inplace=True)

# STEP 2: Generate Features
tenor_map = {"1m": 30/365, "2m": 60/365, "3m": 90/365, "4m": 120/365, "5m": 150/365, "6m": 180/365,
             "7m": 210/365, "8m": 240/365, "9m": 270/365, "1y": 365/365, "2y": 2, "3y": 3}

def calc_1d_forward(df_row, tenors=tenor_map):
    curve_x = [tenors[k] for k in tenors]
    curve_y = [df_row[k] for k in tenors]
    interpolator = interp1d(curve_x, curve_y, kind='cubic', fill_value='extrapolate')
    one_day = 1 / 365
    results = {}
    for k in ["1m", "2m", "3m", "4m", "5m", "6m"]:
        base = tenors[k]
        r1 = interpolator(base)
        r2 = interpolator(base + one_day)
        fwd_rate = ((1 + r2)**(base + one_day) / (1 + r1)**base)**(1 / one_day) - 1
        results[f"fwd_1d_after_{k}"] = fwd_rate
    return pd.Series(results)

fwd_features = df.apply(calc_1d_forward, axis=1)
X = pd.concat([fwd_features, df[['USDZAR', 'o/n interest rate']]], axis=1).dropna()
y = df.loc[X.index, list(tenor_map.keys())]

SEQUENCE_LENGTH = 7
X_seq, y_seq, dates_seq = [], [], []
for i in range(SEQUENCE_LENGTH, len(X)):
    X_seq.append(X.iloc[i - SEQUENCE_LENGTH:i].values)
    y_seq.append(y.iloc[i].values)
    dates_seq.append(X.index[i])

X_seq = np.array(X_seq)
y_seq = np.array(y_seq)
dates_seq = np.array(dates_seq)

train_size = int(0.8 * len(X_seq))
X_train, X_test = X_seq[:train_size], X_seq[train_size:]
y_train, y_test = y_seq[:train_size], y_seq[train_size:]
dates_test = dates_seq[train_size:]

feature_scaler = StandardScaler()
target_scaler = MinMaxScaler()

X_train_scaled = feature_scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_test_scaled = feature_scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)
y_train_scaled = target_scaler.fit_transform(y_train)
y_test_scaled = target_scaler.transform(y_test)

# STEP 3: PyTorch LSTM Model and Optuna Tuning
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.drop = nn.Dropout(dropout)
        self.out = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        x = self.fc1(h_n[-1])
        x = self.drop(torch.relu(x))
        return self.out(x)

def objective(trial):
    hidden_dim = trial.suggest_int("hidden_dim", 64, 256)
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)

    model = LSTMModel(X_train_scaled.shape[2], hidden_dim, y_train.shape[1], dropout).to('cpu')
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    X_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
    y_tensor = torch.tensor(y_train_scaled, dtype=torch.float32)
    loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=32, shuffle=True)

    model.train()
    for epoch in range(20):
        for xb, yb in loader:
            pred = model(xb)
            loss = criterion(pred, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    # Validation
    model.eval()
    X_val_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
    y_val_tensor = torch.tensor(y_test_scaled, dtype=torch.float32)
    with torch.no_grad():
        val_preds = model(X_val_tensor).numpy()
    val_preds = target_scaler.inverse_transform(val_preds)
    y_true = y_test
    r2 = r2_score(y_true, val_preds)
    return -r2  # Optuna minimizes

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=25)

print("Best trial:", study.best_trial.params)

# Retrain with best params
best = study.best_trial.params
final_model = LSTMModel(X_train_scaled.shape[2], best['hidden_dim'], y_train.shape[1], best['dropout'])
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(final_model.parameters(), lr=best['lr'])

final_model.train()
train_loader = DataLoader(TensorDataset(torch.tensor(X_train_scaled, dtype=torch.float32),
                                        torch.tensor(y_train_scaled, dtype=torch.float32)),
                          batch_size=32, shuffle=True)
for epoch in range(40):
    for xb, yb in train_loader:
        pred = final_model(xb)
        loss = criterion(pred, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# Evaluate on full test set
final_model.eval()
with torch.no_grad():
    pred_scaled = final_model(torch.tensor(X_test_scaled, dtype=torch.float32)).numpy()
preds = target_scaler.inverse_transform(pred_scaled)

# Scores
mse = mean_squared_error(y_test, preds)
r2 = r2_score(y_test, preds)
print(f"Optuna-LSTM Test MSE: {mse:.4f}, R2: {r2:.4f}")

# STEP 4: Visualization - Actual vs Predicted for Test Set
tenors = list(tenor_map.keys())
plt.figure(figsize=(22, 6))
for i, tenor in enumerate(tenors):
    plt.plot(dates_test, y_test[:, i], label=f"Actual {tenor}", alpha=0.8)
    plt.plot(dates_test, preds[:, i], linestyle='--', label=f"Predicted {tenor}", alpha=0.7)
plt.legend(ncol=6)
plt.title("Test Set: Actual vs Predicted OIS Curve")
plt.grid(True)
plt.tight_layout()
plt.show()

# STEP 5: Evaluate Model on Entire Dataset
X_all_seq, y_all_seq, dates_all = [], [], []
for i in range(SEQUENCE_LENGTH, len(X)):
    X_all_seq.append(X.iloc[i - SEQUENCE_LENGTH:i].values)
    y_all_seq.append(y.iloc[i].values)
    dates_all.append(X.index[i])
X_all_seq = np.array(X_all_seq)
y_all_seq = np.array(y_all_seq)
X_all_scaled = feature_scaler.transform(X_all_seq.reshape(-1, X_all_seq.shape[-1])).reshape(X_all_seq.shape)

final_model.eval()
with torch.no_grad():
    preds_all_scaled = final_model(torch.tensor(X_all_scaled, dtype=torch.float32)).numpy()
preds_all = target_scaler.inverse_transform(preds_all_scaled)

# Full Dataset R2 and MSE
full_mse = mean_squared_error(y_all_seq, preds_all)
full_r2 = r2_score(y_all_seq, preds_all)
print(f"Full Dataset MSE: {full_mse:.4f}, R2: {full_r2:.4f}")

# STEP 6: Interactive Prediction with Confidence Interval
def predict_yield_curve_with_ci(custom_input, n_simulations=50):
    input_df = pd.DataFrame([custom_input])
    input_scaled = feature_scaler.transform(input_df.values)
    input_seq = np.tile(input_scaled, (SEQUENCE_LENGTH, 1))
    input_tensor = torch.tensor(input_seq[np.newaxis, :, :], dtype=torch.float32)
    final_model.eval()

    preds = []
    for _ in range(n_simulations):
        with torch.no_grad():
            pred_scaled = final_model(input_tensor).numpy()
            pred = target_scaler.inverse_transform(pred_scaled).flatten()
            preds.append(pred)
    preds = np.array(preds)
    mean_pred = preds.mean(axis=0)
    lower_bound = np.percentile(preds, 2.5, axis=0)
    upper_bound = np.percentile(preds, 97.5, axis=0)

    return mean_pred, lower_bound, upper_bound

# Example usage
example_input = {
    'fwd_1d_after_1m': 0.072,
    'fwd_1d_after_2m': 0.071,
    'fwd_1d_after_3m': 0.070,
    'fwd_1d_after_4m': 0.069,
    'fwd_1d_after_5m': 0.068,
    'fwd_1d_after_6m': 0.067,
    'USDZAR': 18.2,
    'o/n interest rate': 0.071
}
mean_pred, lower, upper = predict_yield_curve_with_ci(example_input)

plt.figure(figsize=(10, 5))
plt.plot(tenors, mean_pred, marker='o', label='Mean Prediction')
plt.fill_between(tenors, lower, upper, color='gray', alpha=0.3, label='95% Confidence Interval')
plt.title("Predicted ZAR OIS Curve with 95% Confidence Interval")
plt.xlabel("Tenor")
plt.ylabel("Rate")
plt.legend()
plt.grid(True)
plt.show()
