<a href="https://colab.research.google.com/github/pyagoubi/Stuff/blob/main/Untitled15.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Mounting Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%%capture
!pip install backtesting

import backtesting as bt
from backtesting import Backtest, Strategy

In [8]:
%matplotlib inline
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import datetime as dt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# set cpu or gpu enabled device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu').type

Create LSTM model for 1 day forecasting

In [23]:
class cfg_LSTM1d:  
  split_fraction = 0.85
  time_steps = 5 # number of predictor timesteps
  horizon = 1 # number of timesteps to be predicted
  sequence_length = time_steps + horizon # determine sequence length
  learning_rate=0.01
  num_epochs=2000
  path = '/content/drive/MyDrive/stock predict/technical/1D_technical.csv'
  features = ['close']
  target = 'close'
  input_size = len(features)
  hidden_size = 50
  output_size = 1
  numlayers = 2
  save_path = '/content/drive/MyDrive/stock predict/pred_close1d.pt'


def load_1ddata(path = cfg_LSTM1d.path, features = cfg_LSTM1d.features):
  data_1draw = pd.read_csv(path)
  data_1draw.columns = data_1draw.columns.str.replace(' ', '')
  data_1draw['time'] = pd.to_datetime(data_1draw['time'])
  data_1draw['time'] = data_1draw['time'].dt.date
  data_1draw.set_index('time', inplace=True)
  df_1d = data_1draw[features].copy()
  return df_1d, data_1draw


def trainvalidsplit(df_1d, split_fraction = cfg_LSTM1d.split_fraction):
  split_row = int(df_1d.shape[0] * split_fraction)
  train_1d = df_1d.iloc[:split_row].copy()
  valid_1d = df_1d.iloc[split_row:].copy()
  return train_1d, valid_1d

def scale1d(train_1d, valid_1d,f_scaler1d, 
            t_scaler1d, input_size = cfg_LSTM1d.input_size, target = cfg_LSTM1d.target ):
  
  train_1d_scaled = train_1d.copy()
  valid_1d_scaled = valid_1d.copy()  
  
  if input_size == 2:
    train_1d_scaled.loc[:, train_1d.columns != target] = f_scaler1d.fit_transform(train_1d_scaled.loc[:, train_1d_scaled.columns != target].values.reshape(-1,1))
    valid_1d_scaled.loc[:, valid_1d.columns != target] = f_scaler1d.transform(valid_1d.loc[:, valid_1d_scaled.columns != target].values.reshape(-1,1))
  elif input_size >2:
    train_1d_scaled.loc[:, train_1d.columns != target] = f_scaler1d.fit_transform(train_1d_scaled.loc[:, train_1d_scaled.columns != target])
    valid_1d_scaled.loc[:, valid_1d.columns != target] = f_scaler1d.fit_transform(valid_1d_scaled.loc[:, valid_1d_scaled.columns != target])

  train_1d_scaled[target] = t_scaler1d.fit_transform(train_1d[target].values.reshape(-1,1))
  valid_1d_scaled[target] = t_scaler1d.transform(valid_1d[target].values.reshape(-1,1))
  return train_1d_scaled, valid_1d_scaled

def create_sequences(df, seq_length = cfg_LSTM1d.sequence_length):
    df = df.values  # Convert DataFrame to numpy array
    
    n = df.shape[0]
    xs = np.zeros((n - seq_length, seq_length, df.shape[1]))
    ys = np.zeros((n - seq_length, 1))
    
    for i in range(n - seq_length):
        xs[i] = df[i:(i+seq_length)]
        ys[i] = df[i+seq_length, -1]  # predict the 'return' column one step ahead
    
    # Convert to PyTorch tensors
    X = torch.from_numpy(xs)
    y = torch.from_numpy(ys)
    
    return X, y

class LSTM(nn.Module):
    def __init__(self, input_size = cfg_LSTM1d.input_size, hidden_size = cfg_LSTM1d.hidden_size, 
                 num_layers = cfg_LSTM1d.numlayers, output_size=cfg_LSTM1d.output_size):

      super(LSTM, self).__init__()
      
      self.hidden_size = hidden_size
      self.num_layers = num_layers     
      self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout = 0.2)    
      self.fc1 = nn.Linear(hidden_size, output_size)
      #self.fc2 = nn.Linear(10, output_size)  # Add a second layer
      self.tanh = nn.Tanh()

    def forward(self, x):
      h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) 
      c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) 
      h0.requires_grad = True
      c0.requires_grad = True
      out, _ = self.lstm(x, (h0.detach(), c0.detach()))
      out = self.tanh(self.fc1(out[:, -1, :]))  # apply tanh activation function to the output of the first linear layer
      #out = self.fc2(out)  # pass through the second linear layer
      return out


def train1d(model, train, train_target, valid, valid_target, 
          learning_rate = cfg_LSTM1d.learning_rate, num_epochs = cfg_LSTM1d.num_epochs, save_path = cfg_LSTM1d.save_path):

  criterion = torch.nn.MSELoss(reduction='mean')
  optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)

  train = train.float().to(device)  # Convert to float
  train_target = train_target.float().to(device)  # Convert to float
  valid = valid.float().to(device)  # Convert to float
  valid_target = valid_target.float().to(device)  # Convert to float

  best_loss = float('inf')

  for epoch in range(num_epochs):

    model.train()
    y_train_pred = model(train)

    print(y_train_pred.shape)
    train_loss = criterion(y_train_pred, train_target)
    optimiser.zero_grad()
    train_loss.backward()
    optimiser.step()

    model.eval()
    outputs = model(valid)
    val_loss = criterion(outputs, valid_target)

    print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train Loss: {train_loss.item():.4f}, '
              f'Validation Loss: {val_loss.item():.4f}')
    
    if val_loss < best_loss:
      best_loss = val_loss
      torch.save(model, save_path)

In [None]:
f_scaler1d = MinMaxScaler(feature_range=(-1, 1))
t_scaler1d = MinMaxScaler(feature_range=(-1, 1))

df_1d, data_1draw = load_1ddata()
train_1d, valid_1d = trainvalidsplit(df_1d)
train_1d_scaled, valid_1d_scaled = scale1d(train_1d, valid_1d,f_scaler1d, t_scaler1d)
train_sequences1d, train_target1d = create_sequences(train_1d_scaled)
valid_sequences1d, valid_target1d = create_sequences(valid_1d_scaled)

model1d = LSTM()
model1d = model1d.to(device)

train1d(model1d, train_sequences1d, train_target1d, valid_sequences1d, valid_target1d)
