![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [1]:
#region imports
from AlgorithmImports import *
import torch
from torch import nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import joblib
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import TensorDataset, DataLoader
#endregion

In [2]:
# https://www.quantconnect.com/docs/v2/research-environment/machine-learning/pytorch
# Your New Python File

# get historical data
qb = QuantBook()
symbol = qb.add_equity("TSLA", Resolution.DAILY).symbol
history = qb.history(symbol, datetime(2019, 1, 1), datetime(2024, 1, 1)).loc[symbol]

# Prepare data
df = pd.DataFrame(data={'close': history['close']})
df['log_returns'] = np.log(df['close']) - np.log(df['close'].shift(1))

# calculate log returns and realized volatility of the underlying
prediction_window_size = 21  # trading days in rolling window # TODO: adjust this window 7, 10, 21.,...
dpy = 252  # trading days per year
# ann_factor = dpy / window_size # annualization factor
df['realized_vol_10'] = np.sqrt(df['log_returns'].rolling(window=10).var() * dpy / 10)
df['realized_vol_21'] = np.sqrt(df['log_returns'].rolling(window=21).var() * dpy / 21)
df['realized_vol_60'] = np.sqrt(df['log_returns'].rolling(window=60).var() * dpy / 60)
df = df.dropna()
df

In [3]:
df.info()

In [4]:
last_n_days = 1000

plt.figure(figsize=(14, 7))
# Plotting Log Returns
plt.subplot(3, 1, 1)
plt.plot(df['close'][-last_n_days:], label='TSLA Stock Close Price')
plt.title('TSLA Stock Prices')
plt.xlabel('Date')
plt.ylabel('Closing Price')
plt.grid(True)

# Plotting Log Returns
plt.subplot(3, 1, 2)
plt.plot(df['log_returns'][-last_n_days:], label='Log Returns')
plt.title('Daily Log Returns')
plt.xlabel('Date')
plt.ylabel('Log Return')
plt.grid(True)

# Plotting Realized Volatility
plt.subplot(3, 1, 3)
plt.plot(df['realized_vol_10'][-last_n_days:], label='Realized 10-day Volatility', color='red')
plt.plot(df['realized_vol_21'][-last_n_days:], label='Realized 21-day Volatility', color='green')
plt.plot(df['realized_vol_60'][-last_n_days:], label='Realized 60-day Volatility', color='blue')
plt.title('Realized k-Day Volatility')
plt.xlabel('Date')
plt.ylabel('Volatility')
plt.grid(True)

plt.tight_layout()
plt.show()

In [5]:
# Feature selection
feature_to_predict = 'realized_vol_10'
# get column index of feature_to_predict
label_idx = df.columns.get_loc(feature_to_predict)

# set lookback window size: how many days of past data does the LSTM look at?
lookback = 10

# PyTorch settings
device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.set_default_dtype(torch.float64)

# create set creation
def create_dataset(df, lookback, label_idx, device):
    features = []
    labels = []
    for i in range(len(df)-lookback):
        features.append(df[i:i+lookback]) #  indices `i` up to (but excluding) `i+lookback`
        labels.append(df[i+lookback][label_idx]) # index `i+lookback` only

    return torch.tensor(features.array()).to(device), torch.tensor(labels.array()).to(device)

# split dataset into train and test sets
split_idx = int(df.shape[0] * 0.8)
train_df, test_df = df[:split_idx], df[split_idx:]

# since LSTM uses sigmoid or tanh activation, it is senstivie to scale of the data, should rescale to [0, 1]
scaler = MinMaxScaler(feature_range=(0,1))
train_df_scaled = scaler.fit_transform(train_df)
test_df_scaled = scaler.transform(test_df)
print(train_df_scaled.shape, test_df_scaled.shape)

# split into input features X and outputlabels y
X_train, y_train = create_dataset(train_df_scaled, lookback, label_idx, device)
X_test, y_test = create_dataset(test_df_scaled, lookback, label_idx, device)

# create TensorDatasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

In [6]:
len(train_dataset), len(test_dataset)

In [7]:
df

In [8]:
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [9]:
class VolNet(nn.Module):
    def __init__(self, num_inputs):
        super(VolNet, self).__init__()
        self.num_inputs = num_inputs
        self.num_layers = 2
        self.hidden_size = 32
        self.num_outputs = 1
        
        self.lstm = nn.LSTM(input_size=num_inputs, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True) # up to 3 layers for LSTM
        self.dense = nn.Linear(in_features=self.hidden_size, out_features=self.num_outputs) # fully-connected layer
        
        # self.last_h = self.initHidden()
        # self.last_c = self.initHidden()
        
    def forward(self, x):
        # x should be of shape (batch, sequence, features)
        batch_size = x.shape[0]
        lstm_out, (hn, cn) = self.lstm(x, (self.initHidden(batch_size), self.initHidden(batch_size)))
        # self.last_h, self.last_c = hn, cn
        last_time_step_out = lstm_out[:, -1, :]  # Use the output from the last time step
        return self.dense(last_time_step_out)
    
    def initHidden(self, batch_size):
        if not batch_size:
            batch_size = self.hidden_size * self.num_layers
        return torch.zeros(self.num_layers, batch_size, self.hidden_size).requires_grad_()
        


In [10]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = VolNet(num_inputs=X_train.shape[-1]).to(device)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 20

for epoch in range(num_epochs):
    model.train()  # Set model to training mode
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        # Forward pass
        outputs = model(inputs).squeeze()
        labels.squeeze_()
        loss = criterion(outputs, labels)
        # Backward and optimize
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_loader)
            
            
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}')

# save model as to `joblib`
# Accoridng to Quantconnect docs,
# "Don't use the torch.save method to save models because the tensor data will be lost and corrupt the save."
model_key = "LSTM_model"
file_name = qb.object_store.get_file_path(model_key)
joblib.dump(model, file_name)

In [11]:
# Testing Phase
# Assuming test_dataset is an instance of TensorDataset or similar

test_yhat = []
test_y = []

def evaluate_model(model, test_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    with torch.no_grad():  # Turn off gradients for validation, saves memory and computations
        for inputs, labels in test_loader:
            outputs = model(inputs).squeeze()
            labels.squeeze_()
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            test_yhat.append(outputs.detach().cpu().numpy())
            test_y.append(labels.detach().cpu().numpy())
    
    avg_loss = total_loss / len(test_loader)
    print(f'Test Loss: {avg_loss:.4f}')
    return avg_loss


# Example of how you might call this function
test_loss = evaluate_model(model, test_loader, criterion)

test_yhat = np.hstack(test_yhat)
test_y = np.hstack(test_y)

In [12]:
# put the predicted and actual realized volatility side by side to compare
test_compare = pd.DataFrame({'Predicted_RVol': test_yhat, 'Actual_RVol': test_y})
print(test_compare.to_string())
test_compare.plot(title=f'Model Performance: predicted vs actual normalized {feature_to_predict}', figsize=(15, 10))
plt.show()

In [13]:
# test model
predict = model(X_test)
y_predict = predict.detach().numpy()   # Convert tensor to numpy ndarray
y_test_np = y_test.detach().numpy()

df_perf = pd.DataFrame({'Real': y_test_np.flatten(), 'Predicted': y_predict.flatten()})
df_perf.plot(title=f'Model Performance: predicted vs actual normalized {feature_to_predict}', figsize=(15, 10))
plt.show()

r2 = 1 - np.sum(np.square(y_test_np.flatten() - y_predict.flatten())) / np.sum(np.square(y_test_np.flatten() - y_test_np.mean()))
print(f"The explained variance by the model (r-square): {r2*100:.2f}%")

In [14]:
y_test_np.flatten().shape, y_predict.flatten().shape

In [None]:
# To load model use the following function with whatever `model_key` you saved the model as
def load_model(model_key="LSTM_model"):
    qb.object_store.contains_key(model_key)
    file_name = qb.object_store.get_file_path(model_key)
    loaded_model = joblib.load(file_name)