In [None]:
!nvidia-smi

In [None]:
import torch 
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision 
from torchvision import datasets, transforms 
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
from pathlib import Path
import os

In [None]:
# device agnostic code

device = "cuda" if torch.cuda.is_available() else "cpu"

device

In [None]:
# getting dataset

# !wget http://cs231n.stanford.edu/tiny-imagenet-200.zip

# wget did not work for this kaggle notebook, hence i manually downloaded the dataset and uploaded it to kaggle

In [None]:
# loading train data

TRAINING_PATH = "tiny-imagenet-200/train"

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

training_data = datasets.ImageFolder(root = TRAINING_PATH, transform = transform, target_transform = None)

In [None]:
# loading validation data

VAL_PATH = "tiny-imagenet-200/val"


with open("tiny-imagenet-200/val/val_annotations.txt") as f:
    lines = f.readlines()
    
val_dict = {}

for line in lines:
    parts = line.strip().split('\t')
    val_dict[parts[0]] = parts[1]
    
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
    

val_data = datasets.ImageFolder(root = VAL_PATH, transform = transform, target_transform = None)

for i in range(len(val_data)):
    img_path, _ = val_data.imgs[i]
    img_name = os.path.basename(img_path)
    val_data.imgs[i] = (img_path, training_data.classes.index(val_dict[img_name]))

In [None]:
# data shapes

print(f"Length of training data = {len(training_data)}, Shape of Image = {training_data[0][0].shape}")
print(f"Length of validation data = {len(val_data)}, Shape of Image = {val_data[0][0].shape}")

In [None]:
# label mapping file
with open("tiny-imagenet-200/words.txt", 'r') as f:
    class_names = f.readlines()
    
# mapping between WordNet IDs to class names
class_dict = {}
for line in class_names:
    line = line.split('\t')
    class_dict[line[0]] = line[1].strip()
    
    
# visualise
torch.manual_seed(1234)
fig = plt.figure(figsize=(12, 12))
rows, cols = 3, 3
for i in range(rows * cols):
    rand_idx = torch.randint(0, len(training_data), size = [1]).item()
    image, target = training_data[rand_idx]
    fig.add_subplot(rows, cols, i + 1)
    plt.imshow(image.permute(1, 2, 0))
    plt.title(class_dict[training_data.classes[target]])
    plt.axis(False)

In [None]:
# dataloader

BATCH_SIZE = 16

training_dataloader = DataLoader(dataset = training_data, batch_size = BATCH_SIZE, shuffle = True)
val_dataloader = DataLoader(dataset = val_data, batch_size = BATCH_SIZE, shuffle = True)

training_images, training_targets = next(iter(training_dataloader))

print(f"Training images batch shape = {training_images.shape}")
print(f"Training targets batch shape = {training_targets.shape}")

In [None]:
# InceptionNet model based on Inception V1 module
# also known as the GoogLeNet


class GoogLeNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        self.conv_1 = conv_block(in_channels = 3, out_channels = 64, kernel_size = 7, stride = 2, padding = 3)
        self.maxpool_1 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.conv_2 = conv_block(in_channels = 64, out_channels = 192, kernel_size = 3, stride = 1, padding = 1)
        self.maxpool_2 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.inception_3a = inception_module(in_channels = 192, out_1x1 = 64, in_3x3 = 96, out_3x3 = 128, in_5x5 = 16, out_5x5 = 32, out_maxpool = 32)
        self.inception_3b = inception_module(in_channels = 256, out_1x1 = 128, in_3x3 = 128, out_3x3 = 192, in_5x5 = 32, out_5x5 = 96, out_maxpool = 64)
        self.maxpool_3 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.inception_4a = inception_module(in_channels = 480, out_1x1 = 192, in_3x3 = 96, out_3x3 = 208, in_5x5 = 16, out_5x5 = 48, out_maxpool = 64)
        self.inception_4b = inception_module(in_channels = 512, out_1x1 = 160, in_3x3 = 112, out_3x3 = 224, in_5x5 = 24, out_5x5 = 64, out_maxpool = 64)
        self.inception_4c = inception_module(in_channels = 512, out_1x1 = 128, in_3x3 = 128, out_3x3 = 256, in_5x5 = 24, out_5x5 = 64, out_maxpool = 64)
        self.inception_4d = inception_module(in_channels = 512, out_1x1 = 112, in_3x3 = 144, out_3x3 = 288, in_5x5 = 32, out_5x5 = 64, out_maxpool = 64)
        self.inception_4e = inception_module(in_channels = 528, out_1x1 = 256, in_3x3 = 160, out_3x3 = 320, in_5x5 = 32, out_5x5 = 128, out_maxpool = 128)
        self.maxpool_4 = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)

        self.inception_5a = inception_module(in_channels = 832, out_1x1 = 256, in_3x3 = 160, out_3x3 = 320, in_5x5 = 32, out_5x5 = 128, out_maxpool = 128)
        self.inception_5b = inception_module(in_channels = 832, out_1x1 = 384, in_3x3 = 192, out_3x3 = 384, in_5x5 = 48, out_5x5 = 128, out_maxpool = 128)
        self.avgpool_5 = nn.AvgPool2d(kernel_size = 7, stride = 1)

        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p = 0.4)
        self.fc = nn.Linear(in_features = 1024, out_features = num_classes)



        self.aux_1 = auxiliary_classifier(in_channels = 512, num_classes = num_classes)
        self.aux_2 = auxiliary_classifier(in_channels = 528, num_classes = num_classes)


    def forward(self, x):
        x = self.conv_1(x)
        x = self.maxpool_1(x)

        x = self.conv_2(x)
        x = self.maxpool_2(x)

        x = self.inception_3a(x)
        x = self.inception_3b(x)
        x = self.maxpool_3(x)

        x = self.inception_4a(x)

        if self.training:
            aux1 = self.aux_1(x)

        x = self.inception_4b(x)
        x = self.inception_4c(x)
        x = self.inception_4d(x)

        if self.training:
            aux2 = self.aux_2(x)

        x = self.inception_4e(x)
        x = self.maxpool_4(x)

        x = self.inception_5a(x)
        x = self.inception_5b(x)
        x = self.avgpool_5(x)

        x = self.flatten(x)
        x = self.dropout(x)
        x = self.fc(x)

        if self.training:
            return aux1, aux2, x
        else:
            return x



class inception_module(nn.Module):
    def __init__(self, in_channels, out_1x1, in_3x3, out_3x3, in_5x5, out_5x5, out_maxpool):
        super().__init__()

        self.branch_1 = conv_block(in_channels = in_channels, out_channels = out_1x1, kernel_size = 1, stride = 1, padding = 0)

        self.branch_2 = nn.Sequential(
            conv_block(in_channels = in_channels, out_channels = in_3x3, kernel_size = 1, stride = 1, padding = 0),
            conv_block(in_channels = in_3x3, out_channels = out_3x3, kernel_size = 3, stride = 1, padding = 1)
        )

        self.branch_3 = nn.Sequential(
            conv_block(in_channels = in_channels, out_channels = in_5x5, kernel_size = 1, stride = 1, padding = 0),
            conv_block(in_channels = in_5x5, out_channels = out_5x5, kernel_size = 5, stride = 1, padding = 2)
        )

        self.branch_4 = nn.Sequential(
            nn.MaxPool2d(kernel_size = 3, stride = 1, padding = 1),
            conv_block(in_channels = in_channels, out_channels = out_maxpool, kernel_size = 1, stride = 1, padding = 0)
        )


    def forward(self, x):
        return torch.cat(
            [self.branch_1(x), self.branch_2(x), self.branch_3(x), self.branch_4(x)], 1
        )



class auxiliary_classifier(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()

        self.avgpool = nn.AvgPool2d(kernel_size = 5, stride = 3)
        self.conv_1 = conv_block(in_channels = in_channels, out_channels = 128, kernel_size = 1, stride = 1, padding = 0)
        self.fc1 = nn.Linear(in_features = 4 * 4 * 128, out_features = 1024)
        self.fc2 = nn.Linear(in_features = 1024, out_features = num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p = 0.7)
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.avgpool(x)
        x = self.conv_1(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        return self.fc2(x)



class conv_block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super().__init__()

        self.conv = nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = kernel_size, stride = stride, padding = padding)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.relu(self.conv(x))  

In [None]:
# train and test metrics 

train_loss_values = []
val_loss_values = []
val_acc_values = []
epoch_count = []

In [None]:
# training loop

def model_train(epochs, model, train_dataloader, val_dataloader, loss_func, optimizer, scheduler):

    # turn on training mode
    model.train()

    #check training device
    print(f"Training on {device}.")

    # loop through each epoch
    for epoch in range(epochs):
        print(f"Epoch: {epoch + 1}/{epochs}\n-------------")

        # loop through each batch
        train_loss, train_acc = 0, 0
        total_steps = 1
        for images, classes in train_dataloader:

            #send data to device
            images, classes = images.to(device), classes.to(device)

            # computer forward pass
            aux1, aux2, out = model(images)

            # compute loss for main output
            loss_out = loss_func(out, classes)
            train_acc += accuracy_fn(y_true = classes, y_pred = out.argmax(dim=1))

            # compute loss for auxiliary classifiers
            loss_aux1 = loss_func(aux1, classes)
            loss_aux2 = loss_func(aux2, classes)

            # compute total loss
            loss = loss_out + 0.3 * (loss_aux1 + loss_aux2)
            train_loss += loss

            # update weights
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            batch_loss = train_loss / total_steps
            batch_acc = train_acc / total_steps

            if total_steps % 10 == 0:
                print(f"Training Loss: {batch_loss:.5f} - Training Accuracy: {batch_acc:.5f}%")

            total_steps += 1

        # learning rate decay
        scheduler.step()

        # performance on test set
        # turn on inference mode
        with torch.inference_mode():
            # loop through each batch
            total_val_loss, val_acc = 0, 0
            for val_images, val_classes in val_dataloader:
                # send data to device
                val_images, val_classes = val_images.to(device), val_classes.to(device)

                # forward pass
                y_val_pred = model(val_images)

                # compute loss
                val_loss = loss_func(y_val_pred, val_classes)
                total_val_loss += val_loss
                val_acc += accuracy_fn(y_true = val_classes, y_pred = y_val_pred.argmax(dim=1)
                )
            
            total_val_loss /= len(val_dataloader)
            val_acc /= len(val_dataloader)

        train_loss /= len(train_dataloader)
        train_acc /= len(train_dataloader)

        print(f"[After {epoch + 1} epochs: Train Loss: {train_loss:.5f} - Train Accuracy: {train_acc:.5f}% - Validation Loss: {total_val_loss:.5f} - Validation Accuracy: {val_acc:.5f}%]")

        
        train_loss_values.append(train_loss.item())
        val_loss_values.append(total_val_loss.item())
        val_acc_values.append(val_acc)
        epoch_count.append(epoch + 1)

In [None]:
# test loop

def model_test(model, dataloader, loss_func):
    # turn on test mode
    model.eval()
    
    # turn on inference mode
    with torch.inference_mode():
        # loop through each batch
        test_loss, test_acc = 0, 0
        for images, classes in dataloader:
            # send data to device
            images, classes = images.to(device), classes.to(device)

            # forward pass
            y_pred = model(images)

            # compute loss
            loss = loss_func(y_pred, classes)
            test_loss += loss
            test_acc += accuracy_fn(y_true = classes, y_pred = y_pred.argmax(dim=1)
            )
        
        test_loss /= len(dataloader)
        test_acc /= len(dataloader)
        print(f"Loss: {test_loss:.5f} - Accuracy: {test_acc:.5f}%")
        return test_acc

In [None]:
# metric functions

def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

In [None]:
# instantiating model

torch.manual_seed(1234)
googlenet = GoogLeNet(200).to(device)

In [None]:
# loss function and optimizer

LEARNING_RATE = 0.0001

loss_func = nn.CrossEntropyLoss()

sgd = torch.optim.SGD(params = googlenet.parameters(), lr = LEARNING_RATE, momentum = 0.9)

learning_decay = torch.optim.lr_scheduler.StepLR(optimizer = sgd, step_size = 8, gamma = 0.96)

In [None]:
# training the model

EPOCHS = 10


torch.manual_seed(1234)
model_train(epochs = EPOCHS, model = googlenet, train_dataloader = training_dataloader, val_dataloader = val_dataloader, loss_func = loss_func, optimizer = sgd, scheduler = learning_decay)

In [None]:
# evaluating the model

accuracy = model_test(model = googlenet, dataloader = val_dataloader, loss_func = loss_func)

In [None]:
# loss curve

plt.figure(figsize=(13, 7))
plt.plot(epoch_count, train_loss_values, label = "Train loss")
plt.plot(epoch_count, val_loss_values, label = "Validation loss")
plt.title("Loss curves")
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend()
plt.show()

In [None]:
# accuracy curve

plt.figure(figsize=(13, 7))
plt.plot(epoch_count, val_acc_values, label = "Accuracy")
plt.title("Accuracy curves")
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.legend()
plt.show()

In [None]:
# saving the model

MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents = True, exist_ok = True)

MODEL_NAME = "GoogLeNet_" + str(accuracy).replace(".", "_") + ".pth"

MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME

print(f"Saving GoogLeNet to {MODEL_SAVE_PATH}")
torch.save(obj = alexnet.state_dict(), f = MODEL_SAVE_PATH)