# LSTM From Scratch in PyTorch
From <a href="https://towardsdatascience.com/building-a-lstm-by-hand-on-pytorch-59c02a4ec091">this</a> blog post.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import copy

import torch.nn as nn
import torch.optim as optim
import torch
from torch.utils.data import Dataset, DataLoader

In [45]:
# Generate data
t = np.linspace(0, 50, 500)
signal = np.sin(t)**3 + np.random.normal(0, 0.3, 500)
signal = np.random.normal(0, 1, (500, 5))
#plt.plot(signal)

In [51]:
class GenericDataset(Dataset):

    def __init__(self, X, lags, forecast_horizon):
        self.X = copy.deepcopy(X)
        self.lags = lags
        self.forecast_horizon = forecast_horizon

    def __len__(self):
        
        length = len(self.X)-50
        return length

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        x = np.array(self.X[idx:idx+self.lags]).reshape(self.lags, -1)
        y = np.array(self.X[idx+self.lags:idx+self.lags+self.forecast_horizon]).reshape(self.forecast_horizon, -1)

        return x, y

In [52]:
BATCH_SIZE = 1

torch_dataset = GenericDataset(signal, 5, 1)

train_loader = DataLoader(dataset=torch_dataset, 
                            batch_size=BATCH_SIZE, 
                            shuffle=True, num_workers=0)

In [53]:
class NaiveCustomLSTM(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        # i_t
        self.U_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.V_i = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.Tensor(hidden_size))
        
        # f_t
        self.U_f = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.V_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.Tensor(hidden_size))
        
        # c_t
        self.U_c = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.V_c = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_c = nn.Parameter(torch.Tensor(hidden_size))
        
        # o_t
        self.U_o = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.V_o = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.Tensor(hidden_size))
        
        self.init_weights()
        
    def init_weights(self):
        
        std_dv = 1.0/np.sqrt(self.hidden_size)
        
        for weight in self.parameters():
            weight.data.uniform_(-std_dv, std_dv).double()
            
        return
    
    def forward(self, x, init_states=None):
        
        """Assumes x is shaped (batch_size, sequence_length, num_features).
        """
        
        batch_size, sequence_length, _ = x.size()
        hidden_seq = []
        
        if init_states is None:
            h_t, c_t = (torch.zeros(batch_size, self.hidden_size).to(x.device).double(),
                       torch.zeros(batch_size, self.hidden_size).to(x.device).double())
        else:
            h_t, c_t = init_states
            
        for t in range(sequence_length):
            x_t = x[:,t,:]
            
            i_t = torch.sigmoid(x_t @ self.U_i + h_t @ self.V_i + self.b_i)
            f_t = torch.sigmoid(x_t @ self.U_f + h_t @ self.V_f + self.b_f)
            g_t = torch.tanh(x_t @ self.U_c + h_t @ self.V_c + self.b_c)
            o_t = torch.sigmoid(x_t @ self.U_o + h_t @ self.V_o + self.b_o)
            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)
            
            hidden_seq.append(h_t.unsqueeze(0))
            
        # reshape hidden sequence and return array
        hidden_seq = torch.cat(hidden_seq, dim=0)
        hidden_seq = hidden_seq.transpose(0,1).contiguous()
        return hidden_seq, h_t, c_t

In [54]:
N_HIDDEN = 1

net = NaiveCustomLSTM(1, N_HIDDEN)
net = net.double()

optimizer = torch.optim.Adam(net.parameters(),lr=0.0001)
criterion = nn.MSELoss()

In [55]:
NUM_EPOCHS = 10

for epoch in range(NUM_EPOCHS):
    for inputs, targets in train_loader:

        # Forward pass
        inputs = inputs.double()
        targets = targets.double()
        _, outputs, _ = net.forward(inputs)
        loss = criterion(outputs, targets[:,:,0])
    
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    if (epoch+1) % 1 == 0:
        print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, NUM_EPOCHS, loss.item()))

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x5 and 1x1)

In [32]:
mat1 = np.abs(np.random.normal(0, 1, (3, 3)))
mat2 = np.abs(np.random.normal(0, 1, (3, 3)))

mat2[0,0] = -20

out = np.matmul(mat1, mat2)

out

array([[-30.97580461,   1.59536369,   1.49911774],
       [-28.57306915,   1.45511333,   1.44806284],
       [ -0.92456094,   1.88355468,   1.29246385]])