In [1]:
!pip install torchinfo

Collecting torchinfo
  Downloading torchinfo-1.8.0-py3-none-any.whl.metadata (21 kB)
Downloading torchinfo-1.8.0-py3-none-any.whl (23 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.8.0


In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchinfo import summary

In [3]:
from dataclasses import dataclass

@dataclass
class ModelArgs:
  device = 'cpu'
  input_size = 1
  no_of_neurons = 128
  block_size = 32
  batch_size = 32
  dropout = 0.1
  epoch = 50
  max_lr = 1e-4

In [4]:
torch.set_default_device(ModelArgs.device)

In [5]:
from torch.utils.data import Dataset, DataLoader

torch.manual_seed(0)

<torch._C.Generator at 0x79ae19675710>

In [6]:
num_samples = 1000
seq_length = ModelArgs.block_size
device = ModelArgs.device

In [7]:
t = torch.linspace(0, 100, num_samples + seq_length, device=device)

In [8]:
data = t
data

tensor([0.0000e+00, 9.6993e-02, 1.9399e-01,  ..., 9.9806e+01, 9.9903e+01,
        1.0000e+02])

In [85]:
# normalize the data
mean = data.mean()
std = data.std()

data = (data - mean) / std

In [86]:
X_tensor = torch.stack([data[i:i+seq_length] for i in range(num_samples)])
X_tensor # each sequence + target value is 31 in length

tensor([[-1.7295, -1.7262, -1.7228,  ..., -1.6322, -1.6289, -1.6255],
        [-1.7262, -1.7228, -1.7195,  ..., -1.6289, -1.6255, -1.6222],
        [-1.7228, -1.7195, -1.7161,  ..., -1.6255, -1.6222, -1.6188],
        ...,
        [ 1.6155,  1.6188,  1.6222,  ...,  1.7128,  1.7161,  1.7195],
        [ 1.6188,  1.6222,  1.6255,  ...,  1.7161,  1.7195,  1.7228],
        [ 1.6222,  1.6255,  1.6289,  ...,  1.7195,  1.7228,  1.7262]])

In [87]:
# target value with index corresponding to the target values of X_tensor
y_tensor = data[seq_length:]

In [88]:
# 1000 training sample inputs each a vector 32 in length
X_tensor.shape

torch.Size([1000, 32])

In [89]:
# 1000 training sample outputs each a target value
y_tensor.shape

torch.Size([1000])

In [90]:
train_size = int(0.8 * num_samples)

In [91]:
train_size

800

In [92]:
X_train, y_train = X_tensor[:train_size], y_tensor[:train_size]
X_val, y_val = X_tensor[train_size:], y_tensor[train_size:]

This gets our original time series data and makes train and validation datasets.

Each sample would look like this:
```
X = [4.91, 4.92, 4.93, ... , 5.10]
y = 5.11
```

X is a sequence and y is the value right after that sequence.

The goal of the model is to use the X sequence to predict what comes next (y).

In [93]:
class TimeSeriesDataset(Dataset):
  def __init__(self, X, y):
    self.X = X
    self.y = y

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    return self.X[idx], self.y[idx]

In [94]:
train_dataset = TimeSeriesDataset(X_train, y_train)
val_dataset = TimeSeriesDataset(X_val, y_val)

In [95]:
generator = torch.Generator(device=device)

In [96]:
train_loader = DataLoader(
    train_dataset,
    batch_size=ModelArgs.batch_size,
    shuffle=True,
    generator=generator,
    drop_last=True,
)

In [97]:
val_loader = DataLoader(
    val_dataset,
    batch_size=ModelArgs.batch_size,
    shuffle=True,
    generator=generator,
    drop_last=True,
)

```
BATCH = [
  (X₁, y₁),
  (X₂, y₂),
  (X₃, y₃),
  ...
  (Xₙ, yₙ)
]
```

Where `n = batch_size`

Each sample is a tuple:

```
(sample) = (X, y)
```

And the sample shapes are

```
X = [ x₁, x₂, x₃, ... x_T ]   # T = seq_length
y = x_(T+1)                   # next value
```

## States of LSTM

- Cell State ($c_t$): Stores long term memory, updated at each step to carry information from entire previous time steps
- Hidden State ($h_t$): Internal representation of the current time step, short-term memory
- Input State ($x_t$): Input state at each time step, new information from the training data
- Candidate Memory/Cell State ($g_t$ or $\hat{c_t}$): New memory proposed cell state (long-term memory)



## The three gates of an LSTM model


### The Forget Gate
Decides which parts of the old memory are no longer useful. Decides the portion (as a percentage) of previous long-term memory are useful and should be written into the new cell state (long-term memory) at the current time step.

$$
f_t​ = \sigma(U_f​x_t​ + V_f​h_{t−1​} + b_f)
$$

### Input Gate
Decides what new information should be written into the long-term memory (cell-state) by creating a **candidate state** from the current input and previous hidden state, and using the input gate to control how much of that candidate is added to the cell state.

Create candidate cell:
$$
g_t = \hat{c_t} = \tanh(U_cx_t + V_ch_{t-1} + b_c)
$$

Decide input of candidate cell:
$$
i_t = \sigma(U_ix_t + V_ih_{t-1} + b_i)
$$

### Output Gate
Decides how much of the internal long-term memory should influence the **current output**.

In [98]:
class ForgetGate(nn.Module):
  def __init__(self, input_size: int, hidden_size: int):
    super().__init__()
    self.linear = nn.Linear(input_size + hidden_size, hidden_size)
    nn.init.constant_(self.linear.bias, 1.0)

  def forward(self, x_t, h_prev):
    """
    x_t: (B, input_size)
    h_prev: (B, hidden_size)

    returns:
    f_t: (B, hidden_size) values in range [0, 1]
    """
    # combine current input and previous hidden state
    z = torch.cat([x_t, h_prev], dim=1)
    # apply W_f * z + b_f where W_f = [ U_f V_f ]
    scores = self.linear(z)
    # use sigmoid function to squash to [0,1] so each value is a keep percentage of the prev hidden state
    f_t = torch.sigmoid(scores)
    return f_t

In [99]:
class InputGate(nn.Module):
  def __init__(self, input_size: int, hidden_size: int):
    super().__init__()
    # write strength (input gate) of candidate cell
    self.linear_i = nn.Linear(input_size + hidden_size, hidden_size)

    # candidate cell state (g_t)
    self.linear_g = nn.Linear(input_size + hidden_size, hidden_size)

  def forward(self, x_t, h_prev):
    """
    x_t:    (B, input_size)
    h_prev: (B, hidden_size)

    returns:
    i_t: (B, hidden_size)  values in [0,1]
    g_t: (B, hidden_size)  values in [-1,1]
    """
    z = torch.cat([x_t, h_prev], dim=1)

    # calculate input gate
    i_t = torch.sigmoid(self.linear_i(z))

    # calculate candidate memory state
    g_t = torch.tanh(self.linear_g(z))

    return i_t, g_t

In [100]:
class OutputGate(nn.Module):
  def __init__(self, input_size: int, hidden_size: int):
    super().__init__()
    self.linear = nn.Linear(input_size + hidden_size, hidden_size)

  def forward(self, x_t, h_prev, c_t):
    """
    x_t:    (B, input_size)
    h_prev: (B, hidden_size)
    c_t:    (B, hidden_size)

    returns:
    h_t:    (B, hidden_size)
    """
    z = torch.cat([x_t, h_prev], dim=1)

    # get output gate from the new input and hidden state
    o_t = torch.sigmoid(self.linear(z))

    # use output gate to calculate new hidden state
    h_t = o_t * torch.tanh(c_t)

    return h_t

## Build the LSTM Block

Now that we have all three gates constructed we can create a block of the model itself by combining each gate.

See [video](https://www.youtube.com/watch?v=P_TZN8kRObQ) for good visual and explanation.

In [101]:
class LSTMBlock(nn.Module):
  def __init__(self, input_size: int , hidden_size: int):
    super().__init__()
    self.forget_gate = ForgetGate(input_size, hidden_size)
    self.input_gate = InputGate(input_size, hidden_size)
    self.output_gate = OutputGate(input_size, hidden_size)

  def forward(self, x_t, h_prev, c_prev):
    # calculate forget gate (portion of previous cell state to carry forward)
    f_t = self.forget_gate(x_t, h_prev)

    # calcuate input gate and candidate cell
    i_t, g_t = self.input_gate(x_t, h_prev)

    # add gated previous cell state to gated candidate cell state to get the new cell state (long-term memory)
    c_t = f_t * c_prev + i_t * g_t

    # calculate new hidden state from output gate
    h_t = self.output_gate(x_t, h_prev, c_t)

    # return new hidden (short-term memory) and cell state (long-term memory) states
    return h_t, c_t

## Build the LSTM Model

Use a bunch of blocks to train on our time series data and predict the next time step value.

In [102]:
class LSTM(nn.Module):
  def __init__(self, input_size: int, hidden_size: int):
    super().__init__()

    self.input_size = input_size
    self.hidden_size = hidden_size
    self.block = LSTMBlock(input_size, hidden_size)

    # classifier head to project hidden size -> a prediction
    self.output_layer = nn.Linear(hidden_size, 1)

  def forward(self, x):
    """
    x: (B, T)  where T = seq_length

    returns:
    y_hat: (B, 1)
    """

    B, T = x.shape

    # initialize hidden and cell (long-term memory) states
    h_t = torch.zeros(B, self.hidden_size, device=x.device)
    c_t = torch.zeros(B, self.hidden_size, device=x.device)

    # loop over each time step
    for t in range(T):
      x_t = x[:, t].unsqueeze(1) # (B,) -> (B, 1) because theres one input feature per output
      h_t, c_t = self.block(x_t, h_t, c_t) # update the hidden and cell states from this time step's info

    # make a prediction for the next time step using the classifier head
    y_hat = self.output_layer(h_t)

    return y_hat

## Training and validation

Train on train mode using Adam optimizer and a learning rate scheduler `OneCycleLR`. Make sure to unnormalize the data to get the final prediction for each sample in each batch.

In [103]:
model = LSTM(
    input_size = ModelArgs.input_size,
    hidden_size = ModelArgs.no_of_neurons,
)
model.train()

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=ModelArgs.max_lr)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=ModelArgs.max_lr,
    steps_per_epoch=len(train_loader),
    epochs=ModelArgs.epoch,
)

for epoch in range(ModelArgs.epoch):
  model.train()
  total_loss = 0.0

  for X_batch, y_batch in train_loader:
    optimizer.zero_grad()

    # fwd pass
    y_hat = model(X_batch) # (B, 1)
    y_hat = y_hat * std + mean # unnormalize
    y_pred = y_hat.squeeze(1) # (B,)

    # compute loss
    loss = criterion(y_pred, y_batch)

    # backprop
    loss.backward()
    optimizer.step()
    scheduler.step()

    total_loss += loss.item()

  avg_loss = total_loss / len(train_loader)
  print(f"Epoch {epoch+1}: train loss = {avg_loss:.6f}")

Epoch 1: train loss = 2397.902324
Epoch 2: train loss = 2378.868936
Epoch 3: train loss = 2349.022568
Epoch 4: train loss = 2300.263096
Epoch 5: train loss = 2222.255635
Epoch 6: train loss = 2091.929160
Epoch 7: train loss = 1813.954336
Epoch 8: train loss = 939.904421
Epoch 9: train loss = 34.085828
Epoch 10: train loss = 2.403552
Epoch 11: train loss = 0.697988
Epoch 12: train loss = 0.401623
Epoch 13: train loss = 0.284706
Epoch 14: train loss = 0.221260
Epoch 15: train loss = 0.189299
Epoch 16: train loss = 0.168996
Epoch 17: train loss = 0.157685
Epoch 18: train loss = 0.150368
Epoch 19: train loss = 0.145399
Epoch 20: train loss = 0.139878
Epoch 21: train loss = 0.136679
Epoch 22: train loss = 0.132295
Epoch 23: train loss = 0.128215
Epoch 24: train loss = 0.124786
Epoch 25: train loss = 0.121103
Epoch 26: train loss = 0.118269
Epoch 27: train loss = 0.115027
Epoch 28: train loss = 0.112713
Epoch 29: train loss = 0.109381
Epoch 30: train loss = 0.106874
Epoch 31: train loss = 0.

## Validation Loop

Same thing as training except freeze all gradients so nothing can be updated during training. No back propagation.

In [105]:
model.eval()

with torch.no_grad():
  validation_loss = 0.0
  for X_batch, y_batch in val_loader:
    y_hat = model(X_batch).squeeze(1)
    y_pred = y_hat * std + mean

    loss = criterion(y_pred, y_batch)

    validation_loss += loss.item()

  validation_loss /= len(val_loader)
  print(f"Validation loss = {validation_loss:.6f}")

Validation loss = 3.540914


## Conclusion

In general you should see the training and validation pretty low because the LSTM is rather overkill for predicitng a linear relationship made using `torch.linspace`.