# PneumonAI
This notebook shares functions used to create and evaluate PneumonAI. This AI is tasked to classify Chest X-Ray in two categories : PNEUMONIA or NORMAL and was trained on the [Chest X-Ray Images (Pneumonia)](https://www.kaggle.com/datasets/paultimothymooney/chest-xray-pneumonia). A secondary dataset has been used for experiments: [NIH Chest X-rays](https://www.kaggle.com/datasets/nih-chest-xrays/data). Both datasets are available on Kaggle.
## Author
PneumonAI was created as Laureline Willem's master's thesis. The project sets to offer an Open Source interpretable tool for pneumonia detection.
## License
The following work is made available under MIT license.

Copyright © 2024, Laureline Willem

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

## Import packages and loading the datasets
Note that the preprocessing has been reworked beforehand.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import os

# Loads and returns child & adult datasets
# Input : 
#   - path : path to primary dataset (pediatric X-rays)
#   - pathsecondary : path to secondary dataset (adult X-rays)
# Output : raw sets in the following order : raining, validation and testing sets.
def load_both_datasets(path, pathsecondary) :
    # Fix sizes
    BATCH_SIZE = 32
    IMG_SIZE = (512, 512)

    # Child dataset
    train = keras.utils.image_dataset_from_directory(path+"/train", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)
    val = keras.utils.image_dataset_from_directory(path+"/val", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)
    test = keras.utils.image_dataset_from_directory(path+"/test", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)
    
    # Adult dataset
    trainadult = keras.utils.image_dataset_from_directory(pathsecondary+"/train", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)
    valadult = keras.utils.image_dataset_from_directory(pathsecondary+"/val", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)
    testadult = keras.utils.image_dataset_from_directory(pathsecondary+"/test", labels='inferred', color_mode = 'rgb', batch_size=BATCH_SIZE, image_size=IMG_SIZE, pad_to_aspect_ratio = True)

    return train, val, test, trainadult, valadult, testadult

# Returns the dataset preprocessed with data augmentation applied on the training set
# Input : train, val and test : unpreprocessed tf.data.Dataset containing respectively the training, validation and testing sets.
# Output : preprocessed sets in the following order : raining, validation and testing sets.
def preprocessing_and_data_augmentation(train, val, test) :
    AUTOTUNE = tf.data.AUTOTUNE
    seed = (1, 2)

    # Preprocess and apply data augmenation to train set
    train_dataset = train.map(lambda img, label: (keras.applications.mobilenet_v2.preprocess_input(img), label), num_parallel_calls = AUTOTUNE)
    train_dataset = train_dataset.map(lambda x, label:(tf.image.stateless_random_contrast(x, 0.2, 0.5, seed), label), num_parallel_calls = AUTOTUNE)
    train_dataset = train_dataset.map(lambda x, label:(tf.image.stateless_random_saturation(x, 0.5, 1.0, seed), label), num_parallel_calls = AUTOTUNE)

    # Preprocess validation and testing
    validation_dataset = val.map(lambda img, label: (keras.applications.mobilenet_v2.preprocess_input(img), label), num_parallel_calls = AUTOTUNE)
    test_dataset = test.map(lambda img, label: (keras.applications.mobilenet_v2.preprocess_input(img), label), num_parallel_calls = AUTOTUNE)

    # Prefectching allows for better performance
    train_dataset = train_dataset.prefetch(buffer_size=AUTOTUNE)
    val_dataset = validation_dataset.prefetch(buffer_size=AUTOTUNE)
    test_dataset = test_dataset.prefetch(buffer_size=AUTOTUNE)

    return train_dataset, val_dataset, test_dataset


In [None]:
train, val, test, trainadult, valadult, testadult = load_both_datasets()
train_dataset, val_dataset, test_dataset = preprocessing_and_data_augmentation(train, val, test)
trainadult_dataset, valadult_dataset, testadult_dataset = preprocessing_and_data_augmentation(trainadult, valadult, testadult

## Model Creation

In [3]:
# Creates and returns a model using transfer learning from the argument base_model.
# Input : base_model, a pretrained model from keras.applications to use as backbone
# Output : the new model with backbone parameters frozen
def create_model(base_model) :
    inputs = base_model.inputs
    x = base_model.outputs[0]
    base_model.trainable=False
    global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
    x = global_average_layer(x)
    x = tf.keras.layers.Dropout(0.2)(x)
    prediction_layer = tf.keras.layers.Dense(1)
    outputs = prediction_layer(x)
    model = tf.keras.Model(inputs, outputs)
    base_learning_rate = 0.0001
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=base_learning_rate),
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=['accuracy'])
    return model

## Model Training

In [4]:
# Trains the model
# Inputs :
#   - model : the model to train
#   - train_dataset : the training set
#   - validation_dataset : the validation set
#   - initial_epochs : the maximal number of epochs
# Output : history of the training
def train_model(model, train_dataset, validation_dataset, initial_epochs = 100) :
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min',patience=3, verbose=False)
    history = model.fit(train_dataset,
                    epochs=initial_epochs,
                    validation_data=validation_dataset, callbacks=[callback], verbose=False)
    return history

## Model Fine-Tuning

In [5]:
# Unfreeze the backbone for fine tuning.
# Input : base_model : the backbone.
def unfreeze_model(base_model) :
    base_model.trainable=True

# Trains the model
# Inputs :
#   - model : the model to fine tune
#   - train_dataset : the training set
#   - validation_dataset : the validation set
#   - initial_epochs : the maximal number of epochs
# Output : history of the training
def fine_tune_model(model, train_dataset, validation_dataset, max_epochs = 100, min_lr=0.000001) :
    unfreeze_model(base_model)
    callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min',patience=10, verbose=False)
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=min_lr)
    history = model.fit(train_dataset,
                    epochs=max_epochs,
                    validation_data=validation_dataset, callbacks=[callback, reduce_lr], verbose=False)
    return history

## GradCam
From [Grad-CAM class activation visualization](https://keras.io/examples/vision/grad_cam/), with a few modifications to fit our needs.

In [None]:
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    # Create a sub-model that outputs the feature maps and final prediction
    grad_model = tf.keras.models.Model(
        model.inputs, [model.get_layer(last_conv_layer_name).output, model.output]
    )

    # Use GradientTape to record gradients
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)

        # If pred_index is not specified, use the predicted class index
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    # Calculate gradients
    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))

    # Compute the heatmap
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)

    # Normalise the heatmap
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)

    return heatmap.numpy()

# Display
from IPython.display import Image, display
import matplotlib as mpl
def display_gradcam(img, heatmap, cam_path="cam.jpg", alpha=0.001):

    # Rescale heatmap to a range 0-255
    heatmap = np.uint8(255 * heatmap)

    # Use jet colormap to colorize heatmap
    jet = mpl.colormaps["jet"]

    # Use RGB values of the colormap
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]

    # Create an image with RGB colorized heatmap
    jet_heatmap = keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.utils.img_to_array(jet_heatmap)

    # Superimpose the heatmap on original image
    superimposed_img = img + jet_heatmap * alpha 
    superimposed_img = keras.utils.array_to_img(superimposed_img)

    # Save the superimposed image
    superimposed_img.save(cam_path)

    # Display Grad CAM
    display(Image(cam_path))

### Extract Bounding Boxes
We accept having multiple boxes, as pneumonias can be multifocal (i.e. be visible in multiple unrelated areas of the chest). The algorithm is fairly naive and clusters together neighboring positive pixels.

In [None]:
# Extract Bounding Boxes from the GradCAM Heatmap
# Inputs :
#   - heatmap : GradCAM Heatmap
# Output : a set containing coordinates from the top left and bottom right corners of the bounding boxe(s)
def ExtractBB(heatmap, threshold=0.5) :
    # Binarize the set
    binarization = 1*(heatmap>=threshold)
    positive = set()
    for i in range(len(binarization)) :
        for j in range(len(binarization[i])) :
            if binarization[i][j] == 1 :
                positive.add((i, j))
    # cluster zones together
    clusters = set()
    while len(positive)>0 :
        cluster = set()
        # Get a coordinate
        coord = list(positive)[0]
        # Add the initial pixel to the new cluster
        cluster.add(coord)
        # Remove it from positive to avoid treating it twice
        positive.remove(coord)
        # Start Growing the region
        get_next_to(heatmap, positive, coord, cluster)
        # Create the bounding box containing all the needed pixels
        mini = len(heatmap)
        minj = len(heatmap[0])
        maxi = 0
        maxj = 0
        for elem in cluster :
            mini = min(mini, elem[0])
            minj = min(minj, elem[1])
            maxi = max(maxi, elem[0])
            maxj = max(maxj, elem[1])
        # resize bounding box from heatmap size (16x16) to image size (512x512)
        clusters.append([mini*32, minj*32, maxi*32+32, maxj*32+32])
        # add the cluster to the set
        clusters.add((mini, minj, maxi, maxj))
    return clusters

# Checks Neighbooring pixels recursively until the cluster is bordered by negative pixels. Updated positive and cluster in the process.
# Inputs :
#   - heatmap : GradCAM Heatmap
#   - positive : set of unchecked coordinates of positive pixels
#   - coord : Coordinates whose neightboor must be checked
#   - cluster : Set containing coordinates of pixels in the cluster
# Output : /
def get_next_to(heatmap, positive, coord, cluster) :
    if coord[0] > 0 :
        newcoord = (coord[0]-1, coord[1])
        update(heatmap, positive, newcoord, cluster)
        if coord[1] > 0 :
            newcoord = (coord[0]-1, coord[1]-1)
            update(heatmap, positive, newcoord, cluster)
        if coord[1] < len(heatmap[coord[0]])-1 :
            newcoord = (coord[0]-1, coord[1]+1)
            update(heatmap, positive, newcoord, cluster)
    if coord[0] < len(heatmap)-1 :
        newcoord = (coord[0]+1, coord[1])
        update(heatmap, positive, newcoord, cluster)
        if coord[1] > 0 :
            newcoord = (coord[0]+1, coord[1]-1)
            update(heatmap, positive, newcoord, cluster)
        if coord[1] < len(heatmap[coord[0]])-1 :
            newcoord = (coord[0]+1, coord[1]+1)
            update(heatmap, positive, newcoord, cluster)
    if coord[1] > 0 :
        newcoord = (coord[0], coord[1]-1)
        update(heatmap, positive, newcoord, cluster)
    if coord[1] < len(heatmap[coord[0]])-1 :
        newcoord = (coord[0], coord[1]+1)
        update(heatmap, positive, newcoord, cluster)

# Updates positive and clusters if newcoords contains the coordinates of a positive pixel.
# Inputs :
#   - heatmap : GradCAM Heatmap.
#   - positive : set of unchecked coordinates of positive pixels.
#   - coord : Coordinates whose neightboor must be checked.
#   - cluster : Set containing coordinates of pixels in the cluster.
# Output : /
def update(heatmap, positive, newcoord, cluster) :
    if newcoord in positive :
        # Add positive neighboor to the cluster
        cluster.add(newcoord)
        # Remove it from positive to avoid treating it twice
        positive.remove(newcoord)
        # Check its own neighboor
        get_next_to(heatmap, positive, newcoord, cluster)

## Compute IoU (Intersection over Union)
From [Intersection over Union (IoU) for object detection](https://pyimagesearch.com/2016/11/07/intersection-over-union-iou-for-object-detection/), adapted for multiple bounding boxes.

In [None]:
# Computes IoU between boxesA and boxesB.
# Inputs :
#   - boxesA : first set of boxes.
#   - boxesB : second set of boxes.
# Output : IoU between boxesA and boxesB.
def bb_intersection_over_union(boxesA, boxesB):
    interArea = 0
    boxesAArea = 0
    boxesBArea = 0
    for boxA in boxesA :
        boxesAArea += (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
        for boxB in boxesB :
            # determine the (x, y)-coordinates of the intersection rectangle
            xA = max(boxA[0], boxB[0])
            yA = max(boxA[1], boxB[1])
            xB = min(boxA[2], boxB[2])
            yB = min(boxA[3], boxB[3])
            # compute the area of intersection rectangle
            interArea += max(0, xB - xA + 1) * max(0, yB - yA + 1)
            # compute the area of both the prediction and ground-truth
            # rectangles
    for boxB in boxesB :
        boxesBArea += (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    # compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = interArea / float(boxesAArea + boxesBArea - interArea)
    # return the intersection over union value
    return iou

IoU computation between annotations and Grad-CAM result

In [None]:
import pandas as pd
import numpy as np
from tf_image.core.bboxes.resize import resize
# Computes IoU between given annotations and Grad-CAM results for all images given at path.
# Inputs :
#   - path : path to the csv containing annotations.
#   - threshold : threshold for gradcam heatmap binarisation.
# Output : an array with an entry for each image reporting IoU between the annotations and bounding box extracted with Grad-CAM.
def IoUOverAnnotation(path,threshold=0.5) :
    df = pd.read_csv(path)
    IoU = []
    # Iterate on all the patient analysed
    while df.shape[0] > 0 :
        imgname = df.iloc[0]["image_name"]
        # Get all boxes for this image
        allimgbox = df.loc[df['image_name'] == imgname]
        df = df.loc[df['image_name'] != imgname]
        img = tf.image.decode_image(tf.io.read_file('chest_xray/test/PNEUMONIA/' + imgname), channels=3)
        boxes = []
        # format boxes
        while allimgbox.shape[0] > 0  :
            bbox = allimgbox.iloc[0]
            allimgbox = allimgbox.loc[allimgbox['bbox_y'] != bbox['bbox_y']]
            miny = bbox["bbox_y"]
            minx = bbox["bbox_x"]
            maxy = bbox["bbox_y"] + bbox["bbox_width"]
            maxx = bbox["bbox_x"] + bbox["bbox_height"]
            boxes.append([miny, minx, maxy, maxx])
        # Apply equivalent preprocessing to annotated boxes
        boxes = tf.constant(boxes)
        img, boxes = resize(img, boxes, 512, 512, keep_aspect_ratio=True, random_method=False)
        img = tf.cast(img, tf.float32)
        img = keras.applications.mobilenet_v2.preprocess_input(img)
        boxes = boxes.numpy()
        boxesInOrder = []
        # tf_image declares y before x, while we to x before y, so we switch the elements.
        for i in boxes :
            boxesInOrder.append([i[1], i[0], i[3], i[2]])
        boxes = boxesInOrder
        # Compute Grad-CAM heatmap and extract bounding boxes
        heatmap = make_gradcam_heatmap(np.expand_dims(img, axis=0), model, "Conv_1")
        gradcamBB = ExtractBB(heatmap,threshold=threshold)
        # Compute IoU and append it to all computed IoU
        IoU.append(bb_intersection_over_union(boxes, gradcamBB))
    return IoU

IoU between annotations

In [None]:
import pandas as pd
import numpy as np
# Computes IoU between given annotations for all common images between path1 and path2.
# Inputs :
#   - path1 : path to the first csv containing annotations.
#   - path2 : path to the second csv containing annotations.
# Output : an array with an entry for each image reporting IoU between the annotations.
def IoUBetweenAnnotation(path1, path2) :
    df1= pd.read_csv(path1)
    df2= pd.read_csv(path2)
    IoU = []
    # Iterate on all the patient analysed
    while df1.shape[0] > 0 :
        imgname = df1.iloc[0]["image_name"]
        # Get all boxes for this image
        allimgbox1 = df1.loc[df1['image_name'] == imgname]
        allimgbox2 = df2.loc[df2['image_name'] == imgname]
        df1 = df1.loc[df1['image_name'] != imgname]
        df2 = df2.loc[df2['image_name'] != imgname]
        if allimgbox2.shape[0] > 0 :
            boxes1 = []
            # format boxes
            while allimgbox1.shape[0] > 0  :
                bbox1 = allimgbox1.iloc[0]
                allimgbox1 = allimgbox1.loc[allimgbox1['bbox_y'] != bbox1['bbox_y']]
                miny1 = bbox1["bbox_y"]
                minx1 = bbox1["bbox_x"]
                maxy1 = bbox1["bbox_y"] + bbox1["bbox_width"]
                maxx1 = bbox1["bbox_x"] + bbox1["bbox_height"]
                boxes1.append([miny1, minx1, maxy1, maxx1])
            boxes2 = []
            while allimgbox2.shape[0] > 0  :
                bbox2 = allimgbox2.iloc[0]
                allimgbox2 = allimgbox2.loc[allimgbox2['bbox_y'] != bbox2['bbox_y']]
                miny2 = bbox2["bbox_y"]
                minx2 = bbox2["bbox_x"]
                maxy2 = bbox2["bbox_y"] + bbox2["bbox_width"]
                maxx2 = bbox2["bbox_x"] + bbox2["bbox_height"]
                boxes2.append([miny2, minx2, maxy2, maxx2])
            boxes1 = tf.constant(boxes1)
            boxes2 = tf.constant(boxes2)
            boxes1 = boxes1.numpy()
            boxes2 = boxes2.numpy()
            boxesInOrder1 = []
            # tf_image declares y before x, while we to x before y, so we switch the elements.
            for i in boxes1 :
                boxesInOrder1.append([i[1], i[0], i[3], i[2]])
            boxesInOrder2 = []
            # tf_image declares y before x, while we to x before y, so we switch the elements.
            for i in boxes2 :
                boxesInOrder2.append([i[1], i[0], i[3], i[2]])
            boxes1 = boxesInOrder1
            boxes2 = boxesInOrder2
            # Compute IoU and append it to all computed IoU
            IoU.append(bb_intersection_over_union(boxes1, boxes2))
    return IoU