In this notebook I have created ResNet based on https://arxiv.org/pdf/1603.05027.pdf

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as f
from torch.optim import Adam
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.autograd import Variable
import numpy as np

In [2]:
def accuracy(preds, y_true):
    '''
    Use this function to check accuracy of a model trained.
    
    :param: preds - predictions generated by neural network
    :param: y_true - true/real labels for each sample in the dataset
    '''
    correct = 0 
    assert len(preds) == len(y_true)
    
    for i in range(len(preds)):
        if np.argmax(preds[i]) == y_true[i]:
            correct += 1
    return correct / len(preds)

In [3]:
#Hyperparams
batch_size = 128
epochs = 10
learning_rate = 0.001
hidden_units = 256
number_of_res_blocks = 10
between_strides = number_of_res_blocks/3
int(between_strides)

3

#### Load and preprocess MNIST dataset

In [4]:
train_dataset = MNIST(root='./data/', 
                      train=True, 
                      transform=transforms.ToTensor(), 
                      download=True)

In [5]:
test_dataset = MNIST(root='./data/', 
                      train=False, 
                      transform=transforms.ToTensor(), 
                      download=False)

In [6]:
train_loader = torch.utils.data.DataLoader(train_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=True, 
                                           num_workers=2)

In [7]:
test_loader = torch.utils.data.DataLoader(test_dataset, 
                                           batch_size=batch_size, 
                                           shuffle=False, 
                                           num_workers=2)

#### Create Residual block class

In [8]:
class ResBlock(nn.Module):
    
    def __init__(self,):
        super(ResBlock, self).__init__()
        #We will set default 32 to be number of filters/units in ResBlock
        self.b_1 = nn.BatchNorm2d(32)
        self.conv_1 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        self.b_2 = nn.BatchNorm2d(32)
        self.conv_2 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1)
        
    def forward(self, X):
        out = self.b_1(X)
        out = f.relu(out)
        out = self.conv_1(out)
        out = self.b_2(out)
        out = f.relu(out)
        out = self.conv_2(out)
        return X + out

#### RESIDUAL NETWORK

In [9]:
class ResNet(nn.Module):
    
    def __init__(self, between_strides, number_of_res_blocks, number_of_classes):
        super(ResNet, self).__init__()
        self.input_layer = nn.Conv2d(1, 32, kernel_size=3)
        self.res_layers = []
        for i in range(int(number_of_res_blocks/between_strides)):
            for j in range(int(between_strides)):
                self.res_layers.append(ResBlock())
            #Make strided layer
            self.res_layers.append(nn.Conv2d(32, 32, kernel_size=3, stride=2))
            self.res_layers.append(nn.BatchNorm2d(32))
        
        self.output_layer = nn.Linear(32*2*2, number_of_classes)
        
    def forward(self, X):
        out = self.input_layer(X)
        for i in range(len(self.res_layers)):
            out = self.res_layers[i](out)
            
        out = out.view(-1, 32*2*2)
        out = self.output_layer(out)
        return out

In [10]:
res_net = ResNet(between_strides, number_of_res_blocks, 10)

In [11]:
res_net

ResNet(
  (input_layer): Conv2d (1, 32, kernel_size=(3, 3), stride=(1, 1))
  (output_layer): Linear(in_features=128, out_features=10)
)

In [12]:
res_net.res_layers

[ResBlock(
   (b_1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_1): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
   (b_2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_2): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
 ), ResBlock(
   (b_1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_1): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
   (b_2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_2): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
 ), ResBlock(
   (b_1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_1): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
   (b_2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True)
   (conv_2): Conv2d (32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
 ), Conv2d (32, 32, kernel_size=(3, 3), stride=(2, 2)), BatchNorm2d(32, eps=1e-05, momentum=0.1, 

#### Define loss function

In [16]:
criterion = nn.CrossEntropyLoss()

#### Define an optimizer

In [17]:
optimizer = Adam(res_net.parameters(), lr=learning_rate)

#### Training ResNet

In [20]:
for epoch in range(epochs):
    epoch_accuracy = []
    epoch_loss = []
    counter = 0
    for images, labels in train_loader:
        
        X_batch = Variable(images)
        y_batch = Variable(labels)
        
        optimizer.zero_grad()
        preds = res_net(X_batch)
        epoch_accuracy.append(accuracy(preds.cpu().data.numpy(), y_batch.cpu().data.numpy()))
        loss = criterion(preds, y_batch)
        epoch_loss.append(loss.cpu().data.numpy())
        loss.backward()
        optimizer.step()
        counter += 1
        if counter % 100 == 0:
            print("Epoch: {}/{}".format(epoch+1, epochs), 
                  " | Epoch loss: {}".format(np.mean(epoch_loss)), 
                  " | Epoch accuracy: {}".format(np.mean(epoch_accuracy)))

Epoch: 1/10  | Epoch loss: 0.5831528902053833  | Epoch accuracy: 0.8496875


KeyboardInterrupt: 