Import libraries

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torchvision import transforms
from torch.utils.data.sampler import SubsetRandomSampler

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

DataLoader and Dataset

In [4]:
def data_loader(data_dir,batch_size,random_seed=42,valid_size=0.1,shuffle=True,test=False):
    """creates and returns training and validation data loaders for the CIFAR-10 dataset.
    input parameters:
      data_dir: The directory where the CIFAR-10 dataset is stored or will be downloaded.
      batch_size: The batch size to use for the data loaders.
      random_seed: The random seed used for shuffling the dataset indices.
      valid_size: The proportion of the training dataset to use for validation (default is 0.1, representing 10%).
      shuffle: A boolean flag indicating whether to shuffle the dataset indices before splitting into training and validation sets.

  """
    #define the normalization transformation using the mean and standard deviation values.
    normalize = transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],std=[0.2023, 0.1994, 0.2010])

    transform = transforms.Compose([transforms.Resize((224,224)),transforms.ToTensor(),normalize])

    if test:
        dataset = datasets.CIFAR10(root=data_dir, train=False,download=True, transform=transform)

        data_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)

        return data_loader

    # load the dataset
    train_dataset = datasets.CIFAR10(root=data_dir, train=True,download=True, transform=transform)

    valid_dataset = datasets.CIFAR10(root=data_dir, train=True,download=True, transform=transform)

    num_train = len(train_dataset)
    indices = list(range(num_train))#create a list of indices for shuffling
    split = int(np.floor(valid_size * num_train))#determine the split point for creating the validation dataset

    if shuffle:   #checks shuffling
        np.random.seed(42)
        np.random.shuffle(indices)

    train_idx, valid_idx = indices[split:], indices[:split]
    #create instances of SubsetRandomSampler using the training and validation indices
    train_sampler = SubsetRandomSampler(train_idx)
    valid_sampler = SubsetRandomSampler(valid_idx)

    #data loaders for the training and validation datasets using the respective samplers
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
    valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, sampler=valid_sampler)

    return (train_loader, valid_loader)

train_loader, valid_loader = data_loader(data_dir='./data',batch_size=64)
test_loader = data_loader(data_dir='./data',batch_size=64,test=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:12<00:00, 13171678.88it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified
Files already downloaded and verified


Architecture

In [5]:
class ResidualBlock(nn.Module): #the ResidualBlock class implements a residual block, which consists of two convolutional layers with batch normalization and a residual connection.
                                #This block is used to create deeper residual networks that can better capture complex patterns in images.

    def __init__(self, in_channels, out_channels, stride = 1, downsample = None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1),
                        nn.BatchNorm2d(out_channels),
                        nn.ReLU())   #First convolutional layer followed by batch normalization and ReLU activation.
        self.conv2 = nn.Sequential(nn.Conv2d(out_channels, out_channels, kernel_size = 3, stride = 1, padding = 1),
                        nn.BatchNorm2d(out_channels)) #Second convolutional layer followed by batch normalization.
        self.downsample = downsample #Downsample function to be applied on the input if needed.
        self.relu = nn.ReLU()
        self.out_channels = out_channels

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes = 10):
        super(ResNet, self).__init__()
        self.inplanes = 64 #Number of input channels for the first convolutional layer.
        self.conv1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3),
                        nn.BatchNorm2d(64),
                        nn.ReLU()) #First convolutional layer followed by batch normalization and ReLU activation.
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        self.layer0 = self._make_layer(block, 64, layers[0], stride = 1)
        self.layer1 = self._make_layer(block, 128, layers[1], stride = 2)
        self.layer2 = self._make_layer(block, 256, layers[2], stride = 2)
        self.layer3 = self._make_layer(block, 512, layers[3], stride = 2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        """helper function used in the ResNet class to create a layer with multiple residual blocks.


        block: The residual block class used in the layer.
        planes: Number of output channels for the residual blocks in the layer.
        blocks: Number of residual blocks in the layer.
        stride: Stride value for the first residual block in the layer. Default is 1.
        """
        downsample = None
        if stride != 1 or self.inplanes != planes:

            downsample = nn.Sequential(nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                         nn.BatchNorm2d(planes))

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample)) #adds the first residual block to the layers list.
                                                                        #It uses the provided block class and passes the appropriate arguments,
                                                                        #including the downsample module if downsampling is required.
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)


    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [7]:
num_classes = 10
num_epochs = 2
batch_size = 16
learning_rate = 0.01

model = ResNet(ResidualBlock, [3, 4, 6, 3]).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 0.001, momentum = 0.9)

# Train the model
total_step = len(train_loader)

In [8]:
import gc
total_step = len(train_loader)

for epoch in range(num_epochs): #iterates over the specified number of epochs.
    for i, (images, labels) in enumerate(train_loader):# iterates over the mini-batches of the training dataset. It retrieves images and labels from the train_loader.
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        del images, labels, outputs
        torch.cuda.empty_cache()
        gc.collect()

    print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, loss.item()))

    # Validation
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs

        print('Accuracy of the network on the {} validation images: {} %'.format(5000, 100 * correct / total))

Epoch [1/2], Loss: 1.5059
Accuracy of the network on the 5000 validation images: 59.92 %
Epoch [2/2], Loss: 0.5057
Accuracy of the network on the 5000 validation images: 73.84 %


In [9]:
with torch.no_grad(): # disable gradient computation
    correct = 0
    total = 0
    for images, labels in test_loader:
      images = images.to(device)
      labels = labels.to(device)
      outputs = model(images)
      _, predicted = torch.max(outputs.data, 1)
      total+=labels.size(0)
      correct += (predicted == labels).sum().item()
      del images, labels, outputs

    print('Accuracy of the network on {} test images:{} %'.format(10000,100*correct/total))



Accuracy of the network on 10000 test images:73.76 %
