<a href="https://colab.research.google.com/github/willychangx/covid-ml/blob/main/eg.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!unzip "/content/drive/MyDrive/Files/Others.zip" -d "/content/drive/MyDrive/Files"
!unzip "/content/drive/MyDrive/Files/Covid.zip" -d "/content/drive/MyDrive/Files"
!unzip "/content/drive/MyDrive/Files/Healthy.zip" -d "/content/drive/MyDrive/Files"

In [None]:
%cd /content/drive/MyDrive/Files/

In [10]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm # Displays a progress bar

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torch.utils.data import Dataset, Subset, DataLoader, random_split

import os

In [59]:
params = {
    "mean": [0.5, 0.5, 0.5],
    "std": [0.5, 0.5, 0.5],
    "batch": 30,
    "child": 3, # [0, 10)
    "childchild": 1,
    "num_out": 16,
    "fc1True": True,
    "fc2True": True,
    "lr": 1e-2,
    "wd": 0,
    "epochs": 30,
}

In [14]:
import cv2
import shutil

types = ['Covid', 'Others', 'Healthy']
newFolderPath = []

def combine_images(patientType):
    origDir = os.getcwd()
    currDir = origDir + '/' + patientType
    newFolder = origDir + '/new_' + patientType
    newFolderPath.append(newFolder)
    if os.path.isdir(newFolder):
        shutil.rmtree(newFolder)
    os.mkdir(newFolder)
    index = 0

    for folder in os.listdir(currDir):
        if os.path.isdir(currDir + '/' + folder):
            for filename in os.listdir(currDir + '/' + folder):
                if filename.endswith(".png"):
                    img = cv2.imread(currDir + '/' + folder + '/' + filename)
                    img = cv2.resize(img, (200, 200), interpolation=cv2.INTER_CUBIC)
                    cv2.imwrite(newFolder + '/' + str(index) + '.png', img)
                    index += 1
                    for x in range(3):
                        img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
                        cv2.imwrite(newFolder + '/' + str(index) + '.png', img)
                        index += 1
    print(f"Folder {patientType}: {index} files")
    
for name in types:
    combine_images(name)

Folder Covid: 8668 files
Folder Others: 4988 files
Folder Healthy: 3028 files


In [None]:
if os.path.isdir('Dataset'):
    shutil.rmtree('Dataset')

In [15]:
def read_images(path) -> list:
    images = [cv2.imread(path + '/' + img) for img in os.listdir(path)]
    return images

def read_labels(path):
    labels = [open(path + '/' + lbl, "r").read() for lbl in os.listdir(path)]
    return labels

class DataSet(Dataset):
    def __init__(self, root):
        
        idx = 0
        
        currDir = os.getcwd()
        train = False
        
        folderNames = root.split('/')
        for name in folderNames:
            if name != '.' and name != '':
                try:
                    os.mkdir(name)
                    os.chdir(name)
                except:
                    os.chdir(name)
            if name == 'Train':
                train = True

        os.mkdir('image')
        os.mkdir('label')
        
        for num, folder in enumerate(newFolderPath):
            amt = int(len(os.listdir(folder)) * 0.80)
            if train:
                arr = range(amt)
            else:
                arr = range(amt, len(os.listdir(folder)))
            for x in arr:
                shutil.copy2(folder + '/' + str(x) + '.png', './image/' + str(idx) + '.png')
                file = open("./label/" + str(idx) + '.txt', "w")
                file.write(str(num))
                file.close()
                idx += 1
        os.chdir(currDir)
        
        self.ROOT = root
        self.images = read_images(root + "/image")
        self.labels = read_labels(root + "/label")

    def __len__(self):
        # Return number of points in the dataset

        return len(self.images)

    def __getitem__(self, idx):
        # Here we have to return the item requested by `idx`. The PyTorch DataLoader class will use this method to make an iterable for training/validation loop.

        img = images[idx]
        label = labels[idx]

        return img, label

In [16]:
# Load the dataset and train and test splits
print("Loading datasets...")

# Data path
DATA_train_path = DataSet('./Dataset/Train')
DATA_test_path = DataSet('./Dataset/Test')

Loading datasets...


In [52]:
# Data normalization
MyTransform = transforms.Compose([
    transforms.Grayscale(num_output_channels=3), # Convert image to grayscale
    transforms.ToTensor(), # Transform from [0,255] uint8 to [0,1] float
    transforms.Normalize(params['mean'], params['std']) # TODO: Normalize to zero mean and unit variance with appropriate parameters
])

In [53]:
DATA_train = datasets.ImageFolder(root=DATA_train_path.ROOT, transform=MyTransform)
DATA_test = datasets.ImageFolder(root=DATA_test_path.ROOT, transform=MyTransform)

print("Done!")

Done!


In [54]:
# Create dataloaders
# TODO: Experiment with different batch sizes
trainloader = DataLoader(DATA_train, batch_size=params['batch'], shuffle=True)
testloader = DataLoader(DATA_test, batch_size=params['batch'], shuffle=True)

In [61]:
import torchvision.models as models

class Network(nn.Module):
    def __init__(self):
        super().__init__()
        # TODO: [Transfer learning with pre-trained ResNet-50] 1) Define how many first layers of convolutoinal neural network (CNN) feature extractor in ResNet-50 to be "frozen" and 2) design your own fully-connected network (FCN) classifier.
        # 1) You will only refine last several layers of CNN feature extractor in ResNet-50 that mainly relate to high-level vision task. Determine how many first layers of ResNet-50 should be frozen to achieve best performances. Commented codes below will help you understand the architecture, i.e., "children", of ResNet-50.
        # 2) Design your own FCN classifier. Here I provide a sample of two-layer FCN.
        # Refer to PyTorch documentations of torch.nn to pick your layers. (https://pytorch.org/docs/stable/nn.html)
        # Some common Choices are: Linear, ReLU, Dropout, MaxPool2d, AvgPool2d
        # If you have many layers, consider using nn.Sequential() to simplify your code
        
        # Load pretrained ResNet-50
        self.model_resnet = models.resnet50(pretrained=True)
        
        # The code below can show children of ResNet-50
        # child_counter = 0
        # for child in self.model_resnet.children():
        #    print(" child", child_counter, "is -")
        #    print(child)
        #    child_counter += 1
        
        # TODO: Determine how many first layers of ResNet-50 to freeze
        child_counter = 0
        for child in self.model_resnet.children():
            if child_counter < params['child']:
                for param in child.parameters():
                    param.requires_grad = False
            elif child_counter == params['child']:
                children_of_child_counter = 0
                for children_of_child in child.children():
                    if children_of_child_counter < params['childchild']:
                        for param in children_of_child.parameters():
                            param.requires_grad = False
                    else:
                        children_of_child_counter += 1
            else:
                print("child ",child_counter," was not frozen")
            child_counter += 1
        
        # Set ResNet-50's FCN as an identity mapping
        num_fc_in = self.model_resnet.fc.in_features
        self.model_resnet.fc = nn.Identity()
        
        # TODO: Design your own FCN
        self.fc1 = nn.Linear(num_fc_in, params['num_out'], bias = params['fc1True']) # from input of size num_fc_in to output of size ?
        self.fc2 = nn.Linear(params['num_out'], 3, bias = params['fc2True']) # from hidden layer to 3 class scores
        print(num_fc_in)
    def forward(self,x):
        # TODO: Design your own network, implement forward pass here
        
        relu = nn.ReLU() # No need to define self.relu because it contains no parameters
        
        with torch.no_grad():
            features = self.model_resnet(x)
            
        x = self.fc1(features) # Activation are flattened before being passed to the fully connected layers
        x = relu(x)
        x = self.fc2(x)
        
        # The loss layer will be applied outside Network class
        return x

In [62]:
device = "cuda" if torch.cuda.is_available() else "cpu" # Configure device
model = Network().to(device)
criterion = nn.CrossEntropyLoss() # Specify the loss layer (note: CrossEntropyLoss already includes LogSoftMax())
# TODO: Modify the line below, experiment with different optimizers and parameters (such as learning rate)
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=params['lr'], weight_decay=params['wd']) # Specify optimizer and assign trainable parameters to it, weight_decay is L2 regularization strength (default: lr=1e-2, weight_decay=1e-4)
num_epochs = params['epochs'] # TODO: Choose an appropriate number of training epochs

 child 0 is -
Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
 child 1 is -
BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
 child 2 is -
ReLU(inplace=True)
 child 3 is -
MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
 child 4 is -
Sequential(
  (0): Bottleneck(
    (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (downsample): Sequential(
      (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=Fals

In [63]:
def train(model, loader, num_epoch = num_epochs): # Train the model
    print("Start training...")
    model.train() # Set the model to training mode
    for i in range(num_epoch):
        running_loss = []
        for batch, label in tqdm(loader):
            batch = batch.to(device)
            label = label.to(device)
            optimizer.zero_grad() # Clear gradients from the previous iteration
            pred = model(batch) # This will call Network.forward() that you implement
            loss = criterion(pred, label) # Calculate the loss
            running_loss.append(loss.item())
            loss.backward() # Backprop gradients to all tensors in the network
            optimizer.step() # Update trainable weights
        print("Epoch {} loss:{}".format(i+1,np.mean(running_loss))) # Print the average loss for this epoch
    print("Done!")

def evaluate(model, loader): # Evaluate accuracy on validation / test set
    model.eval() # Set the model to evaluation mode
    correct = 0
    with torch.no_grad(): # Do not calculate grident to speed up computation
        for batch, label in tqdm(loader):
            batch = batch.to(device)
            label = label.to(device)
            pred = model(batch)
            correct += (torch.argmax(pred,dim=1)==label).sum().item()
    acc = correct/len(loader.dataset)
    print("Evaluation accuracy: {}".format(acc))
    return acc

In [None]:
train(model, trainloader, num_epochs)
print("Evaluate on test set")
evaluate(model, testloader)

  0%|          | 0/445 [00:00<?, ?it/s]

Start training...


 12%|█▏        | 52/445 [05:02<37:55,  5.79s/it]