## Classification using self written Long Short Term Memory (LSTM) model

In [1]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.datasets as dsets
from torch.autograd import Variable
from itertools import chain

In [4]:
'''
STEP 1: LOADING DATASET
'''
train_dataset = dsets.MNIST(root='./data', 
                            train=True, 
                            transform=transforms.ToTensor(),
                            download=True)
 
test_dataset = dsets.MNIST(root='./data', 
                           train=False, 
                           transform=transforms.ToTensor())
 
'''
STEP 2: MAKING DATASET ITERABLE
'''
 
batch_size = 100
n_iters = 6000
num_epochs = n_iters / (len(train_dataset) / batch_size)
num_epochs = int(num_epochs)
 
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True)
 
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                          batch_size=batch_size, 
                                          shuffle=False)
 
'''
STEP 3: CREATE MODEL CLASS: LSTM
'''
 
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(LSTMModel, self).__init__()
        # Hidden dimensions
        self.hidden_dim = hidden_dim
         
        # Number of hidden layers
        self.layer_dim = layer_dim
        
        # A dict to store all the models used in LSTM
        self.model_dict={}
        # each list stores sub-models in different layers 
        self.model_dict['wb_ii']=[] # w: weights; b: biases; ii: from input calculate input gate
        self.model_dict['wb_hi']=[] # hi: from hidden layer calculate input gate
        self.model_dict['wb_if']=[] # if: from input calculate forget gate
        self.model_dict['wb_hf']=[]
        self.model_dict['wb_io']=[] # io: from input calculate output gate
        self.model_dict['wb_ho']=[]
        self.model_dict['wb_ig']=[] # ig: from input calculate original output of hidden layer
        self.model_dict['wb_hg']=[]
        
        self.sigmoid=nn.Sigmoid() # Sigmoid used in calculating gates
        self.tanh=nn.Tanh() # tanh used in calculating g and output
        
        # Building LSTM
        input_dim_list=[input_dim] # All layers have the same input dimension as hidden dimension, except the first layer
        for layer in range(layer_dim-1):
            input_dim_list.append(hidden_dim)
        for layer in range(layer_dim):
            # Input gate
            self.model_dict['wb_ii'].append(nn.Linear(input_dim_list[layer],hidden_dim))
            self.model_dict['wb_hi'].append(nn.Linear(hidden_dim,hidden_dim))
            # Forget gate
            self.model_dict['wb_if'].append(nn.Linear(input_dim_list[layer],hidden_dim))
            self.model_dict['wb_hf'].append(nn.Linear(hidden_dim,hidden_dim))
            # Output gate
            self.model_dict['wb_io'].append(nn.Linear(input_dim_list[layer],hidden_dim))
            self.model_dict['wb_ho'].append(nn.Linear(hidden_dim,hidden_dim))
            # Cell gate: produce new candidate values
            self.model_dict['wb_ig'].append(nn.Linear(input_dim_list[layer],hidden_dim))
            self.model_dict['wb_hg'].append(nn.Linear(hidden_dim,hidden_dim))

        # Readout layer: 100-->10
        self.fc = nn.Linear(hidden_dim, output_dim)
    
    def parameters(self): # A generator of all parameters of all sub-models
        parameter_generator=None # Initialize as None
        for group_key in self.model_dict:
            for sub_model in self.model_dict[group_key]: # sub_model is a single model in a certain layer (such as nn.Linear in layer 2)
                if parameter_generator is None:
                    parameter_generator=sub_model.parameters()
                else:
                    parameter_generator=chain(parameter_generator,sub_model.parameters())
        parameter_generator=chain(parameter_generator,self.fc.parameters())
        return parameter_generator
    
    def cuda(self):
        for group_key in self.model_dict:
            for sub_model in self.model_dict[group_key]: # sub_model is a single model in a certain layer (such as nn.Linear in layer 2)
                # print(sub_model)
                sub_model.cuda()
        self.fc.cuda()
    
    def forward(self, x): # Tensor shape of x: batch_size*seq_dim*input_dim
        
        # Initialize hidden state with zeros
        if torch.cuda.is_available():
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda()) # x.size(0): batch_size
        else:
            h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
        # A list of all hidden states of each time point
        h_list=[h0]
        """Use list instead of a big Variable to store everything! 
            In order to make sure a certain Variable is only updated once. 
            Because the updated value will be needed for computating gradients in backpropagation!
            Otherwise there will be error: 
            one of the variables needed for gradient computation has been modified by an inplace operation"""
        
        # Initialize cell state
        if torch.cuda.is_available():
            c0 = Variable(torch.ones(self.layer_dim, x.size(0), self.hidden_dim).cuda()) # x.size(0): batch_size
        else:
            c0 = Variable(torch.ones(self.layer_dim, x.size(0), self.hidden_dim))
        # A list of all cell states of each time point
        c_list=[c0]
            
        time_steps = x.size(1) # number of time steps
        for step in range(time_steps):
            # One time step
            # Initialize current hidden state and cell state
            if torch.cuda.is_available():
                curr_h = [Variable(torch.zeros(x.size(0), self.hidden_dim).cuda()) for i in range(self.layer_dim)]
                curr_c = [Variable(torch.ones(x.size(0), self.hidden_dim).cuda()) for i in range(self.layer_dim)]
            else:
                curr_h = [Variable(torch.zeros(x.size(0), self.hidden_dim)) for i in range(self.layer_dim)]
                curr_c = [Variable(torch.ones(x.size(0), self.hidden_dim)) for i in range(self.layer_dim)]

            for layer in range(self.layer_dim):
                if layer==0: # The input to the first layer is the raw input: x[:, step, :]
                    input_gate=self.sigmoid(self.model_dict['wb_ii'][layer](x[:, step, :])+self.model_dict['wb_hi'][layer](h_list[-1][layer])) # h_list[-1] is the hidden state of last time step
                    forget_gate=self.sigmoid(self.model_dict['wb_if'][layer](x[:, step, :])+self.model_dict['wb_hf'][layer](h_list[-1][layer]))
                    output_gate=self.sigmoid(self.model_dict['wb_io'][layer](x[:, step, :])+self.model_dict['wb_ho'][layer](h_list[-1][layer]))
                    new_values=self.tanh(self.model_dict['wb_ig'][layer](x[:, step, :])+self.model_dict['wb_hg'][layer](h_list[-1][0]))

                else: # The input to the other layers are input processed by the previous layers: curr_h[layer-1]
                    input_gate=self.sigmoid(self.model_dict['wb_ii'][layer](curr_h[layer-1])+self.model_dict['wb_hi'][layer](h_list[-1][layer]))
                    forget_gate=self.sigmoid(self.model_dict['wb_if'][layer](curr_h[layer-1])+self.model_dict['wb_hf'][layer](h_list[-1][layer]))
                    output_gate=self.sigmoid(self.model_dict['wb_io'][layer](curr_h[layer-1])+self.model_dict['wb_ho'][layer](h_list[-1][layer]))
                    new_values=self.tanh(self.model_dict['wb_ig'][layer](curr_h[layer-1])+self.model_dict['wb_hg'][layer](h_list[-1][0]))
                
                curr_c[layer] = forget_gate * c_list[-1][layer] + input_gate * new_values # update current layer of current cell state
                curr_h[layer] = output_gate * self.tanh(curr_c[layer]) # update current layer of current hidden state
            
            c_list.append(curr_c) # add current cell state to the list
            h_list.append(curr_h)
        
        # Classification
        out = self.fc(h_list[time_steps][self.layer_dim-1]) # Output of the last time step
        # out.size() --> 100, 10
        return out
 
'''
STEP 4: INSTANTIATE MODEL CLASS
'''
input_dim = 28
hidden_dim = 100
layer_dim = 3  # ONLY CHANGE IS HERE FROM ONE LAYER TO TWO LAYER
output_dim = 10
 
model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim)

if torch.cuda.is_available():
    model.cuda()
     
'''
STEP 5: INSTANTIATE LOSS CLASS
'''
criterion = nn.CrossEntropyLoss()
 
'''
STEP 6: INSTANTIATE OPTIMIZER CLASS
'''
learning_rate = 0.1
 
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)  
 
'''
STEP 7: TRAIN THE MODEL
'''
 
# Number of steps to unroll
seq_dim = 28 
 
iter = 0
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        # Load images as Variable
        if torch.cuda.is_available():
            images = Variable(images.view(-1, seq_dim, input_dim).cuda())
            labels = Variable(labels.cuda())
        else:
            images = Variable(images.view(-1, seq_dim, input_dim))
            labels = Variable(labels)
             
        # Clear gradients w.r.t. parameters
        optimizer.zero_grad()
         
        # Forward pass to get output/logits
        # outputs.size() --> 100, 10
        outputs = model(images)
         
        # Calculate Loss: softmax --> cross entropy loss
        loss = criterion(outputs, labels)
         
        # Getting gradients w.r.t. parameters
        loss.backward()
         
        # Updating parameters
        optimizer.step()
         
        iter += 1
         
        if iter % 300 == 0:
            # Calculate Accuracy         
            correct = 0
            total = 0
            # Iterate through test dataset
            for images, labels in test_loader:

                if torch.cuda.is_available():
                    images = Variable(images.view(-1, seq_dim, input_dim).cuda())
                else:
                    images = Variable(images.view(-1, seq_dim, input_dim))
                 
                # Forward pass only to get logits/output
                outputs = model(images)
                 
                # Get predictions from the maximum value
                _, predicted = torch.max(outputs.data, 1)
                 
                # Total number of labels
                total += labels.size(0)
                 
                # Total correct predictions
                if torch.cuda.is_available():
                    correct += (predicted.cpu() == labels.cpu()).sum()
                else:
                    correct += (predicted == labels).sum()
             
            accuracy = 100 * correct / total
             
            # Print Loss
            print('Iteration: {}. Loss: {}. Accuracy: {}'.format(iter, loss.data[0], accuracy))
            
            

Iteration: 300. Loss: 2.283118963241577. Accuracy: 15.32
Iteration: 600. Loss: 1.96955144405365. Accuracy: 27.37
Iteration: 900. Loss: 0.99946129322052. Accuracy: 61.33
Iteration: 1200. Loss: 0.6285029053688049. Accuracy: 79.55
Iteration: 1500. Loss: 0.3197811245918274. Accuracy: 89.67
Iteration: 1800. Loss: 0.2982783019542694. Accuracy: 92.77
Iteration: 2100. Loss: 0.19164474308490753. Accuracy: 92.1
Iteration: 2400. Loss: 0.06307867169380188. Accuracy: 95.84
Iteration: 2700. Loss: 0.20054252445697784. Accuracy: 94.11
Iteration: 3000. Loss: 0.11319737136363983. Accuracy: 95.64
Iteration: 3300. Loss: 0.13868208229541779. Accuracy: 97.22
Iteration: 3600. Loss: 0.03167583793401718. Accuracy: 97.24
Iteration: 3900. Loss: 0.11802885681390762. Accuracy: 97.24
Iteration: 4200. Loss: 0.08874500542879105. Accuracy: 97.53
Iteration: 4500. Loss: 0.15819133818149567. Accuracy: 97.62
Iteration: 4800. Loss: 0.19451479613780975. Accuracy: 97.22
Iteration: 5100. Loss: 0.051464639604091644. Accuracy: 