# Notebook to train a model to diagnose thoracic pathology from chest X-rays

The purpose of this Jupyter notebook is to demonstrate how we can build a AI-based Radiologist system using Apache Spark and Analytics Zoo to detect pneumonia and other diseases from chest x-ray images. The X-rays are made available by the United States’ National Institutes of Health (NIH). The dataset contains over 120,000 images of frontal chest x-rays, each potentially labeled with one or more of fourteen different thoracic pathologies. We show how to build a multi-label image classification model in a distributed Apache Spark infrastructure, and demonstrate how to build complex image transformations and deep learning pipelines using Analytics Zoo with scalability and ease of use.

For instructions on prerequisites for this notebook, refer to the GitHub readme.

## Import the required packages
The following modules are for this notebook.

In [15]:
import warnings
# Ignoring the warnings to improve readability of the notebook
warnings.filterwarnings("ignore", message="numpy.dtype size changed")

import random
import time
import numpy as np
from math import ceil
#from bigdl.dllib.optim.optimizer import SGD, SequentialSchedule, Warmup, Poly,\Plateau, EveryEpoch, 
#TrainSummary,\ValidationSummary, SeveralIteration, Step
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.storagelevel import StorageLevel
from bigdl.dllib.nncontext import *
from bigdl.dllib.feature.image.imagePreprocessing import *
from bigdl.dllib.feature.common import ChainedPreprocessing
from bigdl.dllib.keras.layers import Input, Flatten, Dense, GlobalAveragePooling2D, Dropout
from bigdl.dllib.keras.metrics import AUC
from bigdl.dllib.keras.optimizers import Adam
from bigdl.dllib.keras.models import Model
from bigdl.dllib.net.net_load import Net
from bigdl.dllib.nnframes import NNEstimator, NNImageReader
from bigdl.dllib.keras.objectives import BinaryCrossEntropy
from pyspark.sql.types import StringType, ArrayType
import matplotlib.pyplot as plt

## Transfer learning and loading pre-trained models
We use transfer learning for training the model. In the following cell, we show how to load a pre-trained Inception, ResNet-50, VGG, and a DenseNet model. These models are pre-trained with ImageNet dataset and are available [here](https://analytics-zoo.github.io/0.4.0/#ProgrammingGuide/image-classification/). Only one of the models is used in the actual training. You can switch between the different models below by calling the appropritate function to  see how they perform.

*get_resent_model* function below is used to load an __ResNet-50__ Model. The function accepts two parameters:
- *model_path* - This is the path in your HDFS where the model pretrained model is located
- *label_length* - This is the number of labels for a given task. For this exercise, the Xrays can have 14 diseases. *label-length* is always 14.

The function does the following:
-  *Net.load_bigdl()* - loads a BigDL model. _Net_ package can be used to load models from other frameworks like Caffe, Torch and TensorFlow. This returns a _Model_.
- *new_graph()* removes layers after "pool5"
- *Input()* creates a new layer for the Xray images. The images are resized to 224x224 and have three channels
- The input layer is added to the model using *to_keras*
- We then flatten the neural network, add dropout and apply regularization

    

In [16]:
from bigdl.dllib.keras.layers import *

# Function to load a ResNet50 model
def build_model(label_length):
    model = Sequential()
    model.add(Conv2D(32, 3, 3, input_shape=(3, 224, 224)))
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=[2, 2]))

    model.add(Conv2D(32, 3, 3))
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=[2, 2]))

    model.add(Conv2D(64, 3, 3))
    model.add(Activation("relu"))
    model.add(MaxPooling2D(pool_size=[2, 2]))

    model.add(Flatten())
    model.add(Dense(64))
    model.add(Activation("relu"))
    model.add(Dropout(0.5))
    model.add(Dense(label_length, activation="sigmoid"))
    #model.add(Activation("sigmoid"))

    return model


## Calculate the AUC-ROC for a disease

The following function calculates the ROC for disease *k*. We use ML Pipeline *BinaryClassificationEvaluator* for this.

In [17]:
def get_auc_for_kth_class(k, df, label_col="label", prediction_col="prediction"):
    get_Kth = udf(lambda a: a[k], DoubleType())
    extracted_df = df.withColumn("kth_label", get_Kth(col(label_col))) \
        .withColumn("kth_prediction", get_Kth(col(prediction_col))) \
        .select('kth_label', 'kth_prediction')
    roc_score = BinaryClassificationEvaluator(rawPredictionCol='kth_prediction',
                                              labelCol='kth_label', metricName="areaUnderROC").evaluate(extracted_df)
    return roc_score

## Evaluating the model and plot AUC 

In [18]:
def evaluate_and_plot(testDF):
    predictionDF = nnModel.transform(testDF).persist(storageLevel=StorageLevel.DISK_ONLY)
    label_texts= ["Atelectasis", "Cardiomegaly", "Effusion", "Infiltration", "Mass", "Nodule", "Pneumonia",
                   "Pneumothorax", "Consolidation", "Edema", "Emphysema", "Fibrosis", "Pleural_Thickening", "Hernia"]
    label_map = {k: v for v, k in enumerate(label_texts)}
    chexnet_order = ["Atelectasis", "Cardiomegaly", "Effusion", "Infiltration", "Mass", "Nodule", "Pneumonia", "Pneumothorax", "Consolidation",
     "Edema", "Emphysema", "Fibrosis", "Pleural_Thickening", "Hernia"]
    total_auc = 0.0
    roc_auc_label =dict()
    for i in chexnet_order:
        roc_score = get_auc_for_kth_class(label_map[i], predictionDF)
        total_auc += roc_score
        print('{:>12} {:>25} {:>5} {:<20}'.format('ROC score for ', i, ' is: ', roc_score))
        roc_auc_label[i]=(roc_score)
    print("Average AUC: ", total_auc / float(label_length))

## Main program


In [19]:
random.seed(1234)
batch_size = 12 #1024 
num_epoch = 15
# 
#    model_path - Path for the pre-trained model file, data and the location to save the model after training. 
#                 The model path must match the function you are calling (ResNet-50, VGG or DenseNet)
#    image_path - Path to all images
#    label_path - Path to the label file (Data_Entry_2017.csv) available from NIH
#    save_path = Path to save the model and intermediate results 
image_path = "/opt/application/data/output"
label_path = "/opt/application/data"
model_path = "/opt/application/data/model" 

# Get Spark Context
sparkConf = create_spark_conf().setAppName("Chest X-ray Training")
sc = init_nncontext(sparkConf)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Make sure the batchsize is a multiple of (Number of executors * Number of cores)
numexecutors = len(sc._jsc.sc().statusTracker().getExecutorInfos()) - 1
numcores = int(sc.getConf().get('spark.executor.cores','1'))

print("Number of Executors = " +str(numexecutors))
print("Number of Cores = " + str(numcores))
print("Batch Size = " + str(batch_size))
#if batch_size%(numexecutors*numcores)==0:
#    print("Batchsize is a multiple of (Number of Executors * Number of cores. Good to proceed")
#else:
#    print("Batchsize is NOT a multiple of (Number of Executors * Number of cores). Do not proceed !")

2022-04-08 10:13:33,945 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-04-08 10:13:33,946 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-04-08 10:13:33,948 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-04-08 10:13:33,948 Thread-4 WARN The bufferSize is set to 4000 but bufferedIo is false: false
22-04-08 10:13:33 [Thread-4] INFO  Engine$:121 - Auto detect executor number and executor cores number
22-04-08 10:13:33 [Thread-4] INFO  Engine$:123 - Executor number is 1 and executor cores number is 6
22-04-08 10:13:33 [Thread-4] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 15
2022-04-08 10:13:33 WARN  SparkContext:69 - Using an existing SparkContext; some configuration may not take effect.
22-04-08 10:13:33 [Thread-4] INFO  Engine$:446 - Find existing spark context. Checking the spark conf...
Number of Executors = 0
Number of Cores = 1
Batch Size = 12


## Load the data
We then load the dataset. NIH has __[released](https://www.nih.gov/news-events/news-releases/nih-clinical-center-provides-one-largest-publicly-available-chest-x-ray-datasets-scientific-community)__ the chest xray has two sets (training and test). We have created a [notebook](ConvertXray-ConvertImages.ipynb) to read the Xray images from NIH an save them as training and test datasets (in two folders /trainingDF and /testDF). In the below code, we read these dataframes and combine them to a single Spark dataframe. We then sploy them to the actual training and validation Dataframes for our model. We use *ramdomSplit* to split the data.



In [20]:
label_length = 14
label_texts = ["Atelectasis", "Cardiomegaly", "Effusion", "Infiltration", "Mass", "Nodule", "Pneumonia", "Pneumothorax",
               "Consolidation", "Edema", "Emphysema", "Fibrosis", "Pleural_Thickening", "Hernia"]
label_map = {k: v for v, k in enumerate(label_texts)}

def text_to_label(text):
    arr = [0.0] * len(label_texts)
    for l in text.split("|"):
        if l != "No Finding":
            arr[label_map[l]] = 1.0
    return arr

getLabel = udf(lambda x: text_to_label(x), ArrayType(DoubleType()))
getName = udf(lambda row: os.path.basename(row[0]), StringType())
imageDF = NNImageReader.readImages(image_path, sc, resizeH=256, resizeW=256, image_codec=1) \
    .withColumn("Image_Index", getName(col('image')))

labelDF = spark.read.load(label_path + "/Data_Entry_2017_v2020.csv", format="csv", sep=",", inferSchema="true", header="true") \
        .select("Image_Index", "Finding_Labels") \
        .withColumn("label", getLabel(col('Finding_Labels'))) \
        #.withColumnRenamed('Image Index', 'Image_Index')

totalDF = imageDF.join(labelDF, on="Image_Index", how="inner")
#.withColumnRenamed("Finding Labels", "Finding_Labels")

#(trainingDF, validationDF) = totalDF.randomSplit([0.8, 0.2])
trainingDF=totalDF
validationDF=totalDF
print("Number of training images: ", trainingDF.count())
print("Number of validation images: ", validationDF.count())

Number of training images:  12
Number of validation images:  12


## Load the pre-trained model and optimiser
We first load the pre-trained model. We use ResNet in the below example. It can be changed to any of the above defined models. We then load the optimiser

In [21]:
# Load the pretrained model
#xray_model = get_resnet_model(model_path, label_length)
xray_model = build_model(label_length)


creating: createZooKerasSequential
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasMaxPooling2D
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasMaxPooling2D
creating: createZooKerasConvolution2D
creating: createZooKerasActivation
creating: createZooKerasMaxPooling2D
creating: createZooKerasFlatten
creating: createZooKerasDense
creating: createZooKerasActivation
creating: createZooKerasDropout
creating: createZooKerasDense


## Image pre-processing

We build *ChainedPreprocessing* to combine the following preprocessing.
- *RowToImageFeature* - converts a Spark row to a BigDL ImageFeature
- *ImageCenterCrop* - resizes the image to 224 x 224
- *ImageHFlip* - Randomly flips 50% of the image horizontally
- *ImageBrightness* - Randomly adjust the brigthness of 50% of the images
- *ImageChannelNormalize* - Normalize the images by subtracting the mean value of the ImageNet images

In [22]:
transformer = ChainedPreprocessing(
            [RowToImageFeature(), ImageCenterCrop(224, 224), ImageRandomPreprocessing(ImageHFlip(), 0.5),
             ImageRandomPreprocessing(ImageBrightness(0.0, 32.0), 0.5),
             ImageChannelNormalize(123.68, 116.779, 103.939), ImageMatToTensor(), ImageFeatureToTensor()])

creating: createRowToImageFeature
creating: createImageCenterCrop
creating: createImageHFlip
creating: createImageRandomPreprocessing
creating: createImageBrightness
creating: createImageRandomPreprocessing
creating: createImageChannelNormalize
creating: createImageMatToTensor
creating: createImageFeatureToTensor
creating: createChainedPreprocessing


### Define the Classifier

In [23]:
classifier = NNEstimator(xray_model, BinaryCrossEntropy(), transformer) \
            .setBatchSize(batch_size) \
            .setMaxEpoch(num_epoch) \
            .setFeaturesCol("image") \
            .setCachingSample(False) \
            .setValidation(EveryEpoch(), validationDF, [AUC()], batch_size).setOptimMethod(Adam())


creating: createZooKerasBinaryCrossEntropy
creating: createSeqToTensor
creating: createFeatureLabelPreprocessing
creating: createNNEstimator
creating: createEveryEpoch
creating: createAUC
creating: createAdam


### Train the model

In [24]:
%%time
nnModel = classifier.fit(trainingDF)

22-04-08 10:14:11 [Thread-4] INFO  InternalDistriOptimizer$:987 - Sequential[c6293aa0] isTorch is false
22-04-08 10:14:11 [Thread-4] INFO  DistriOptimizer$:826 - caching training rdd ...
22-04-08 10:14:12 [Thread-4] INFO  DistriOptimizer$:652 - Cache thread models...
22-04-08 10:14:12 [Executor task launch worker for task 0.0 in stage 145.0 (TID 97)] WARN  DistriOptimizer$:609 - Partitions of the training data are not evenlydistributed across the executors in the Spark cluster; are there sufficient trainingdata to be distributed?
22-04-08 10:14:12 [Executor task launch worker for task 0.0 in stage 145.0 (TID 97)] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 768
22-04-08 10:14:12 [Executor task launch worker for task 0.0 in stage 145.0 (TID 97)] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 768
22-04-08 10:14:12 [Executor task launch worker for task 0.0 in stage 145.0 (TID 97)] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 768
22-04-08 10:14:12 [Executor task la

                                                                                

22-04-08 10:14:13 [Thread-4] INFO  DistriOptimizer$:433 - [Epoch 1 12/12][Iteration 1][Wall Clock 0.7064477s] Trained 12.0 records in 0.7064477 seconds. Throughput is 16.986395 records/second. Loss is 8.839904. 
22-04-08 10:14:13 [Thread-4] INFO  DistriOptimizer$:475 - [Epoch 1 12/12][Iteration 1][Wall Clock 0.7064477s] Epoch finished. Wall clock time is 708.4581 ms
22-04-08 10:14:13 [Thread-4] INFO  DistriOptimizer$:433 - [Epoch 2 12/12][Iteration 2][Wall Clock 1.2465411s] Trained 12.0 records in 0.538083 seconds. Throughput is 22.301392 records/second. Loss is 6.2162. 
22-04-08 10:14:13 [Thread-4] INFO  DistriOptimizer$:475 - [Epoch 2 12/12][Iteration 2][Wall Clock 1.2465411s] Epoch finished. Wall clock time is 1249.7118 ms
22-04-08 10:14:13 [Thread-4] INFO  DistriOptimizer$:112 - [Epoch 2 12/12][Iteration 2][Wall Clock 1.2465411s] Validate model...
22-04-08 10:14:14 [Thread-4] INFO  DistriOptimizer$:178 - [Epoch 2 12/12][Iteration 2][Wall Clock 1.2465411s] validate model throughput 

                                                                                

22-04-08 10:14:16 [Thread-4] INFO  DistriOptimizer$:433 - [Epoch 5 12/12][Iteration 5][Wall Clock 3.4271841s] Trained 12.0 records in 0.7187224 seconds. Throughput is 16.696293 records/second. Loss is 5.3179245. 
22-04-08 10:14:16 [Thread-4] INFO  DistriOptimizer$:475 - [Epoch 5 12/12][Iteration 5][Wall Clock 3.4271841s] Epoch finished. Wall clock time is 3542.6618 ms
22-04-08 10:14:16 [Thread-4] INFO  DistriOptimizer$:112 - [Epoch 5 12/12][Iteration 5][Wall Clock 3.4271841s] Validate model...
22-04-08 10:14:16 [Thread-4] INFO  DistriOptimizer$:178 - [Epoch 5 12/12][Iteration 5][Wall Clock 3.4271841s] validate model throughput is 104.78263 records/second
22-04-08 10:14:16 [Thread-4] INFO  DistriOptimizer$:181 - [Epoch 5 12/12][Iteration 5][Wall Clock 3.4271841s] AucScore is (Average score: 0.82142866, count: 12)
score for each class is:
0.99999994 
0.5000001 
0.5000004 
0.5000004 
0.99999994 
0.99999994 
0.99999994 
0.99999994 
0.99999994 
0.99999994 
0.5000004 
0.99999994 
0.99999994 

### Evaluate the model and plot AUC accuracy for Validation Data

In [25]:
print("Evaluating the model on validation data:")
evaluate_and_plot(validationDF)

Evaluating the model on validation data:
22-04-08 10:14:34 [Thread-4] INFO  NNModel:730 - Batch per thread: 2; Total number of cores: 6; Global batch size: 12
22-04-08 10:14:35 [Executor task launch worker for task 0.0 in stage 257.0 (TID 149)] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 768


                                                                                

ROC score for                Atelectasis  is:  0.0                 
ROC score for               Cardiomegaly  is:  0.5                 
ROC score for                   Effusion  is:  0.5                 
ROC score for               Infiltration  is:  0.5                 
ROC score for                       Mass  is:  0.0                 
ROC score for                     Nodule  is:  0.0                 
ROC score for                  Pneumonia  is:  0.0                 
ROC score for               Pneumothorax  is:  0.0                 
ROC score for              Consolidation  is:  0.0                 
ROC score for                      Edema  is:  0.0                 
ROC score for                  Emphysema  is:  0.5                 
ROC score for                   Fibrosis  is:  0.0                 
ROC score for         Pleural_Thickening  is:  0.0                 
ROC score for                     Hernia  is:  0.5                 
Average AUC:  0.17857142857142858


### Save the model for inference

In [27]:
save_path = model_path + '/xray_model_classif'
nnModel.model.saveModel(save_path + ".bigdl", save_path + ".bin", True)
print('Model saved at: ', save_path)

Model saved at:  /opt/application/data/model/xray_model_classif
