In [None]:
import pandas as pd
import numpy as np
from scipy.signal import savgol_filter
from scipy.ndimage import gaussian_filter
import time
import matplotlib.pyplot as plt
from pathlib import Path
from tqdm import tqdm
from sklearn.decomposition import PCA
import sklearn
from glob import glob
import cv2
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import os
%matplotlib inline

In [None]:
IMAGE_SIZE = [224, 224] # feel free to change depending on dataset

# training config:
epochs = 500
batch_size = 32

#define paths
covid_path = '../data/chest/Chest_COVID'
noncovid_path = '../data/chest/Chest_NonCOVID'

# Use glob to grab images from path .jpg or jpeg
covid_files = glob(covid_path + '/*')
noncovid_files = glob(noncovid_path + '/*')

In [None]:
# Preparing Labels
covid_labels = []
noncovid_labels = []

covid_images=[]
noncovid_images=[]

import cv2 

for i in range(len(covid_files)):
  image = cv2.imread(covid_files[i])
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  image = cv2.resize(image,(224,224))
  image = np.transpose(image, (2, 1, 0))
  covid_images.append(image)
  covid_labels.append('CT_COVID')

for i in range(len(noncovid_files)):
  image = cv2.imread(noncovid_files[i])
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
  image = cv2.resize(image,(224,224))
  image = np.transpose(image, (2, 1, 0))
  noncovid_images.append(image)
  noncovid_labels.append('CT_NonCOVID')

In [None]:
# normalize to interval of [0,1]
covid_images = np.array(covid_images) / 255
noncovid_images = np.array(noncovid_images) / 255

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras.utils import to_categorical

# split into training and testing
covid_x_train, covid_x_test, covid_y_train, covid_y_test = train_test_split(
    covid_images, covid_labels, test_size=0.2)
noncovid_x_train, noncovid_x_test, noncovid_y_train, noncovid_y_test = train_test_split(
    noncovid_images, noncovid_labels, test_size=0.2)


X_train = np.concatenate((noncovid_x_train, covid_x_train), axis=0)
X_test = np.concatenate((noncovid_x_test, covid_x_test), axis=0)
y_train = np.concatenate((noncovid_y_train, covid_y_train), axis=0)
y_test = np.concatenate((noncovid_y_test, covid_y_test), axis=0)

# make labels into categories - either 0 or 1
y_train = LabelBinarizer().fit_transform(y_train)
y_train = to_categorical(y_train)

y_test = LabelBinarizer().fit_transform(y_test)
y_test = to_categorical(y_test)

In [None]:
X_train.shape

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.optim as optim


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 8, 3, padding=1)
        self.conv2 = nn.Conv2d(8, 16, 3, padding=1)
        self.conv3 = nn.Conv2d(16, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(25088, 512)
        self.fc2 = nn.Linear(512, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return x

device = torch.device("cuda")
net = Net()
net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

In [None]:
Xt_train=torch.from_numpy(X_train)
yt_train = torch.from_numpy(y_train)

In [None]:
Xt_test = torch.from_numpy(X_test)
Xt_test= Xt_test.float()
yt_test = torch.from_numpy(y_test)
yt_test= yt_test.float()
Xt_test, yt_test = Xt_test.to(device), yt_test.to(device)

In [None]:
step=1

for epoch in range(150):
    Xt_train= Xt_train.float()
    Xt_train = Xt_train.to(device)
    yt_train = yt_train.to(device)
    optimizer.zero_grad()
    pred = net(Xt_train)
    loss = criterion(pred, torch.max(yt_train.long(), 1)[1])
    loss.backward()
    optimizer.step()
    
    print(f'Step {step} ==> loss {loss :.4f} ')
    step=step+1

In [None]:
train_accuracy = torch.tensor(train_acc_arr, device = 'cpu')
test_accuracy = torch.tensor(test_acc_arr, device = 'cpu')

epoch_rate = [epoch for epoch in range(128)]

import matplotlib.pyplot as plt
plt.plot(epoch_rate, train_accuracy)
plt.plot(epoch_rate, test_accuracy)
plt.show()

In [None]:
import matplotlib.pyplot as plt
plt.plot(loss_epoch_arr)
plt.show()

In [None]:
y_pred = net(Xt_train)
train_acc = torch.sum(y_pred == yt_train)
final_train_acc = train_acc/len(Xt_train)
print(final_train_acc.to("cpu").numpy() * 100)

In [None]:
y_pred = net(Xt_test)
train_acc = torch.sum(y_pred == yt_test)
final_train_acc = train_acc/len(Xt_test)
print(final_train_acc.to("cpu").numpy()*100)

In [None]:
torch.sum(y_pred == yt_test) #188

In [None]:
class CustomDataset(Dataset):
    def __init__(self):
        self.imgs_path = "../data/chest/"
        file_list = glob(self.imgs_path + "*")
        print(file_list)
        self.data = []
        for class_path in file_list:
            class_name = class_path.split("\\")[-1]
            for img_path in glob(class_path + "/*"):
                self.data.append([img_path, class_name])
        print(len(self.data))
        self.class_map = {"Chest_COVID" : 0, "Chest_NonCOVID": 1}
        self.img_dim = (224, 224)
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        img_path, class_name = self.data[idx]
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, self.img_dim)
        img = np.array(img) / 255
        class_id = self.class_map[class_name]
        img_tensor = torch.from_numpy(img)
        img_tensor = img_tensor.permute(2, 0, 1)
        class_id = torch.tensor(class_id)
        return img_tensor, class_id
    
if __name__ == "__main__":
    dataset = CustomDataset()
    data_loader = DataLoader(dataset, batch_size=128, shuffle=True)

In [None]:
train_set, test_set=torch.utils.data.random_split(dataset, [752,188 ])
train_loader = DataLoader(train_set, batch_size=128, shuffle=True)
test_loader = DataLoader(test_set, batch_size=128, shuffle=True)

In [None]:
def evaluation(dataloader):
    total, correct = 0, 0
    for data in dataloader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs.float())
        _, pred = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (pred == labels).sum().item()
    return 100 * correct / total

In [None]:
max_epochs = 32
train_arr = []
test_arr=[]

for epoch in range(max_epochs):
    

    for i, data in enumerate(train_loader, 0):

        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()

        outputs = net(inputs.float())
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    train_eval = evaluation(train_loader)
    test_eval = evaluation(test_loader)
    train_arr.append(train_eval)
    test_arr.append(test_eval)
    print('Epoch: %d/%d  Train_Acc: %.2f Test_Acc: %.2f' % (epoch+1, max_epochs, train_eval, test_eval))

In [None]:
epoch_rate = [epoch for epoch in range(32)]
plt.plot(epoch_rate, train_arr)
plt.plot(epoch_rate, test_arr)
plt.show()

In [None]:
print('Test acc: %0.2f, Train acc: %0.2f' % (evaluation(test_loader), evaluation(train_loader)))