In [3]:
import os
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torchvision
import torchvision.transforms as transforms

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import tools
import tests

In [4]:
data_dir = tools.select_data_dir()
device = torch.device('cpu')

The data directory is ../data


In [5]:
transform = transforms.Compose([
    transforms.ToTensor(),  # Transform to tensor
    transforms.Normalize((0.5,), (0.5,))  # Scale images to [-1, 1]
])

trainset = torchvision.datasets.FashionMNIST(root=data_dir, train=True, download=True, transform=transform)
testset = torchvision.datasets.FashionMNIST(root=data_dir, train=False, download=True, transform=transform)

classes = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal',
           'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=5, shuffle=False)

### ResNet block
Our ResNet consists of blocks with two convolutional layers and a skip connection.

In the most general case, our implementation should have:


* Two convolutional layers with:
    * 3x3 kernel
    * no bias terms
    * padding with one pixel on both sides
    * 2d batch normalization after each convolutional layer.

* **The first convolutional layer also (optionally) has:**
    * different number of input channels and output channels
    * change of the resolution with stride.

* The skip connection:
    * simply copies the input if the resolution and the number of channels do not change.
    * if either the resolution or the number of channels change, the skip connection should have one convolutional layer with:
        * 1x1 convolution **without bias**
        * change of the resolution with stride (optional)
        * different number of input channels and output channels (optional)
    * if either the resolution or the number of channels change, the 1x1 convolutional layer is followed by 2d batch normalization.

* The ReLU nonlinearity is applied after the first convolutional layer and at the end of the block.


In [86]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        """
        Args:
          in_channels (int):  Number of input channels.
          out_channels (int): Number of output channels.
          stride (int):       Controls the stride.
        """
        super(Block, self).__init__()
        # YOUR CODE HERE
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=(1,1), bias=False) 
        self.bn1 = nn.BatchNorm2d(num_features=out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,stride=stride, padding=(1,1), bias=False)
        self.bn2 = nn.BatchNorm2d(num_features=out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, out_channels, kernel_size=1,bias=False)
        self.bn3 = nn.BatchNorm2d(num_features=out_channels)
        
        self.relu=nn.ReLU()
        

       
    def forward(self, x):
        # YOUR CODE HERE
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        
        if(x.stride !=1):
            x = self.conv3(x)
            x = self.bn3(x)
        print(x.stride)
    
        x = self.relu(x)
        
        
        return x

In [87]:
def test_Block_shapes():

    # The number of channels and resolution do not change
    batch_size = 20
    x = torch.zeros(batch_size, 16, 28, 28)
    block = Block(in_channels=16, out_channels=16)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels
    block = Block(in_channels=16, out_channels=32)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 28, 28]), "Bad shape of y: y.shape={}".format(y.shape)

    # Decrease the resolution
    block = Block(in_channels=16, out_channels=16, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 16, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    # Increase the number of channels and decrease the resolution
    block = Block(in_channels=16, out_channels=32, stride=2)
    y = block(x)
    assert y.shape == torch.Size([batch_size, 32, 14, 14]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_Block_shapes()

<built-in method stride of Tensor object at 0x7fe25125ff80>
<built-in method stride of Tensor object at 0x7fe25125fe40>
<built-in method stride of Tensor object at 0x7fe25125ff40>
<built-in method stride of Tensor object at 0x7fe25125f840>
Success


In [83]:
tests.test_Block(Block)
tests.test_Block_relu(Block)
tests.test_Block_batch_norm(Block)

<built-in method stride of Tensor object at 0x7fe241a78dc0>


AssertionError: 
Not equal to tolerance rtol=1e-07, atol=0.001
y does not match expected value.
Mismatched elements: 9 / 9 (100%)
Max absolute difference: 1.00073242
Max relative difference: 0.03847606
 x: array([[[[24.999622, 34.999474, 24.999622],
         [34.999474, 48.999268, 34.999474],
         [24.999622, 34.999474, 24.999622]]]], dtype=float32)
 y: array([[[[26, 36, 26],
         [36, 50, 36],
         [26, 36, 26]]]])

In [27]:
class GroupOfBlocks(nn.Module):
    def __init__(self, in_channels, out_channels, n_blocks, stride=1):
        super(GroupOfBlocks, self).__init__()

        first_block = Block(in_channels, out_channels, stride)
        other_blocks = [Block(out_channels, out_channels) for _ in range(1, n_blocks)]
        self.group = nn.Sequential(first_block, *other_blocks)

    def forward(self, x):
        return self.group(x)

In [28]:
# Let's print a block
group = GroupOfBlocks(in_channels=10, out_channels=20, n_blocks=3)
print(group)

GroupOfBlocks(
  (group): Sequential(
    (0): Block(
      (conv1): Conv2d(10, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(20, 20, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (1): Block(
      (conv1): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(20, 20, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(20, eps=1e-05, momentum=0.1, affine=True, track_running_s

In [29]:
class ResNet(nn.Module):
    def __init__(self, n_blocks, n_channels=64, num_classes=10):
        """
        Args:
          n_blocks (list):   A list with three elements which contains the number of blocks in 
                             each of the three groups of blocks in ResNet.
                             For instance, n_blocks = [2, 4, 6] means that the first group has two blocks,
                             the second group has four blocks and the third one has six blocks.
          n_channels (int):  Number of channels in the first group of blocks.
          num_classes (int): Number of classes.
        """
        assert len(n_blocks) == 3, "The number of groups should be three."
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=n_channels, kernel_size=5, stride=1, padding=2, bias=False)
        self.bn1 = nn.BatchNorm2d(n_channels)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.group1 = GroupOfBlocks(n_channels, n_channels, n_blocks[0])
        self.group2 = GroupOfBlocks(n_channels, 2*n_channels, n_blocks[1], stride=2)
        self.group3 = GroupOfBlocks(2*n_channels, 4*n_channels, n_blocks[2], stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=4, stride=1)
        self.fc = nn.Linear(4*n_channels, num_classes)

        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, np.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x, verbose=False):
        """
        Args:
          x of shape (batch_size, 1, 28, 28): Input images.
          verbose: True if you want to print the shapes of the intermediate variables.
        
        Returns:
          y of shape (batch_size, 10): Outputs of the network.
        """
        if verbose: print(x.shape)
        x = self.conv1(x)
        if verbose: print('conv1:  ', x.shape)
        x = self.bn1(x)
        if verbose: print('bn1:    ', x.shape)
        x = self.relu(x)
        if verbose: print('relu:   ', x.shape)
        x = self.maxpool(x)
        if verbose: print('maxpool:', x.shape)

        x = self.group1(x)
        if verbose: print('group1: ', x.shape)
        x = self.group2(x)
        if verbose: print('group2: ', x.shape)
        x = self.group3(x)
        if verbose: print('group3: ', x.shape)

        x = self.avgpool(x)
        if verbose: print('avgpool:', x.shape)

        x = x.view(-1, self.fc.in_features)
        if verbose: print('x.view: ', x.shape)
        x = self.fc(x)
        if verbose: print('out:    ', x.shape)

        return x

In [30]:
def test_ResNet_shapes():
    # Create a network with 2 block in each of the three groups
    n_blocks = [2, 2, 2]  # number of blocks in the three groups
    net = ResNet(n_blocks, n_channels=10)
    net.to(device)

    # Feed a batch of images from the training data to test the network
    with torch.no_grad():
        images, labels = iter(trainloader).next()
        images = images.to(device)
        print('Shape of the input tensor:', images.shape)

        y = net.forward(images, verbose=True)
        print(y.shape)
        assert y.shape == torch.Size([trainloader.batch_size, 10]), "Bad shape of y: y.shape={}".format(y.shape)

    print('Success')

test_ResNet_shapes()

Shape of the input tensor: torch.Size([32, 1, 28, 28])
torch.Size([32, 1, 28, 28])
conv1:   torch.Size([32, 10, 28, 28])
bn1:     torch.Size([32, 10, 28, 28])
relu:    torch.Size([32, 10, 28, 28])
maxpool: torch.Size([32, 10, 14, 14])
group1:  torch.Size([32, 10, 14, 14])
group2:  torch.Size([32, 20, 7, 7])
group3:  torch.Size([32, 40, 4, 4])
avgpool: torch.Size([32, 40, 1, 1])
x.view:  torch.Size([32, 40])
out:     torch.Size([32, 10])
torch.Size([32, 10])
Success


In [31]:
# This function computes the accuracy on the test dataset
def compute_accuracy(net, testloader):
    net.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in testloader:
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

In [32]:
# Create the network
n_blocks = [2, 2, 2]  # number of blocks in the three groups
net = ResNet(n_blocks, n_channels=16)
net.to(device)

ResNet(
  (conv1): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (group1): GroupOfBlocks(
    (group): Sequential(
      (0): Block(
        (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU()
      )
      (1): Block(
        (conv1): Conv2d(1

In [33]:
optimizer = optim.Adam(net.parameters(),lr=0.01)
loss_function = nn.CrossEntropyLoss()

EPOCHS = 10
BATCH_SIZE = 32

In [34]:
# Implement the training loop in this cell
for epoch in range(EPOCHS):
    for images, labels in trainloader:
        optimizer.zero_grad()
        output = net(images)
        loss = loss_function(output, labels)
        loss.backward()
        optimizer.step()
    print(f"Epoch: {epoch}. Loss: {loss}")

Epoch: 0. Loss: 0.706305742263794
Epoch: 1. Loss: 0.28587406873703003
Epoch: 2. Loss: 0.18219316005706787
Epoch: 3. Loss: 0.25198814272880554
Epoch: 4. Loss: 0.2304684817790985
Epoch: 5. Loss: 0.16556903719902039


KeyboardInterrupt: 