# Recurrent Neural Network

In [1]:
# import packages.
import torch
import torchvision

In [2]:
# Device configuration.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# Hyper-parameters
sequence_length = 28
input_size = 28
hidden_size = 128
num_layers = 2
num_classes = 10
batch_size = 100
num_epochs = 2
learning_rate = 0.01

In [4]:
# Load downloaded dataset.
import numpy as np
import gzip
import os
class MNISTDataset(torch.utils.data.Dataset):
    def __init__(self, root, train=True, transform=None):
        self.file_pre = 'train' if train == True else 't10k'
        self.transform = transform
        self.label_path = os.path.join(root, '%s-labels-idx1-ubyte.gz' % self.file_pre)
        self.image_path = os.path.join(root, '%s-images-idx3-ubyte.gz' % self.file_pre)
        self.images, self.labels = self.__read_data__(self.image_path, self.label_path)
    
    def __read_data__(self, image_path, label_path):
        # Read dataset.
        with gzip.open(label_path, 'rb') as lbpath:
            labels = np.frombuffer(lbpath.read(), np.uint8, offset=8)
        with gzip.open(image_path, 'rb') as imgpath:
            images = np.frombuffer(imgpath.read(), np.uint8, offset=16).reshape(len(labels), 28, 28)
        return images, labels
    
    def __getitem__(self, index):
        image, label = self.images[index], int(self.labels[index])
        if self.transform is not None:
            image = self.transform(np.array(image))
        return image, label
    
    def __len__(self):
        return len(self.labels)

In [5]:
# MNIST dataset
train_dataset = MNISTDataset('../data/MNIST/', transform=torchvision.transforms.ToTensor())
test_dataset = MNISTDataset('../data/MNIST/', train=False, transform=torchvision.transforms.ToTensor())

In [6]:
# Data loader.
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=False)

In [7]:
# Recurrent neural network (many-to-one)
class RNN(torch.nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = torch.nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = torch.nn.Linear(hidden_size, num_classes)
        
    def forward(self, x):
        # Set initial hidden and cell states.
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0)) # out: tensor of shape (batch_size, seq_length, hidden_size)
        
        # Decode the hidden state of the last time step.
        out = self.fc(out[:, -1, :])
        return out

In [8]:
# Make model.
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)

In [9]:
# Loss and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [10]:
# Train the model
import gc
total_step = len(train_loader)
for epoch in range(num_epochs):
    gc.collect()
    torch.cuda.empty_cache()
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Print
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, total_step, loss.item()))

Epoch [1/2], Step [100/600], Loss: 0.5874
Epoch [1/2], Step [200/600], Loss: 0.2861
Epoch [1/2], Step [300/600], Loss: 0.1178
Epoch [1/2], Step [400/600], Loss: 0.0953
Epoch [1/2], Step [500/600], Loss: 0.0616
Epoch [1/2], Step [600/600], Loss: 0.0445
Epoch [2/2], Step [100/600], Loss: 0.1617
Epoch [2/2], Step [200/600], Loss: 0.1722
Epoch [2/2], Step [300/600], Loss: 0.0585
Epoch [2/2], Step [400/600], Loss: 0.1982
Epoch [2/2], Step [500/600], Loss: 0.0853
Epoch [2/2], Step [600/600], Loss: 0.0803


In [11]:
# Test the model.
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        
    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))

Test Accuracy of the model on the 10000 test images: 98.20666666666666 %


In [12]:
# Save the model checkpoint
torch.save(model.state_dict(), 'rnn.ckpt')