In [1]:
import torch
from torch.optim import Adam, SGD
import matplotlib.pyplot as plt
from torch import nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader,TensorDataset
from sklearn.metrics import classification_report, accuracy_score
import copy
import numpy as np

In [2]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True) #mounting my google drive

Mounted at /content/drive


In [3]:
!mkdir ~/.kaggle

In [4]:
!cp /content/drive/MyDrive/KAGGLE_API_CREDENTIALS/kaggle.json ~/.kaggle/kaggle.json

In [5]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d khoongweihao/covid19-xray-dataset-train-test-sets

#link to dataset: https://www.kaggle.com/datasets/khoongweihao/covid19-xray-dataset-train-test-sets

In [None]:
!unzip '/content/covid19-xray-dataset-train-test-sets.zip'

In [8]:
device = 'cuda' if torch.cuda.is_available() else 'cpu' #setting up device agnostic code

In [9]:
train_transform = transforms.Compose([
    transforms.Resize((512,512)),  # Resize the image to (512,512)
    transforms.RandomHorizontalFlip(),  # Flip the image horizontally
    transforms.RandomRotation(20),  # Rotate the image by up to 20 degrees
    transforms.ToTensor(),  # Convert the image to a PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image with the mean and std dev for ImageNet
])

test_transform = transforms.Compose([
    transforms.Resize((512,512)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image with the mean and std dev for ImageNet
])


In [10]:
train_dataset_path = '/content/xray_dataset_covid19/train'
test_dataset_path = '/content/xray_dataset_covid19/test'


train_dataset = datasets.ImageFolder(train_dataset_path, transform=train_transform)
test_dataset = datasets.ImageFolder(test_dataset_path, transform=test_transform)


In [11]:
from torch.utils.data.dataloader import RandomSampler
BATCH_SIZE =32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [21]:
def train_model(model, train_loader, loss_fn, optimizer,device):
    model.train()
    running_loss = 0.0 #keeps track of the total loss in entire dataset
    preds_list = []
    labels_list = []

    for X, y in train_loader:
        X = X.to(device)
        y = y.to(device)

        # Forward pass
        outputs = model(X)
        loss = loss_fn(outputs, y.float()) #the loss_fn expects the two params to be of the same dtype. y is an int. so we convert it to float

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Get predictions

        probs = torch.sigmoid(outputs)  # Apply sigmoid activation to get probabilities
        preds = (probs > 0.5).float()  # Use threshold of 0.5 for classification


        # Move predictions and labels to CPU and convert to numpy arrays, then append to the lists
        preds_list.append(preds.detach().cpu().numpy())
        labels_list.append(y.cpu().numpy())

        running_loss += loss.item() * X.size(0)

    # Concatenate all the numpy arrays into a single numpy array
    all_preds = np.concatenate(preds_list, axis=0).astype(int)
    all_labels = np.concatenate(labels_list, axis=0).astype(int)

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)  # Use scikit-learn's accuracy_score

    return epoch_loss, epoch_acc


In [22]:
def test_model(model, test_loader, loss_fn, device):
    model.eval()
    running_loss = 0.0
    preds_list = []
    labels_list = []

    with torch.no_grad():
        for X, y in test_loader:
            X = X.to(device)
            y = y.to(device)

            # Forward pass
            outputs = model(X)
            loss = loss_fn(outputs, y.float())

            # Get predictions
            probs = torch.sigmoid(outputs)  # Apply sigmoid activation to get probabilities
            preds = (probs > 0.5).float()  # Use threshold of 0.5 for classification

            # Move predictions and labels to CPU and convert to numpy arrays, then append to the lists
            preds_list.append(preds.detach().cpu().numpy())
            labels_list.append(y.cpu().numpy())

            running_loss += loss.item() * X.size(0)

    # Concatenate all the numpy arrays into a single numpy array
    all_preds = np.concatenate(preds_list, axis=0).astype(int)
    all_labels = np.concatenate(labels_list, axis=0).astype(int)

    epoch_loss = running_loss / len(test_loader.dataset)
    epoch_acc = accuracy_score(all_labels, all_preds)  # Use scikit-learn's accuracy_score

    classification_results = classification_report(all_labels, all_preds)

    return epoch_loss, epoch_acc, classification_results


In [23]:
def train_and_validate(model, train_loader, test_loader, loss_fn, optimizer, epochs, device, model_save_path):
    best_acc = 0.0
    for epoch in range(epochs):
        train_loss, train_acc = train_model(model, train_loader, loss_fn, optimizer, device)
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')

        if epoch % 5 == 0:
          test_loss, test_acc, class_report = test_model(model, test_loader, loss_fn, device)
          print(f'Epoch {epoch+1}/{epochs},test Loss: {test_loss:.4f}, test Acc: {test_acc:.4f}')


        # Save the model weights if this epoch gives us the highest test accuracy
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), model_save_path)
            best_class_report = class_report


    # After all epochs, print the best classification report
    print("Best Classification Report : ")
    return best_class_report, best_acc

In [24]:
#i only did this to find the shape that my classifier layer would be receiving

img,_ = train_dataset[59]

resize_transform = transforms.Resize((512, 512))
resized_img = resize_transform(img)

conv_block_1 = nn.Sequential(
        nn.Conv2d(in_channels=3,out_channels = 6, kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=6,out_channels = 6,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

conv_block_2 = nn.Sequential(
        nn.Conv2d(in_channels=6,out_channels = 12,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=12,out_channels = 12,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

conv_block_3 = nn.Sequential(
        nn.Conv2d(in_channels=12,out_channels = 18,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=18,out_channels = 18,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

X = conv_block_1(resized_img)
X = conv_block_2(X)
X = conv_block_3(X)
X.shape





torch.Size([18, 64, 64])

In [25]:
class COVID19_XRAY_CNNV2(nn.Module):
  def __init__(self,input_shape,output_shape):
    super().__init__()
    self.conv_block_1 = nn.Sequential(
        nn.Conv2d(in_channels=input_shape,out_channels = 6, kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=6,out_channels = 6,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    self.conv_block_2 = nn.Sequential(
        nn.Conv2d(in_channels=6,out_channels = 12,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=12,out_channels = 12,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    self.conv_block_3 = nn.Sequential(
        nn.Conv2d(in_channels=12,out_channels = 18,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.Conv2d(in_channels=18,out_channels = 18,kernel_size=3, padding=1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size=2, stride=2)
    )

    self.classifier_layer = nn.Sequential(
        nn.Flatten(),
        nn.Linear(in_features=73728, out_features=64),
        nn.BatchNorm1d(64),
        nn.ReLU(),
        nn.Linear(in_features=64, out_features=1)
    )


  def forward(self, X):
    X = self.conv_block_1(X)
    X = self.conv_block_2(X)
    X = self.conv_block_3(X)
    X = self.classifier_layer(X)
    return X.squeeze(dim=1)


In [26]:
model = COVID19_XRAY_CNNV2(input_shape=3,output_shape=1).to(device)

In [27]:
loss_fn = nn.BCEWithLogitsLoss()
optimizer = Adam(params=model.parameters(), lr=0.003, weight_decay=0.007)

In [None]:
epochs = 100

# Perform training and validation
train_and_validate(model, train_loader, test_loader, loss_fn, optimizer, epochs, device, 'COVID19_XRAY_CNN_weights.pth')