In [None]:
#######################################################################################
#######################################################################################
###                                                                                 ###
### YOU SHOULD COPY THIS SCRIPT INSIDE THE FIRST FOLDER "CHEST_XRAY" WHICH          ###
### YOU WILL GET BY UNZIPPING THE FILE "CHEST-XRAY-PNEUMONIA.ZIP"                   ###
### AVAILABLE FROM https://www.kaggle.com/paultimothymooney/chest-xray-pneumonia    ###
###                                                                                 ###
### IF YOU DO THIS RIGHT, YOU SHOULD HAVE A SECOND "CHEST_XRAY" FOLDER              ###
### ON THE SAME FOLDER THAN WHERE THIS SCRIPT IS                                    ###
###                                                                                 ###
### IF YOU DON'T DO THIS, YOU WILL HAVE TO CHANGE MANUALLY THE PATH                 ###
### FOR LOADING THE IMAGES.                                                         ###
###                                                                                 ###
#######################################################################################
#######################################################################################

In [None]:
### INSERT SPARKDL INTO NOTEBOOK ###
# We create an environment in our operative system to work in
import os
SUBMIT_ARGS = "--packages databricks:spark-deep-learning:1.2.0-spark2.3-s_2.11 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

# We initialize findspark
import findspark
findspark.init()

# And we start our spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Identifying pneumonia images using pyspark").getOrCreate()

In [None]:
### IMPORTING NEEDED PACKAGES TO LOAD IMAGES ###
# ImageSchema to fill the dataframes
from pyspark.ml.image import ImageSchema

# lit to add labels to our images
from pyspark.sql.functions import lit

In [None]:
### LOAD THE TRAIN IMAGES ###
# First, we put the training images in 2 dataframes, one normal and one with pneumonia
normal_df_train = ImageSchema.readImages("chest_xray/train/NORMAL").withColumn("label", lit(0))
pneumonia_df_train = ImageSchema.readImages("chest_xray/train/PNEUMONIA").withColumn("label", lit(1))

# Second, we merge both dataframes into a single one
train_df = normal_df_train.unionAll(pneumonia_df_train)

# And we make a repartition to better manage the PC's memory
train_df = train_df.repartition(100)

In [None]:
### LOAD THE TEST IMAGES ###
# Same as the training images case, but with the test images
# First, we put the test images in 2 dataframes, one normal and one with pneumonia
normal_df_test = ImageSchema.readImages("chest_xray/test/NORMAL").withColumn("label", lit(0))
pneumonia_df_test = ImageSchema.readImages("chest_xray/test/PNEUMONIA").withColumn("label", lit(1))

# Second, we merge both dataframes into a single one
test_df = normal_df_test.unionAll(pneumonia_df_test)

# And we make a repartition to better manage the PC's memory
test_df = test_df.repartition(100)

In [None]:
### TRAINING OUR MODEL ###
# We need to import all the packages that we will use on the training
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

# First, we will extract the features from our images using the "InceptionV3" model
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")

# Now we use a logistic regression algorithm to analyze the features, since It's a pretty simple and fasta algorithm
lr = LogisticRegression(maxIter=10, regParam=0.05, elasticNetParam=0.3, labelCol="label")

# And we define a pipeline model which implements the featurizer model and the analyzer model 
p = Pipeline(stages=[featurizer, lr])

# And lastly we can start our training by using the pipeline that includes both models
p_model = p.fit(train_df)

In [None]:
### EVALUATING OUR MODEL ###
# We will import the evaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# We define the metric that we want to take into account to evaluate our model, which will be "accuracy" in this example
evaluator = MulticlassClassificationEvaluator()

# And we input our test dataframe, to check how good our model is
tested_df = p_model.transform(test_df)

# We want to see some metrics, in order to evaluate it by ourselves
print('Test set accuracy: ', evaluator.evaluate(tested_df,{evaluator.metricName: 'accuracy'}))
print('Test set precision: ', evaluator.evaluate(tested_df,{evaluator.metricName: 'weightedPrecision'}))
print('Test set recall: ', evaluator.evaluate(tested_df,{evaluator.metricName: 'weightedRecall'}))
print('Test set F1-score: ', evaluator.evaluate(tested_df,{evaluator.metricName: 'f1'}))