### LSTM Implementation 

In [1]:
#  LSTMCell

In [2]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dataset
from torch.autograd import Variable
from torch.nn import Parameter
from torch import Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
import math

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
cuda = True if torch.cuda.is_available() else False

Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor

torch.manual_seed(125)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(125)

In [3]:
import torchvision.transforms as transforms

mnist_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (1.0,))
])

In [4]:
from torchvision.datasets import MNIST

download_root = 'MNIST_DATASET/'

train_dataset = MNIST(download_root, transform=mnist_transform, train=True, download=True)
valid_dataset = MNIST(download_root, transform=mnist_transform, train=False, download=True)
test_dataset = MNIST(download_root, transform=mnist_transform, train=False, download=True)

100%|██████████| 9.91M/9.91M [00:02<00:00, 4.81MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 137kB/s]
100%|██████████| 1.65M/1.65M [00:01<00:00, 1.29MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 2.24MB/s]


In [5]:
batch_size = 64
train_loader = DataLoader(dataset=train_dataset,
                         batch_size=batch_size,
                         shuffle=True)
valid_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size,
                         shuffle=True)
test_loader = DataLoader(dataset=test_dataset,
                         batch_size=batch_size,
                         shuffle=True)

In [6]:
batch_size = 100
n_iters = 6000
num_epochs = n_iters / (len(train_dataset) / batch_size)
num_epochs = int(num_epochs)

- The core ides of LSTM is to transmit information over long distances through cell states.
- Gates control how much information is remembered and discarded:
    - Forget Gate : Determines how much previous information to **ERASE**
    - Input Gate : Determines how much new information to **INCORPORATE**
    - Output Gate : Determines how much to send to the output

In [7]:
class LSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, bias=True):
        super(LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.bias = bias
        # Perform linear transformations on both the input and hidden states, outputting a vector of size : 4 * hidden_size
        # We linearly transform the input and previous hidden states, add them, and use them in the gate computation.
        self.x2h = nn.Linear(input_size, 4 * hidden_size, bias=bias)
        self.h2h = nn.Linear(hidden_size, 4 * hidden_size, bias=bias)
        self.reset_parameters()

    def reset_parameters(self):             # Initialize parameters with Xavier-like method
        std = 1.0 / math.sqrt(self.hidden_size)
        for w in self.parameters():
            w.data.uniform_(-std, std)      # set uniform distribution and ensure initial stability

    def forward(self, x, hidden):           # x : input vector of current state | hidden : tuple of (previous hidden state hx, cell state cx)
        hx, cx = hidden
        x = x.view(-1, x.size(1))           # x : (batch_size, input_size)

        gates = self.x2h(x) + self.h2h(hx)  # (batch_size, 4*hidden_size)
        gates = gates.squeeze()             # Remove dimension with size 1
        # Split one vector into 4 gates / shape of each gate : (batch_size, hidden_size)
        ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)

        ingate = F.sigmoid(ingate)
        forgetgate = F.sigmoid(forgetgate)
        cellgate = F.tanh(cellgate)
        outgate = F.sigmoid(outgate)

        # update cell state : cx*forgetgate for remembering + ingate*cellgate(new) for adding new infos
        cy = torch.mul(cx, forgetgate) +  torch.mul(ingate, cellgate) 
        hy = torch.mul(outgate, F.tanh(cy))  # update new hidden state
        return (hy, cy)

- This class is a manually implemented LSTM model that sequentially processes sequence data (e.g. sentenced, image matrices, etc.) using our own LSTMCell and produces a prediction as the final output.

In [8]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, bias=True):
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim

        self.layer_dim = layer_dim
        self.lstm = LSTMCell(input_dim, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x): # input x : (batch_size, seq_len, input_dim)
        # Initial hidden state
        if torch.cuda.is_available():
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
        else:
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
        # Initial cell state 
        if torch.cuda.is_available():
            c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda())
        else:
            c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))

        # The initial hidden and cell states for each batch are stored in hn and cn.
        outs = [] # stores hidden state of each time step
        cn = c0[0,:,:]
        hn = h0[0,:,:]

        '''
        - x[:, seq, :] : The seqth time step data of the sequence (shape : (batch_size, input_dim))
        - self.lstm(...) : Executes the previously defined LSTMCell one time step at a time.
        - The returned hn is stored in outs[] at each time step.
        => This iteration below processes the sequence one time step at a time, similar to the actual RNN architecture
        '''
        for seq in range(x.size(1)):
            hn, cn = self.lstm(x[:,seq,:], (hn,cn)) 
            outs.append(hn)

        out = outs[-1].squeeze() # outs[-1] : hidden state of the last point
        out = self.fc(out)       # final classification 
        return out

- The following setup treats MNIST input as a sequence
- As MNIST image is 28*28, we divide it into 28 time points (sequence length), each of which is treated as a 28-dimensional vector.
- In other words, each "row" is treated as a single time point.

In [9]:
input_dim = 28    # Size of the input vector at one time point (number of pixels)
hidden_dim = 128  # Size of the hidden state remembered by the LSTM
layer_dim = 1     # Number of LSTM layers (only 1 layer is used)
output_dim = 10   # Number of classification classes (digits 0-9, MNIST)

model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim)
if torch.cuda.is_available():
    model.cuda()
criterion = nn.CrossEntropyLoss()  # Loss function for Multi class classification
learning_rate = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [10]:
seq_dim = 28
loss_list = [] # list to record the training loss
iter = 0
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            images = Variable(images.view(-1, seq_dim, input_dim).cuda()) # (batch_size, 1, 28, 28) -> (batch_size, 28, 28)
            labels = Variable(labels.cuda())
        else:
            images = Variable(images.view(-1, seq_dim, input_dim))
            labels = Variable(labels)

        optimizer.zero_grad()             # initialize gradient from the previous step
        outputs = model(images)           # calculate predictions using the LSTM model
        loss = criterion(outputs, labels) # calculate loss by comparing the predicted results to the correct answer

        if torch.cuda.is_available():
            loss.cuda()

        loss.backward()                # calculate gradient by backpropagation
        optimizer.step()               # update parameter with the gradient
        loss_list.append(loss.item())
        iter += 1

        # Validation
        if iter % 500 == 0:
            correct = 0
            total = 0
            for images, labels in valid_loader:
                if torch.cuda.is_available():
                    images = Variable(images.view(-1, seq_dim, input_dim).cuda())
                else:
                    images = Variable(images.view(-1 , seq_dim, input_dim))

                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1) # The index with the highest predicted probability is considered as the class prediction

                total += labels.size(0)
                if torch.cuda.is_available():
                    correct += (predicted.cpu() == labels.cpu()).sum()
                else:
                    correct += (predicted == labels).sum()

            accuracy = 100 * correct / total
            print('Iteration: {}. Loss: {}. Accuracy: {}'.format(iter, loss.item(), accuracy))

Iteration: 500. Loss: 2.237457275390625. Accuracy: 21.420000076293945
Iteration: 1000. Loss: 0.9853159785270691. Accuracy: 69.56999969482422
Iteration: 1500. Loss: 0.41114744544029236. Accuracy: 88.91999816894531
Iteration: 2000. Loss: 0.23155280947685242. Accuracy: 93.62000274658203
Iteration: 2500. Loss: 0.09778112918138504. Accuracy: 94.66000366210938
Iteration: 3000. Loss: 0.07660431414842606. Accuracy: 95.93000030517578
Iteration: 3500. Loss: 0.12175733596086502. Accuracy: 96.41999816894531
Iteration: 4000. Loss: 0.02374931238591671. Accuracy: 97.05999755859375
Iteration: 4500. Loss: 0.0556069053709507. Accuracy: 96.91000366210938
Iteration: 5000. Loss: 0.08004338294267654. Accuracy: 97.11000061035156
Iteration: 5500. Loss: 0.16317909955978394. Accuracy: 96.5
Iteration: 6000. Loss: 0.02393285557627678. Accuracy: 97.80000305175781
Iteration: 6500. Loss: 0.016652461141347885. Accuracy: 97.77999877929688
Iteration: 7000. Loss: 0.021453820168972015. Accuracy: 97.7300033569336
Iteratio

In [None]:
def evaluate(model, val_iter):
    corrects, total, total_loss = 0, 0, 0
    model.eval() # MUST!! for the evaluation mode
    for images, labels in val_iter:
        if torch.cuda.is_available():
            images = Variable(images.view(-1, seq_dim, input_dim).cuda())
        else:
            images = Variable(images.view(-1 , seq_dim, input_dim)).to(device)
        labels = labels.cuda()
        logit = model(images).cuda()
        loss = F.cross_entropy(logit, labels, reduction = "sum")
        _, predicted = torch.max(logit.data, 1) # The index with the highest predicted probability is considered as the class prediction
        total += labels.size(0)
        total_loss += loss.item()
        corrects += (predicted == labels).sum()

    avg_loss = total_loss / len(val_iter.dataset)
    avg_accuracy = corrects / total
    return avg_loss, avg_accuracy

In [12]:
test_loss, test_acc = evaluate(model,test_loader)
print("Test Loss: %5.2f | Test Accuracy: %5.2f" % (test_loss, test_acc))

Test Loss:  0.06 | Test Accuracy:  0.98
