<pre>
<b>
COMP-6721 | 2021-Winter
Project | Part-1

Pravesh Gupta | 40152506
Vikramjeet Singh | 
Manjot Kaur Dherdi | 
</b>
</pre>

# 1. Data Loading

## 1.1. Installing required python modules

In [None]:
pip install torch

In [None]:
pip install torchvision

In [None]:
pip install pandas

In [None]:
pip install torchvision

In [None]:
pip install scikit-image

In [None]:
pip install numpy

In [None]:
pip install sklearn

In [None]:
pip install pillow

### 1.2. Dataset Description


Required Dataset Folder Heirarchy:
<pre>
{dataset_folder}
--images
----mask
-------{mask images}
----no_mask
-------{no mask images}
----not_person
-------{not person images}
</pre>

### 1.3. Defining Pytorch dataset Representation and data transaformers

In [None]:
from torchvision.datasets import VisionDataset
import pandas as pd
import os
from skimage import io as sk_io, transform as sk_transform
import numpy as np
from sklearn.utils import shuffle
from PIL import Image
    
class Rescale(object):
    def __init__(self, output_size, debug=False, export_path=None):
        assert isinstance(output_size, (int))
        self.output_size = output_size

    def __call__(self, img_data):
        img_arr = np.array(img_data)
        h, w, c = img_arr.shape
        isAlreadyScaled = (h==self.output_size and w==self.output_size)
        
        if not isAlreadyScaled:
            scale_factor = float(self.output_size)/img_arr.shape[0]
            img_arr = (sk_transform.rescale(img_arr, (scale_factor, scale_factor, 1))*255).astype(np.uint8)

            new_w = img_arr.shape[1]

            # Clipping or filling
            if new_w>self.output_size:
                mid = new_w//2
                new_w_start = mid-self.output_size//2
                new_w_end = mid+self.output_size//2

                if (new_w_end-new_w_start)<self.output_size:
                    new_w_end += (self.output_size-(new_w_end-new_w_start))
                elif (new_w_end-new_w_start)>self.output_size:
                    new_w_end -= ((new_w_end-new_w_start)-self.output_size)
                img_arr = img_arr[:, new_w_start:new_w_end]
            elif new_w<self.output_size:
                mid = new_w//2
                new_w_start = self.output_size//2-mid
                new_w_end = new_w_start+new_w
                filled_img_arr = np.zeros((self.output_size, self.output_size, img_arr.shape[2]), dtype=np.uint8)
                filled_img_arr[:, new_w_start:new_w_end] = img_arr[:, :]
                img_arr = filled_img_arr
        return Image.fromarray(img_arr)

### 1.4. Creating dataset and data loaders for train and test

In [None]:
def get_train_test_indices(dataset_targets):
    test_indices_map = {}
    for class_name in dataset.class_to_idx:
        label = dataset.class_to_idx[class_name]
        test_indices_map[label] = {'indices': np.array([], dtype=np.int32), 'count': 0}

    train_indices_map = {}
    for class_name in dataset.class_to_idx:
        label = dataset.class_to_idx[class_name]
        train_indices_map[label] = {'indices': np.array([], dtype=np.int32), 'count': 0}

    targets = np.array(dataset.targets, dtype=np.int32)
    target_indices = np.where(targets!=None)[0]
    np.random.shuffle(target_indices)

    for i in target_indices:
        label = dataset.targets[i]
        if test_indices_map[label]['count']<100:
            test_indices_map[label]['indices'] = np.append(test_indices_map[label]['indices'], i)
            test_indices_map[label]['count']+=1
        elif train_indices_map[label]['count']<350:
            train_indices_map[label]['indices'] = np.append(train_indices_map[label]['indices'], i)
            train_indices_map[label]['count']+=1

    test_indices = np.array([], dtype=np.int32)
    train_indices = np.array([], dtype=np.int32)

    for class_name in dataset.class_to_idx:
        label = dataset.class_to_idx[class_name]
        label_test_indices = test_indices_map[label]['indices']
        test_indices = np.append(test_indices, label_test_indices)

        label_train_indices = train_indices_map[label]['indices']
        train_indices = np.append(train_indices, label_train_indices)
    return train_indices, test_indices

In [None]:
from torchvision.transforms import ToTensor, Compose, Normalize
import torch
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import ImageFolder

torch.manual_seed(6721)

rescaled_size=512

data_dir = './data/'

data_transform = Compose([
    Rescale(rescaled_size)
    , ToTensor()
])

dataset = ImageFolder(
    root=data_dir
    ,transform=data_transform
)
print(f'Dataset: size: {len(dataset)}, class labels: {dataset.class_to_idx}')

train_indices, test_indices = get_train_test_indices(dataset.targets)
selected_indices = np.concatenate((train_indices, test_indices))

selected_dataset = Subset(dataset, selected_indices)
print(f'Selected dataset: size: {len(selected_dataset)}')
selected_dataloader = DataLoader(selected_dataset, batch_size=8, shuffle=False)

means = torch.tensor([])
stds = torch.tensor([])
for i, (data, labels) in enumerate(selected_dataloader):
    batch_mean = torch.mean(data, axis=(0, 2, 3))
    batch_std = torch.std(data, axis=(0, 2, 3))
    means = torch.cat((means, batch_mean.unsqueeze(0)))
    stds = torch.cat((stds, batch_std.unsqueeze(0)))
mean = torch.mean(means, axis=0)
std = torch.mean(stds, axis=0)
print(f'Mean: {mean}')
print(f'Std: {std}')

In [None]:
torch.manual_seed(6721)

from torchvision.transforms import ToTensor, Compose, Normalize
import torch
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import ImageFolder

data_transform = Compose([
    Rescale(rescaled_size)
    , ToTensor()
    , Normalize(mean=mean, std=std)
])

dataset = ImageFolder(
    root=data_dir
    ,transform=data_transform
)

train_dataset = Subset(dataset, train_indices)
print(f'Train dataset: size: {len(train_dataset)}')

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
for data, labels in train_dataloader:
    print(f'Train X | y batch shapes : {data.shape, labels.shape}')
    break

test_dataset = Subset(dataset, test_indices)
print(f'Test dataset: size: {len(test_dataset)}')

test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)
for data, labels in test_dataloader:
    print(f'Test X | y batch shapes : {data.shape, labels.shape}')
    break

# 2. Data Modelling

In [None]:
from torch.nn import Module, Conv2d, MaxPool2d, Linear, ReLU

class ProjectModel(Module):
    def __init__(self, num_classes=3):
        super(ProjectModel, self).__init__()
        self.module = torch.nn.Sequential(
            torch.nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=2)
            , torch.nn.BatchNorm2d(32)
            , torch.nn.LeakyReLU(inplace=True)
            
            , torch.nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=0)
            , torch.nn.BatchNorm2d(32)
            , torch.nn.LeakyReLU(inplace=True)
            
            , torch.nn.MaxPool2d(kernel_size=2, stride=2)
            
            , torch.nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0)
            , torch.nn.BatchNorm2d(64)
            , torch.nn.LeakyReLU(inplace=True)
            
            , torch.nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=0)
            , torch.nn.BatchNorm2d(64)
            , torch.nn.LeakyReLU(inplace=True)
            
            , torch.nn.MaxPool2d(kernel_size=2, stride=2)
            
            , torch.nn.Flatten()
            , torch.nn.Linear(64*126*126, num_classes)
        )
        
    def forward(self, X):
        return self.module(X)

### 2.1. Integrity Test

In [None]:
torch.manual_seed(6721)

from sklearn import metrics

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

net = ProjectModel()

# net = net.to(device)

# Model Unit test
total_labels = 0
correct_labels = 0
for data, labels in train_dataloader:
#     torch.cuda.empty_cache()
#     data, labels = data.to(device), labels.to(device)
    outputs = net(data)
    print(outputs)
    y_pred = torch.argmax(outputs, dim=1)
    print(y_pred)
    total_labels+=labels.size(0)
    correct_labels += (y_pred==labels).sum().item()
    break

print(f'Accuracy: {(correct_labels/total_labels)*100}')
print('Model test passed.')

### 2.2. Training

In [None]:
torch.manual_seed(6721)

from torch.optim import SGD
from torch.nn import CrossEntropyLoss
import time

num_epoch = 5
lr = 0.001
momentum = 0.5
net = ProjectModel()

loss_evaluater = CrossEntropyLoss()
optimizer = SGD(net.parameters(), lr=lr, momentum=momentum)

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = torch.device('cpu')
# net = net.to(device)

net.train()

t = time.time()
for i in range(num_epoch):
    print(f'Epoch: {i+1}')
    t_epoch = time.time()
    total_labels = 0
    correctly_pred = 0
    t_batch = time.time()
    for j, (data, labels) in enumerate(train_dataloader):
#         torch.cuda.empty_cache()
#         data, labels = data.to(device), labels.to(device)
        X, y = data, labels
        optimizer.zero_grad()
        output = net(X)
        loss = loss_evaluater(output, y)
        loss.backward()
        optimizer.step()
        y_pred = torch.argmax(output, dim=1)
        total_labels += y.size(0)
        correctly_pred += (y_pred==y).sum().item()
        current_acc = (correctly_pred/total_labels)
        if j%5==0:
            seconds_passed = time.time()-t_batch
            print(f'After {j} batches: [time:{seconds_passed//60}m {seconds_passed%60}s, Accuracy: {current_acc*100}]')
    epoch_acc = (correctly_pred/total_labels)
    seconds_passed = time.time()-t_epoch
    print(f'Epoch {i+1}: [time:{seconds_passed//60}m {seconds_passed%60}s, Accuracy: {epoch_acc*100}]')
    print()
seconds_passed = time.time()-t
print(f'Training time: {seconds_passed//60}m {seconds_passed%60}s')

### 2.3. Training Evaluation

In [None]:
torch.manual_seed(6721)

t = time.time()
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')

with torch.no_grad():
    total_labels = 0
    correctly_pred = 0
    t_batch = time.time()
    for i, (data, labels) in train_dataloader:
#         data,labels = data.to(device), labels.to(device)
        X, y = data, labels
        output = net(X)
        y_pred = torch.argmax(output, dim=1)
        total_labels += y.size(0)
        correctly_pred += (y_pred==y).sum().item()
        current_acc = (correctly_pred/total_labels)
        if i%5==0:
            seconds_passed = time.time()-t_batch
            print(f'After {j} batches: [time:{seconds_passed//60}m {seconds_passed%60}s, Accuracy: {current_acc*100}]')
    train_acc = (correctly_pred/total_labels)
    print(f'Training accuracy: {train_acc*100}')
seconds_passed = time.time()-t

print(f'Trainig evaluation time: {seconds_passed//60}m {seconds_passed%60}s')

### 2.4. Testing

In [None]:
torch.manual_seed(6721)

net.eval()

t = time.time()
# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')

total_labels = 0
correctly_pred = 0
t_batch = time.time()
for i, (data, labels) in enumerate(test_dataloader):
#     torch.cuda.empty_cache()
#     data, labels = data.to(device), labels.to(device)
    X, y = data, labels
    output = net(X)
    y_pred = torch.argmax(output, dim=1)
    total_labels += y.size(0)
    correctly_pred += (y_pred==y).sum().item()
    current_acc = (correctly_pred/total_labels)
    if i%5==0:
        seconds_passed = time.time()-t_batch
        print(f'After {j} batches: [time:{seconds_passed//60}m {seconds_passed%60}s, Accuracy: {current_acc*100}]')

test_acc = (correctly_pred/total_labels)
print(f'Test accuracy: {test_acc*100}')
seconds_passed = time.time()-t
print(f'Test time: {seconds_passed//60}m {seconds_passed%60}s')

### 2.5. Saving pretrained Model

In [None]:
torch.manual_seed(6721)

file_path = './net.pt'

torch.save(net.state_dict(), file_path)

state = {
    'epoch': num_epoch,
    'state_dict': model.state_dict(),
    'optimizer': optimizer.state_dict(),
    'lr': lr,
    'momentum': momentum
}

### 2.6. Loading Pretrained Model

In [None]:
torch.manual_seed(6721)

state = torch.load(file_path)

net = ProjectModel()
loss_evaluater = CrossEntropyLoss()
optimizer = SGD(net.parameters(), lr=lr, momentum=momentum)

num_epoch = state['epoch']
lr = state['lr']
momentum = state['momentum']
net.load_state_dict(state['state_dict'])
optimizer.load_state_dict(state['optimizer'])

# 3. Model Evaluation

### 3.1. Accuracy

In [None]:
print(f'Accuracy for the model is {test_acc*100} %')

### 3.2 Precision

In [None]:
print(f'Precision for the model is {test_precision*100} %')

### 3.3 Recall

In [None]:
print(f'Recall for the model is {test_recall*100} %')

### 3.4 F1 Score

In [None]:
print(f'F1 score for the model is {test_f1*100} %')