In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
import pandas as pd
import os

In [None]:
# set path for data import
notebook_dir = os.path.dirname(os.path.abspath("__file__"))
data_path = os.path.join(notebook_dir, '..', '..', 'confidential_data')

In [None]:
data = pd.read_csv(f'{data_path}/merged_data.csv', index_col=0, parse_dates=True)
data.dropna(inplace=True)
# delete rows where volume is zero
data = data[data['sp500_volume'] != 0]

In [None]:
print(data.head())
print(data.shape)

In [None]:
# Create variable Log Returns and further
data["DailyReturn"] = np.log(data["sp500_close"]).diff()
data['Sign_1d'] = (data['DailyReturn'] > 0).astype(int)
data['Volume_change'] = np.log(data['sp500_volume']).diff().shift(1)
#data['Trading_range'] = np.log(data['sp500_high'] / data['sp500_low']).shift(1)

data = data.iloc[1:]

In [None]:
data["DailyReturn"].describe()

In [None]:
plt.figure(figsize=(12, 4))
plt.plot(data["DailyReturn"], label='Daily Log Returns', alpha=0.7)
plt.legend()

In [None]:
# plot the distribution of daily returns
plt.figure(figsize=(10, 6))
plt.hist(data["DailyReturn"], bins=100, density=True, alpha=0.6, color='g')
plt.title('Distribution of Daily Log Returns')
plt.xlabel('Daily Log Return')
plt.ylabel('Density')
plt.grid()
plt.show()

NOTE: If I use log returns, then there is a zero asset return (mean=0.0002). This would make the use of volatility as predictor for sign return described in Christofferson & Diebold (2006) not applicable to my project.

In [None]:
# Check for normality of returns
jb_stat, jb_p = stats.jarque_bera(data["DailyReturn"].dropna())
print(f"Jarque-Bera test statistic: {np.round(jb_stat, 2)}, p-value: {np.round(jb_p, 4)}")

Hence, the Daily Returns do not follow a normal distribution and sign forecastibility with zero mean asset return but asymmetric distribution from [Christofferson et al.](https://economics.sas.upenn.edu/pier/working-paper/2006/direction-change-forecasts-based-conditional-variance-skewness-and-kurtosis) (2007) could be applied. By the descriptive statistics, it already becomes apparent that there is excess kurtosis, since the minimum return is roughly 22 st.dev. away from the mean (when returns go back to 1980). The probability for this to happen is so small, that this return would be even unlikely if we get a return every nanosecond from the start of the universe. Since Pr(Z>22)=2.44x10^-107 for a st.normal distr. Whereas, the universe exists for roughly 4,35x10^26=13,800,000,000x365x24x3600x10^9 nanoseconds.

In [None]:
# get annualized realized volatility
data['Realized_Vol_20d'] = data['DailyReturn'].rolling(window=20).std() * np.sqrt(252)

In [None]:
# print nan values per column
print(data.isna().sum())

TODO: Download Bond data from bloomberg

In [None]:
# Add lagged returns as features
data['Return_Lag1'] = data['DailyReturn'].shift(1)
data['oil_price_return'] = np.log(data['oil_close']).diff()
data['vix_return'] = np.log(data['vix']).diff()
data['sign_lag1'] = data['Sign_1d'].shift(1)

data = data.dropna()

# Feature matrix
features = np.column_stack([
    data['Realized_Vol_20d'].values,
    data['Return_Lag1'].values,
    data['Volume_change'].values,
    #data['Trading_range'].values,
    data['fed_fund_rate'].values,
    data['treasury_3mo'].values,
    data['treasury_10yr'].values,
    data['treasury_30yr'].values,
    data['corp_bond_rate_aaa'].values,
    data['oil_price_return'].values,
    data['vix'].values,
    data['vix_return'].values,
    data['sign_lag1'].values
])

input_variables = features.shape[1]

# Deep Learning for Sign Classification

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import random

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

## Create loss function that maximizes classification accuracy

In [None]:
class MaximumUtilityLoss(nn.Module):
    """
    Differentiable approximation of the Elliott-Lieli (2013) Maximum Utility estimator.
    Optimizes correct classification rather than probability calibration.
    """
    def __init__(self, c=0.0, sharpness=10.0, w_pos=1.0, w_neg=1.0):
        super().__init__()
        self.c = c
        self.sharpness = sharpness
        self.w_pos = w_pos
        self.w_neg = w_neg

    def forward(self, logits, y):
        y_sign = 2 * y - 1 # -1 or 1
        pred = torch.tanh(self.sharpness * (logits - self.c))  # get a sign approximation
        b = torch.where(y > 0.5, self.w_pos, self.w_neg)
        loss = 1-torch.mean(b * y_sign * pred)
        return loss

In [None]:
NUMBER_OF_LAGS = 100  # number of lagged observations
HIDDEN_DIM = 64
NUM_LAYERS = 4
DROPOUT = 0.0
NUMBER_OF_EPOCHS = 50
CLASSIFICATION_THRESHOLD = 0.5
LR = 3e-4
LR_STEP_SIZE = 10       # after how many epochs to decrease LR
LR_GAMMA = 0.5          # factor to decrease LR

In [None]:
# Create lagged feature matrix for LSTM
def create_lagged_matrix_multifeature(X, window):
    out = []
    for i in range(window, len(X)):
        out.append(X[i-window:i, :])
    return np.array(out)

X_lagged = create_lagged_matrix_multifeature(features, NUMBER_OF_LAGS)
y_target = data['Sign_1d'].values[NUMBER_OF_LAGS:]

## Define LSTM Model

In [None]:
class SimpleLSTM(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=16, num_layers=1, dropout=0.0):
        super(SimpleLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.0)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]
        out = self.fc(out)
        return out

In [None]:
dates_lagged = data.index[NUMBER_OF_LAGS:] 
years = dates_lagged.year
unique_years = np.unique(years).astype(int)
min_year = int(unique_years.min())
max_year = int(unique_years.max())

# require at least this many trading observations for a year to be considered "full"
MIN_DAYS_PER_YEAR = 200

# find first test_year such that test_year and previous 3 years each have >= MIN_DAYS_PER_YEAR samples
first_valid_test_year = None
for cand in range(min_year + 3, max_year + 1):
    train_years = [cand - 3, cand - 2, cand - 1]
    counts = {y: np.sum(years == y) for y in train_years + [cand]}
    if all(counts[y] >= MIN_DAYS_PER_YEAR for y in train_years + [cand]):
        first_valid_test_year = cand
        print("First valid test year:", first_valid_test_year)
        break

if first_valid_test_year is None:
    raise RuntimeError("No calendar-aligned test year found with sufficient data. Lower MIN_DAYS_PER_YEAR or check your date range.")


In [None]:
from tqdm import tqdm

window_results = []
window_no = 0

for test_year in tqdm(range(first_valid_test_year, max_year+1), desc="yearly rolling window"):
    window_no += 1
    train_years = [test_year - 3, test_year - 2, test_year - 1]
    train_idx = np.where(np.isin(years, train_years))[0]
    test_idx  = np.where(years == test_year)[0]

    if len(train_idx) < 700 or len(test_idx) < 230:
        raise ValueError("The data length suggests there is insufficient data in window {}.".format(window_no))

    # prepare split
    X_tr = X_lagged[train_idx]
    y_tr = y_target[train_idx]
    X_te = X_lagged[test_idx]
    y_te = y_target[test_idx]

    # tensors
    X_tr_t = torch.tensor(X_tr, dtype=torch.float32)
    y_tr_t = torch.tensor(y_tr.reshape(-1,1), dtype=torch.float32)
    X_te_t = torch.tensor(X_te, dtype=torch.float32)

    # save train and test date range
    train_start_date = dates_lagged[train_idx[0]].date()
    train_end_date   = dates_lagged[train_idx[-1]].date()
    test_start_date  = dates_lagged[test_idx[0]].date()
    test_end_date    = dates_lagged[test_idx[-1]].date()

    # new model per window
    model_window = SimpleLSTM(input_dim=input_variables, hidden_dim=HIDDEN_DIM, num_layers=NUM_LAYERS, dropout=DROPOUT)
    optimizer = optim.Adam(model_window.parameters(), lr=LR)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=LR_STEP_SIZE, gamma=LR_GAMMA)
    criterion = nn.BCEWithLogitsLoss()

    ds = TensorDataset(X_tr_t, y_tr_t)
    data_loader = DataLoader(ds, batch_size=32, shuffle=True)  # batch size is no hyperparameter

    # train
    model_window.train()
    for epoch in range(NUMBER_OF_EPOCHS):
        epoch_loss = 0.0
        for X_batch, y_batch in data_loader:
            optimizer.zero_grad()
            logits = model_window(X_batch)
            loss = criterion(logits, y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item() * X_batch.size(0)
        # decrease learning rate
        scheduler.step()

    # predict for the whole test year
    model_window.eval()
    with torch.no_grad():
        logits_test = model_window(X_te_t).squeeze()         # shape (TEST_DAYS,)
        probs = torch.sigmoid(logits_test).cpu().numpy()
        preds = (probs > CLASSIFICATION_THRESHOLD).astype(int)

    window_df = pd.DataFrame({
        'Actual_Sign': y_te,
        'Predicted_Sign': preds,
        'Pos_probability': probs,
        'Train_Start': train_start_date,
        'Train_End': train_end_date,
        'Test_Start': test_start_date,
        'Test_End': test_end_date
    }, index=dates_lagged[test_idx])

    window_results.append(window_df)

# combine windows
if window_results:
    rolling_results = pd.concat(window_results).sort_index()
    overall_acc = (rolling_results['Actual_Sign'] == rolling_results['Predicted_Sign']).mean()
    print(f"Rolling annual OOS accuracy: {overall_acc:.4f}")
    rolling_results.to_csv("LSTM_rolling_annual_results.csv")
else:
    print("No windows produced - check TRAIN_DAYS/TEST_DAYS and data length.")

In [None]:
# count parameters
sum(p.numel() for p in model_window.parameters() if p.requires_grad)

#parameters: 
- 1st hidden layer: 4x(#features x hidden_size + hidden_size x hidden_size) weights + hidden_size biases
- subsequent hidden layers: 4x(hidden_size x hidden_size + hidden_size x hidden_size) + hidden_size biases
- last layer (fully connceted layer): hidden_size + 1

In [None]:
rolling_results["Pos_probability"].plot(figsize=(12,6))
plt.title("Predicted Probability of Positive Return")

### Out of sample Prediction

In [None]:
from sklearn.metrics import classification_report, accuracy_score

# Classification report
print("Classification Report:")
print(classification_report(rolling_results['Actual_Sign'], rolling_results['Predicted_Sign']))
print("Accuracy:", accuracy_score(rolling_results['Actual_Sign'], rolling_results['Predicted_Sign']))

## Evaluate results for each Year individually

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report

# rolling_results should exist from previous cell
if 'rolling_results' not in globals():
    raise RuntimeError("rolling_results not found — run the rolling evaluation cell first.")

# determine grouping year (prefer explicit Test_Year if present)
if 'Test_Year' in rolling_results.columns:
    years_idx = rolling_results['Test_Year']
else:
    years_idx = rolling_results.index.year

summary_rows = []
per_year_reports = {}

for y in sorted(years_idx.unique()):
    mask = (years_idx == y)
    y_true = rolling_results.loc[mask, 'Actual_Sign'].values
    y_pred = rolling_results.loc[mask, 'Predicted_Sign'].values

    if len(y_true) == 0:
        continue

    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    rec = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    cm = confusion_matrix(y_true, y_pred)

    summary_rows.append({
        'Year': int(y),
        'N': len(y_true),
        'Accuracy': acc,
        'Precision 1': prec,
        'Recall 1': rec,
        'F1': f1,
        'TN': int(cm[0,0]) if cm.shape == (2,2) else 0,
        'FP': int(cm[0,1]) if cm.shape == (2,2) else 0,
        'FN': int(cm[1,0]) if cm.shape == (2,2) else 0,
        'TP': int(cm[1,1]) if cm.shape == (2,2) else 0
    })

    # store full sklearn report per year
    per_year_reports[y] = classification_report(y_true, y_pred, zero_division=0, output_dict=False)

summary_df = pd.DataFrame(summary_rows).sort_values('Year').set_index('Year')
print("Per-year summary:")
print(summary_df)

In [None]:
ax = summary_df['Accuracy'].plot(kind='bar', figsize=(10,4), title='Out of Sample Accuracy by Year')
ax.set_ylabel('Accuracy')
plt.tight_layout()