In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torchvision
from torchvision import transforms, models
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

import os
import time
import pickle
import math
import itertools
import copy

from torch.utils.tensorboard import SummaryWriter

In [None]:
# Device config
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Random seed
seed = 2022
torch.random.manual_seed(seed)
np.random.seed(2022)

# Tensorboard writer
writer = SummaryWriter("runs/multilabelsclass")

# Data Pre-Processing and contruct CNN structure

# 1. Read Image & Transform

### 1.1 Loading data

In [None]:
# Data directory
data_dir = '../pascal_2007'
if not os.path.exists(data_dir):
    os.mkdir(data_dir)
    
# Save directory
save_dir = './saved'
if not os.path.exists(save_dir):
    os.mkdir(save_dir)

# Image directory
train_img_dir = os.path.join(data_dir, 'train')
test_img_dir = os.path.join(data_dir, 'test')

# Annotaion file
train_anno_file = os.path.join(data_dir, 'train.csv')
test_anno_file = os.path.join(data_dir, 'test.csv')

### 1.2 Train/Val/Test Split 

In [None]:
train_label = pd.read_csv(train_anno_file)

train_image_label_value = (train_label["is_valid"].values)
valid_image_label_value = (train_label["is_valid"].values)

In [None]:
# get all index of training image
train_image_label_index = [i for i, x in enumerate(train_image_label_value) if x]
valid_image_label_index = [i for i, x in enumerate(valid_image_label_value) if not x]
print("Number of training image: ", len(train_image_label_index))
print("Number of valid image: ", len(valid_image_label_index))

In [None]:
valid_image_label_index

In [None]:
## get all image file
image_file_name = train_label["fname"].values
image_file_name

In [None]:
# Read all image
total_train_image = [] 
for image in image_file_name:
    image = plt.imread(os.path.join(train_img_dir, image))
    total_train_image.append(image)

len(total_train_image)

In [None]:
train_image = [ total_train_image[i] for i in train_image_label_index]
val_image = [ total_train_image[i] for i in valid_image_label_index]

In [None]:
plt.imshow(total_train_image[0])
plt.show()

In [None]:
## Read all label for train and val data

labels = train_label["labels"].values

training_labels = labels[train_image_label_index]
validation_labels = labels[valid_image_label_index]

In [None]:
training_labels

In [None]:
## get all image file
test_label = pd.read_csv(test_anno_file)
test_image_file_name = test_label["fname"].values
test_image_file_name

In [None]:
## Test data

total_test_image = [] 
for image in test_image_file_name:
    image = plt.imread(os.path.join(test_img_dir, image))
    total_test_image.append(image)

test_anno_label = pd.read_csv(test_anno_file)
test_labels = test_anno_label["labels"].values

In [None]:
plt.imshow(total_test_image[0])
plt.show()

In [None]:
test_labels

In [None]:
def convert_label_to_array(labels):
    for ind, label in enumerate(labels):
        labels[ind] = label.split(" ")
    return labels        

In [None]:
training_labels = convert_label_to_array(training_labels)
validation_labels = convert_label_to_array(validation_labels)
test_labels = convert_label_to_array(test_labels)

In [None]:
test_labels

### 1.3 Dataset and DataLoader

In [None]:
# Get all labels
labels = []

for label in training_labels:
    labels = labels + label
    
labels = sorted(list(set(labels)))

print('Number of labels: ', len(labels))

In [None]:
labels

In [None]:
## Create custom dataset:

class ImageDataset(Dataset):
    # __init_function run once when instantiating the Dataset object
    def __init__(self, annotations, images, labellist, transforms=None):
        super().__init__()
        self.img = images
        self.annotations = annotations    
        self.labellist = labellist
        self.transforms = transforms
        
    # __len__ func return the num of samples in dataset 
    def __len__(self):
        return len(self.annotations)
    
    # Load and return a sample from the dataset at the given index `idx` then convert it to tensor 
    # retrieves the corresponding labels from the csv data, call the transform function on them (if applicable)
    # then return the tensor image and corresponding label.
    
    # convert label of and sample to index of these label in the labellist
    def text_to_index(self, labellist, annotations):
        index = []
        for item in annotations:
            index.append(self.labellist.index(item))
        return index
    
    def __getitem__(self, idx):
        
        # convert image to tensor and transform image
        image = self.img[idx]
        if self.transforms:
            image = self.transforms(image)
            
        # convert label to one hot encoding
        labels_index = self.text_to_index(self.labellist, self.annotations[idx])
        labels_index = torch.tensor(labels_index)
        labels_onehot = F.one_hot(labels_index, num_classes=len(self.labellist))
        onehot_label = labels_onehot.sum(dim=0).float()
        
        return image, onehot_label

In [None]:
F.one_hot(torch.tensor([2, 6]), num_classes=20).sum(dim=0).float()

In [None]:
for i in range(10):
    image = train_image[i]
    label = training_labels[i]
    
    plt.imshow(image)
    print(label)
    plt.show()

### 1.4 Transform image data

In [None]:
# Data transform and argumentation (data preprocessing)

mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])

data_transforms = {
    'train': transforms.Compose([
        transforms.ToTensor(),        
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.Normalize(mean, std)
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.Normalize(mean, std)
    ]),
    'test': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.Normalize(mean, std)
    ])
}

### 1.5 Final Image Dataset

In [None]:
# Final data
images_pascal = {
    'train' : ImageDataset(training_labels, train_image, labels, data_transforms['train']),
    'val': ImageDataset(validation_labels, val_image, labels, data_transforms['val']),
    'test': ImageDataset(test_labels, total_test_image, labels, data_transforms['test'])
}


In [None]:
# Plot image in Pytorch
image, label = (images_pascal['train'])[15]
image = image.permute(1, 2, 0)
plt.figure(figsize=(5, 5))
plt.imshow(image)


In [None]:
label

In [None]:
# DataLoader
dataloaders = {
    'train': DataLoader(images_pascal['train'], batch_size=16, shuffle=True),
    'val': DataLoader(images_pascal['val'], batch_size=16, shuffle=False),
    'test': DataLoader(images_pascal['test'], batch_size=16, shuffle=False)
}

## 2. CNN structure

- In this part I will be test 3 CNN structure include ResNet50, ResNet18,  as a backbone to extract feature of input image
- The last layer will be a fully connected layer with number of out-put node equal to the number of classes and using `Sigmoid activation`

## 2.1. ResNet18

## 2.2 ResNet50

In [None]:
## Load a pretrained model and reset the final fully connected layers
from torch.optim import lr_scheduler

## FIne tuning the convnet: Instead of random initialization, we initialize the network with a pretrained network

model = models.resnet50(pretrained=True)

class NeuralNetwork(nn.Module):
    def __init__(self, model, num_classes):
        super().__init__()
        self.model = model
        self.num_classes = num_classes
        
        #Finetuning the convnet
        in_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(in_ftrs, 128)
        self.fc_head = nn.Linear(128, self.num_classes)
        
    def forward(self, x):
        x = F.relu(self.model(x))
        x = torch.sigmoid(self.fc_head(x))
        return x

model = NeuralNetwork(model, num_classes = len(labels)).to(device)
# model = NeuralNetwork(model, num_classes = len(labels))
print(model)

In [None]:
# Test DataLoader
examples = iter(dataloaders['train'])
example_images, example_labellists = examples.next()
print('Batch image size: ', example_images.shape)
print('Batch label size: ', example_labellists.shape)

In [None]:
# Add model graph to Tensorboard
writer.add_graph(model, example_images.to(device))

In [None]:
## Loss function and Optimizer

criterion = nn.CrossEntropyLoss()
# Optimize parameter rely on paper Deep Residual Learning for Image Recognition
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
step_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=5, gamma=0.1)

# Training and evaluating model

In [None]:
dataset_sizes = {x: len(images_pascal[x]) for x in ['train', 'val']}
dataset_sizes

In [None]:
## Scheduling the learning rate
# Saving the best model

def train_model(model, dataloaders, labels, criterion, optimizer, scheduler, num_epochs, writer):
    since = time.time() # start time

    best_model_wts = copy.deepcopy(model.state_dict()) # copy best model
    best_acc = 0.0

    # loop throught datset
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            # Train info
            running_loss = torch.tensor(0.0).to(device)
            running_corrects = torch.zeros(len(labels)).to(device)
            running_total = torch.zeros(len(labels)).to(device)
            # Iterate over data.
            for inputs, labellist in dataloaders[phase]:
                inputs = inputs.to(device)
                labellist = labellist.to(device)

                # forward
                # track history if only in train
                # zero the parameter gradient
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    preds = outputs.round()
                    loss = criterion(outputs, labellist)

                    # backward + optimize only if in training phase
                    if phase == 'train':
#                         optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                # statistics the results
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labellist.data, dim=0)
                running_total += labellist.size(0)
                
            if phase == 'train':
                scheduler.step()
                
#             epoch_loss = running_loss / dataset_sizes[phase]
#             epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            epoch_loss = running_loss / running_total.sum()
            epoch_cls_acc = running_corrects.double() / running_total * 100
            epoch_acc = epoch_cls_acc.mean()

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            
            # Tensorboard
            writer.add_scalar("{}/loss".format(phase), epoch_loss, epoch)
            writer.add_scalar("{}/accuracy/average".format(phase), epoch_acc, epoch)
            
            
            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_ft = train_model(model, dataloaders, labels, criterion, optimizer_ft, step_lr_scheduler, 25, writer)

## Evaluate model

In [None]:
since = time.time()

# Convert to eval model
model.eval()

# Train info
running_loss = torch.tensor(0.0).to(device)
running_corrects = torch.zeros(len(labels)).to(device)
running_total = torch.zeros(len(labels)).to(device)


for images, labellists in dataloaders['test']:
    # To GPU
    images = images.to(device)
    labellists = labellists.to(device)
    
    with torch.no_grad():
        outputs = model(images)
        preds = outputs.round()
        loss = criterion(outputs, labellists)
        
    # Statistics
    running_loss += loss.item() * images.size(0)
    running_corrects += torch.sum(preds == labellists.data, dim=0)
    running_total += labellists.size(0)
    
loss = running_loss / running_total.sum()
cls_loss = running_corrects.double() / running_total * 100
acc = cls_loss.mean()

print('Finish testing after {:.2f}s'.format(time.time() - since))
print('Loss: {:.4f} - Acc: {:.2f}%'.format(loss, acc))