In [None]:
import os
import numpy as np
from tqdm import tqdm

import torch
import torchvision
import torch.nn as nn
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SubsetRandomSampler

In [None]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    return torch.device('cpu')

In [None]:
DEVICE = get_default_device()
BATCH_SIZE = 32
LEARNING_RATE = 5e-3
NUM_CLASSES = 10
NUM_EPOCHS = 20
CLASSES = (
    "plane",
    "car",
    "bird",
    "cat",
    "deer",
    "dog",
    "frog",
    "horse",
    "ship",
    "truck",
)

In [None]:
def data_loader(
    data_dir,
    batch_size,
    image_size=(224, 224),
    random_seed=47,
    valid_size=0.1,
    shuffle=True,
    test=False,
    download=True,
):
    data_transforms = transforms.Compose(
        [
            transforms.Resize(image_size),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010]
            ),
        ]
    )

    if test:
        dataset = datasets.CIFAR10(
            root=data_dir, train=False, transform=data_transforms, download=download
        )

        data_loader = DataLoader(
            dataset=dataset,
            batch_size=batch_size,
            shuffle=shuffle,
        )

        return data_loader

    train_dataset = datasets.CIFAR10(
        root=data_dir, train=True, transform=data_transforms, download=download
    )

    valid_dataset = datasets.CIFAR10(
        root=data_dir, train=True, transform=data_transforms, download=download
    )

    num_train = len(train_dataset)
    indices = list(range(num_train))
    split = int(np.floor(valid_size * num_train))

    if shuffle:
        np.random.seed(random_seed)
        np.random.shuffle(indices)

    train_indices, valid_indices = indices[split:], indices[:split]
    train_sampler = SubsetRandomSampler(indices=train_indices)
    valid_sampler = SubsetRandomSampler(indices=valid_indices)

    train_dataloader = DataLoader(
        dataset=train_dataset, batch_size=batch_size, sampler=train_sampler
    )

    valid_dataloader = DataLoader(
        dataset=valid_dataset, batch_size=batch_size, sampler=valid_sampler
    )

    return (train_dataloader, valid_dataloader)

In [None]:
train_dataloader, valid_dataloader = data_loader(
    data_dir="./data",
    batch_size=BATCH_SIZE,
)

test_dataloader = data_loader(
    data_dir="./data",
    batch_size=BATCH_SIZE,
    test=True,
)

In [None]:
# # implementation from scratch

# class ResidualBlock(nn.Module):
#     def __init__(
#         self,
#         in_channels,
#         out_channels,
#         stride=1,
#         kernel_size=(3, 3),
#         padding=1,
#         downsample=None,
#     ):
#         super(ResidualBlock, self).__init__()
#         self.conv1 = nn.Conv2d(
#             in_channels=in_channels,
#             out_channels=out_channels,
#             kernel_size=kernel_size,
#             stride=stride,
#             padding=padding,
#         )
#         self.bn1 = nn.BatchNorm2d(num_features=out_channels)
#         self.relu = nn.ReLU(inplace=True)
#         self.conv2 = nn.Conv2d(
#             in_channels=out_channels,
#             out_channels=out_channels,
#             kernel_size=kernel_size,
#             stride=stride,
#             padding=padding,
#         )
#         self.bn2 = nn.BatchNorm2d(num_features=out_channels)
#         self.downsample = downsample
#         self.stride = stride

#     def forward(self, x):
#         residual = x
#         out = self.relu(self.bn1(self.conv1(X)))
#         out = self.bn2(self.conv2(out))
#         if self.downsample:
#             residual = self.downsample(x)
#         out += residual
#         out = self.relu(out)
#         return out
    

# class ResNet(nn.Module):
#     def __init__(self, block_obj, layers, in_channels=3, num_classes=1000):
#         super(ResNet, self).__init__()
#         self.inplanes = 64
#         self.conv1 = nn.Conv2d(
#             in_channels=in_channels,
#             out_channels=64,
#             kernel_size=7,
#             stride=2,
#             padding=3,
#             bias=False,
#         )
#         self.bn = nn.BatchNorm2d(num_features=64)
#         self.relu = nn.ReLU(inplace=True)
#         self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

#         self.layer0 = self.make_layers(
#             block_obj=block_obj, planes=64, num_blocks=layers[0]
#         )
#         self.layer1 = self.make_layers(
#             block_obj=block_obj,
#             planes=128,
#             num_blocks=layers[1],
#             stride=2,
#         )
#         self.layer2 = self.make_layers(
#             block_obj=block_obj,
#             planes=256,
#             num_blocks=layers[2],
#             stride=2,
#         )
#         self.layer3 = self.make_layers(
#             block_obj=block_obj,
#             planes=512,
#             num_blocks=layers[3],
#             stride=2,
#         )

#         self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
#         self.fc = nn.Linear(in_features=512, out_features=num_classes)

#     def make_layers(self, block_obj, planes, num_blocks, stride=1):
#         downsample = None
#         if self.inplanes != planes or stride != 1:
#             downsample = nn.Sequential(
#                 nn.Conv2d(
#                     in_channels=self.inplanes,
#                     out_channels=planes,
#                     kernel_size=1,
#                     stride=stride,
#                     bias=False,
#                 ),
#                 nn.BatchNorm2d(num_features=planes),
#             )
#         layers = []
#         layers.append(block_obj(self.inplanes, planes, stride, downsample=downsample))
#         self.inplanes = planes
#         for _ in range(1, num_blocks):
#             layers.append(block_obj(self.inplanes, planes))
#         return nn.Sequential(*layers)

#     def forward(self, x):
#         x = self.maxpool(self.relu(self.bn(self.conv1(x))))
#         x = self.layer0(x)
#         x = self.layer1(x)
#         x = self.layer2(x)
#         x = self.layer3(x)
#         x = self.avgpool(x)
#         x = torch.flatten(input=x, start_dim=1)
#         x = seff.fc(x)
#         return x
    
# # resnet34 implementation
# layers = [3, 4, 6, 3]
# model = ResNet(ResidualBlock, layers)
# model

In [None]:
model = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.DEFAULT, progress=True)
model

In [None]:
fine_tune_model = model
last_layer_in_features = fine_tune_model.fc.in_features
fine_tune_model.fc = nn.Linear(in_features=last_layer_in_features, out_features=NUM_CLASSES)
fine_tune_model.to(DEVICE)

In [None]:
loss_fn = nn.CrossEntropyLoss()
fine_tune_optimizer = torch.optim.SGD(
    fine_tune_model.parameters(), lr=LEARNING_RATE, weight_decay=5e-3, momentum=0.9
)

In [None]:
def fit_model(num_epochs, model, criterion, optimizer):
    for epoch in range(num_epochs):
        for idx, (images, labels) in enumerate(tqdm(train_dataloader)):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        print(f"Epoch: {epoch+1}/{num_epochs} | Loss: {loss.item()}")
        with torch.no_grad():
            correct = 0
            total = 0
            print('\nPerforming Validation...\n')
            for images, labels in tqdm(valid_dataloader):
                images = images.to(DEVICE)
                labels = labels.to(DEVICE)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                del images, labels, outputs
            print(f"Validation Accuracy: {(correct/total)*100}")
            print("%" * 30)

In [None]:
# this is fine tuning i.e. training entire model from scratch but with pretrained weights

fit_model(
    num_epochs=NUM_EPOCHS,
    model=fine_tune_model,
    criterion=loss_fn,
    optimizer=fine_tune_optimizer,
)

In [None]:
model = torchvision.models.resnet34(weights=torchvision.models.ResNet34_Weights.DEFAULT, progress=True)
model

In [None]:
for parameter in model.parameters():
    parameter.requires_grad = False

In [None]:
feature_extraction_model = model
last_layer_in_features = feature_extraction_model.fc.in_features
feature_extraction_model.fc = nn.Linear(in_features=last_layer_in_features, out_features=NUM_CLASSES)
feature_extraction_model.to(DEVICE)

In [None]:
params_to_update = []
for name, parameter in feature_extraction_model.named_parameters():
    if parameter.requires_grad:
        params_to_update.append(parameter)
        print(name)
print()
print(params_to_update)

In [None]:
loss_fn = nn.CrossEntropyLoss()
feature_extraction_optimizer = torch.optim.SGD(
   params_to_update, lr=LEARNING_RATE, weight_decay=5e-3, momentum=0.9 
)

In [None]:
# this is feature extraction i.e. training last layer of the model keeping pretrained weights as it is

fit_model(
    num_epochs=NUM_EPOCHS,
    model=feature_extraction_model,
    criterion=loss_fn,
    optimizer=feature_extraction_optimizer,
)

In [None]:
def test_model(model, criterion, optimizer):
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in tqdm(test_dataloader):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            del images, labels, outputs
        print(f"Testing Accuracy: {(correct/total)*100}")

In [None]:
test_model(
    model=feature_extraction_model,
    criterion=loss_fn,
    optimizer=feature_extraction_optimizer,
)