In [None]:
import os
from tqdm import tqdm
import numpy as np

import torch
import torchvision
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.hub import load_state_dict_from_url
from torch.utils.data.sampler import SubsetRandomSampler

In [None]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps")
    return torch.device("cpu")

In [None]:
DEVICE = get_default_device()
BATCH_SIZE = 32
LEARNING_RATE = 5e-3
NUM_CLASSES = 10
NUM_EPOCHS = 20
CLASSES = (
    "plane",
    "car",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
)

In [None]:
def data_loader(
    data_dir,
    batch_size,
    image_size=(224, 224),
    random_seed=47,
    valid_size=0.1,
    shuffle=True,
    test=False,
    download=True,
):
    data_transforms = transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010]
            ),
        ]
    )

    if test:
        dataset = datasets.CIFAR10(
            root=data_dir, train=False, transform=data_transforms, download=download
        )

        data_loader = DataLoader(
            dataset=dataset,
            batch_size=batch_size,
            shuffle=shuffle,
        )

        return data_loader

    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True, transform=data_transforms, download=download
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True, transform=data_transforms, download=download
    )

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_indices, valid_indices = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(indices=train_indices)
    valid_sampler = SubsetRandomSampler(indices=valid_indices)

    train_dataloader = DataLoader(
        dataset=train_dataset, batch_size=batch_size, sampler=train_sampler
    )

    valid_dataloader = DataLoader(
        dataset=valid_dataset, batch_size=batch_size, sampler=valid_sampler
    )

    return (train_dataloader, valid_dataloader)

In [None]:
train_dataloader, valid_dataloader = data_loader(
    data_dir="./data",
    batch_size=BATCH_SIZE,
)

test_dataloader = data_loader(
    data_dir="./data",
    batch_size=BATCH_SIZE,
    test=True,
)

In [None]:
# # this has been written as per the source code from pytorch

# class VGG(nn.Module):
#     def __init__(self, cfgs, num_classes=1000):
#         super(VGG, self).__init__()
#         self.features = self.make_layers(cfgs)
#         self.num_classes = num_classes
#         self.avgpool = nn.AdaptiveAvgPool2d(output_size=(7, 7))
#         self.classifier = nn.Sequential(
#             nn.Linear(in_features=7 * 7 * 512, out_features=4096),
#             nn.ReLU(inplace=True),
#             nn.Dropout(p=0.5),
#             nn.Linear(in_features=4096, out_features=4096),
#             nn.ReLU(inplace=True),
#             nn.Dropout(p=0.5),
#             nn.Linear(in_features=4096, out_features=self.num_classes),
#         )

#     def make_layers(self, cfg, batch_norm=False):
#         layers = []
#         in_channels = 3
#         for conf in cfg:
#             if conf == "M":
#                 layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
#             else:
#                 conv2d = nn.Conv2d(
#                     in_channels=in_channels,
#                     out_channels=conf,
#                     kernel_size=3,
#                     padding=1,
#                     stride=1,
#                 )
#                 if batch_norm:
#                     layers += [
#                         conv2d,
#                         nn.BatchNorm2d(num_features=conf),
#                         nn.ReLU(inplace=True),
#                     ]
#                 else:
#                     layers += [conv2d, nn.ReLU(inplace=True)]
#                 in_channels = conf
#         return nn.Sequential(*layers)

#     def forward(self, x):
#         x = self.features(x)
#         x = self.avgpool(x)
#         x = torch.flatten(input=x, start_dim=1)
#         x = self.classifier(x)
#         return x
    
# model_confs = {
#     'VGG16_url': 'https://download.pytorch.org/models/vgg16-397923af.pth',
#     'VGG16_confs': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']
# }

# model = VGG(cfgs=model_confs['VGG16_confs']).to(DEVICE)
# print(model)
# model_state_dict = load_state_dict_from_url(url=model_confs['VGG16_url'], progress=True)
# model.load_state_dict(model_state_dict)

In [None]:
model = torchvision.models.vgg16(weights=torchvision.models.VGG16_Weights.DEFAULT)
model

In [None]:
fine_tune_model = model
last_layer_in_features = fine_tune_model.classifier[6].in_features
fine_tune_model.classifier[6] = nn.Linear(in_features=last_layer_in_features, out_features=NUM_CLASSES)
fine_tune_model.to(DEVICE)

In [None]:
loss_fn = nn.CrossEntropyLoss()
fine_tune_optimizer = torch.optim.SGD(
    fine_tune_model.parameters(), lr=LEARNING_RATE, weight_decay=5e-3, momentum=0.9
)

In [None]:
def fit_model(num_epochs, model, criterion, optimizer):
    for epoch in range(num_epochs):
        for idx, (images, labels) in enumerate(tqdm(train_dataloader)):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f"Epoch: {epoch+1}/{num_epochs} | Loss: {loss.item()}")
        with torch.no_grad():
            correct = 0
            total = 0
            print('\nPerforming Validation...\n')
            for images, labels in tqdm(valid_dataloader):
                images = images.to(DEVICE)
                labels = labels.to(DEVICE)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                del images, labels, outputs
            print(f"Validation Accuracy: {(correct/total)*100}")
            print("%" * 30)

In [None]:
# this is pretraining i.e. training entire model from scratch but with pretrained weights

fit_model(
    num_epochs=NUM_EPOCHS,
    model=fine_tune_model,
    criterion=loss_fn,
    optimizer=fine_tune_optimizer,
)

In [None]:
model = torchvision.models.vgg16(weights=torchvision.models.VGG16_Weights.DEFAULT)
model

In [None]:
for parameter in model.parameters():
    parameter.requires_grad = False

In [None]:
feature_extraction_model = model
last_layer_in_features = feature_extraction_model.classifier[6].in_features
feature_extraction_model.classifier[6] = nn.Linear(in_features=last_layer_in_features, out_features=NUM_CLASSES)
feature_extraction_model.to(DEVICE)

In [None]:
params_to_update = []
for name, parameter in feature_extraction_model.named_parameters():
    if parameter.requires_grad:
        params_to_update.append(parameter)
        print(name)
print()
print(params_to_update)

In [None]:
loss_fn = nn.CrossEntropyLoss()
feature_extraction_optimizer = torch.optim.SGD(
   params_to_update, lr=LEARNING_RATE, weight_decay=5e-3, momentum=0.9 
)

In [None]:
# this is feature extraction i.e. training last layer of the model keeping pretrained weights as it is

fit_model(
    num_epochs=NUM_EPOCHS,
    model=feature_extraction_model,
    criterion=loss_fn,
    optimizer=feature_extraction_optimizer,
)

In [None]:
def test_model(model, criterion, optimizer):
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in tqdm(test_dataloader):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
        print(f"Testing Accuracy: {(correct/total)*100}")

In [None]:
test_model(
    model=feature_extraction_model,
    criterion=loss_fn,
    optimizer=feature_extraction_optimizer,
)