In [1]:
import io
from io import StringIO
import os
import sys
import cv2
from pyspark import SparkContext
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import Row, SparkSession 
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType
import pyspark.sql.functions as F
# for visualization
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline 

In [2]:
# import spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
data_path = "./coronahack-chest-xraydataset/Chest_xray_Corona_Metadata.csv"
meta_data = spark.read.format('csv').options(header='true', inferschema='true').load(data_path)

In [4]:
meta_data.show(5)

+---+-----------------+------+------------+----------------------+----------------------+
|_c0| X_ray_image_name| Label|Dataset_type|Label_2_Virus_category|Label_1_Virus_category|
+---+-----------------+------+------------+----------------------+----------------------+
|  0|IM-0128-0001.jpeg|Normal|       TRAIN|                  null|                  null|
|  1|IM-0127-0001.jpeg|Normal|       TRAIN|                  null|                  null|
|  2|IM-0125-0001.jpeg|Normal|       TRAIN|                  null|                  null|
|  3|IM-0122-0001.jpeg|Normal|       TRAIN|                  null|                  null|
|  4|IM-0119-0001.jpeg|Normal|       TRAIN|                  null|                  null|
+---+-----------------+------+------------+----------------------+----------------------+
only showing top 5 rows



In [5]:
meta_data_with_labels = meta_data.withColumn(
    'new_label',
    F.when((F.col("Label")  =="Normal"), 0).otherwise(1)
)

In [7]:
number_of_samples = meta_data_with_labels.count()
train_set = meta_data_with_labels.where(meta_data.Dataset_type == "TRAIN")
test_set = meta_data_with_labels.where(meta_data.Dataset_type == "TEST")
num_train = train_set.count()
num_test = test_set.count()

print(f"Number of samples: ",number_of_samples )
print(f"Train samples: ", num_train)
print(f"Test samples: ",num_test)

Number of samples:  5910
Train samples:  5286
Test samples:  624


In [8]:
train_path = os.getcwd() +'/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/train/'
@udf(returnType=StringType())
def get_absolute_path(img_file):
    abs_path = train_path+str(img_file)
    return abs_path
train = train_set.withColumn("image_path", get_absolute_path(col("X_ray_image_name")))

test_path = os.getcwd() +'/coronahack-chest-xraydataset/Coronahack-Chest-XRay-Dataset/Coronahack-Chest-XRay-Dataset/test/'
@udf(returnType=StringType())
def get_test_absolute_path(img_file):
    abs_path = test_path+str(img_file)
    return abs_path
test = test_set.withColumn("image_path", get_test_absolute_path(col("X_ray_image_name")))

In [9]:
select_train = train.select("image_path", "new_label")
select_test = test.select("image_path", "new_label")
# select_train.show(4)
# select_test.show(4)
print(select_train.count())
print(select_test.count())


5286
624


# Convert images to flattened descriptors

In [10]:
udf_image = udf(lambda img: get_desc(img), ArrayType(FloatType()))

def fetch_descriptor(img):
    
    image = cv2.imread(img)
    image = cv2.resize(image, (50, 50))

    if image is None:
        image = []
    else:
        descriptor = image.flatten().tolist()
    return descriptor

udf_image = udf(fetch_descriptor, ArrayType(IntegerType()))

udf_image = udf(fetch_descriptor, ArrayType(IntegerType()))

# TODO need to do for test data as well
# change afterwards
train_descriptor = select_train.withColumn("descriptors", udf_image("image_path"))
train_descriptor = train_descriptor.filter(train_descriptor.descriptors. isNotNull())


test_descriptor = select_test.withColumn("descriptors", udf_image("image_path"))
test_descriptor = test_descriptor.filter(test_descriptor.descriptors. isNotNull())



train_descriptor.show(4)
test_descriptor.show(4)
print(train_descriptor.count())
print(test_descriptor.count())



+--------------------+---------+--------------------+
|          image_path|new_label|         descriptors|
+--------------------+---------+--------------------+
|/Users/mma525/Doc...|        0|[0, 0, 0, 14, 14,...|
|/Users/mma525/Doc...|        0|[61, 61, 61, 89, ...|
|/Users/mma525/Doc...|        0|[79, 79, 79, 67, ...|
|/Users/mma525/Doc...|        0|[46, 46, 46, 57, ...|
+--------------------+---------+--------------------+
only showing top 4 rows

+--------------------+---------+--------------------+
|          image_path|new_label|         descriptors|
+--------------------+---------+--------------------+
|/Users/mma525/Doc...|        0|[0, 0, 0, 0, 0, 0...|
|/Users/mma525/Doc...|        0|[25, 25, 25, 24, ...|
|/Users/mma525/Doc...|        0|[18, 18, 18, 23, ...|
|/Users/mma525/Doc...|        0|[17, 17, 17, 15, ...|
+--------------------+---------+--------------------+
only showing top 4 rows

5286
624


In [11]:
from pyspark.sql.functions import size

train_filtered = train_descriptor.filter(size('descriptors')==7500)
test_filtered = test_descriptor.filter(size('descriptors')==7500)

print("Total train examples: ",train_filtered.count())
print("Total test examples: ",test_filtered.count())

Total train examples:  5286
Total test examples:  624


In [12]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

train_with_vec = train_filtered.withColumn("vec_descriptors", list_to_vector_udf("descriptors"))
test_with_vec = test_filtered.withColumn("vec_descriptors", list_to_vector_udf("descriptors"))

print("Total train examples: ",train_with_vec.count())
print("Total test examples: ",test_with_vec.count())


Total train examples:  5286
Total test examples:  624


Split the training into 90/10 into train and validation data

In [13]:
train, val = train_with_vec.randomSplit([0.9, 0.1])
train.show()
val.show()
print("Total train split: ",train.count())
print("Total validation split: ",val.count())

+--------------------+---------+--------------------+--------------------+
|          image_path|new_label|         descriptors|     vec_descriptors|
+--------------------+---------+--------------------+--------------------+
|/Users/mma525/Doc...|        1|[58, 58, 58, 80, ...|[58.0,58.0,58.0,8...|
|/Users/mma525/Doc...|        1|[61, 59, 59, 69, ...|[61.0,59.0,59.0,6...|
|/Users/mma525/Doc...|        1|[8, 8, 8, 254, 25...|[8.0,8.0,8.0,254....|
|/Users/mma525/Doc...|        1|[254, 254, 254, 0...|[254.0,254.0,254....|
|/Users/mma525/Doc...|        1|[0, 0, 0, 253, 25...|[0.0,0.0,0.0,253....|
|/Users/mma525/Doc...|        1|[0, 0, 0, 0, 0, 0...|[0.0,0.0,0.0,0.0,...|
|/Users/mma525/Doc...|        1|[100, 100, 100, 1...|[100.0,100.0,100....|
|/Users/mma525/Doc...|        1|[21, 21, 21, 23, ...|[21.0,21.0,21.0,2...|
|/Users/mma525/Doc...|        1|[8, 8, 8, 8, 8, 8...|[8.0,8.0,8.0,8.0,...|
|/Users/mma525/Doc...|        1|[97, 97, 97, 105,...|[97.0,97.0,97.0,1...|
|/Users/mma525/Doc...|   

# Logistic Regression Classifier

### Training

Define our model and train on the training data

In [14]:
from pyspark.ml.classification import LogisticRegression
logistic_regression = LogisticRegression(featuresCol = 'vec_descriptors', labelCol = 'new_label', maxIter=100)
logistic_regression_model = logistic_regression.fit(train)

### Evaluating on training data

In [15]:
predictions_train = logistic_regression_model.transform(train)
predictions_train.select("image_path","new_label","probability", "prediction").show(4)

+--------------------+---------+--------------------+----------+
|          image_path|new_label|         probability|prediction|
+--------------------+---------+--------------------+----------+
|/Users/mma525/Doc...|        1|[4.47243294660984...|       1.0|
|/Users/mma525/Doc...|        1|[1.29001351237413...|       1.0|
|/Users/mma525/Doc...|        1|[4.09797201580553...|       1.0|
|/Users/mma525/Doc...|        1|[2.89160032613314...|       1.0|
+--------------------+---------+--------------------+----------+
only showing top 4 rows



##### ROC evaluator

In [16]:
# Here we evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='new_label', metricName='areaUnderROC')
print('Train Area Under ROC: ', evaluator.evaluate(predictions_train))


Train Area Under ROC:  0.9999677422199572


##### Classifier evaluator

In [17]:

# multiclassclassification evaluator is used in order to measure accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='new_label', metricName='accuracy')
print('Train Accuracy: ', evaluator.evaluate(predictions_train))




Train Accuracy:  0.9976982632349864


### Evaluating on validation data

In [18]:
predictions_val = logistic_regression_model.transform(val)
predictions_val.select("image_path","new_label","probability", "prediction").show(4)

+--------------------+---------+--------------------+----------+
|          image_path|new_label|         probability|prediction|
+--------------------+---------+--------------------+----------+
|/Users/mma525/Doc...|        1|[0.99999999997986...|       0.0|
|/Users/mma525/Doc...|        1|[2.76759369931467...|       1.0|
|/Users/mma525/Doc...|        0|[0.99999993458172...|       0.0|
|/Users/mma525/Doc...|        0|[1.0,4.6933065702...|       0.0|
+--------------------+---------+--------------------+----------+
only showing top 4 rows



##### ROC evaluator

In [19]:
# Here we evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='new_label', metricName='areaUnderROC')
print('Val Area Under ROC: ', evaluator.evaluate(predictions_val))


Val Area Under ROC:  0.9816870144284132


##### Classifier evaluator

In [20]:

# multiclassclassification evaluator is used in order to measure accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='new_label', metricName='accuracy')
print('Val Accuracy: ', evaluator.evaluate(predictions_val))




Val Accuracy:  0.9664694280078896


### Evaluating on test data

We evaluate on the test data

In [21]:
test = test_with_vec
predictions_test = logistic_regression_model.transform(test)
predictions_test.select("image_path","new_label","probability", "prediction").show(4)

+--------------------+---------+--------------------+----------+
|          image_path|new_label|         probability|prediction|
+--------------------+---------+--------------------+----------+
|/Users/mma525/Doc...|        0|[4.09483116771980...|       1.0|
|/Users/mma525/Doc...|        0|[1.56064516561494...|       1.0|
|/Users/mma525/Doc...|        0|[7.51723548251625...|       1.0|
|/Users/mma525/Doc...|        0|[1.19315269408144...|       1.0|
+--------------------+---------+--------------------+----------+
only showing top 4 rows



##### ROC evaluator

In [22]:
# Here we evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='new_label', metricName='areaUnderROC')
print('Test Area Under ROC: ', evaluator.evaluate(predictions_test))

Test Area Under ROC:  0.8497644093797941


##### Classifier evaluator

In [23]:
# multiclassclassification evaluator is used in order to measure accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='new_label', metricName='accuracy')
print('Test Accuracy: ', evaluator.evaluate(predictions_test))

Test Accuracy:  0.7339743589743589
