In [37]:
import sys
sys.path.append('..')

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np

from utils.timescale_connector import TimescaleConnector


In [16]:
df = TimescaleConnector.query_ohlcv_daily()

In [17]:
# Prepare for a time series dataset
# First test only with "close"
data = df["close"].values

In [18]:
# Scale the data to between 0 and 1
scaler = MinMaxScaler(feature_range=(-1, 1))
data = scaler.fit_transform(data.reshape(-1, 1))

In [19]:
# Split to train set and test set
train_data, test_data = train_test_split(data, test_size=0.2, shuffle=False)

In [20]:
# Convert the data to Torch tensor
train_data = torch.FloatTensor(train_data).view(-1)

In [21]:
# Lookback period
lookback = 10

In [22]:
# Prepare the dataset
def create_inout_sequences(input_data: pd.DataFrame, tw: int) -> list:
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+tw:i+tw+1]
        inout_seq.append((train_seq, train_label))
    return inout_seq

In [23]:
# Create the sequence
inout_seq = create_inout_sequences(train_data, lookback)

In [24]:
len(inout_seq)

125576

In [31]:
# Define the LSTM model
from dataclasses import dataclass


# @dataclass
# class LSTM(nn.Module):
#     def __post_init__(self):
#         super().__init__()

#     input_size = 1
#     hidden_layer_size = 100
#     output_size = 1
#     lstm = nn.LSTM(input_size, hidden_layer_size)
#     linear = nn.Linear(hidden_layer_size, output_size)
#     hidden_cell = (
#         (torch.zeros(1, 1, hidden_layer_size)),
#         torch.zeros(1, 1, hidden_layer_size),
#     )

#     def forward(self, input_seq: list):
#         lstm_out, self.hidden_cell = self.lstm(
#             input_seq.view(len(input_seq), 1, -1), self.hidden_cell
#         )
#         prediction = self.linear(lstm_out.view(len(input_seq), -1))
#         return prediction[-1]

class LSTM(nn.Module):
    def __init__(self, input_size=10, hidden_layer_size=100, output_size=1):
        super().__init__()
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.hidden_cell = (torch.zeros(1,1,self.hidden_layer_size),
                            torch.zeros(1,1,self.hidden_layer_size))

    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.lstm(input_seq.view(len(input_seq) ,1, -1), self.hidden_cell)
        predictions = self.linear(lstm_out.view(len(input_seq), -1))
        return predictions

**Training**

In [32]:
# Init the model, define loss function and optimzier
model = LSTM()
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [33]:
# Batch size with data loader
dataloader = DataLoader(inout_seq, batch_size=100, shuffle=False)

In [34]:
# Check for GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [36]:
# Train
epochs = 10

for i in range(epochs):
    for seq, labels in dataloader:
        # Move to GPU
        seq = seq.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        model.hidden_cell = (
            torch.zeros(1, 1, model.hidden_layer_size).to(device),
            torch.zeros(1, 1, model.hidden_layer_size).to(device),
        )
        
        y_pred = model(seq)
        
        single_loss = loss_function(y_pred, labels)
        single_loss.backward()
        optimizer.step()
        
    if i%25 == 1:
        print(f"Epoch {i:3} loss: {single_loss.item():10.8f}")
        
print(f'epoch: {i:3} loss: {single_loss.item():10.10f}')

Epoch   1 loss: 0.00005602
epoch:   9 loss: 0.0000415553


**Validation**

In [43]:
# Validation
# Set the model to evaluation mode
model.eval()

# Prepare the test data
test_inputs = train_data[-lookback:].tolist()
test_outputs = test_data.tolist()

# Make predictions on the test data
predictions = []
for i in range(len(test_data)):
    seq = torch.FloatTensor(test_inputs[-lookback:])
    with torch.no_grad():
        model.hidden_cell = (torch.zeros(1, 1, model.hidden_layer_size),
                        torch.zeros(1, 1, model.hidden_layer_size))
        predictions.append(model(seq).item())

    test_inputs.append(predictions[-1])

# Calculate the mean squared error (MSE) between the predictions and the actual values
mse = ((np.array(predictions) - np.array(test_outputs)) ** 2).mean()
print(f"Mean Squared Error: {mse:.4f}")

# Plot the predictions and the actual values
import matplotlib.pyplot as plt

plt.plot(test_outputs, label="Actual")
plt.plot(predictions, label="Predicted")
plt.legend()
plt.show()


RuntimeError: input.size(-1) must be equal to input_size. Expected 10, got 1

In [None]:
# Scale the data back
inverse_transformed_predictions = scaler.inverse_transform(predictions)
