# Forecasting: LSTM

Open in [Colab](https://colab.google/)

### Import Libraries and Load Data

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
from plotly.offline import init_notebook_mode
init_notebook_mode(connected=True)
from plotly.subplots import make_subplots

from sklearn.preprocessing import StandardScaler

import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Dataset

import plotly.io as pio
pio.renderers.default = 'colab' 

In [None]:
import yfinance as yf

start_date = '2023-07-12'
end_date = '2025-06-11'

ticker = 'AAPL'
data = yf.download(ticker, start=start_date, end=end_date, interval='1h')
data.columns = data.columns.droplevel(1)
data.columns.name = None
data

Check missing values

In [None]:
data.info()

In [None]:
data.isnull().sum().any()

In [None]:
data.describe().T

### General Observations
1. The dataset contains 3351 observations.
2.  The attributes are all numeric except for a date column that is used as the index.
3.  There are no missing values in the dataset.
4.  The variable Close can be used as a target for the models.

Note: If we had categorical columns, we would need to identify them and encode them or extract a feature from it

## Volume-Price Plot

In [None]:
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Create a copy with resample data to a day
daily_data = data.resample('1D').agg({
    'Open': 'first',
    'High': 'max',
    'Low': 'min',
    'Close': 'last',
    'Volume': 'sum'
}).dropna()

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, 
               vertical_spacing=0.03, subplot_titles=('Apple', 'Volume'), 
               row_width=[0.2, 0.7])

fig.add_trace(go.Candlestick(
    x=daily_data.index, 
    open=daily_data["Open"], high=daily_data["High"],
    low=daily_data["Low"], close=daily_data["Close"], 
    name="Apple"), row=1, col=1)

fig.add_trace(go.Bar(
    x=daily_data.index, 
    y=daily_data['Volume'], 
    showlegend=False), row=2, col=1)

fig.update_layout(
    yaxis_title='Apple',
    shapes=[dict(
        x0=start_date, x1=end_date, y0=0, y1=1, xref='x', yref='paper',
        line_width=2
    )],
    xaxis_rangeslider_visible=False
)

fig.show()

## Creating features early on

We're going to create three extra features with the moving average at different resolutions

In [None]:
data["5d_sma"] = data["Close"].rolling(window=5).mean()
data["9d_sma"] = data["Close"].rolling(window=9).mean()
data["17d_sma"] = data["Close"].rolling(window=17).mean()

In [None]:
data

For filling the Nan values in SMAs columns, we're just using the exact close value this time

In [None]:
data['5d_sma'] = data['5d_sma'].fillna(data['Close'])
data['9d_sma'] = data['9d_sma'].fillna(data['Close'])
data['17d_sma'] = data['17d_sma'].fillna(data['Close'])
data

## Plot Closing Price vs Moving Avergaes

In [None]:
import plotly.graph_objects as go

# Create plot with daily daya to gain clarity
daily_data['5d_sma'] = daily_data['Close'].rolling(window=5).mean().fillna(data['Close'])
daily_data['9d_sma'] = daily_data['Close'].rolling(window=9).mean().fillna(data['Close'])
daily_data['17d_sma'] = daily_data['Close'].rolling(window=17).mean().fillna(data['Close'])

fig = go.Figure()

# Close line
fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['Close'],
    mode='lines',
    name='Close',
    line=dict(width=2)
))

# SMA 5 days
fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['5d_sma'],
    mode='lines',
    name='5d SMA',
    line=dict(dash='dot')
))

# SMA 9 days
fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['9d_sma'],
    mode='lines',
    name='9d SMA',
    line=dict(dash='dash')
))

# SMA 17 days
fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['17d_sma'],
    mode='lines',
    name='17d SMA',
    line=dict(dash='dashdot')
))

# Layout
fig.update_layout(
    title='Apple Close Price with Daily SMAs',
    xaxis_title='Date',
    yaxis_title='Price',
    legend=dict(x=0, y=1.1, orientation='h'),
    template='plotly_white'
)

fig.show()

## Bollinger Bands

**Bollinger Bands** are a popular technical analysis tool developed by John Bollinger. They consist of three lines: a simple moving average (SMA) in the middle, and two bands (upper and lower) plotted at a specified number of standard deviations above and below the SMA. 

These bands expand and contract based on market volatility. Traders use Bollinger Bands to identify overbought or oversold conditions, potential breakout opportunities, and to assess price volatility. When the price moves close to the upper band, the asset may be considered overbought; when it approaches the lower band, it may be considered oversold.

In [None]:
# Standard Bollinger Bands Parameters
window = 20  
num_std = 2  

# Central band (SMA)
data['bb_mid'] = data['Close'].rolling(window=window).mean()

# Rolling standard deviation
rolling_std = data['Close'].rolling(window=window).std()

# Upper band
data['bb_upper'] = data['bb_mid'] + num_std * rolling_std

# Lower band
data['bb_lower'] = data['bb_mid'] - num_std * rolling_std

In [None]:
import plotly.graph_objects as go
import pandas as pd

fig = go.Figure()

daily_data = data.resample('1D').mean().bfill()

# Gráfico de velas
fig.add_trace(go.Candlestick(
    x=daily_data.index,
    open=daily_data['Open'],
    high=daily_data['High'],
    low=daily_data['Low'],
    close=daily_data['Close'],
    name='Price'
))

# Bandas de Bollinger (solo líneas)
fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['bb_upper'],
    name='Upper Band', line=dict(color='rgba(173,216,230,0.75)', dash='dot')
))

fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['bb_mid'],
    name='Middle Band', line=dict(color='blue')
))

fig.add_trace(go.Scatter(
    x=daily_data.index, y=daily_data['bb_lower'],
    name='Lower Band', line=dict(color='rgba(173,216,230,0.75)', dash='dot')
))

# Configuración final
fig.update_layout(
    title='Bollinger Bands on Daily Price',
    xaxis_title='Date',
    yaxis_title='Price',
    template='plotly_white',
    autosize=False,
    width=1200,
    height=600,
    xaxis_rangeslider_visible=False
)

fig.show()

## Model Training
### Preparing Dataloader and Model classes

This class defines the architecture of a custom LSTM-based neural network used for time series prediction, specifically for forecasting stock closing prices.

**What happens in the __init__ method?**
This method sets up the layers of the model.
- input_size: How many features are given as input at each time step (e.g., Open, Volume, etc.).
- hidden_size: The number of “memory units” in each LSTM layer — more units can help the model learn more complex patterns.
- layers: How many LSTM layers are stacked on top of each other.
- output_size: The size of the model’s final output — for predicting one value, this is usually 1.


**Layers defined**:
- ```self.lstm```: A multi-layer LSTM with the given parameters. It processes sequential input data.
- ```sself.fc1```: A fully connected (linear) layer that maps the output from the last time step of the LSTM to the final prediction.

> The architecture could be extended into a deeper feedforward structure if needed.

**What happens in the forward method?**
The forward method defines how the data flows through the model.
First the **hidden state (h0)** and **cell state (c0)** are intialized. They’re like the memory of the LSTM. Then the input data is passed through the LSTM, then it returns:

- out: all LSTM outputs for each time step
- h_out, c_out: the final hidden and cell states (not used here)

The last step goes through a fully connected layer to produce the predicted value.
The squeeze(1) removes any extra dimensions so the output is clean and ready.


In [None]:
class LstmNet(nn.Module):
    def __init__(self,input_size,hidden_size,layers,output_size):
        super(LstmNet,self).__init__()
        self.layers = layers
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=layers, batch_first=True)

        self.fc1 = nn.Linear(hidden_size, output_size)
        # self.fc2 = nn.Linear(10,output_size)
    def forward(self,x):
        # print(x.shape)
        h0 = Variable(torch.zeros(self.layers, x.size(0), self.hidden_size)).to(device)

        c0 = Variable(torch.zeros(self.layers, x.size(0), self.hidden_size)).to(device)
        out, (h_out, c_out) = self.lstm(x,(h0,c0))
        # print(out.shape,h_out.shape,c_out.shape)
        out = self.fc1(out[:,-1,:])
        return out.squeeze(1)

Let's check if we can use GPU accelaration

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

### Data Preparation for LSTM

Before training an LSTM model, we need to **transform the data into sequences** that the model can learn from. These helper functions and classes prepare the dataset accordingly.

**prepare_dataset(data, length)**:

This function creates input sequences and targets from a time series.
- It slides a window of size length over the data. For each window:
    - The input (x) is the sequence of values inside the window.
    - The target (y) is the value that comes right after the window.

**train_test_split(X, Y, percent)**:
splits the dataset into training and testing sets, based on a percentage. Importantly, this split keeps the time order of the data 


**DataPrep Class**:
this is a custom PyTorch Dataset class that wraps the inputs and targets so they can be loaded by a DataLoader. This class makes it easy to feed data into the LSTM in batches while training.

In [None]:
def prepare_dataset(data,length):
    x = []
    y = []
    for i in range(len(data)-length-1):
        x.append(data[i:i+length])
        y.append(data[i+length])
    return np.array(x),np.array(y)

def train_test_split(X,Y,percent):
    per = percent/100
    sz = len(X)
    xtrain = torch.Tensor(X[:int(sz*per)])
    ytrain = torch.Tensor(Y[:int(sz*per)])
    xtest = torch.Tensor(X[int(sz*per):])
    ytest = torch.Tensor(Y[int(sz*per):])
    return xtrain,ytrain,xtest,ytest

class DataPrep(Dataset):
  def __init__(self, inputs, targets):
      self.inputs = inputs
      self.targets = targets
  def __len__(self):
      return len(self.inputs)

  def __getitem__(self, index):
      X = self.inputs[index]
      Y = self.targets[index]
      return X, Y

Before feeding data into a neural network, especially an LSTM, it’s important to **normalize the values**. This is due to the fact that neural networks are sensitive to the scale of input data. If the values are too large (e.g., stock prices in the hundreds) or vary a lot, the training process can become unstable or slow.

By scaling the Close prices, the model can focus on learning patterns rather than dealing with raw magnitudes.

In [None]:
scaler = StandardScaler()
values = scaler.fit_transform(data['Close'].values.reshape(-1,1))

Let's apply the helper functions

In [None]:
seq_len = 40
data_inp, data_tar = prepare_dataset(values, seq_len)
xtrain, ytrain, xtest, ytest = train_test_split(data_inp, data_tar, 80)

After preparing and splitting the data, we wrap it in PyTorch DataLoaders to handle **batching** and **shuffling** during training. shuffle=True ensures the training data is mixed before each epoch — this helps the model generalize better.

In [None]:
traindata = DataPrep(xtrain,ytrain)
testdata = DataPrep(xtest,ytest)
batch_size = 32
trainset = DataLoader(traindata,batch_size = batch_size,shuffle = True)
testset = DataLoader(testdata,batch_size = batch_size,shuffle = True)
for xbatch,ybatch in trainset:
    print(xbatch.shape,ybatch.shape)
    break

### Training the model

In [None]:
from tqdm import trange

train_losses = []
test_losses = []
input_sz = 1
hidden_sz = 200
output_sz = 1
layers = 2

This blocks of code train the LSTM model using Mean Squared Error as the loss function and Stochastic Gradient Descent (SGD) as the optimizer.

In [None]:
model = LstmNet(input_sz,hidden_sz,layers,output_sz).to(device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.00001)

Inside each epoch:

**1.	Training Phase.
For every batch of data:**
- Move the batch to the selected device (CPU/GPU).
- Pass the inputs through the model to get predictions.
- Compute the loss between predictions and actual values.
- Call .backward() to calculate gradients.
- Call .step() to update model weights.
- Accumulate the batch losses.
  
**2. Testing Phase (Evaluation):**
- No gradients are needed during evaluation, so torch.no_grad() is used.
- The model is tested on unseen data (the test set).
- Loss is computed but the model weights are not updated.

In [None]:
epochs = 500

for epoch in trange(epochs, desc="Training Epochs"):
    batch_loss = 0
    for xbatch,ybatch in trainset:
        xbatch,ybatch = xbatch.to(device),ybatch.to(device),
        out = model(xbatch)
        loss = criterion(out, ybatch.squeeze(1))
        loss.backward()
        optimizer.step()
        batch_loss += loss.item()

    train_loss = batch_loss/len(trainset)
    batch_loss = 0
    with torch.no_grad():
        for xbatch,ybatch in testset:
            xbatch,ybatch = xbatch.to(device),ybatch.to(device)
            out = model(xbatch)
            loss = criterion(out, ybatch.squeeze(1))
            batch_loss += loss.item()
    test_loss = batch_loss/len(testset)
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    if(epoch%10==9):
        print("\nEpoch: ", epoch+1, "|", "Train Loss : ", "{:.6f}".format(train_loss), "|", "Test Loss : ", "{:.6f}".format(test_loss))

## Making Predictions with the Trained Model

After training the LSTM model, we use it to make predictions on both the **training** and **testing** datasets to evaluate how well it has learned.

In [None]:
train_pred = model(xtrain.to(device)).cpu().data.numpy()
train_actual = ytrain.data.numpy()
test_pred = model(xtest.to(device)).cpu().data.numpy()
test_actual = ytest.data.numpy()

pred = np.concatenate((train_pred,test_pred))
actual = np.concatenate((train_actual,test_actual)).squeeze()

print(pred.shape)
print(actual.shape)

### Visualize results

In [None]:
trace1 = go.Scatter(x = [(i+1) for i in range(len(pred))],y = pred, name='Predicted Data')
trace2 = go.Scatter(x = [(i+1) for i in range(len(actual))],y = actual, name='Actual data')

datas = [trace1,trace2]

layout = go.Layout(title='Prediction for 80:20 split Simple LSTM')
fig = go.Figure(data=datas, layout=layout)
fig.add_vline(x=len(train_pred), line_width=1, line_dash="dash", line_color="red")

fig.show()