**Import Python module**

In [None]:
import os
import zipfile
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [None]:
#Load data to Google Colab
local_zip = '/content/Data.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('/content')
zip_ref.close()
DATA_PATH = "Data"

**Data pre-processing(Augmentation + DataLoader)**

In [None]:
def get_count_metrics(folder, data_path=DATA_PATH):

    train_dir = os.path.join(data_path, folder)
    list_p = os.listdir(os.path.join(train_dir,'PNEUMONIA')) # dir is your directory path
    num_p = len(list_p)
    list_n = os.listdir(os.path.join(train_dir,'NORMAL')) # dir is your directory path
    num_n = len(list_n)
    list_c = os.listdir(os.path.join(train_dir,'COVID19')) # dir is your directory path
    num_c = len(list_c)
    count_tuple = (int(num_n), int(num_p), int(num_c))
    #count_tuple = (int(num_p), int(num_c))
    #raise NotImplementedError

    #return number_normal, number_pneumonia
    return count_tuple

def load_data(data_path=DATA_PATH):
    
    '''
    TODO: Implement this function to return the data loader for 
    train and validation dataset. Set batchsize to 32.
    
    You should add the following transforms (https://pytorch.org/docs/stable/torchvision/transforms.html):
        1. transforms.RandomResizedCrop: the images should be cropped to 224 x 224
        2. transforms.ToTensor: just to convert data/labels to tensors
    You should set the *shuffle* flag for *train_loader* to be True, and False for *val_loader*.
    
    HINT: Consider using `torchvision.datasets.ImageFolder`.
    '''

    import torchvision
    import torchvision.datasets as datasets
    import torchvision.transforms as transforms

    # your code here
    transform_dict = {
        'train': transforms.Compose(
        [transforms.Resize(224),
         transforms.RandomResizedCrop(224),
         transforms.RandomHorizontalFlip(p=0.5),
         transforms.RandomRotation(degrees=(-10, 10)),
         transforms.RandomVerticalFlip(p=0.5),
         transforms.GaussianBlur(kernel_size= (5,5),sigma=(0.1, 2.0)),
         transforms.ToTensor(),
         #transforms.ColorJitter(brightness = 0.5, contrast = 0.2, saturation = 0.2),
         #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
         #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
         ]),
        'test': transforms.Compose(
        [transforms.Resize(224),
         transforms.CenterCrop(224),
         transforms.RandomHorizontalFlip(p=0.5),
         transforms.RandomRotation(degrees=(-10, 10)),
         transforms.RandomVerticalFlip(p=0.5),
         transforms.GaussianBlur(kernel_size= (5,5),sigma=(0.1, 2.0)),
         transforms.ToTensor(),
         #transforms.ColorJitter(brightness = 0.5, contrast = 0.2, saturation = 0.2),
         #transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
         #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
         ])}
    
    train_data = datasets.ImageFolder(root=data_path + '/train', transform=transform_dict['train'])
    print(train_data.class_to_idx)
    train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
    #print(train_loader.class_to_idx)
    print('train_loader - len', len(train_loader))
    print('train_loader - type', type(train_loader))
    
    test_data = datasets.ImageFolder(root=data_path + '/test', transform=transform_dict['test'])
    test_loader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=False)
    print('test_loader - len', len(test_loader))
    print('test_loader - type', type(test_loader))
    #raise NotImplementedError
    
    return train_loader, test_loader



In [None]:
assert type(get_count_metrics('train')) is tuple
assert type(get_count_metrics('test')) is tuple
print(get_count_metrics('train'))
print(get_count_metrics('test'))

In [None]:
train_loader, val_loader = load_data()

**Explore processed image**

In [None]:
import torchvision
import matplotlib.pyplot as plt

#def imshow(img, title):
    #npimg = img.numpy()
    #plt.figure(figsize=(15, 7))
    #plt.axis('off')
    #plt.imshow(np.transpose(npimg, (1, 2, 0)))
    #plt.title(title)
    #plt.show()

def show_batch_images(dataloader):
    images, labels = next(iter(dataloader))
    grid = torchvision.utils.make_grid(images, padding=20)
    #print(labels)
    npgrid = grid.cpu().numpy()
    plt.figure(figsize=(30, 15))
    plt.imshow(np.transpose(npgrid, (1, 2, 0)), interpolation='nearest')
    #plt.title(label=["COVID19" if x==0  else ("NORMAL" if x == 1 else "PNEUMONIA") for x in labels])
    print(labels)
    plt.title(label=["COVID19" if x==0 else "non-COVID19" for x in labels])
    plt.show()

    #imshow(img, title=["COVID19" if x==0  else ("NORMAL" if x == 1 else "PNEUMONIA") for x in labels])
  
for i in range(1):
    show_batch_images(train_loader)

**Baseline ResNet18 model**

In [None]:
import torchvision
from torchvision import models
    
num_classes = 3
model_conv = torchvision.models.resnet18(pretrained=True)
for param in model_conv.parameters():
    param.requires_grad = False

# Parameters of newly constructed modules have requires_grad=True by default   
num_ftrs = model_conv.fc.in_features        
model_conv.fc = nn.Linear(num_ftrs, num_classes)
 
criterion = nn.CrossEntropyLoss()

optimizer_conv = torch.optim.SGD(model_conv.fc.parameters(), lr= 1e-4)  


In [None]:
#Load pretrained model and train on multiple epochs
n_epochs = 20

def train_model(model, train_dataloader, n_epoch=n_epochs, optimizer=optimizer_conv, criterion=criterion):
    import torch.optim as optim

    model.train() # prep model for training
    
    _START_RUNTIME = time.time()
    
    for epoch in range(n_epoch):
        print(f"Epoch {epoch} starts")
        curr_epoch_loss = []
        for data, target in train_dataloader:
            # your code here
            #print(data.shape)
            #inputs, labels = data
            
        # forward + backward + optimize
            
            outputs = model(data)
            
            loss = criterion(outputs, target)
            optimizer.zero_grad()
            loss.backward()
            # zero the parameter gradients
            optimizer.step()
            #raise NotImplementedError
            curr_epoch_loss.append(loss.cpu().data.numpy())
        print("Total train time = {:.2f} seconds".format(time.time() - _START_RUNTIME))    
        print(f"Epoch {epoch}: curr_epoch_loss={np.mean(curr_epoch_loss)}")
    return model

In [None]:
# get train and val data loader
train_loader, val_loader = load_data()
import time
seed = 24
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
#print(model)
model = train_model(model_conv, train_loader)

In [None]:
model.eval()
Y_pred = []
Y_test = []
predictions, actuals = list(), list()
for data, target in val_loader:
        # your code here
    Y_pred_orig = model(data)
        #print(len(Y_pred_tag))
    _, Y_pred_tag = torch.max(Y_pred_orig, dim = 1)
    Y_pred_tag = Y_pred_tag.detach().numpy()
    Y_pred_tag = Y_pred_tag.reshape(len(Y_pred_tag), 1)
    #Y_pred = Y_pred.round()
    Y_test = target.numpy()
    Y_test = Y_test.reshape(len(Y_test), 1)
    #Y_test = np.reshape(Y_test, (-1,2))
    predictions.append(Y_pred_tag)
    #print(predictions)
    actuals.append(Y_test)
    #print(actuals)


In [None]:
Y_pred = np.concatenate(predictions, axis=0)
Y_test = np.concatenate(actuals, axis=0)

In [None]:
#accuracy
from sklearn.metrics import accuracy_score
print(Y_pred)
#y_pred, y_true = eval_model(model, val_loader)
acc = accuracy_score(Y_test, Y_pred)
print(("Validation Accuracy of ResNet18: " + str(acc)))

**SIMPLE CNN MODEL**

In [None]:
class SimpleCNN_SAM(nn.Module):
    def __init__(self):
        super(SimpleCNN_SAM, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=4, stride=4)
        self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=4, stride=4)
        self.linear1 = nn.Linear(14 * 14 * 32, 128)
        self.relu3 = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)
        self.linear2 = nn.Linear(128, 3)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = x.reshape(x.size(0), -1)
        x = self.linear1(x)
        x = self.relu3(x)
        x = self.dropout(x)
        pred = self.linear2(x)

        return pred


In [None]:
#Load simple CNN model and train on multiple epochs
model_CNN = SimpleCNN_SAM()
n_epochs = 5
def train_CNN_model(model, train_dataloader, n_epoch=n_epochs, optimizer=optimizer_conv, criterion=criterion):
    import torch.optim as optim

    criterion = nn.CrossEntropyLoss()

    optimizer = torch.optim.SGD(model.parameters(), lr=1e-4)

    model.train() # prep model for training
    
    _START_RUNTIME = time.time()
    
    for epoch in range(n_epoch):
        print(f"Epoch {epoch} starts")
        curr_epoch_loss = []
        for data, target in train_dataloader:
            # your code here
            #print(data.shape)
            #inputs, labels = data
            
        # forward + backward + optimize
            
            outputs = model(data)
            
            loss = criterion(outputs, target)
            optimizer.zero_grad()
            loss.backward()
            # zero the parameter gradients
            optimizer.step()
            #raise NotImplementedError
            curr_epoch_loss.append(loss.cpu().data.numpy())
        print("Total train time = {:.2f} seconds".format(time.time() - _START_RUNTIME))    
        print(f"Epoch {epoch}: curr_epoch_loss={np.mean(curr_epoch_loss)}")
    return model

In [None]:
#print(model)
model_CNN = train_CNN_model(model_CNN, train_loader)

In [None]:
model.eval()
Y_pred = []
Y_test = []
predictions, actuals = list(), list()
for data, target in val_loader:
        # your code here
    Y_pred_orig = model(data)
        #print(len(Y_pred_tag))
    _, Y_pred_tag = torch.max(Y_pred_orig, dim = 1)
    Y_pred_tag = Y_pred_tag.detach().numpy()
    Y_pred_tag = Y_pred_tag.reshape(len(Y_pred_tag), 1)
    #Y_pred = Y_pred.round()
    Y_test = target.numpy()
    Y_test = Y_test.reshape(len(Y_test), 1)
    #Y_test = np.reshape(Y_test, (-1,2))
    predictions.append(Y_pred_tag)
    #print(predictions)
    actuals.append(Y_test)
    #print(actuals)

In [None]:
Y_pred = np.concatenate(predictions, axis=0)
Y_test = np.concatenate(actuals, axis=0)

In [None]:
from sklearn.metrics import accuracy_score
print(Y_pred)
#y_pred, y_true = eval_model(model, val_loader)
acc = accuracy_score(Y_test, Y_pred)
print(("Validation Accuracy of Simple CNN: " + str(acc)))