# Explanation analysis using pneumonia XRay pictures with FoXAI


Double check if you need to reinstall linux libaries related to OpenCV. If errors appears you might consider runing it with ``sudo`` access. 

In [None]:
!apt-get update && apt-get install ffmpeg libsm6 libxext6  -y

Installing most needed libaries 

In [None]:
!pip install scikit-learn foxai tqdm ipywidgets

## Imports
Adding all the most important imports from torch, matplotlib, sklearn, PIL, opencv. All these will be used in later stages of the notebook for either file management, training the models or visualizations. 

In [None]:
import numpy as np
import pandas as pd

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data.sampler import SubsetRandomSampler

import matplotlib.pyplot as plt
import time
import copy
from random import shuffle

import tqdm.notebook as tqdm

import sklearn
from sklearn.metrics import accuracy_score, cohen_kappa_score
from sklearn.metrics import classification_report
from PIL import Image
import cv2

import os
import shutil

## Train Dataset Exploration and Creation

For the purpose of this notebook we create dataset from 3 sources: 
1. [Open dataset of Covid-19 cases with chest X-ray](https://github.com/ieee8023/covid-chestxray-dataset)
2. [Paul Mooney's pneumonia dataset ](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia)
3. [Kaggle's Covid-19 radiography database](https://www.kaggle.com/datasets/tawsifurrahman/covid19-radiography-database)
It is assumed that you download them in this directory. 

#### How to download them?

1. **Open dataset of covid-10 cases with chest X-ray**: this you can download via command: 

```
git clone https://github.com/ieee8023/covid-chestxray-dataset.git
```

2. **Paul Mooney's pneumonia dataset**: this dataset can be downloaded via Kaggle API: 

```
kaggle datasets download -d paultimothymooney/chest-xray-pneumonia
```

3. **Kaggle's Covid-19 radiography database**: this dataset can be downloaded via Kaggle API: 

```
kaggle datasets download -d tawsifurrahman/covid19-radiography-database
```

First let's explore a bit our dataset No 1

In [None]:
df = pd.read_csv('./covid-chestxray-dataset/metadata.csv')
selected_df = df[df.finding=="Pneumonia/Viral/COVID-19"]
selected_df = selected_df[(selected_df.view == "AP") | (selected_df.view == "PA")]
selected_df.head(2)

In [None]:
images = selected_df.filename.values.tolist()

now we want to create our dataset directory. As said before "Our dataset" will be combination of few datasets. 

In [None]:
from pathlib import Path
import shutil

COVID_PATH = './COVID19-DATASET/train/covid19'
NORMAL_PATH = './COVID19-DATASET/train/normal'

covid_train_path = Path(COVID_PATH)
if covid_train_path.exists() and covid_train_path.is_dir(): 
    shutil.rmtree(covid_train_path)
os.makedirs(covid_train_path)

normal_train_path = Path(NORMAL_PATH)
if normal_train_path.exists() and normal_train_path.is_dir(): 
    shutil.rmtree(normal_train_path)
os.makedirs(normal_train_path)

and copy pictures of covid from dataset No 1

In [None]:
for image in images:
    shutil.copy(os.path.join('./covid-chestxray-dataset/images', image), os.path.join(COVID_PATH, image))

**Bonus:** you can add extra pictures (+4300 pictures) of covid from dataset No 2 (from train and test directories), but be aware that they might be of lower labeling quality. 

In [None]:
# for image in os.listdir('chest_xray/train/PNEUMONIA'):
#     shutil.copy(os.path.join('chest_xray/train/PNEUMONIA', image), os.path.join(COVID_PATH, image))
    
# for image in os.listdir('chest_xray/test/PNEUMONIA'):
#     shutil.copy(os.path.join('chest_xray/test/PNEUMONIA', image), os.path.join(COVID_PATH, image))

Add pictures of normal lungs from dataset No 2

In [None]:
for image in os.listdir('chest_xray/train/NORMAL')[:300]:
    shutil.copy(os.path.join('chest_xray/train/NORMAL', image), os.path.join(NORMAL_PATH, image))

Add examples from dataset No 3

In [None]:
COVID_TEST = 'COVID-19_Radiography_Dataset/COVID/images'
NORMAL_TEST = 'COVID-19_Radiography_Dataset/Normal/images'

for image in os.listdir(COVID_TEST)[:300]:
    shutil.copy(os.path.join(COVID_TEST, image), os.path.join('./COVID19-DATASET/train/covid19', image))
for image in os.listdir(NORMAL_TEST)[:300]:
    shutil.copy(os.path.join(NORMAL_TEST, image), os.path.join('./COVID19-DATASET/train/normal', image))

Visualize proportions in train dataset

In [None]:
DATA_PATH = './COVID19-DATASET/train'

In [None]:
class_names = os.listdir(DATA_PATH)
image_count = {}
for i in class_names:
    count = len(os.listdir(os.path.join(DATA_PATH,i)))
    lab = f"normal ({count})"
    if i == "covid19":
        lab = f"pneumonia ({count})"
    image_count[lab] = count

#Plotting Distribution of Each Classes
fig1, ax1 = plt.subplots()
ax1.pie(image_count.values(),
        labels = image_count.keys(),
        shadow=True,
        autopct = '%1.1f%%',
        startangle=90)
plt.show()

Now we want to explore some examples in the dataset. Let's start with pneumonia!

In [None]:
fig = plt.figure(figsize=(16,5))
fig.suptitle("Pneumonia", size=22)
img_paths = os.listdir(COVID_PATH)
shuffle(img_paths)

for i,image in enumerate(img_paths[:4]):
    img = cv2.imread(os.path.join(COVID_PATH, image))
    plt.subplot(1,4, i+1, frameon=False)
    plt.imshow(img)
fig.show()

And now the healthy examples

In [None]:
fig = plt.figure(figsize=(16,5))
fig.suptitle("Pneumonia Negative - Healthy", size=22)
img_paths = os.listdir(NORMAL_PATH)
shuffle(img_paths)

for i,image in enumerate(img_paths[:4]):
    img = cv2.imread(os.path.join(NORMAL_PATH, image))
    plt.subplot(1,4, i+1, frameon=False)
    plt.imshow(img)
fig.show()

### Training of the model

#### Transforms
Here we define transforms for our training set. We will resize images to 150x150, perform random rotation from -10 to 10 degrees, random horizontal flip). For validation set we will use also resizing and center cropping. 

In [None]:
#Statistics Based on ImageNet Data for Normalisation
mean_nums = [0.485, 0.456, 0.406]
std_nums = [0.229, 0.224, 0.225]

data_transforms = {"train":transforms.Compose([
                                transforms.Resize((150,150)), 
                                transforms.RandomRotation(10), 
                                transforms.RandomHorizontalFlip(p=0.4), 
                                transforms.ToTensor(), 
                                transforms.Normalize(mean = mean_nums, std=std_nums)]), 
                    "val": transforms.Compose([
                                transforms.Resize((150,150)),
                                transforms.CenterCrop(150), 
                                transforms.ToTensor(),
                                transforms.Normalize(mean=mean_nums, std = std_nums)
                    ])}

## Train and Validation Data Split


Let's define our train and validation split. 

In [None]:
def load_split_train_val_test(datadir, train_size=0.8):
    train_data = datasets.ImageFolder(datadir,       
                    transform=data_transforms['train']) #Picks up Image Paths from its respective folders and label them
    val_data = datasets.ImageFolder(datadir,
                    transform=data_transforms['val'])
    num_train = len(train_data)
    indices = list(range(num_train))
    train_split = int(np.floor(train_size * num_train))
    np.random.shuffle(indices)
    
    train_idx, val_idx =  indices[:train_split], indices[train_split:]
    
    dataset_size = {"train":len(train_idx), "val": len(val_idx)}
    
    train_sampler = SubsetRandomSampler(train_idx) # Sampler for splitting train and val images
    val_sampler = SubsetRandomSampler(val_idx)
    trainloader = torch.utils.data.DataLoader(train_data,
                   sampler=train_sampler, batch_size=8) # DataLoader provides data from traininng and validation in batches
    
    valloader = torch.utils.data.DataLoader(val_data,
                   sampler=val_sampler, batch_size=8)
    
    return trainloader, valloader, dataset_size

trainloader, valloader, dataset_size = load_split_train_val_test(DATA_PATH, .2)
dataloaders = {"train" :trainloader, "val": valloader}
data_sizes = {x: len(dataloaders[x].sampler) for x in ['train','val']}
class_names = trainloader.dataset.classes
print(class_names)

Lets visualize one training batch

In [None]:
def imshow(inp, size =(30,30), title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = mean_nums
    std = std_nums
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.figure(figsize=size)
    plt.imshow(inp)
    if title is not None:
        plt.title(title, size=30)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

### Model Definition

Here we some further configuration and define model's architecture, optimzer and learning rate scheduler.

In [None]:
torch.cuda.empty_cache()

In [None]:
if torch.cuda.is_available():
    device=torch.device("cuda:0")
else:
    device = torch.device("cpu")

In [None]:
def CNN_Model(pretrained=True):
    model = models.resnet18(pretrained=pretrained) # Returns Defined Densenet model with weights trained on ImageNet
    model.fc = nn.Linear(512, len(class_names)) # Overwrites the Classifier layer with custom defined layer for transfer learning
    model = model.to(device) # Transfer the Model to GPU if available
    return model

model = CNN_Model(pretrained=True)

# specify loss function (categorical cross-entropy loss)
criterion = nn.CrossEntropyLoss() 

# Specify optimizer which performs Gradient Descent
optimizer = optim.Adam(model.parameters(), lr=1e-4)
exp_lr_scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(trainloader)*40) 


### Training

Now lets define train method and run it. 

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = np.inf

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            current_loss = 0.0
            current_corrects = 0
            current_kappa = 0

            for inputs, labels in tqdm.tqdm(dataloaders[phase], desc=phase, leave=False):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # We need to zero the gradients in the Cache.
                optimizer.zero_grad()

                # Time to carry out the forward training poss
                # We only need to log the loss stats if we are in training phase
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                if phase == 'train':
                    scheduler.step()

                # We want variables to hold the loss statistics
                current_loss += loss.item() * inputs.size(0)
                current_corrects += torch.sum(preds == labels.data)
            epoch_loss = current_loss / data_sizes[phase]
            epoch_acc = current_corrects.double() / data_sizes[phase]
            if phase == 'val':
                print('{} Loss: {:.4f} | {} Accuracy: {:.4f}'.format(
                    phase, epoch_loss, phase, epoch_acc))
            else:
                print('{} Loss: {:.4f} | {} Accuracy: {:.4f}'.format(
                    phase, epoch_loss, phase, epoch_acc))

            # EARLY STOPPING
            if phase == 'val' and epoch_loss < best_loss:
                print('Val loss Decreased from {:.4f} to {:.4f} \nSaving Weights... '.format(best_loss, epoch_loss))
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_since = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_since // 60, time_since % 60))
    print('Best val loss: {:.4f}'.format(best_loss))

    # Now we'll load in the best model weights and return it
    model.load_state_dict(best_model_wts)
    return model

In [None]:
base_model = train_model(model, criterion, optimizer, exp_lr_scheduler, num_epochs=20)


### Results analysis

Here we perform analysis of our results. 

First we get results from labels and prediction

In [None]:
y_pred_list = []
y_true_list = []
with torch.no_grad():
    for x_batch, y_batch in tqdm.tqdm(valloader, leave=False):
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        y_test_pred = base_model(x_batch)
        y_test_pred = torch.log_softmax(y_test_pred, dim=1)
        _, y_pred_tag = torch.max(y_test_pred, dim = 1)
        y_pred_list.append(y_pred_tag.cpu().numpy())
        y_true_list.append(y_batch.cpu().numpy())

In [None]:
y_pred_list = [i[0] for i in y_pred_list]
y_true_list = [i[0] for i in y_true_list]

Now we define custom function to visualize confusion matrix.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import itertools
def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    accuracy = np.trace(cm) / float(np.sum(cm))
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

Let's visualize the confusion matrix

In [None]:
from sklearn.metrics import confusion_matrix

cm =  confusion_matrix(y_true_list, y_pred_list)

plot_confusion_matrix(cm = cm, 
                      normalize    = False,
                      target_names = ['pneumonia','normal'],
                      title        = "Confusion Matrix")

**Testset results:**
- VGG19 93% test
- VGG11: 93,5%
- Resnet34: 93%
- Resnet18: 94%
- Resnet50: 94,25%
- Mobilenetv2: 92,25%

## Model Evaluation


### Testset Creation

We aim to create a testset from different examples than the trainset, consisting of examples from dataset No2 and Dataset No 3

In [None]:
covid_test_path = Path('./COVID19-DATASET/test/covid19')
if covid_test_path.exists() and covid_test_path.is_dir():
    shutil.rmtree(covid_test_path)
os.makedirs(covid_test_path)

normal_test_path = Path('./COVID19-DATASET/test/normal')
if normal_test_path.exists() and normal_test_path.is_dir():
    shutil.rmtree(normal_test_path)
os.makedirs('./COVID19-DATASET/test/normal')

In [None]:
!ls COVID-19_Radiography_Dataset/COVID

For the purpose of the testset i take 200 last examples from dataset (before we took only first 300 examples while whole dataset is 1200 examples per class. 

In [None]:
COVID_TEST = 'COVID-19_Radiography_Dataset/COVID/images'
NORMAL_TEST = 'COVID-19_Radiography_Dataset/Normal/images'

for image in os.listdir(COVID_TEST)[-200:]:
    shutil.copy(os.path.join(COVID_TEST, image), os.path.join('./COVID19-DATASET/test/covid19', image))
for image in os.listdir(NORMAL_TEST)[-200:]:
    shutil.copy(os.path.join(NORMAL_TEST, image), os.path.join('./COVID19-DATASET/test/normal', image))

### Loading testdataloader

In [None]:
TEST_DATA_PATH = './COVID19-DATASET/test/'

test_transforms = transforms.Compose([
                                      transforms.Resize((150,150)),
                                      transforms.ToTensor(),
                                      transforms.Normalize(mean=mean_nums, std=std_nums)
])


test_image = datasets.ImageFolder(TEST_DATA_PATH, transform=test_transforms)

testloader = torch.utils.data.DataLoader(test_image, batch_size=1)

### Confusion matrix on the testset

Here same as before for validation set and trainset we firstly calculate the predictions and labels from testset and then visualize the confusion matrix

In [None]:
y_pred_list = []
y_true_list = []
with torch.no_grad():
    for x_batch, y_batch in tqdm.tqdm(testloader, leave=False):
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        y_test_pred = base_model(x_batch)
        y_test_pred = torch.log_softmax(y_test_pred, dim=1)
        _, y_pred_tag = torch.max(y_test_pred, dim = 1)
        y_pred_list.append(y_pred_tag.cpu().numpy())
        y_true_list.append(y_batch.cpu().numpy())

In [None]:
y_pred_list = [i[0] for i in y_pred_list]
y_true_list = [i[0] for i in y_true_list]

In [None]:
from sklearn.metrics import confusion_matrix

cm =  confusion_matrix(y_true_list, y_pred_list)

plot_confusion_matrix(cm = cm, 
                      normalize    = False,
                      target_names = ['pneumonia','normal'],
                      title        = "Confusion Matrix")

In [None]:
torch.save(base_model.state_dict(), './best_model.pth')

### Analyze Explanations

We will analyze here how the model works form perspective of good explanations (where model and label agree) and where it does not 

#### Helpful function

In [None]:
import matplotlib.pyplot as plt

# function to enable displaying matplotlib Figures in notebooks
def show_figure(fig): 
    dummy = plt.figure()
    new_manager = dummy.canvas.manager
    new_manager.canvas.figure = fig
    new_manager.set_window_title("Test")
    fig.set_canvas(new_manager.canvas)
    return dummy

### Foxai imports

In [None]:
from foxai.context_manager import FoXaiExplainer, ExplainerWithParams, CVClassificationExplainers
from foxai.visualizer import mean_channels_visualization

#### Explaner configs

We want to focus on GradCam results here as to our knowledge this method gives the most stable results

In [None]:
categories = ["pneumonia", "normal"]
layer = [module for module in model.modules() if isinstance(module, torch.nn.Conv2d)][-1]
explainer_list = [
    ExplainerWithParams(explainer_name=CVClassificationExplainers.CV_LAYER_GRADCAM_EXPLAINER, layer=layer),
]

In [None]:
%matplotlib inline

#### Check where model agrees

In [None]:
sample_counter = 0
max_samples_explained: int = 40

# iterate over dataloader
for sample_batch in testloader:
    sample_list, label_list = sample_batch
    # iterate over all samples in batch
    for sample, label in zip(sample_list, label_list):
        # add batch size dimension to the data sample
        input_data = sample.reshape(1, sample.shape[0], sample.shape[1], sample.shape[2]).to(device)
        category_name = categories[label.item()]
        with FoXaiExplainer(
            model=model,
            explainers=[ExplainerWithParams(explainer_name=CVClassificationExplainers.CV_LAYER_GRADCAM_EXPLAINER, layer=layer),],
            target=label,
        ) as xai_model:
            # calculate attributes for every explainer
            probs, attributes_dict = xai_model(input_data)
        if categories[torch.argmax(_).detach().cpu()] == category_name:
            for key, value in attributes_dict.items():

                # create figure from attributes and original image           
                figure = mean_channels_visualization(attributions=value[0], transformed_img=sample, title= f"Mean of channels ({key})")
                # save figure to artifact directory
                title = categories[torch.argmax(_).detach().cpu()] + ", (" + category_name +")"
                imshow(sample, (8,8), title=title)
                show_figure(figure)

            sample_counter += 1
        # if we processed desired number of samples break the loop
        if sample_counter > max_samples_explained:
            break

    # if we processed desired number of samples break the loop
    if sample_counter > max_samples_explained:
        break

#### Check where model disagrees

In [None]:
sample_counter = 0
max_samples_explained: int = 50

# iterate over dataloader
for sample_batch in testloader:
    sample_list, label_list = sample_batch
    # iterate over all samples in batch
    for sample, label in zip(sample_list, label_list):
        # add batch size dimension to the data sample
        input_data = sample.reshape(1, sample.shape[0], sample.shape[1], sample.shape[2]).to(device)
        category_name = categories[label.item()]
        with FoXaiExplainer(
            model=model,
            explainers=explainer_list,
            target=label,
        ) as xai_model:
            # calculate attributes for every explainer
            probs, attributes_dict = xai_model(input_data)
        if categories[torch.argmax(_).detach().cpu()] != category_name:
            for key, value in attributes_dict.items():

                # create figure from attributes and original image           
                figure = mean_channels_visualization(attributions=value[0], transformed_img=sample, title= f"Mean of channels ({key})")
                # save figure to artifact directory
                title = categories[torch.argmax(_).detach().cpu()] + ", (" + category_name +")"
                imshow(sample, (8,8), title=title)
                show_figure(figure)

            sample_counter += 1
        # if we processed desired number of samples break the loop
        if sample_counter > max_samples_explained:
            break

    # if we processed desired number of samples break the loop
    if sample_counter > max_samples_explained:
        break

## Congratulations 
You have successfully trained Pneumonia detecting model, and perform some analysis o the model. 