# Import and Explore Data

In [87]:
path = "s3://ms-syntheamass-1m-mldata/syntheamassCSV/covid/10k_synthea_covid19_csv/"
bucket_file = "indexed_data_2.csv"

indexed = spark.read.format("csv").option("inferSchema", True)\
.option("header", True).load(path + bucket_file)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [88]:
# cols = indexed.columns
# cols.remove("D_dimer")
# cols.remove("Serum_Ferritin")
# cols.remove("High_Sensitivity_Cardiac_Troponin_I")
# cols.remove("IL_6")
# cols.remove("Lymphocytes")
# cols.remove("Lactate_dehydrogenase")
# # cols.remove("AGE")
# cols.remove("icu_admitted_index")

# indexed = indexed.select(cols)
# indexed.columns
indexed = indexed.select("Id", "BMI", "AGE", "DECEASED_index")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [89]:
indexed.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8820

In [90]:
indexed.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+----+--------------+
|                  Id|      BMI| AGE|DECEASED_index|
+--------------------+---------+----+--------------+
|00ebc091-9748-42c...|25.989397|45.0|           0.0|
|01277eee-8cd9-4f7...|     27.0|45.0|           0.0|
|0143c325-d2e1-4c0...|     30.0|64.0|           1.0|
|01936601-631d-4f3...|     17.0| 3.0|           0.0|
|023ebd7f-12e5-486...|     28.0|44.0|           0.0|
|03a0d001-96be-4da...|     23.0|22.0|           0.0|
|03fa0ea8-f4d1-49e...|     23.0|22.0|           0.0|
|04bebb25-d12a-445...|     27.0|64.0|           0.0|
|06fe1c85-e24d-412...|     25.0|19.0|           0.0|
|06fea317-8464-4ef...|     17.0| 6.0|           0.0|
|08223dc4-d9c0-481...|     30.0|41.0|           0.0|
|08653edb-39d8-464...|     30.0|58.0|           0.0|
|08c42d2d-7f70-467...|     28.0|69.0|           0.0|
|08ddcbc1-2fe6-4b5...|25.989397|49.0|           0.0|
|09595671-900f-4ba...|     27.0|66.0|           0.0|
|09b1aaf2-46c0-4e7...|     28.0|65.0|         

# Create Features Column


In [91]:
cols = indexed.columns
cols.remove("DECEASED_index")
cols.remove("Id")

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols, outputCol='features')

indexed = assembler.transform(indexed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Scale Features Column

In [92]:
from pyspark.ml.feature import StandardScaler
standardscaler = StandardScaler().setInputCol("features").setOutputCol("scaled_features")

scaled = standardscaler.fit(indexed).transform(indexed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Split Data into Train and Test sets

In [93]:
train, test = scaled.randomSplit([0.8, 0.2], seed=12345)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
# Analyzing distributions

percent_pos_total = float(scaled.where("DECEASED_index == 1").count()) \
/ scaled.count() * 100

percent_pos_train = float(train.where("DECEASED_index == 1").count()) \
/ train.count() * 100

percent_pos_test = float(test.where("DECEASED_index == 1").count()) \
/ test.count() * 100

print("percent deceased for total covid patients:", percent_pos_total)

print("percent deceased for train:", percent_pos_train)

print("percent deceased for test:", percent_pos_test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

percent deceased for total covid patients: 4.0476190476190474
percent deceased for train: 4.122831959056014
percent deceased for test: 3.7513997760358344

# Handle the imbalance between alive and deceased patients


In [95]:

num_negs = float(train.where("DECEASED_index == 0").count())
                 
balancing_ratio = num_negs/train.count()

print("Balancing ratio:", balancing_ratio)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Balancing ratio: 0.9587716804094398

In [96]:
from pyspark.sql import functions

train = train.withColumn("class_weights", functions.when(train.DECEASED_index == 1, balancing_ratio)
                        .otherwise(1 - balancing_ratio))
train.select("class_weights").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+
|      class_weights|
+-------------------+
|0.04122831959056017|
|0.04122831959056017|
|0.04122831959056017|
| 0.9587716804094398|
|0.04122831959056017|
+-------------------+
only showing top 5 rows

# Feature Selection using Chi Square Selector

In [97]:
from pyspark.ml.feature import ChiSqSelector

css = ChiSqSelector(featuresCol='scaled_features', outputCol = 'Aspect', 
                   labelCol='DECEASED_index', fpr=0.05)

train = css.fit(train).transform(train)

test = css.fit(test).transform(test)

test.select("Aspect").show(1, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------+
|Aspect                                |
+--------------------------------------+
|[8.133034958981286,2.8612532063993914]|
+--------------------------------------+
only showing top 1 row

# Building classification model using logistic regression

In [98]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="DECEASED_index", 
                       featuresCol="Aspect", weightCol = "class_weights", maxIter = 5)
model = lr.fit(train)
predict_train = model.transform(train)

predict_test = model.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Model Evaluation

In [99]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="DECEASED_index")

predict_test.select("DECEASED_index", "rawPrediction", "prediction", "probability").show(5)

print("The area under ROC for train set is:", evaluator.evaluate(predict_train))
print("The area under ROC for test set is:", evaluator.evaluate(predict_test))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+--------------------+----------+--------------------+
|DECEASED_index|       rawPrediction|prediction|         probability|
+--------------+--------------------+----------+--------------------+
|           0.0|[0.60818864900375...|       0.0|[0.64752749778933...|
|           0.0|[1.20187748893561...|       0.0|[0.76885860994560...|
|           0.0|[1.16885451797093...|       0.0|[0.76293790205834...|
|           0.0|[0.14262118189657...|       0.0|[0.53559498005019...|
|           0.0|[0.98482784898506...|       0.0|[0.72806511606148...|
+--------------+--------------------+----------+--------------------+
only showing top 5 rows

The area under ROC for train set is: 0.8358571603877791
The area under ROC for test set is: 0.8637571305774796

In [100]:
sc.install_pypi_package("matplotlib")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1110, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [101]:
sc.install_pypi_package("pandas")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1110, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [102]:
sc.install_pypi_package("handyspark")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1110, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



# Thresholds and Metrics

In [103]:
from handyspark import BinaryClassificationMetrics

bcm = BinaryClassificationMetrics(predict_test, scoreCol='probability', labelCol='DECEASED_index')

bcm.getMetricsByThreshold().filter('fpr between 0.19 and 0.21').toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   threshold       fpr    recall  precision
0   0.444753  0.190227  0.746269   0.132626
1   0.442310  0.191390  0.746269   0.131926
2   0.439870  0.191972  0.746269   0.131579
3   0.437432  0.193717  0.746269   0.130548
4   0.436614  0.199535  0.746269   0.127226
5   0.434181  0.202443  0.746269   0.125628
6   0.429323  0.204188  0.746269   0.124688

# Confusion Matrix

In [104]:
bcm.print_confusion_matrix(.37)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

         Predicted       
                 0      1
Actual 0    1166.0  553.0
       1       7.0   60.0

In [105]:
# from pyspark.ml.classification import LogisticRegression
# import matplotlib.pyplot as plt
# import numpy as np

# beta = np.sort(model.coefficients)

# plt.plot(beta)
# plt.ylabel('Beta Coefficients')
# plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [106]:
# trainingSummary = model.summary
# roc = trainingSummary.roc.toPandas()
# plt.plot(roc['FPR'],roc['TPR'])
# plt.ylabel('False Positive Rate')
# plt.xlabel('True Positive Rate')
# plt.title('ROC Curve')
# plt.show()
# print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…