In [None]:
import os
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision

from torch.utils.data import Dataset, DataLoader, BatchSampler, random_split
from torchvision import transforms
from PIL import Image

In [None]:
# Train only subclass classification for a given superclass
superclass_idx = 0 # 0 for dog, 1 for bird, 2 for reptile
superclass_test_img_dir = '' # fill in with path to directory containing only test images predicted to belong to superclass corresponding to superclass_idx

In [None]:
from torchvision import models
backbone = models.resnet18(weights='IMAGENET1K_V1')

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 56.2MB/s]


In [None]:
backbone

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [None]:
from google.colab import drive
drive.mount('/content/drive')
%cd drive/MyDrive/NNDL_Project

Mounted at /content/drive
/content/drive/MyDrive/NNDL_Project


In [None]:
%%capture
! unzip -o train_shuffle.zip

In [None]:
%%capture
! unzip -o test_shuffle.zip

In [None]:
# Create Dataset class for multilabel classification
class MultiClassImageDataset(Dataset):
    def __init__(self, ann_df, sub_map_df, img_dir, transform=None):
        self.ann_df = ann_df
        self.sub_map_df = sub_map_df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.ann_df)

    def __getitem__(self, idx):
        img_name = self.ann_df['image'][idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        sub_idx = self.ann_df['subclass_index'][idx]
        sub_label = self.sub_map_df['class'][sub_idx]

        if self.transform:
            image = self.transform(image)

        return image, sub_idx, sub_label

class MultiClassImageTestDataset(Dataset):
    def __init__(self, sub_map_df, img_dir, transform=None):
        self.sub_map_df = sub_map_df
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self): # Count files in img_dir
        return len([fname for fname in os.listdir(self.img_dir)])

    def __getitem__(self, idx):
        img_name = str(idx) + '.jpg'
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, img_name

In [None]:
train_ann_df = pd.read_csv('train_data.csv')
train_ann_df = train_ann_df[train_ann_df['superclass_index'] == superclass_idx]
train_ann_df = train_ann_df.reset_index(drop=True)

super_map_df = pd.read_csv('superclass_mapping.csv')
sub_map_df = pd.read_csv('subclass_mapping.csv')

train_img_dir = 'train_shuffle'
test_img_dir = 'test_shuffle'

image_preprocessing = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=(0), std=(1)),
])

# Create train and val split
train_dataset = MultiClassImageDataset(train_ann_df, sub_map_df, train_img_dir, transform=image_preprocessing)
train_dataset, val_dataset = random_split(train_dataset, [0.9, 0.1])

# Create test dataset
test_dataset = MultiClassImageTestDataset(sub_map_df, superclass_test_img_dir, transform=image_preprocessing)

# Create dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset,
                          batch_size=batch_size,
                          shuffle=True, drop_last = True)

val_loader = DataLoader(val_dataset,
                        batch_size=batch_size,
                        shuffle=True, drop_last = True)

test_loader = DataLoader(test_dataset,
                         batch_size=1,
                         shuffle=False)

In [None]:
# Simple CNN
class CNN(nn.Module):
    def __init__(self, backbone, finetune=False):
        super().__init__()

        self.backbone = backbone
        if not finetune:
          for param in self.backbone.parameters():
            param.requires_grad = False
        else:
          for i, layer in enumerate(backbone.children()):
            if i < 6: #just train last of the main convolutional layers
              for param in layer.parameters():
                param.requires_grad = False
            else:
              for param in layer.parameters():
                param.requires_grad = True


        num_ftrs = self.backbone.fc.in_features
        # Here the size of each output sample is set to 2.
        # Alternatively, it can be generalized to ``nn.Linear(num_ftrs, len(class_names))``.
        self.backbone.fc = nn.Linear(num_ftrs, 88)

    def forward(self, x):
        sub_out = self.backbone(x)
        return sub_out

class Trainer():
    def __init__(self, model, criterion, optimizer, train_loader, val_loader, test_loader=None, device='cuda'):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader
        self.device = device

    def train_epoch(self):
        running_loss = 0.0
        device = self.device
        for i, data in enumerate(self.train_loader):
            inputs, sub_labels = data[0].to(device), data[1].to(device)

            self.optimizer.zero_grad()
            sub_outputs = self.model(inputs)
            loss = self.criterion(sub_outputs, sub_labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f'Training loss: {running_loss/i:.3f}')

    def validate_epoch(self):
        sub_correct = 0
        total = 0
        running_loss = 0.0
        device = self.device
        with torch.no_grad():
            for i, data in enumerate(self.val_loader):
                inputs, sub_labels = data[0].to(device), data[1].to(device)

                sub_outputs = self.model(inputs)
                loss = self.criterion(sub_outputs, sub_labels)
                _, sub_predicted = torch.max(sub_outputs.data, 1)

                total += sub_labels.size(0)
                sub_correct += (sub_predicted == sub_labels).sum().item()
                running_loss += loss.item()

        print(f'Validation loss: {running_loss/i:.3f}')
        print(f'Validation subclass acc: {100 * sub_correct / total:.2f} %')

    def test(self, save_to_csv=False, return_predictions=False):
        if not self.test_loader:
            raise NotImplementedError('test_loader not specified')

        # Evaluate on test set, in this simple demo no special care is taken for novel/unseen classes
        test_predictions = {'image': [], 'superclass_index': [], 'subclass_index': []}
        with torch.no_grad():
            for i, data in enumerate(self.test_loader):
                inputs, img_name = data[0].to(device), data[1]

                sub_outputs = self.model(inputs)
                _, sub_predicted = torch.max(sub_outputs.data, 1)

                test_predictions['image'].append(img_name[0])
                test_predictions['subclass_index'].append(sub_predicted.item())

        test_predictions = pd.DataFrame(data=test_predictions)

        if save_to_csv:
            test_predictions.to_csv(f'test_predictions_transferlearning_subclass_superclass{superclass_idx}.csv', index=False)

        if return_predictions:
            return test_predictions

In [None]:
def fine_tune(backbone):
  for i, layer in enumerate(backbone.children()):
    if i < 6: #just train last of the main convolutional layers
      for param in layer.parameters():
        param.requires_grad = False
    else:
      for param in layer.parameters():
        param.requires_grad = True
  return

In [None]:
# Init model and trainer
device = 'cuda'
backbone = models.resnet18(weights='IMAGENET1K_V1')
finetune = False
model = CNN(backbone, finetune=finetune).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
trainer = Trainer(model, criterion, optimizer, train_loader, val_loader, test_loader)

In [None]:
# Training loop
print('Trainable parameters: ', sum(p.numel() for p in model.parameters() if p.requires_grad))
print('Total parameters: ', sum(p.numel() for p in model.parameters()))
print('----------------------')
print('')
for epoch in range(25):
    print(f'Epoch {epoch+1}')
    trainer.train_epoch()
    trainer.validate_epoch()
    print('')
    if epoch == 10:
      fine_tune(trainer.model.backbone)
      print('')
      print('Trainable parameters: ', sum(p.numel() for p in trainer.model.parameters() if p.requires_grad))
      print('Total parameters: ', sum(p.numel() for p in trainer.model.parameters()))
      print('----------------------')
      print('')
      trainer = Trainer(trainer.model, trainer.criterion, trainer.optimizer, trainer.train_loader, trainer.val_loader, trainer.test_loader)


print('Finished Training')

Trainable parameters:  45144
Total parameters:  11221656
Epoch 1
Training loss: 3.609
Validation loss: 6.007
Validation subclass acc: 21.09 %

Epoch 2
Training loss: 2.711
Validation loss: 5.155
Validation subclass acc: 29.69 %

Epoch 3
Training loss: 2.318
Validation loss: 4.814
Validation subclass acc: 29.69 %

Epoch 4
Training loss: 2.056
Validation loss: 4.421
Validation subclass acc: 39.06 %

Epoch 5
Training loss: 1.874
Validation loss: 4.424
Validation subclass acc: 32.81 %

Epoch 6
Training loss: 1.747
Validation loss: 4.117
Validation subclass acc: 38.28 %

Epoch 7
Training loss: 1.640
Validation loss: 4.167
Validation subclass acc: 39.84 %

Epoch 8
Training loss: 1.538
Validation loss: 4.012
Validation subclass acc: 39.84 %

Epoch 9
Training loss: 1.503
Validation loss: 3.997
Validation subclass acc: 45.31 %

Epoch 10
Training loss: 1.438
Validation loss: 4.204
Validation subclass acc: 35.16 %

Epoch 11
Training loss: 1.380
Validation loss: 4.044
Validation subclass acc: 40.6

In [None]:
trainer.model.eval()
trainer.test(save_to_csv=False, return_predictions=True)

'''
This simple baseline scores the following test accuracy

Superclass Accuracy
Overall: 43.83 %
Seen: 61.11 %
Unseen: 0.00 %

Subclass Accuracy
Overall: 2.03 %
Seen: 9.56 %
Unseen: 0.00 %
'''