In [1]:
import time

import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torch.nn.functional as F

from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader

In [2]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if not torch.cuda.is_available():
    print("WARNING: CUDA not available. Running on CPU.")

First, let's preprocess the data and apply some transforms so that our model has an easier time
with training and testing.

Training set:
- ToTensor:
    - converts to PyTorch tensor
- Normalize(mean, std):
    - Normalises each channel of the tensor image. Using the standard mean and std for CIFAR10 dataset.
- RandomHorizontalFlip:
    - Added as a form of data augmentation to make the model more robust to different spatial orientations.
- RandomCrop(size, padding, padding_mode):
    - Randomly crops image to size (size, size) and adds a padding of 4 pixels to all sides of the image.
    - padding_mode=reflect just means that the padding uses reflection of the input array to fill the new pixels. e.g. {a, b, c} => {a, b, c, b, a} for padding=2. Helps model generalise better.

Testing set:
- Only need to convert to tensor and normalise, as augmentation outside of this shouldn't be done on testing sets.

In [3]:
# Normalise data
transform_train = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4, padding_mode='reflect')
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

In [4]:
# Batch size
batch_size_train = 128
batch_size_test = 100

# Create train and test data
train_data = CIFAR10(root='C:/Users/Jacqu/Downloads/data/cifar10', train=True, download=True, transform=transform_train)
test_data = CIFAR10(root='C:/Users/Jacqu/Downloads/data/cifar10', train=False, download=True, transform=transform_test)

# Create dataloaders
train_loader = DataLoader(train_data, batch_size=batch_size_train, shuffle=True, num_workers=2)
test_loader = DataLoader(test_data, batch_size=batch_size_test, shuffle=False, num_workers=2)

# Report split sizes
print(f'Training set has {len(train_loader)} instances')
print(f'Testing set has {len(test_loader)} instances')

Files already downloaded and verified
Files already downloaded and verified
Training set has 391 instances
Testing set has 100 instances


## Creating the ResNet architecture

### Overview
For complex tasks like image classification, one needs a deep CNN to get a model that performs well. The issue with this is that, as you add more layers to the NN, it becomes difficult to train and the accuracy starts to saturate and degrade. This is called the Vanishing Gradient problem, where the gradients that are used to update the network become extremely small (vanish) as they are backpropogated from the output layers to the earlier layers.

Enter Residual Networks (ResNet)!

![Alt text](image-5.png)

ResNet solves the Vanishing Gradient problem by utilising skip connections, which allows alternate shortcut pats for the gradient to flow through. Another benefit of these connections is that they allow the model to learn the identity functions which ensure that a higher layer will perform at least as good as a lower layer (and not worse).

### Residual Block
A residual block (as shown below) is a stack of layers set in such a way that the output of a layer is taken and added to another layer deeper in the block. The non-linearity is then applied after adding it together with the corresponding layer in the main path.

![Alt text](image-4.png)

So, a residual network is just a stack of residual blocks.

In [5]:
# Model
class ResidualBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(ResidualBlock, self).__init__()
        
        # 1st Convolutional Layer
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        # Perform Batch Normalisation for stabilising training and improving generalisation
        self.bn1 = nn.BatchNorm2d(planes)
        
        # 2nd Convolutional Layer
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=1, padding=1, bias=False
        )
        # 2nd Batch Normalisation
        self.bn2 = nn.BatchNorm2d(planes)

        # Shortcut connection to downsample residual
        self.shortcut = nn.Sequential()

        # If the input shape is different from the output shape
        # a 1x1 convolution followed by batch normalisation is added to the shortcut
        # to match the dimensions.
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False
                ),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        # First layer of conv., batch norm. and relu activation
        out = F.relu(self.bn1(self.conv1(x)))

        # Second layer of conv. and batch norm.
        out = self.bn2(self.conv2(out))

        # Add shortcut to output (residual connection)
        out += self.shortcut(x)

        # Apply relu activation
        out = F.relu(out)
        return out

In [6]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        # Pytorch has downsampling
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        # For every one of two blocks in a layer, first will downsample by a factor of 2,
        # the second one will compute the convolutional layer
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def ResNet18():
    return ResNet(ResidualBlock, [2, 2, 2, 2])

def ResNet34():
    return ResNet(ResidualBlock, [3, 4, 6, 3])

In [7]:
# Hyper-parameters
num_epochs = 40
learning_rate = 0.1
num_classes = 10

In [8]:
model = ResNet18().to(device)

In [9]:
# Model info
print("Model No. of Parameters:", sum([param.nelement() for param in model.parameters()]))
print(model)

Model No. of Parameters: 11173962
ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNor

In [10]:
criterion = nn.CrossEntropyLoss()

# SGD does not change learning rate
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=5e-4)

# Piecewise learning rate scheduler
total_step = len(train_loader)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer, max_lr=learning_rate, steps_per_epoch=total_step, epochs=num_epochs
)

In [12]:
# Construct scaler for mixed precision training
scaler = torch.cuda.amp.GradScaler()

# Train the model
model.train()
print("> Training")
start = time.time()
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        with torch.autocast(device_type='cuda', dtype=torch.float16):
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
        
        # Scale loss. Calls backward on scaled loss to create scaled gradients
        scaler.scale(loss).backward()

        # First unscales the gradients of the optimiser's assigned parameters. If these gradients do not contain infs or NaNs,
        # optimizer.step() is then called, otherwise, optimizer.step() is skipped.
        scaler.step(optimizer)

        # Updates the scale for next iteration
        scaler.update()

        # Backwards and optimise
        optimizer.zero_grad()

        if (i+1) == 390:
            print("Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}".format(
                epoch+1, num_epochs, i+1, total_step, loss.item()
            ))

        scheduler.step()
end = time.time()
elapsed = end - start
print("Training time: {:.2f} sec or {:.2f} min".format(elapsed, elapsed / 60))

> Training
Epoch [1/40], Step [390/391], Loss: 1.1184
Epoch [2/40], Step [390/391], Loss: 0.8407
Epoch [3/40], Step [390/391], Loss: 0.8414
Epoch [4/40], Step [390/391], Loss: 0.7977
Epoch [5/40], Step [390/391], Loss: 0.5678
Epoch [6/40], Step [390/391], Loss: 0.4835
Epoch [7/40], Step [390/391], Loss: 0.4660
Epoch [8/40], Step [390/391], Loss: 0.4158
Epoch [9/40], Step [390/391], Loss: 0.5166
Epoch [10/40], Step [390/391], Loss: 0.4298
Epoch [11/40], Step [390/391], Loss: 0.4324
Epoch [12/40], Step [390/391], Loss: 0.3191
Epoch [13/40], Step [390/391], Loss: 0.4344
Epoch [14/40], Step [390/391], Loss: 0.2941
Epoch [15/40], Step [390/391], Loss: 0.3774
Epoch [16/40], Step [390/391], Loss: 0.4038
Epoch [17/40], Step [390/391], Loss: 0.4323
Epoch [18/40], Step [390/391], Loss: 0.2430
Epoch [19/40], Step [390/391], Loss: 0.3853
Epoch [20/40], Step [390/391], Loss: 0.2560
Epoch [21/40], Step [390/391], Loss: 0.2880
Epoch [22/40], Step [390/391], Loss: 0.3388
Epoch [23/40], Step [390/391],

In [13]:
# Test the model
model.eval()
print("> Testing")
start = time.time()
with torch.no_grad():
    correct= 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)

        # Count number of correct predictions
        correct += (predicted == labels).sum().item()
    print("Test Accuracy: {} %".format(100 * correct / total))

end = time.time()
elapsed = end - start
print("Testing time: {:.2f} sec or {:.2f} min".format(elapsed, elapsed / 60))

> Testing
Test Accuracy: 94.54 %
Testing time: 7.24 sec or 0.12 min
