In [21]:
import torch
import torch.nn as nn
import pandas as pd
import os
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchinfo import summary

In [None]:
def directional_accuracy(gt, pred):
    """Calculates directional accuracy of given ground truth and prediction series.
    From Kaeley et al.
    
    inputs:
        gt: ground truth prices
        pred: predicted prices
        
    returns:
        acc: directional accuracy of predicted values
    """
    acc = []
    for i in range(1, len(gt)):
        if gt[i] >= gt[i-1] and pred[i] >= gt[i-1]:
            acc.append(1)
        elif gt[i] < gt[i-1] and pred[i] < gt[i-1]:
            acc.append(1)
        else:
            acc.append(0)

    return np.array(acc).mean()

In [22]:
def z_norm(df, col_exclude=None):
    """Performs z-score normalization on all columns of df except col_exclude
    
    inputs:
        df: stock data
        col_exclude: columns to be excluded from normalization
        
    returns:
        df_std: normalized z-score data
    """

    df_std = df.copy()
    cols = list(df.columns)
    cols.remove(col_exclude)
    for c in cols:
        df_std[c] = (df_std[c] - df_std[c].mean()) / df_std[c].std()

    return df_std

In [23]:
def to_sequences(seq_size: int, obs: np.array):
    """Splits a table of data into sequences of given length"""

    x = []
    y = []
    for i in range(len(obs) - seq_size):
        window = obs[i:(i + seq_size), :]
        after_window = obs[i + seq_size, :]
        x.append(window)
        y.append(after_window)
    return x, y

In [24]:
# loading data
data_path = 'data'
interval = '1d'
companies = ['AAPL'] #os.listdir(data_path)
df_list = []
test_start_date = pd.to_datetime("2022-03-01")
for co in companies:
    files = os.listdir(os.path.join(data_path, co))
    for f in files:
        if interval in f:
            file = f

    df = pd.read_csv(os.path.join(data_path, co, file))
    df = df.drop(columns=['Unnamed: 0'])

    df = z_norm(df, 'date')
    df['date'] = pd.to_datetime(df['date'])
    # could also add ticker label column

    df_list.append(df)

In [25]:
x_train = []
x_test = []
y_train = []
y_test = []
seq_size = 30 # 30th day will be the prediction
for df in df_list:
    # split each df into train and test timeframes
    df_train = df[df['date'] < test_start_date]
    df_test = df[df['date'] >= test_start_date]

    # drop unnecessary columns
    df_train = df.drop(columns=['date'])
    df_test = df.drop(columns=['date'])

    # convert to 2D numpy arrays of shape (-1, num_cols)
    train = df_train.to_numpy()
    test = df_test.to_numpy()

    # convert to sequences and append to respective training and testing lists
    x, y = to_sequences(seq_size, train)
    for i in range(len(x)):
        x_train.append(x[i])
        y_train.append(y[i])
    x, y = to_sequences(seq_size, test)
    for j in range(len(x)):
        x_test.append(x[i])
        y_test.append(y[i])

ndims = y_train[0].shape[0]

x_train = np.vstack(x_train).reshape(-1, seq_size, ndims)
y_train = np.vstack(y_train).reshape(-1, ndims)
x_test = np.vstack(x_test).reshape(-1, seq_size, ndims)
y_test = np.vstack(y_test).reshape(-1, ndims)

x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_test = torch.tensor(x_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

test_dataset = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

Using basic transformer from https://github.com/jeffheaton/app_deep_learning/blob/main/t81_558_class_10_3_transformer_timeseries.ipynb

In [26]:
# Positional Encoding for Transformer
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [27]:
# Model definition using Transformer
class TransformerModel(nn.Module):
    def __init__(self, input_dim=1, output_dim=1, d_model=64, nhead=4, num_layers=2, dropout=0.1):
        super(TransformerModel, self).__init__()

        self.encoder = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model, nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.decoder = nn.Linear(d_model, output_dim)
        self.init_weights()

    def init_weights(self):
        initrange = 0.1    
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, x):
        x = self.encoder(x)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = self.decoder(x[:, -1, :])
        return x

model = TransformerModel(input_dim=ndims, output_dim=ndims)



In [28]:
# Train the model
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=0.5, patience=10, verbose=True)

epochs = 1000
early_stop_count = 0
min_val_loss = float('inf')

for epoch in range(epochs):
    model.train()
    train_losses = []
    for batch in train_loader:
        x_batch, y_batch = batch
        # x_batch, y_batch = x_batch, y_batch # can do .to(device) here

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        train_losses.append(loss.item())
        optimizer.step()

    # Validation
    model.eval()
    val_losses = []
    with torch.no_grad():
        for batch in test_loader:
            x_batch, y_batch = batch
            # x_batch, y_batch = x_batch, y_batch # can do .to(device) here
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            val_losses.append(loss.item())

    val_loss = np.mean(val_losses)
    train_loss = np.mean(train_losses)
    # scheduler.step(val_loss)

    # if val_loss < min_val_loss:
    #     min_val_loss = val_loss
    #     early_stop_count = 0
    # else:
    #     early_stop_count += 1

    # if early_stop_count >= 5:
    #     print("Early stopping!")
    #     break
    print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

Epoch 1/1000, Train Loss: 0.2802, Validation Loss: 0.2395
Epoch 2/1000, Train Loss: 0.1829, Validation Loss: 0.3161
Epoch 3/1000, Train Loss: 0.1717, Validation Loss: 0.2636
Epoch 4/1000, Train Loss: 0.1635, Validation Loss: 0.2400
Epoch 5/1000, Train Loss: 0.1559, Validation Loss: 0.2976
Epoch 6/1000, Train Loss: 0.1545, Validation Loss: 0.3116
Epoch 7/1000, Train Loss: 0.1491, Validation Loss: 0.2914
Epoch 8/1000, Train Loss: 0.1464, Validation Loss: 0.2463
Epoch 9/1000, Train Loss: 0.1437, Validation Loss: 0.3192
Epoch 10/1000, Train Loss: 0.1461, Validation Loss: 0.4474
Epoch 11/1000, Train Loss: 0.1441, Validation Loss: 0.2681
Epoch 12/1000, Train Loss: 0.1449, Validation Loss: 0.2294
Epoch 13/1000, Train Loss: 0.1399, Validation Loss: 0.2553
Epoch 14/1000, Train Loss: 0.1391, Validation Loss: 0.3444
Epoch 15/1000, Train Loss: 0.1379, Validation Loss: 0.2400
Epoch 16/1000, Train Loss: 0.1373, Validation Loss: 0.3199
Epoch 17/1000, Train Loss: 0.1380, Validation Loss: 0.3384
Epoch 

KeyboardInterrupt: 

In [29]:
saved_weights_path = 'saved_models'
fn = 'AAPL_model_20240209.pt'
torch.save(model.state_dict(), os.path.join(saved_weights_path, fn))

In [None]:
# testing with metrics
model = TransformerModel(input_dim=ndims, output_dim=ndims)
model.load_state_dict(torch.load(os.path.join(saved_weights_path, fn)))
with torch.no_grad():
    for batch in test_loader:
        x_batch, y_batch = batch
        # x_batch, y_batch = x_batch, y_batch # can do .to(device) here
        outputs = model(x_batch)
