In [1]:
!pip install torchsummary

In [2]:
import os
import random
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import cv2
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F

from tqdm import tqdm
from torchsummary import summary
from torchvision import datasets, transforms
from torch.utils.data import  DataLoader, Dataset
from sklearn.model_selection import train_test_split

# Setting Up Device

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))

# Data Preprocessing

In [4]:
train_folder= '../input/chest-xray-pneumonia/chest_xray/train/'
val_folder = '../input/chest-xray-pneumonia/chest_xray/val/'
test_folder = '../input/chest-xray-pneumonia/chest_xray/test/'

# Preparing DataSets

### Augmentations

In [5]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.485, 0.456, 0.406],
            [0.229, 0.224, 0.225]
        )
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.485, 0.456, 0.406],
            [0.229, 0.224, 0.225]
        )
    ]),
}

## Creating Datasets

In [6]:
Datasets = {
    'train': datasets.ImageFolder(train_folder, data_transforms['train']),
    'test' : datasets.ImageFolder(test_folder, data_transforms['test']),
    'val'  : datasets.ImageFolder(val_folder, data_transforms['test'])
}
train_ds, val_ds = train_test_split(Datasets['train'], test_size=0.3, random_state=33)
Datasets['train'] = train_ds
Datasets['val'] = val_ds

## Creating Dataloaders

In [7]:
Dataloaders = {
    'train': DataLoader(Datasets['train'], batch_size = 32, shuffle = True, num_workers = 2),
    'test': DataLoader(Datasets['test'], batch_size = 32, shuffle = True, num_workers = 2),
    'val': DataLoader(Datasets['val'], batch_size = 32, shuffle = True, num_workers = 2),
}

## Statistics

In [8]:
files = []
categories = []
filenames = os.listdir(os.path.join(train_folder,'NORMAL'))
for name in filenames:
    files.append(os.path.join(train_folder, 'NORMAL', name))
    categories.append('NORMAL')

filenames = os.listdir(os.path.join(train_folder,'PNEUMONIA'))
for name in filenames:
    files.append(os.path.join(train_folder, 'PNEUMONIA', name))
    categories.append('PNEUMONIA')

In [9]:
random_file_index = random.sample(range(len(files)), 9)
random_fig = plt.figure(figsize = (12, 12))
rows, cols = 3, 3
for i in range(9):
    random_fig.add_subplot(rows, cols, i+1)
    plt.imshow(cv2.imread(files[random_file_index[i]]))
    plt.title(categories[random_file_index[i]])
    plt.axis('off')
plt.show()

In [10]:
Tr_PNEUMONIA = len([label for _, label in Datasets['train'] if label == 1])
Tr_NORMAL = len(Datasets['train']) - Tr_PNEUMONIA
V_PNEUMONIA = len([label for _, label in Datasets['val'] if label == 1])
V_NORMAL = len(Datasets['val']) - V_PNEUMONIA
Te_PNEUMONIA = len([label for _, label in Datasets['test'] if label == 1])
Te_NORMAL = len(Datasets['test']) - Te_PNEUMONIA
Pn = [Tr_PNEUMONIA, V_PNEUMONIA, Te_PNEUMONIA]
No = [Tr_NORMAL, V_NORMAL, Te_NORMAL]

In [11]:
fig = plt.subplots(figsize =(4, 4))

br1 = np.arange(len(Pn))
br2 = [x + 0.25 for x in br1]

plt.bar(br1, Pn, color='r', width = 0.25, label = 'Pneumonia')
plt.bar(br2, No, color='b', width = 0.25, label = 'Normal')

plt.ylabel('Count')
plt.xticks([r + 0.25 for r in range(len(Pn))],
        ['Train', 'Validation', 'Test'])
plt.legend()
plt.show()

# Defining the model

In [12]:
model = torchvision.models.resnet18(pretrained=False)
model = model.to(device)
summary(model, input_size = (3, 224, 224))

# Training

In [13]:
epochs = 30
alpha = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=alpha)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 3, gamma = 0.1)
loss_fn = nn.CrossEntropyLoss()

In [14]:
def trainer_loop(model, trainloader, loss_fn, optimizer, scheduler = None, t_gpu = False):
    model.train()
    tr_loss, tr_acc = 0.0, 0.0
    for i, data in enumerate(tqdm(trainloader)):
        img, label = data
        if t_gpu:
                img, label = img.cuda(), label.cuda()
        optimizer.zero_grad()
        output = model(img)
        _, pred = torch.max(output.data, 1)
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()
        
        tr_loss += loss.item()
        tr_acc += torch.sum(pred == label.data)
        torch.cuda.empty_cache()

    scheduler.step() if scheduler != None else None
    return tr_loss/len(trainloader.dataset), 100*tr_acc/len(trainloader.dataset)

In [15]:
def val_loop(model, val_loader, loss_fn, t_gpu=False):
    model.train(False)
    model.eval()
    val_loss, val_acc = 0.0, 0.0
    with torch.no_grad():
        for i, data in enumerate(tqdm(val_loader)):
            img, label = data
            if t_gpu:
                    img, label = img.cuda(), label.cuda()
            output = model(img)
            _, pred = torch.max(output.data, 1)
            loss = loss_fn(output, label)

            val_loss += loss.item()
            val_acc += torch.sum(pred == label.data)

    return val_loss/len(val_loader.dataset), 100*val_acc/len(val_loader.dataset)

In [16]:
def train_model(epochs, model, trainloader, valloader, loss_fn, optimizer, scheduler = None, t_gpu = False):
    stat_dict = {
        'learning_rate':[],
        'train_loss':    [],
        'train_acc':     [],
        'val_loss':      [],
        'val_acc':       []    
    }
    print('*'*5+'Training Started'+'*'*5)
    for ep in range(epochs):
        print(f'Training epoch: {ep+1}')
        t_loss, t_acc = trainer_loop(
            model, trainloader, loss_fn, optimizer, scheduler, t_gpu
        )
        v_loss, v_acc = val_loop(
            model, valloader, loss_fn, t_gpu
        )
        print(f'Learning Rate: {optimizer.param_groups[0]["lr"]}')
        print(f'Training   : Loss: {t_loss}    Accuracy: {t_acc}%')
        print(f'Validation : Loss: {v_loss}    Accuracy: {v_acc}%')
        stat_dict['learning_rate'].append(optimizer.param_groups[0]["lr"])
        stat_dict['train_loss'].append(t_loss)
        stat_dict['val_loss'].append(v_loss)
        stat_dict['train_acc'].append(t_acc)
        stat_dict['val_acc'].append(v_acc)
    print('Finished Training')
    return stat_dict

In [17]:
hist = train_model(epochs, model, Dataloaders['train'], Dataloaders['val'], loss_fn, optimizer,  scheduler, device == 'cuda')

## Statistics
### Loss

In [18]:
fig, ax = plt.subplots(figsize=(8,5))

LT = ax.plot(np.linspace(1, epochs, epochs), hist['train_loss'], 'b-', label='Train Loss')
LV = ax.plot(np.linspace(1, epochs, epochs), hist['val_loss'], 'r-', label='Val Loss')
ax.set_xlabel('Epochs')
ax.set_ylabel('Loss')

ax.set_xlim([1, (len(hist['val_loss']))])
if len(hist['val_loss']) >= 30:
    ax.set_xticks(range(1, (len(hist['val_loss'])+1), 5))
elif len(hist['val_loss']) >= 20:
    ax.set_xticks(range(1, (len(hist['val_loss'])+1), 2))
elif len(hist['val_loss']) < 20:
    ax.set_xticks(range(1, (len(hist['val_loss'])+1)))

lns = LT+LV
labs = [l.get_label() for l in lns]
ax.legend(lns, labs, loc='upper right')
ax.grid('on')

### Accuracy

In [19]:
fig, ax = plt.subplots(figsize=(8,5))

AT = ax.plot(np.linspace(1, epochs, epochs), hist['train_acc'], 'g-', label='Train Acc')
AV = ax.plot(np.linspace(1, epochs, epochs), hist['val_acc'], 'y-', label='Val Acc')

ax.set_xlabel('Epochs')
ax.set_ylabel('Accuracy')

ax.set_xlim([1, (len(hist['val_acc']))])
if len(hist['val_acc']) >= 30:
    ax.set_xticks(range(1, (len(hist['val_acc'])+1), 5))
elif len(hist['val_acc']) >= 20:
    ax.set_xticks(range(1, (len(hist['val_acc'])+1), 2))
elif len(hist['val_acc']) < 20:
    ax.set_xticks(range(1, (len(hist['val_acc'])+1)))

lns = AT+AV
labs = [l.get_label() for l in lns]
ax.legend(lns, labs, loc='upper right')
ax.grid('on')

### Learning Rate

In [20]:
fig, ax = plt.subplots(figsize=(8,5))

LR = ax.plot(np.linspace(1, epochs, epochs), hist['learning_rate'], 'g-', label='Learning Rate')

ax.set_xlabel('Epochs')
ax.set_ylabel('Learning Rate')

labs = [l.get_label() for l in LR]
ax.legend(LR, labs, loc='upper right')
ax.grid('on')

# Prediction on the Test Set

In [21]:
def test_loop(model, testdata, loss_fn, t_gpu):
    print('*'*5+'Testing Started'+'*'*5)
    model.train(False)
    model.eval()

    TestLoss, TestAcc = 0.0, 0.0
    for data, target in testdata:
        if t_gpu:
            data, target = data.cuda(), target.cuda()

        output = model(data)
        loss = loss_fn(output, target)

        _, pred = torch.max(output.data, 1)
        TestLoss += loss.item() * data.size(0)
        TestAcc += torch.sum(pred == target.data)
        torch.cuda.empty_cache()

    TestLoss = TestLoss / len(testdata.dataset)
    TestAcc = TestAcc / len(testdata.dataset)
    print(f'Loss: {TestLoss} Accuracy: {TestAcc}%')

In [22]:
test_loop(model, Dataloaders['test'], loss_fn, True)

# Saving the Model

In [23]:
torch.save(model.state_dict(), 'pneumonia_xray.pth')

## converting to onnx

In [24]:
model.eval()
dummy_input,_ = next(iter(Dataloaders['test']))
torch.onnx.export(model,
         dummy_input.cuda(),
         "Pneumonia_Res.onnx",
         export_params=True,
         opset_version=10,
         do_constant_folding=True,
         input_names = ['modelInput'],
         output_names = ['modelOutput'],
         dynamic_axes={'modelInput' : {0 : 'batch_size'},
                                'modelOutput' : {0 : 'batch_size'}}) 
print(" ") 
print('Model has been converted to ONNX')