Notebook is Google Colaboratory friendly

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install SimpleITK

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt

import skimage
import torch as tc
import torchvision as tv
import SimpleITK as sitk
import sklearn.metrics as metrics

from skimage.transform import resize
from PIL import Image
from torchvision import transforms, utils
from torch.utils.data import Dataset, DataLoader

In [None]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
train_transformer = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop((224),scale=(0.5,1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    normalize
])

val_transformer = transforms.Compose([
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    normalize
])

In [None]:
batchsize=10
def read_txt(txt_path):
    with open(txt_path) as f:
        lines = f.readlines()
    txt_data = [line.strip() for line in lines]
    return txt_data

class CovidCTDataset(Dataset):
    def __init__(self, root_dir, txt_COVID, txt_NonCOVID, transform=None):
        """
        Args:
            txt_path (string): Path to the txt file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        File structure:
        - root_dir
            - CT_COVID
                - img1.png
                - img2.png
                - ......
            - CT_NonCOVID
                - img1.png
                - img2.png
                - ......
        """
        self.root_dir = root_dir
        self.txt_path = [txt_COVID,txt_NonCOVID]
        self.classes = ['CT_COVID', 'CT_NonCOVID']
        self.num_cls = len(self.classes)
        self.img_list = []
        for c in range(self.num_cls):
            cls_list = [[os.path.join(self.root_dir,self.classes[c],item), c] for item in read_txt(self.txt_path[c])]
            self.img_list += cls_list
        self.transform = transform

    def __len__(self):
        return len(self.img_list)

    def __getitem__(self, idx):
        if tc.is_tensor(idx):
            idx = idx.tolist()

        img_path = self.img_list[idx][0]
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)
        sample = {'img': image,
                  'label': int(self.img_list[idx][1])}
        return sample

In [None]:
trainset = CovidCTDataset(root_dir='/content/drive/MyDrive/...',
                              txt_COVID='/content/drive/MyDrive/...',
                              txt_NonCOVID='/content/drive/MyDrive/...',
                              transform= train_transformer)
valset = CovidCTDataset(root_dir='/content/drive/MyDrive/...',
                          txt_COVID='/content/drive/MyDrive/...',
                          txt_NonCOVID='/content/drive/MyDrive/...',
                          transform= val_transformer)
testset = CovidCTDataset(root_dir='/content/drive/MyDrive/...',
                          txt_COVID='/content/drive/MyDrive/...',
                          txt_NonCOVID='/content/drive/MyDrive/...',
                          transform= val_transformer)
print("Size of training dataset: " + str(trainset.__len__()))
print("Size of validation dataset: " + str(valset.__len__()))
print("Size of testing dataset: " + str(testset.__len__()))

train_loader = DataLoader(trainset, batch_size=batchsize, drop_last=False, shuffle=True)
val_loader = DataLoader(valset, batch_size=batchsize, drop_last=False, shuffle=False)
test_loader = DataLoader(testset, batch_size=batchsize, drop_last=False, shuffle=False)

In [None]:
image, label = next(iter(train_loader))

In [None]:
for batch_index, batch_samples in enumerate(train_loader):      
        data, target = batch_samples['img'], batch_samples['label']
plt.imshow(data[0,1,:,:].numpy())

In [None]:
class SimpleNetwork(tc.nn.Module):
    def __init__(self):
        super(SimpleNetwork, self).__init__()

        self.layer_1 = tc.nn.Sequential(
            tc.nn.Conv2d(3, 32, 3, 1, padding=1),
            tc.nn.BatchNorm2d(32),
            tc.nn.ReLU()
        ) 
        self.layer_2 = tc.nn.Sequential(
            tc.nn.Conv2d(32, 32, 3, stride=2, padding=1),
            tc.nn.BatchNorm2d(32),
            tc.nn.ReLU()
        )
        self.layer_3 = tc.nn.Sequential(
            tc.nn.Conv2d(32, 64, 3, stride=1, padding=1),
            tc.nn.BatchNorm2d(64),
            tc.nn.ReLU()
        )
        self.layer_4 = tc.nn.Sequential(
            tc.nn.Conv2d(64, 64, 3, stride=2, padding=1),
            tc.nn.BatchNorm2d(64),
            tc.nn.ReLU()
        )
        self.layer_5 = tc.nn.Sequential(
            tc.nn.Conv2d(64, 128, 3, stride=1, padding=1),
            tc.nn.BatchNorm2d(128),
            tc.nn.ReLU()
        )
        self.layer_6 = tc.nn.Sequential(
            tc.nn.Conv2d(128, 128, 3, stride=2, padding=1),
            tc.nn.BatchNorm2d(128),
            tc.nn.ReLU()
        )
        self.layer_7 = tc.nn.Sequential(
            tc.nn.Conv2d(128, 128, 3, stride=1, padding=1),
            tc.nn.BatchNorm2d(128),
            tc.nn.ReLU()
        )
        self.avg_pool = tc.nn.Sequential(tc.nn.AvgPool2d(1,1))
        self.last_layer = tc.nn.Sequential(
            tc.nn.Linear(128*28*28, 4)
        )

    def forward(self, x):
        x = self.layer_1(x)
        x = self.layer_2(x)
        x = self.layer_3(x)
        x = self.layer_4(x)
        x = self.layer_5(x)
        x = self.layer_6(x)
        x = self.layer_7(x)
        x = self.avg_pool(x)
        x = x.view(-1, 128*28*28)
        x = self.last_layer(x)
        return x

In [None]:
model = SimpleNetwork()
for batch_index, batch_samples in enumerate(train_loader):      
    data, target = batch_samples['img'], batch_samples['label']
    output = model(data)
    print(f"Output: {tc.nn.Sigmoid()(output)}")

In [None]:
learning_rate = 0.003
batch_size = 10
num_epochs = 100
objective_function = tc.nn.CrossEntropyLoss()
model = SimpleNetwork()
parameters = model.parameters()
optimizer = tc.optim.Adam(parameters, lr=learning_rate)
device = "cuda:0"


In [None]:
%tensorflow_version 2.x
import tensorflow as tf
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

In [None]:
tc.cuda.get_device_properties(device)

In [None]:
losses = []
model = model.to(device)
model.train()
print(f"Dataset size: {len(train_loader.dataset)}")
for epoch in range(num_epochs):
    print(f"Current epoch: {epoch}")
    epoch_loss = 0.0
    for batch_index, batch_samples in enumerate(train_loader):
        data, target = batch_samples['img'], batch_samples['label']
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = objective_function(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        epoch_loss += loss.item() * data.size(0)
    epoch_loss = epoch_loss / len(train_loader.dataset)
    losses.append(epoch_loss)
    print(f"Current epoch loss: {epoch_loss}")


plt.figure()
plt.plot(losses, "r-")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.grid(True)
plt.show()

In [None]:
predictions = []
targets = []
model.eval()
with tc.set_grad_enabled(False):
    print(f"Dataset size: {len(test_loader.dataset)}")
    for batch_index, batch_samples in enumerate(train_loader):
        data, target = batch_samples['img'], batch_samples['label']
        data, target = data.to(device), target.to(device)
        output = model(data)
        prediction = tc.argmax(tc.nn.Sigmoid()(output), dim=1)
        predictions.extend(prediction.cpu().numpy().tolist())
        targets.extend(target.cpu().numpy().tolist())


print("Testing confusion matrix: \n", metrics.confusion_matrix(targets, predictions))
print("Testing classification report: \n", metrics.classification_report(targets, predictions))

In [None]:
print("AUC: \n", metrics.roc_auc_score(targets, predictions))

In [None]:
print("ACC: \n", metrics.accuracy_score(targets, predictions))

In [None]:
print("F1: \n", metrics.f1_score(targets, predictions))

In [None]:
fpr, tpr, thresholds = metrics.roc_curve(targets, predictions)
roc_auc = metrics.auc(fpr, tpr)
display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc)
display.plot(color = 'red')
plt.show()

In [None]:
cm = metrics.confusion_matrix(targets, predictions)
disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()