# Homework 3, exercise 2 - Residual Neural Network on CIFAR10

In this exercise we implement a (slightly modified) ResNet as introduced in [this paper](https://arxiv.org/pdf/1512.03385.pdf).

We will use the CIFAR-10 dataset and we will implement and train a ResNet to properly classifying the input images.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import time

For this exercise it is recommended to use the GPU! The ResNet uses conv2d layers and skip connections, making the training very slow on CPU.

In [None]:
use_cuda = True

if use_cuda and torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')

device

device(type='cuda')

### Load the CIFAR10 dataset

The CIFAR10 dataset is composed of 32x32x3 (height x width x channel) labeled images belonging to 10 different classes ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck').

In [None]:
import torchvision
import torchvision.transforms as transforms

transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(root='./data_cifar', train=True,
                                        download=True, transform=transform_train)

testset = torchvision.datasets.CIFAR10(root='./data_cifar', train=False,
                                       download=True, transform=transform_test)

batch_size = 128

c, w, h = 3, 32, 32

trainloader = torch.utils.data.DataLoader(trainset,
                                          batch_size=batch_size,
                                          shuffle=True)

testloader = torch.utils.data.DataLoader(testset,
                                         batch_size=batch_size,
                                         shuffle=True)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data_cifar/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:02<00:00, 60408394.96it/s]


Extracting ./data_cifar/cifar-10-python.tar.gz to ./data_cifar
Files already downloaded and verified


## Exercise 1 - Implement a Residual Block

Residual neural networks mainly consist of components called Residual Blocks. One residual block can be expressed as **y** = *f*(**x**) + **x** (see Equation (11.5)), where **x** and **y** are the input and output of the block, respectively. So the input **x** is added to the result of *f*(**x**) using a *skip connection*.

In this exercise, *f* consists of:
1. a 2d convolutional layer with input channels=`in_channels`, output channels=`hidden_channels`, a kernel size of (3, 3), a stride of 1, padding of 1 and no bias parameter.
2. a batch normalisation layer
3. ReLU activation
4. a 2d convolutional layer with input channels=`hidden_channels`, output channels=`out_channels`, a kernel size of (3, 3), a stride of 1, padding of 1 and no bias parameter.
5. a batch normalisation layer

After this the `skip_connection` is applied. If the dimensions of *f*(**x**) and **x** do not match, an extra linear projection is applied to **x** so the dimensions match. This has already been implemented for you. You only need to call it at the right place.
Finally, a ReLU activation is applied on the output **y**


In [None]:
class ResidualBlock(nn.Module):

  def __init__(self, in_channels, hidden_channels, out_channels):
    super().__init__()

    # TODO: Define the layers

    self.conv1 = # TODO
    self.batch1 = # TODO
    self.relu = # TODO
    self.conv2 = # TODO
    self.batch2 = # TODO
    ###############################################################

        if in_channels != out_channels:  # f(x) and x dimensions do not match! Define a projection for input x
      self.skip_connection = nn.Sequential(
          nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
          nn.BatchNorm2d(out_channels)
      )
    else:
      self.skip_connection = lambda x: x  # The dimensions already match! No need to do a projection on x

  def forward(self, x):
    # TODO: Implement the forward pass
    skip = # TODO
    x = # TODO
    x = # TODO
    x = # TODO
    x = # TODO
    x = # TODO
    x = # TODO
    return x
    ###############################################################



## Exercise 2 - Implement a Residual Neural Network
Now you can use the previously defined Residual Block to create your ResNet.

The network consists of:
1. a convolutional layer with input channels=`in_channels`, output channels=64, a stride of 1, padding of 1 and no bias parameter,
2. a batch normalisation layer
3. ReLU activation
4. a max pooling layer with kernel size (3, 3), a stride of 2 and padding of 1,
5. eight residual blocks, with (64, 64, 128, 128, 256, 256, 512, 512) channels, respectively (see code below)
6. an average pooling layer over all feature maps (already present)
7. a dense layer to form the output distribution (already present)

In [None]:
class ResNet(nn.Module):

  def __init__(self, in_channels, out_size):
    super().__init__()

    # TODO: Define the layers

    self.conv1 = # TODO
    self.batch1 = # TODO
    self.relu = # TODO
    self.pool1 = #TODO

    ###############################################################

    self.res_blocks = nn.ModuleList(
        [
         ResidualBlock(64, 64, 64),
         ResidualBlock(64, 64, 64),

         ResidualBlock(64, 128, 128),
         ResidualBlock(128, 128, 128),

         ResidualBlock(128, 256, 256),
         ResidualBlock(256, 256, 256),

         ResidualBlock(256, 512, 512),
         ResidualBlock(512, 512, 512),
        ]
    )

    self.dense_layer = nn.Linear(512, out_size)

    for module in self.modules():
      if isinstance(module, nn.Conv2d):
          nn.init.kaiming_normal_(module.weight, mode='fan_out', nonlinearity='relu')

  def forward(self, x):

    # TODO: Implement the forward pass (add everything that needs to be done before the average pooling)

    x = # TODO
    x = # TODO
    x = # TODO
    x = # TODO
    for block in self.res_blocks:
      x = # TODO

    #################################################################

    x = F.avg_pool2d(x, x.shape[2:])

    x = x.view(x.size(0), -1)
    x = self.dense_layer(x)

    return x



### Initialize the network, Loss function and Optimizer

In [None]:
net = ResNet(c, len(classes)).to(device)

criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

## Exercise 3 - Train/evaluate the network
Train the network you built using the code below. First check your code by training a single epoch, which should already give you around 50% train accuracy. Then run it for 100-200 epochs such that the code converges.

Add the following answers in your report:
1. What test accuracy were you able to get?
2. How many layers does your network have? (counting only convolutional and dense layers)
3. Why do the skip connections help for training deep neural networks?
4. What options do you have to improve the test accuracy? Explain 3 options why you think that they would improve accuracy. (you do not need to implement/code them)

In [None]:
start=time.time()

for epoch in range(0,200):

  net.train()  # Put the network in train mode
  for i, (x_batch, y_batch) in enumerate(trainloader):
    x_batch, y_batch = x_batch.to(device), y_batch.to(device)  # Move the data to the device that is used

    optimizer.zero_grad()  # Set all currenly stored gradients to zero

    y_pred = net(x_batch)

    loss = criterion(y_pred, y_batch)

    loss.backward()

    optimizer.step()

    # Compute relevant metrics

    y_pred_max = torch.argmax(y_pred, dim=1)  # Get the labels with highest output probability

    correct = torch.sum(torch.eq(y_pred_max, y_batch)).item()  # Count how many are equal to the true labels

    elapsed = time.time() - start  # Keep track of how much time has elapsed

    # Show progress every 20 batches
    if not i % 20:
      print(f'epoch: {epoch}, time: {elapsed:.3f}s, loss: {loss.item():.3f}, train accuracy: {correct / batch_size:.3f}')

    correct_total = 0

  net.eval()  # Put the network in eval mode
  for i, (x_batch, y_batch) in enumerate(testloader):
    x_batch, y_batch = x_batch.to(device), y_batch.to(device)  # Move the data to the device that is used

    y_pred = net(x_batch)
    y_pred_max = torch.argmax(y_pred, dim=1)

    correct_total += torch.sum(torch.eq(y_pred_max, y_batch)).item()

  print(f'Accuracy on the test set: {correct_total / len(testset):.3f}')




In [None]:
correct_total = 0

for i, (x_batch, y_batch) in enumerate(testloader):
  x_batch, y_batch = x_batch.to(device), y_batch.to(device)  # Move the data to the device that is used

  y_pred = net(x_batch)
  y_pred_max = torch.argmax(y_pred, dim=1)

  correct_total += torch.sum(torch.eq(y_pred_max, y_batch)).item()

print(f'Accuracy on the test set: {correct_total / len(testset):.3f}')