**Preparing CIFAR-10 Dataset** - Calling the Libraries

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import torch.nn as nn
import time

**Forming the Dataset**

Resize: Resizes the images to 32x32 pixels.

RandomHorizontalFlip: Randomly flips the images horizontally with a probability of 0.5.

RandomAffine: Applies random affine transformations to the images, including rotation, translation, and scaling.

ColorJitter: Applies random color jitter to the images, including brightness, contrast, saturation, and hue adjustments.

ToTensor: Converts the images to PyTorch tensors.
Normalize: Normalizes the tensor values by dividing by 255 (to scale the values to the range [0,1]) and then subtracting the mean and dividing by the standard deviation. The inplace=True argument modifies the tensors in place, which can save memory.







In [None]:
# Define the data augmentation and normalization transformations for the training dataset
transform_train = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    # Normalize the tensors by dividing by 255 (to scale the values to the range [0,1])
    # and then subtracting the mean and dividing by the standard deviation
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True),
])

# Define the normalization transformations for the test dataset
transform_test = transforms.Compose([
    # Resize the images to 32x32 pixels
    transforms.Resize((32, 32)), 
    # Convert the images to PyTorch tensors
    transforms.ToTensor(),
    # Normalize the tensors by subtracting the mean and dividing by the standard deviation
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

# Download and load the training dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)

# Download and load the test dataset
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=50, shuffle=False, num_workers=2)

**Visualizing a Sample** - We visualize a random subset of 10 images from the CIFAR-10 training dataset. The first step is to convert the images to a suitable format for display by reversing the normalization process and transposing them. Next, a grid of subplots is generated to present the images along with their associated labels. Each subplot displays the image and its corresponding label.

In [None]:
# Define the classes for CIFAR-10 dataset
classes = ('frog', 'cat', 'car', 'horse', 'truck', 'deer', 'bird', 'plane', 'ship', 'dog')

# Get a batch of images and labels from the trainloader
dataiter = iter(trainloader)
images, labels = dataiter.__next__()

# Randomly select 10 images from the batch
indices = np.random.choice(range(len(images)), size=10, replace=False)
images = images[indices]
labels = labels[indices]

# De-normalize the images to reverse the normalization applied during data preprocessing
images = (images * 0.5) + 0.5

# Transpose the images to be in the format (height, width, channels) for display
images = np.transpose(images, (0, 2, 3, 1))

# Create a grid of subplots with 2 rows and 5 columns to display 10 images
fig, axes = plt.subplots(nrows=2, ncols=5, figsize=(12, 6))

# Loop through each subplot and plot the corresponding image and label
for i, ax in enumerate(axes.flat):
    # Plot the image in the current subplot
    ax.imshow(images[i])
    
    # Get the label of the image
    label = classes[labels[i]]
    
    # Set the title of the subplot as the image label
    ax.set_title(label, fontsize=12)
    # Remove axis markings
    ax.axis('off')

# Show the plot with the grid of images
plt.tight_layout()
plt.show()


**Creating the Model** - This code defines a custom neural network block called "Block" that performs adaptive convolutions based on channel-wise weights. The block contains multiple convolutional layers, with weights calculated using a fully connected layer that takes input from an adaptive average pooling layer. The block also has a residual connection to enhance the learning of the network.

The benefits  are that it can adaptively adjust the weights of the convolutional layers based on the input data, allowing for more efficient and effective feature extraction. The residual connection also allows for better training of the network by helping to prevent vanishing gradients. This block can be used as a building block for constructing more complex neural network architectures.

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, k=3):
        super(Block, self).__init__()
        # Adaptive average pooling to reduce the spatial dimensions to 1x1
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        # Fully connected layer to obtain channel-wise weights for adaptive convolutions
        self.fc = nn.Linear(in_channels, k)
        
        # Convolutional layers for the block
        self.convs = nn.ModuleList()
        for _ in range(k):
            conv = nn.Conv2d(in_channels, out_channels, 3, padding=1)
            # Initialize the weights of the convolutional layer
            nn.init.kaiming_normal_(conv.weight, mode='fan_in', nonlinearity='relu')
            self.convs.append(conv)
            
        # Residual connection
        self.residual = nn.Identity()
        if in_channels != out_channels:
            self.residual = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0),
                nn.BatchNorm2d(out_channels)
            )
            # Initialize the weights of the residual connection
            nn.init.kaiming_normal_(self.residual[0].weight, mode='fan_in', nonlinearity='relu')

    def forward(self, x):
        b, c, _, _ = x.size()
        # Calculate the channel-wise weights using the fully connected layer
        a = self.fc(self.avg_pool(x).view(b, c))
        a = F.softmax(a, dim=1).view(b, -1, 1, 1)

        # Perform the adaptive convolutions with the calculated weights
        out = 0
        for i, conv in enumerate(self.convs):
            out += a[:, i:i + 1] * conv(x)
        
        # Add the residual connection to the output
        res = self.residual(x)
        out += res
        return out

**Model**
The code defines a neural network model called CIFAR10_NN for the CIFAR-10 dataset. The model consists of a backbone that is made up of a series of Block instances that perform adaptive convolutions based on channel-wise weights. Each Block contains multiple convolutional layers, and the weights are calculated using a fully connected layer with input from an adaptive average pooling layer. The model also has residual connections to enhance the learning of the network. The backbone is followed by a classifier that maps the output of the backbone to the final class scores.

The alternative approach could have been to use a pre-trained model and fine-tune it for the CIFAR-10 dataset. This approach would have been faster than training the model from scratch as the pre-trained model has already learned useful features on a large dataset. One popular pre-trained model for computer vision tasks is the ResNet model, which has shown to achieve state-of-the-art performance on various image classification tasks.

In [None]:
class CIFAR10_NN(nn.Module):
    def __init__(self):
        super(CIFAR10_NN, self).__init__()
        # Define the backbone of the model as a sequence of blocks and other layers
        self.backbone = nn.Sequential(
            Block(3, 64),                # Change the number of input channels from 3 to 32
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            Block(64, 64),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            Block(64, 128),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            Block(128, 128),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            Block(128, 128),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            Block(128, 256),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            Block(256, 256),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            Block(256, 256),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            Block(256, 512),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            Block(512, 512),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            Block(512, 512),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)
        )

        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),  # Reduce the spatial dimensions to 1x1
            nn.Flatten(),                 # Flatten the tensor into a 1D vector
            nn.Linear(512, 10)            # Fully connected layer to produce the final class scores
        )

    # Define the forward pass of the model
    def forward(self, x):
        x = self.backbone(x)          # Pass the input through the backbone
        x = self.classifier(x)        # Pass the output of the backbone through the classifier
        return x

In [None]:
# Create an instance of the CIFAR10Model class
model = CIFAR10_NN()

**Loss Function**
This code first sets the device to be used for computation to either GPU (if available) or CPU. Then, it defines a custom loss function called CustomLoss that modifies the standard cross-entropy loss by applying a custom modification. The modification involves raising (1 - e^-cross_entropy_loss) to the power of gamma, multiplying it with the alpha and then multiplying the product with the cross-entropy loss.

The forward method of the CustomLoss class takes two inputs - input and target - and computes the custom loss by first computing the cross-entropy loss using PyTorch's built-in F.cross_entropy() function. If the target tensor is one-hot encoded, it is converted to class labels. If the target tensor has an invalid number of dimensions, an error is raised. Finally, the custom loss is computed using the modification described above and returned.

This custom loss function can be used as a replacement for the standard cross-entropy loss function in training neural networks. The custom modification can potentially help the model converge faster or better by emphasizing hard-to-predict examples.

In [None]:
# Set the device to use (GPU if available, else CPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

class CustomLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2):
        super().__init__()
        self.alpha = alpha
        self.gamma = gamma
        
    def forward(self, input, target):
        # Check if the target tensor is one-hot encoded
        if target.ndim == input.ndim:
            target = torch.argmax(target, dim=1)
        elif target.ndim != input.ndim - 1:
            raise ValueError(f"Target tensor has invalid dimensions: expected {input.ndim - 1}, got {target.ndim}")
        
        # Compute the cross-entropy loss
        ce_loss = F.cross_entropy(input, target, reduction='mean')
        
        # Apply a custom modification to the cross-entropy loss
        custom_loss = self.alpha * (1 - torch.exp(-ce_loss)) ** self.gamma * ce_loss
        
        return custom_loss


**Optimizer and learning rate scheduler** are important components in training a neural network.

The optimizer specifies the algorithm to use for updating the weights of the neural network during training to minimize the loss function. In this code, the Adam optimizer is used with a learning rate of 0.001. Adam is a popular optimizer that combines the advantages of two other optimizers: AdaGrad and RMSProp.

The learning rate scheduler adjusts the learning rate during training to improve performance. In this code, a cosine annealing scheduler is used with a maximum number of epochs of 50 and a minimum learning rate of 1e-6. Cosine annealing is a technique that gradually reduces the learning rate over time in a cyclical manner, where the learning rate decreases from a maximum value to a minimum value and then increases again. This can help improve the model's ability to converge to a better solution by allowing it to explore the solution space more effectively.

In [None]:
# Create an instance of the focal loss function
criterion = CustomLoss()

# Define optimizer for the model with new hyperparameters
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Define learning rate scheduler for the optimizer with new hyperparameters
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50, eta_min=1e-6)

This code defines a function train_epoch that trains a PyTorch model for one epoch (i.e., one pass through the training data).

The function takes in the following arguments:

**model:** the PyTorch model to train(CFIR10_NN)

**dataloader:** a PyTorch DataLoader object containing the training data (images and labels)

**criterion:** the loss function to optimize during training

**optimizer:** the optimization algorithm to use during training

**device:** the device (CPU or GPU) to use for training

**accumulation_steps: **the number of mini-batches to accumulate gradients over before updating the model parameters

The function performs the following steps:

Sets the model to training mode using model.train().
1. Initializes variables for keeping track of the running loss, number of correct predictions, and total number of samples.
2. Resets the gradients using optimizer.zero_grad().
3. Loops through the training data (images and labels) using a for loop.
4. Moves the images and labels to the specified device using images.to(device) and labels.to(device).
5. Performs a forward pass through the model using outputs = model(images).
6. Calculates the loss between the predicted outputs and the true labels using the specified loss function criterion(outputs, labels).
7. Divides the loss by the specified accumulation_steps.
8. Performs backpropagation to compute gradients using loss.backward().
9. Accumulates the loss using running_loss += loss.item() * accumulation_steps.
10. Gets the predicted class labels using _, predicted = outputs.max(1).
11. Updates the total number of samples using total += labels.size(0).
Counts the number of correct predictions using correct += predicted.eq(labels).sum().item().
12. Updates the model parameters after accumulating gradients for accumulation_steps mini-batches using optimizer.step() and resets the gradients using optimizer.zero_grad().
13. Calculates the average loss and accuracy for the epoch using epoch_loss = running_loss / len(dataloader.dataset) and epoch_acc = correct / len(dataloader.dataset).
14.Returns the epoch loss and accuracy as a tuple.

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device, accumulation_steps=4):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct = 0
    total = 0
    
    optimizer.zero_grad()  # Reset gradients outside the loop
  
    # Loop through the training data (images and labels)
    for i, (images, labels) in enumerate(dataloader):
        images, labels = images.to(device), labels.to(device)  # Move images and labels to the device
        outputs = model(images)  # Forward pass through the model
        loss = criterion(outputs, labels)  # Calculate the loss
        loss /= accumulation_steps
        loss.backward()  # Perform backpropagation to compute gradients

        # Update model parameters after accumulating gradients for 'accumulation_steps' mini-batches
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()  # Update the model parameters
            optimizer.zero_grad()  # Reset the gradients

        running_loss += loss.item() * accumulation_steps  # Accumulate the loss
        _, predicted = outputs.max(1)  # Get the predicted class labels
        total += labels.size(0)  # Update the total number of samples
        correct += predicted.eq(labels).sum().item()  # Count the number of correct predictions

    # Update model parameters after accumulating gradients for the last mini-batch
    if i % accumulation_steps != accumulation_steps - 1:
        optimizer.step()
        optimizer.zero_grad()

    epoch_loss = running_loss / len(dataloader.dataset)  # Calculate the average loss for the epoch
    epoch_acc = correct / len(dataloader.dataset)  # Calculate the accuracy for the epoch

    return epoch_loss, epoch_acc

The **validate_epoch** function is used to evaluate the performance of a trained model on a validation set. The function sets the model to evaluation mode, computes the validation loss and accuracy for the given dataloader using the criterion provided. It returns the average validation loss and accuracy for the epoch. The function does not perform backpropagation or update the model parameters.

In [None]:
def validate_epoch(model, dataloader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)  # Move inputs and labels to the device

        with torch.no_grad():  # Disable gradient calculation
            outputs = model(inputs)  # Forward pass through the model
            loss = criterion(outputs, labels)  # Calculate the loss

        running_loss += loss.item()  # Accumulate the loss

        _, predicted = torch.max(outputs, dim=1)  # Get the predicted class labels
        total += labels.size(0)  # Update the total number of samples
        correct += (predicted == labels).sum().item()  # Count the number of correct predictions

    epoch_loss = running_loss / len(dataloader)  # Calculate the average loss for the epoch
    epoch_acc = correct / total  # Calculate the accuracy for the epoch

    return epoch_loss, epoch_acc

**Training the Model and Printing Results** - We train the model for a specified number of epochs and visualize the loss and accuracy of both training and validation.

In [None]:
# Training parameters
num_epochs = 35
print_every = 7

# Store the loss and accuracy history
train_loss_h = []
val_loss_h = []
train_acc_h = []
val_acc_h = []

start = time.time()
end = time.time()

# Loop through each epoch
for epoch in range(num_epochs):
    # Train the model for one epoch and calculate training loss and accuracy
    train_loss, train_acc = train_epoch(model, trainloader, criterion, optimizer, device)
    # Validate the model and calculate validation loss and accuracy
    val_loss, val_acc = validate_epoch(model, testloader, criterion, device)

    # Update the learning rate scheduler at every epoch
    scheduler.step()

    # Store the calculated losses and accuracies for the current epoch
    train_loss_h.append(train_loss)
    val_loss_h.append(val_loss)
    train_acc_h.append(train_acc)
    val_acc_h.append(val_acc)

    # Print the losses and accuracies for the current epoch if it's a multiple of 'print_every'
    if (epoch + 1) % print_every == 0:
        print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.4f}, Val Loss: {:.4f}, Val Acc: {:.4f}'.format(epoch+1, num_epochs, train_loss, train_acc, val_loss, val_acc))


print(f'Training finished in {(end - start):.2f} seconds.')



**Plotting Results**

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(12, 4))

axs[0].plot(train_loss_h, label='Training Loss')
axs[0].plot(val_loss_h, label='Validation Loss')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Loss')
axs[0].set_title('Loss Curves')
axs[0].legend()

axs[1].plot(train_acc_h, label='Training Accuracy')
axs[1].plot(val_acc_h, label='Validation Accuracy')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('Accuracy')
axs[1].set_title('Accuracy Curves')
axs[1].legend()

plt.show()

In [None]:
# Evaluate the model on the test set
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate the accuracy
accuracy = 100 * correct / total
print('Accuracy of the model on the test set: {:.2f}%'.format(accuracy))

**Validation Set Visualization** - In this section, we visualize a random subset of the validation set with predicted and actual labels. The purpose of this visualization is to give an intuitive understanding of the performance of our model on the validation set. The images are displayed along with their corresponding predicted and actual labels.

In [None]:
# Set the model to evaluation mode
model.eval()

# Create empty lists to store the predictions and actual labels
preds = []
targets = []

# Loop through the validation data and collect predictions and actual labels
with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        preds.append(predicted.cpu().numpy())
        targets.append(labels.cpu().numpy())

# Concatenate the lists of predictions and actual labels into numpy arrays
preds = np.concatenate(preds)
targets = np.concatenate(targets)

# Visualize a random subset of the validation set with predicted and actual labels
fig = plt.figure(figsize=(25, 8))
fig.suptitle("Random Subset of the Validation Set with Predicted and Actual Labels", fontsize=20)

for i in range(20):
    index = np.random.randint(0, len(preds))
    ax = fig.add_subplot(4, 10, i+1, xticks=[], yticks=[])
    ax.imshow(testset.data[index])
    ax.set_title(f"Predicted: {classes[preds[index]]}\nActual: {classes[targets[index]]}", color=("green" if preds[index]==targets[index] else "red"), fontsize=12)
    
plt.tight_layout()
plt.subplots_adjust(top=0.85)

plt.show()

**References: **


1. How adaptive pooling works in PyTorch https://stackoverflow.com/questions/53841509/how-does-adaptive-pooling-in-pytorch-work

2. GitHub repository with an example implementation of a custom model in PyTorch: https://github.com/ejcgt/attention-target-detection/blob/master/model.py

3. PyTorch forum discussion on adding layers after defining a model: https://discuss.pytorch.org/t/adding-layers-after-defining-the-model/141859

4. PyTorch forum discussion on the meaning of "return nn.Sequential(*layers)": https://discuss.pytorch.org/t/the-meaning-of-return-nn-sequential-layers/93070

5. Analytics Vidhya article on writing a custom loss function in TensorFlow: https://www.analyticsvidhya.com/blog/2022/09/dummies-guide-to-writing-a-custom-loss-function-in-tensorflow/

6. CNVrg article on creating custom loss functions in Keras: https://cnvrg.io/keras-custom-loss-functions/

7. https://pytorch.org/docs/stable/optim.html#torch.optim.Adam

8. PyTorch documentation on the Adam optimizer: https://machinelearningmastery.com/adam-optimization-algorithm-for-deep-learning/

9. Blog post on the Adam optimization algorithm for deep learning: https://pytorch.org/docs/stable/optim.html#torch.optim.lr_scheduler.CosineAnnealingLR

10. PyTorch documentation on the Cosine Annealing learning rate scheduler : https://towardsdatascience.com/understanding-learning-rate-schedules-and-adaptive-learning-rate-methods-in-deep-learning-7fc642831215

11. Blog post on understanding learning rate schedules and adaptive learning rate methods in deep learning: https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#training-the-model

12.  Optimizers: https://towardsdatascience.com/how-to-train-neural-network-faster-with-optimizer-buffers-7f58d19fc65e

13. Improving Accuracy https://medium.com/@stepanulyanin/improving-the-accuracy-of-your-deep-learning-model-using-accumulation-steps-5a3bd1f504db 

14. Kaggle competition on image classification: https://www.kaggle.com/c/cifar-10
This Kaggle competition provides a dataset for image classification, similar to the one used in the code. You can use this competition to test your skills in image classification and learn from the solutions provided by other participants.