<a href="https://colab.research.google.com/github/sensharma/adlproject/blob/main/adlproj_transf_lrn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
!ls /content/gdrive/MyDrive/data

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).
apple_strawberry.jpg	dogs.png		   LibriSpeech
basket			FashionMNIST		   MNIST
cifar-10-batches-py	hymenoptera_data	   text_dataset_test
cifar-10-python.tar.gz	imagenet_class_index.json


In [None]:
import os
import time
from copy import deepcopy

import numpy as np
import matplotlib.pyplot as plt

import torch
from torch import nn, optim
from torch.utils.data import DataLoader

from torchvision import datasets, models, transforms

In [None]:
data_path = os.path.join(os.getcwd(), 'gdrive', 'MyDrive', 'data')
models_path = os.path.join(os.getcwd(), 'gdrive', 'MyDrive', 'colabdrive', 'adlproject', 'saved_models')
plots_path = os.path.join(os.getcwd(), 'gdrive', 'MyDrive', 'colabdrive', 'adlproject', 'plots', 'MNIST')

use_cuda = torch.cuda.is_available()
device = "cuda" if use_cuda else "cpu"

In [None]:
print(ten_resnet_model)

### Creating a ResNet18 for training on 10-class FashionMNIST
The structure of ResNet is printed below. The transfer learning setup can be done a few ways. Approach here:
- Change input Conv2d layer to accept 1-channel input for FashionMNIST
- Additional final layer, from 1000 (# out classes in ImageNet, on which it was trained) to 10 (# classes neede for FashionMNIST)
- There is no softmax in the model as it was trained with `nn.CrossEntropyLoss` that includes softmax (equivalent to `LogSoftMax` + `NLLLoss`). `CrossEntropyLoss` approach used here.
- Data needs resizing, because of shape ResNet takes - done in dataloader with `Resize`, which which uses interpolation

In [None]:
pret_resnet = models.resnet18(pretrained=True)
last_out_features = pret_resnet.fc.out_features
conv_struct = ten_resnet_model.pretrained.conv1
print(last_out_features, '\n', conv_struct)

1000 
 Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)


In [None]:
class TenResNet(nn.Module):
    def __init__(self, in_channels=1):
        super(TenResNet, self).__init__()
        self.model = models.resnet18(pretrained=True)
        # in original model
        # self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.model.conv1 = nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        out_features = self.model.fc.in_features
        self.model.fc = nn.Linear(out_features, 10)
    
    def forward(self, x):
        return self.model(x)

ten_resnet_model = TenResNet(in_channels=1)

Freezing weight update for all layers, except the two layers to learn
- first input 1-channel convolutional layer
- final output layer (10 classes)

In [None]:
for param in ten_resnet_model.parameters():
    param.requires_grad = False
for param in ten_resnet_model.model.conv1.parameters():
    param.requires_grad = True
for param in ten_resnet_model.model.fc.parameters():
    param.requires_grad = True
# for param in ten_resnet_model.parameters():
#     print(param.requires_grad)

Training and Eval functions

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        # print(output.shape, target.shape)
        # loss = F.nll_loss(output, target)
        criterion = nn.CrossEntropyLoss().to(device)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    return loss

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            criterion = nn.CrossEntropyLoss().to(device)
            test_loss += criterion(output, target).item() # sum up batch loss
            pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
    100. * correct / len(test_loader.dataset)))
    return correct

Dataloaders - with transforms to match ResNet size

In [None]:
fm_train_dataset = datasets.FashionMNIST(root=data_path,
                                         train=True,
                                         download=True,
                                         ).data.float()

data_transform = transforms.Compose([transforms.Resize((224, 224)),
                                     transforms.ToTensor(), 
                                     transforms.Normalize((fm_train_dataset.mean()/255), 
                                                          (fm_train_dataset.std()/255)),
                                     ])

f_mnist_train_loader = DataLoader(
    dataset=datasets.FashionMNIST(root=data_path,
                                  train=True,
                                  download=True,
                                  transform=data_transform,
                                  ),
    batch_size=128,
    shuffle=False,
    )

f_mnist_test_loader = DataLoader(
    dataset=datasets.FashionMNIST(root=data_path,
                                  train=False,
                                  download=True,
                                  transform=data_transform,
                                  ),
    batch_size=128,
    shuffle=False,
    )

### Train and save best model

Optimiser argument limited to those weights that require update to optimise computation.

In [None]:
num_epochs = 20
optimizer = optim.Adam(filter(lambda p: p.requires_grad, ten_resnet_model.parameters()), lr=3e-4)  #3e-4
# optimizer = optim.Adadelta(filter(lambda p: p.requires_grad, ten_resnet_model.parameters()))
# optimizer = optim.SGD(filter(lambda p: p.requires_grad, ten_resnet_model.parameters()), lr=0.01, momentum=0.8)

model = ten_resnet_model.to(device)

best = 0
# train and save best model (based on validation accuracy)
for epoch in range(1, num_epochs + 1):
    loss = train(model, device, f_mnist_train_loader, optimizer, epoch)
    correct = test(model, device, f_mnist_test_loader)
    if correct > best:
        best = correct
        torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'loss': loss,
        }, f'{models_path}/f_mnist_res_cpt_conv_add')


Test set: Average loss: 0.0101, Accuracy: 5681/10000 (57%)


Test set: Average loss: 0.0088, Accuracy: 6242/10000 (62%)


Test set: Average loss: 0.0078, Accuracy: 6637/10000 (66%)


Test set: Average loss: 0.0076, Accuracy: 6766/10000 (68%)


Test set: Average loss: 0.0073, Accuracy: 6867/10000 (69%)


Test set: Average loss: 0.0072, Accuracy: 6899/10000 (69%)


Test set: Average loss: 0.0072, Accuracy: 6892/10000 (69%)


Test set: Average loss: 0.0070, Accuracy: 6985/10000 (70%)


Test set: Average loss: 0.0069, Accuracy: 7003/10000 (70%)


Test set: Average loss: 0.0069, Accuracy: 7077/10000 (71%)


Test set: Average loss: 0.0067, Accuracy: 7095/10000 (71%)


Test set: Average loss: 0.0067, Accuracy: 7097/10000 (71%)


Test set: Average loss: 0.0065, Accuracy: 7167/10000 (72%)


Test set: Average loss: 0.0065, Accuracy: 7106/10000 (71%)


Test set: Average loss: 0.0065, Accuracy: 7152/10000 (72%)


Test set: Average loss: 0.0066, Accuracy: 7157/10000 (72%)


Test set: Average loss:

Predicting using one of the saved models

In [None]:
from torch.nn import functional as F

In [None]:
pt_model_file = f'{models_path}/f_mnist_res_cpt_conv_chng'
cpt = torch.load(pt_model_file, map_location=device)
print(cpt['epoch'])
pt_model = TenResNet(in_channels=1)
pt_model.load_state_dict(cpt['model_state_dict'])
pt_model.to(device)

pt_model.eval()
with torch.no_grad():
    pred = pt_model(batch[0][0:1].to(device))
    probs = torch.exp(F.log_softmax(pred, dim=1))
print(probs, torch.sum(probs))

18
tensor([[7.2196e-05, 4.3660e-07, 9.2756e-06, 3.4555e-04, 4.1377e-05, 3.9823e-03,
         2.5928e-05, 1.7179e-02, 3.0979e-03, 9.7525e-01]], device='cuda:0') tensor(1., device='cuda:0')
