In [1]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import torchvision.models as models
import torch.nn as nn
import os
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
from glob import glob
from PIL import Image

In [2]:
!pip install torchsummary
from torchsummary import summary

# Declaring the paths of training, validation and test data

In [3]:
# Train
train_images = []
train_labels = []
normal_train_paths = r"../input/chest-xray-pneumonia/chest_xray/train/NORMAL/"
pneumonia_train_paths = r"../input/chest-xray-pneumonia/chest_xray/train/PNEUMONIA/"

for img_name in os.listdir(normal_train_paths):
  train_images.append(os.path.join(normal_train_paths, img_name))
  train_labels.append(0)

for img_name in os.listdir(pneumonia_train_paths):
  train_images.append(os.path.join(pneumonia_train_paths, img_name))
  train_labels.append(1)

print(len(normal_train_paths), len(pneumonia_train_paths))

# Val
val_images = []
val_labels = []
normal_val_paths = r"../input/chest-xray-pneumonia/chest_xray/val/NORMAL/"
pneumonia_val_paths = r"../input/chest-xray-pneumonia/chest_xray/val/PNEUMONIA/"

for img_name in os.listdir(normal_val_paths):
    val_images.append(os.path.join(normal_val_paths, img_name))
    val_labels.append(0)

for img_name in os.listdir(pneumonia_val_paths):
    val_images.append(os.path.join(pneumonia_val_paths, img_name))
    val_labels.append(1)

print(len(normal_val_paths), len(pneumonia_val_paths))

# Test
test_images = []
test_labels = []
normal_test_paths = r"../input/chest-xray-pneumonia/chest_xray/test/NORMAL/"
pneumonia_test_paths = r"../input/chest-xray-pneumonia/chest_xray/test/PNEUMONIA/"

for img_name in os.listdir(normal_test_paths):
    test_images.append(os.path.join(normal_test_paths, img_name))
    test_labels.append(0)

for img_name in os.listdir(pneumonia_test_paths):
    test_images.append(os.path.join(pneumonia_test_paths, img_name))
    test_labels.append(1)

print(len(normal_test_paths), len(pneumonia_test_paths))

In [4]:
print(len(train_images))
print(len(val_images))
print(len(test_images))

# Choosing device

In [5]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

# Create a transformation pipeline
- Read the images as PIL and convert them into tensors.
- Normalize them.

In [6]:
from torchvision.transforms.transforms import ColorJitter

new_size = 224
batch_size = 16

normalize = transforms.Normalize(mean=[0.5], std=[0.5])

train_transform = transforms.Compose([
                                transforms.ToPILImage(),
                                transforms.Resize((new_size, new_size)),
                                transforms.ColorJitter(brightness=(0.95, 1.05), contrast=(0.95, 1.05), saturation=(0.95, 1.05)),
                                transforms.ToTensor()
])

val_transform = transforms.Compose([
                                transforms.ToPILImage(),
                                transforms.Resize((new_size, new_size)),
                                transforms.ToTensor()
])

In [7]:
class ChestXRayDataset(Dataset):
    def __init__(self, images_paths, labels, transforms=None):
        super().__init__()
        self.images_paths = images_paths # path of all files
        self.labels = labels
        self.transforms = transforms

    def __len__(self):
        return len(self.images_paths)

    def __getitem__(self, ix):
        img_path = self.images_paths[ix]
        cls = self.labels[ix]

        img = cv2.imread(img_path)[:,:,::-1]
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        
        cls = torch.tensor([cls]).float().to(device)

        return img, cls

    def collate_fn(self, batch):
        images, classes = [], []
    
        for ix in range(len(batch)):
            imgs, clss = batch[ix]
            
            if self.transforms:
                imgs = self.transforms(imgs)[None]
            else:
                imgs = transforms.ToTensor(imgs)[None]

            imgs = [normalize(i/255.0) for i in imgs]

            images.extend(imgs)

            classes.extend(clss)

        images_tensor = torch.cat(images)
        images_tensor = images_tensor.view(batch_size, 1, new_size, new_size)

        classes = torch.tensor(classes)
        classes = classes.view((batch_size, 1))

        return images_tensor.to(device), classes.to(device)

In [8]:
# tr_size = int(len(train_images) * 0.5)

# train_ds = ChestXRayDataset(train_images[:tr_size], train_labels[:tr_size], train_transform)
# val_ds = ChestXRayDataset(val_images, val_labels, val_transform)
# test_ds = ChestXRayDataset(test_images, test_labels, val_transform)

train_ds = ChestXRayDataset(train_images, train_labels, train_transform)
val_ds = ChestXRayDataset(val_images, val_labels, val_transform)
test_ds = ChestXRayDataset(test_images, test_labels, val_transform)

train_dl = DataLoader(train_ds, batch_size=batch_size, collate_fn=train_ds.collate_fn, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=batch_size, collate_fn=val_ds.collate_fn, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=batch_size, collate_fn=test_ds.collate_fn, shuffle=True)

# Checking the labels shape

In [9]:
class ChestXModel(nn.Module):
  def __init__(self):
    super().__init__()

    self.vgg_backbone = models.vgg16(pretrained=True)
    self.vgg_backbone.features[0] = nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))

    # Removes the inplace operation
    for name, layer in self.vgg_backbone.named_modules():
      if isinstance(layer, torch.nn.ReLU):
        layer.inplace = False

    for param in self.vgg_backbone.parameters():
      param.requires_grad = False

    features = 25088

    self.vgg_backbone.classifier = nn.Sequential(
        nn.Linear(features, 128, bias=True),
        nn.ReLU(),
        nn.Dropout(p=0.3, inplace=False),
        nn.Linear(128, 64, bias=True),
        nn.ReLU(),
        nn.Dropout(p=0.3, inplace=False),
        nn.Linear(64, 32, bias=True),
        nn.ReLU(),
        nn.Dropout(p=0.3, inplace=False),
        nn.Linear(32, 1, bias=True),
        nn.Sigmoid()
    )

  def forward(self, x):
    return self.vgg_backbone(x)

In [10]:
# TODO: See how to calculate accuracy https://github.com/PacktPublishing/Modern-Computer-Vision-with-PyTorch/blob/master/Chapter06/Road_sign_detection.ipynb
def train_batch(data, model, optimizer, criterion):

  imgs, clss = data

  model.train().to(device)

  preds = model(imgs)
  loss = criterion(preds, clss)

  optimizer.zero_grad()
  loss.backward()
  optimizer.step()

  return loss.cpu().detach().numpy()
  # return loss.item()

In [11]:
@torch.no_grad()
def val_batch(data, model, criterion):
    imgs, clss = data

    model.eval().to(device)

    with torch.no_grad():

        preds = model(imgs)

        loss = criterion(preds, clss)

        return loss.cpu().detach().numpy()

In [12]:
@torch.no_grad()
def accuracy(data, model):
    x, y = data

    model.eval().to(device)

    with torch.no_grad():
        
        preds = model(x)

        acc = (preds > 0.5) == y
        acc = acc.float().sum()
        acc = acc / len(y)
        acc = acc * 100

        return acc.cpu()

In [13]:
torch.cuda.empty_cache()

In [14]:
torch.autograd.set_detect_anomaly(True)

In [15]:
!mkdir saved_models

In [16]:
n_epochs = 20
model = ChestXModel().to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

loss_per_batch, acc_per_batch = [], []
val_loss_per_batch, val_acc_per_batch = [], []

for ex in range(n_epochs):
    torch.cuda.empty_cache()
    print(f"Epoch {ex+1}")
  
    # Training Loss and Accuracy
    train_loss, train_acc = [], []
  
    for i, data in enumerate(train_dl):
        loss = train_batch(data, model, optimizer, criterion)    
        acc = accuracy(data, model)
        
        train_loss.append(loss)
        train_acc.append(acc)

    loss_per_batch.append(np.array(train_loss).mean())
    acc_per_batch.append(np.asarray(train_acc).mean())
    
    # Save model
    torch.save(model.state_dict(), f"./saved_models/model_epoch{ex}.pt")

    # Validation Loss and Accuracy
    val_loss, val_acc = [], []
    
    for i, data in enumerate(val_dl):
        loss = val_batch(data, model, criterion)
        acc = accuracy(data, model)
        
        val_acc.append(acc)
        val_loss.append(loss)

    val_loss_per_batch.append(np.asarray(val_loss).mean())
    val_acc_per_batch.append(np.asarray(val_acc).mean())

In [17]:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(9, 9), sharex=True)

ax1.plot(list(range(n_epochs)), loss_per_batch, label="Loss")
ax1.set_title('Training loss per batch')
ax1.set(xlabel="Epochs", ylabel="Loss")
ax1.grid()
ax1.legend()

ax2.plot(list(range(n_epochs)), acc_per_batch, label="Acc")
ax2.set_title('Training accuracy per batch')
ax2.set(xlabel="Epochs", ylabel="Accuracy")
ax2.grid()
ax2.legend()

ax3.plot(list(range(n_epochs)), val_loss_per_batch, label="Loss")
ax3.set_title("Validation loss per batch")
ax3.set(xlabel="Epochs", ylabel="Loss")
ax3.grid()
ax3.legend()

ax4.plot(list(range(n_epochs)), val_acc_per_batch, label="Acc")
ax4.set_title("Validation accuracy per batch")
ax4.set(xlabel="Epochs", ylabel="Accuracy")
ax4.grid()
ax4.legend()

plt.show()

# Testing

In [18]:
test_acc = []
for i, data in enumerate(test_dl):
  acc = accuracy(data, model)
  test_acc.append(acc)

In [19]:
test_acc = torch.tensor(test_acc).cpu().detach().numpy().tolist()

In [20]:
plt.plot(test_acc)
plt.rcParams["figure.figsize"] = (9, 9)
plt.title('Testing accuracy')
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.grid()

plt.show()