In [26]:
# An implementation of https://arxiv.org/pdf/1512.03385.pdf
# code was referenced from below
# https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py


import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

In [27]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [28]:
num_epochs = 4
batch_size = 100
learning_rate = 0.001

In [29]:
# Image preprocessing modules
transform = transforms.Compose([
    transforms.Pad(4),
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32),
    transforms.ToTensor()])

In [30]:
train_dataset = torchvision.datasets.CIFAR10(root='../../data/',
                                             train=True,
                                             transform=transform,
                                             download=True)

test_dataset = torchvision.datasets.CIFAR10(root='../../data/',
                                            train=False,
                                            transform=transforms.ToTensor())

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

Files already downloaded and verified


The residual block

First we pass our ips through a  3x3 conv layer then we apply batch norm and pass through ReLU and then the second conv layer with a batch norm.

DNN are hard to train
The depth of the nw can be an obstacle to training.

What happens when we pass data into such a deep network?

We initialize a nn all of the layers have randomly chosen weights

Then we pass inputs through each layer of networks the activtions are multiplied by a random weight matrix...okay

After doing it many times, the data gets to the output layer

Now its time to caculate the loss after forward pass

When we pass gradients through back propagation, the loss calculated or the gradients we have now does not hold any value or meaning.
After some forward passes, the layers may have an important information to hold, but it couldnt retain after so many layers ahead, therefore a residue( to understand) is extracted after every few layers and each few layers defined under a residual block in a residual network.

Within each block, the layers will pass their data normally and between each block there is a new type of connection. This connection is called skip connection.

And this connection works by combining input from the block to the output of the block.

Now, we may make this connection by concatenating the i/p and o/p tensors.
These skip connections accelerate the training.

This will the network during backpropagation to hold meaningful falues whose loss is near to zero.






In [31]:
# 3x3 convolution
def conv3x3(in_channels, out_channels, stride=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3,
                     stride=stride, padding=1, bias=False)

In [32]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = conv3x3(in_channels, out_channels, stride)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

In [33]:
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 16
        self.conv = conv3x3(3, 16)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self.make_layer(block, 16, layers[0])
        self.layer2 = self.make_layer(block, 32, layers[1], 2)
        self.layer3 = self.make_layer(block, 64, layers[2], 2)
        self.avg_pool = nn.AvgPool2d(8)
        self.fc = nn.Linear(64, num_classes)

    def make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if (stride != 1) or (self.in_channels != out_channels):
            downsample = nn.Sequential(
                conv3x3(self.in_channels, out_channels, stride=stride),
                nn.BatchNorm2d(out_channels))
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for i in range(1, blocks):
            layers.append(block(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

model = ResNet(ResidualBlock, [2, 2, 2]).to(device)

explanation:


conv3x3 is a function that defines a 3x3 convolutional layer. In convolutional neural networks (CNNs), 3x3 convolutions are commonly used to capture spatial patterns in the input data.

It takes input channels, output channels, and an optional stride as parameters.
A stride of 1 means the kernel moves one pixel at a time.

Adds a batch normalization layer (bn1) after the first convolution.
Batch normalization helps stabilize and speed up training.


Applies Rectified Linear Unit (ReLU) activation to introduce non-linearity. The inplace=True option modifies the input tensor directly, saving memory.
self.conv2 = conv3x3(out_channels, out_channels)

Creates the second 3x3 convolutional layer (conv2) with the same output channels. This convolutional layer further transforms the data.

Adds a batch normalization layer (bn2) after the second convolution.

The downsample parameter is assigned to the downsample attribute. This parameter is optional and provides a way to downsample the input if needed.

forward:

Defines the forward pass method for the ResidualBlock. This method specifies how data should flow through the block during forward propagation.

Saves the original input (x) as the residual. This will be added to the transformed output later.

Applies the first convolutional layer to the input.

Applies batch normalization to the output of the first convolution.

Applies ReLU activation.

Applies the second convolutional layer.

Applies batch normalization to the output of the second convolution.

Checks if a downsample function is provided.

If a downsample function is provided, applies it to the original input (x) to match dimensions.

Adds the original input (residual) to the transformed output. This creates a shortcut connection.

Applies ReLU activation to the final output.

Returns the final output of the residual block after the forward pass.

In [34]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [35]:
# For updating learning rate
def update_lr(optimizer, lr):
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [36]:
total_step = len(train_loader)
curr_lr = learning_rate
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print ("Epoch [{}/{}], Step [{}/{}] Loss: {:.4f}"
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))

    # Decay learning rate
    if (epoch+1) % 20 == 0:
        curr_lr /= 3
        update_lr(optimizer, curr_lr)

Epoch [1/4], Step [100/500] Loss: 1.7602
Epoch [1/4], Step [200/500] Loss: 1.4141
Epoch [1/4], Step [300/500] Loss: 1.2637
Epoch [1/4], Step [400/500] Loss: 1.3798
Epoch [1/4], Step [500/500] Loss: 1.0047
Epoch [2/4], Step [100/500] Loss: 1.0096
Epoch [2/4], Step [200/500] Loss: 1.0822
Epoch [2/4], Step [300/500] Loss: 1.2242
Epoch [2/4], Step [400/500] Loss: 0.8563
Epoch [2/4], Step [500/500] Loss: 0.8088
Epoch [3/4], Step [100/500] Loss: 0.7238
Epoch [3/4], Step [200/500] Loss: 0.7976
Epoch [3/4], Step [300/500] Loss: 0.7426
Epoch [3/4], Step [400/500] Loss: 0.9967
Epoch [3/4], Step [500/500] Loss: 0.8524
Epoch [4/4], Step [100/500] Loss: 0.7132
Epoch [4/4], Step [200/500] Loss: 0.8299
Epoch [4/4], Step [300/500] Loss: 0.9029
Epoch [4/4], Step [400/500] Loss: 0.6775
Epoch [4/4], Step [500/500] Loss: 0.8872


In [37]:
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Accuracy of the model on the test images: {} %'.format(100 * correct / total))

# Save the model checkpoint
torch.save(model.state_dict(), 'resnet.ckpt')

Accuracy of the model on the test images: 72.37 %
