In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

from PIL import Image
import random

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [None]:
# Setting seed for reproducibility
seed = 140

random.seed(seed)

torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

if torch.cuda.is_available():
    device = torch.device("cuda")
    torch.cuda.manual_seed(seed)

In [None]:
# Hyperparameters

batch_size = 64
learning_rate = 0.05
epochs = 15

## Data Preparation

In [None]:
import os
os.environ['KAGGLE_USERNAME'] = 'neutrino140'
os.environ['KAGGLE_KEY'] = '0f80ddf25e1b7c23b63bd004df23b86a'

In [None]:
!pip install kaggle

!kaggle datasets download sachinkumar413/covid-pneumonia-normal-chest-xray-images
!unzip -o covid-pneumonia-normal-chest-xray-images.zip -d Dataset
!rm -rf covid-pneumonia-normal-chest-xray-images.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: Dataset/COVID/COVID_1203.png  
  inflating: Dataset/COVID/COVID_1204.png  
  inflating: Dataset/COVID/COVID_1205.png  
  inflating: Dataset/COVID/COVID_1206.png  
  inflating: Dataset/COVID/COVID_1207.png  
  inflating: Dataset/COVID/COVID_1208.png  
  inflating: Dataset/COVID/COVID_1209.png  
  inflating: Dataset/COVID/COVID_121.png  
  inflating: Dataset/COVID/COVID_1210.png  
  inflating: Dataset/COVID/COVID_1211.png  
  inflating: Dataset/COVID/COVID_1212.png  
  inflating: Dataset/COVID/COVID_1213.png  
  inflating: Dataset/COVID/COVID_1214.png  
  inflating: Dataset/COVID/COVID_1215.png  
  inflating: Dataset/COVID/COVID_1216.png  
  inflating: Dataset/COVID/COVID_1217.png  
  inflating: Dataset/COVID/COVID_1218.png  
  inflating: Dataset/COVID/COVID_1219.png  
  inflating: Dataset/COVID/COVID_122.png  
  inflating: Dataset/COVID/COVID_1220.png  
  inflating: Dataset/COVID/COVID_1221.png  
  inflating: 

In [None]:
# Transforming images to tensors and shuffling

img2tensor = transforms.ToTensor()
resize_img = transforms.Resize((256, 256))

dataset = list()

label_map = {
    0: 'Normal',
    1: 'Covid',
    2: 'Pneumonia'
}

dir = '/content/Dataset/NORMAL/'
for file in os.listdir(dir):
  img = img2tensor(resize_img(Image.open(dir + file).convert("RGB"))) # Open image-> Convert to RGB -> Resize -> Convert to tensor
  label = torch.tensor(0)
  dataset.append((img, label))

dir = '/content/Dataset/COVID/'
for file in os.listdir(dir):
  img = img2tensor(resize_img(Image.open(dir + file).convert("RGB")))
  label = torch.tensor(1)
  dataset.append([img, label])

dir = '/content/Dataset/PNEUMONIA/'
for file in os.listdir(dir):
  img = img2tensor(resize_img(Image.open(dir + file).convert("RGB")))
  label = torch.tensor(2)
  dataset.append([img, label])

random.shuffle(dataset)
len(dataset)

5228

In [None]:
# Splitting data into Train, Val and Test set

labels = [sample[1] for sample in dataset]

train, test = train_test_split(dataset, test_size=0.2, random_state= seed, stratify = labels)
val, test = train_test_split(test, test_size=0.5, random_state= seed)

print(len(train))
print(len(val))
print(len(test))

4182
523
523


In [None]:
# Statified Sampler from https://discuss.pytorch.org/t/how-to-enable-the-dataloader-to-sample-from-each-class-with-equal-probability/911

class StratifiedSampler():
    """Stratified Sampling

    Provides equal representation of target classes in each batch
    """
    def __init__(self, class_vector, batch_size):
        """
        Arguments
        ---------
        class_vector : torch tensor
            a vector of class labels
        batch_size : integer
            batch_size
        """
        self.n_splits = int(class_vector.size(0) / batch_size)
        self.class_vector = class_vector

    def gen_sample_array(self):
        try:
            from sklearn.model_selection import StratifiedShuffleSplit
        except:
            print('Need scikit-learn for this functionality')
        import numpy as np
        np.random.seed(seed)

        s = StratifiedShuffleSplit(n_splits=self.n_splits, test_size=0.5)
        X = torch.randn(self.class_vector.size(0),2).numpy()
        y = self.class_vector.numpy()
        s.get_n_splits(X, y)

        train_index, test_index = next(s.split(X, y))
        return np.hstack([train_index, test_index])

    def __iter__(self):
        return iter(self.gen_sample_array())

    def __len__(self):
        return len(self.class_vector)

In [None]:
# Making mini batches from the dataset

train_labels = torch.tensor([label for img, label in train])
strat_sampler = StratifiedSampler(class_vector=train_labels, batch_size = batch_size)

train_loader = DataLoader(train, batch_size = batch_size, sampler = strat_sampler)
test_loader = DataLoader(test, batch_size = batch_size)
val_loader = DataLoader(val, batch_size = batch_size)

print(len(train_loader))
print(len(test_loader))
print(len(val_loader))

66
9
9


## Model Architeture

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=5, padding=2)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.bn1 = nn.BatchNorm2d(16)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=5, padding=2)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.bn2 = nn.BatchNorm2d(32)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=5, padding=2)
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.conv4 = nn.Conv2d(64, 128, kernel_size=5, padding=2)
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(128)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)


        feature_size = 256 // 2**4
        self.lstm = nn.LSTM(input_size= 128*feature_size*feature_size,
                            hidden_size= 512,
                            num_layers= 1,
                            bidirectional = True,
                            batch_first= True)


        self.fc1 = nn.Linear(2*512, 128)
        self.relu5 = nn.ReLU()
        self.dropout1 = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 3)

    def forward(self, x):
        # CNN layers
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.bn1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        x = self.bn2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.pool3(x)
        x = self.bn3(x)

        x = self.conv4(x)
        x = self.relu4(x)
        x = self.pool4(x)
        x = self.bn4(x)

        # Flattening for LSTM
        batch_size, channels, height, width = x.size()
        x = x.view(x.shape[0], -1)

        # LSTM layer
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        x = x[:, -1, :] # Taking output from last time step

        # Classification
        x = self.fc1(x)
        x = self.relu5(x)
        x = self.dropout1(x)
        x = self.fc2(x)

        return x


## Setting up Model, Optimizer and Loss function

In [None]:
model = NeuralNetwork()
model.to(device)

NeuralNetwork(
  (conv1): Conv2d(3, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu1): ReLU()
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv2): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu2): ReLU()
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (conv3): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu3): ReLU()
  (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv2d(64, 128, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
  (relu4): ReLU()
  (bn4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_st

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

## Setting up Training and Testing funtion

In [None]:
def trainModel(model, loss_fn, optimizer, train_loader):
    model.train()

    train_loss = 0
    for batch in train_loader:
        images = batch[0].to(device)
        labels = batch[1].to(device)

        pred = model(images)

        # Making labels one hot encoded
        y = torch.zeros(labels.size(0), 3, dtype=torch.float32).to(device)
        y[torch.arange(labels.size(0)), labels] = 1

        loss = loss_fn(pred, y)
        train_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss = train_loss / len(train_loader)
    print(f'Training Loss: {train_loss:.4f}')

In [None]:
def testModel(model, loss_fn, test_loader, val = False):
    model.eval()

    with torch.no_grad():
      loss = 0
      correct = 0
      total = 0

      true_labels = []
      pred_labels = []
      for batch in test_loader:
          images = batch[0].to(device)
          labels = batch[1].to(device)

          pred = model(images)
          predicted_labels = torch.argmax(pred, dim = 1)

          # Making labels one hot encoded
          y = torch.zeros(labels.size(0), 3, dtype=torch.float32).to(device)
          y[torch.arange(labels.size(0)), labels] = 1
          loss += loss_fn(pred, y).item()

          pred_labels.extend(predicted_labels.tolist())
          true_labels.extend(labels.tolist())
          correct += (predicted_labels == labels).sum()
          total += labels.size(0)

    accuracy = 100 * correct / total
    loss = loss / len(test_loader)


    if val:
      print(f'Val Loss: {loss:.4f} \t Val Accuracy: {accuracy:.4f}')
      return None
    else:
      print(f'Test Loss: {loss:.4f} \t Test Accuracy: {accuracy:.4f}')
      return classification_report(true_labels, pred_labels, digits = 6)


## Training Model

In [None]:
for t in range(epochs):
    print(f"\nEpoch {t+1}\n-------------------------------")

    trainModel(model, criterion, optimizer, train_loader)
    testModel(model, criterion, val_loader, val = True) # Validation during training

print("Done!")


Epoch 1
-------------------------------
Training Loss: 0.4884
Val Loss: 0.9104 	 Val Accuracy: 60.8031

Epoch 2
-------------------------------
Training Loss: 0.1680
Val Loss: 0.2493 	 Val Accuracy: 91.3958

Epoch 3
-------------------------------
Training Loss: 0.0988
Val Loss: 0.1636 	 Val Accuracy: 94.8375

Epoch 4
-------------------------------
Training Loss: 0.0542
Val Loss: 0.2080 	 Val Accuracy: 91.7782

Epoch 5
-------------------------------
Training Loss: 0.0315
Val Loss: 0.2200 	 Val Accuracy: 91.9694

Epoch 6
-------------------------------
Training Loss: 0.0132
Val Loss: 0.0980 	 Val Accuracy: 97.5143

Epoch 7
-------------------------------
Training Loss: 0.0066
Val Loss: 0.1142 	 Val Accuracy: 96.5583

Epoch 8
-------------------------------
Training Loss: 0.0043
Val Loss: 0.1061 	 Val Accuracy: 97.1319

Epoch 9
-------------------------------
Training Loss: 0.0029
Val Loss: 0.1091 	 Val Accuracy: 97.3231

Epoch 10
-------------------------------
Training Loss: 0.0020


## Score on Test set

In [None]:
test_result = testModel(model, criterion, test_loader)

print(test_result)

Test Loss: 0.0788 	 Test Accuracy: 98.4704
              precision    recall  f1-score   support

           0   0.971429  0.988372  0.979827       172
           1   0.993548  0.987179  0.990354       156
           2   0.989637  0.979487  0.984536       195

    accuracy                       0.984704       523
   macro avg   0.984871  0.985013  0.984906       523
weighted avg   0.984816  0.984704  0.984723       523

