In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('heart failure').getOrCreate()
spark

In [2]:
data = spark.read.csv("C:\\Users\\HAI\\Downloads\\heart_failure_clinical_records_dataset.csv",header = True,inferSchema = True)
data.show()

+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
| age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|
+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
|75.0|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|
|55.0|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|
|65.0|      0|                     146|       0|               20|                  0| 162000.0|             1.3|         129|  1|      1|   7|          1|
|50.0|      1|                     111|       0|               2

In [3]:
 data.printSchema()

root
 |-- age: double (nullable = true)
 |-- anaemia: integer (nullable = true)
 |-- creatinine_phosphokinase: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- ejection_fraction: integer (nullable = true)
 |-- high_blood_pressure: integer (nullable = true)
 |-- platelets: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- serum_sodium: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- smoking: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- DEATH_EVENT: integer (nullable = true)



In [4]:
from pyspark.sql.functions import count,col,when,isnan
data.select([count(when(col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+---+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
|age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|
+---+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
|  0|      0|                       0|       0|                0|                  0|        0|               0|           0|  0|      0|   0|          0|
+---+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+



In [5]:
#example 
#Imputer - remove null values
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = ['anaemia','smoking'],
                 outputCols = ['{}_imputed'.format(c) for c in ['anaemia','smoking'] ]).setStrategy('mean')
data = imputer.fit(data).transform(data)
data.show(5)

+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+---------------+---------------+
| age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|anaemia_imputed|smoking_imputed|
+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+---------------+---------------+
|75.0|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|              0|              0|
|55.0|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|              0|              0|
|65.0|      0|                     146|       0|            

In [6]:
#check the correltaion
data.toPandas().corr()

Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,DEATH_EVENT,anaemia_imputed,smoking_imputed
age,1.0,0.088006,-0.081584,-0.101012,0.060098,0.093289,-0.052354,0.159187,-0.045966,0.06543,0.018668,-0.224068,0.253729,0.088006,0.018668
anaemia,0.088006,1.0,-0.190741,-0.012729,0.031557,0.038182,-0.043786,0.052174,0.041882,-0.094769,-0.10729,-0.141414,0.06627,1.0,-0.10729
creatinine_phosphokinase,-0.081584,-0.190741,1.0,-0.009639,-0.04408,-0.07059,0.024463,-0.016408,0.05955,0.079791,0.002421,-0.009346,0.062728,-0.190741,0.002421
diabetes,-0.101012,-0.012729,-0.009639,1.0,-0.00485,-0.012732,0.092193,-0.046975,-0.089551,-0.15773,-0.147173,0.033726,-0.001943,-0.012729,-0.147173
ejection_fraction,0.060098,0.031557,-0.04408,-0.00485,1.0,0.024445,0.072177,-0.011302,0.175902,-0.148386,-0.067315,0.041729,-0.268603,0.031557,-0.067315
high_blood_pressure,0.093289,0.038182,-0.07059,-0.012732,0.024445,1.0,0.049963,-0.004935,0.037109,-0.104615,-0.055711,-0.196439,0.079351,0.038182,-0.055711
platelets,-0.052354,-0.043786,0.024463,0.092193,0.072177,0.049963,1.0,-0.041198,0.062125,-0.12512,0.028234,0.010514,-0.049139,-0.043786,0.028234
serum_creatinine,0.159187,0.052174,-0.016408,-0.046975,-0.011302,-0.004935,-0.041198,1.0,-0.189095,0.00697,-0.027414,-0.149315,0.294278,0.052174,-0.027414
serum_sodium,-0.045966,0.041882,0.05955,-0.089551,0.175902,0.037109,0.062125,-0.189095,1.0,-0.027566,0.004813,0.08764,-0.195204,0.041882,0.004813
sex,0.06543,-0.094769,0.079791,-0.15773,-0.148386,-0.104615,-0.12512,0.00697,-0.027566,1.0,0.445892,-0.015608,-0.004316,-0.094769,0.445892


In [7]:
data.groupBy('DEATH_EVENT').count().show()

+-----------+-----+
|DEATH_EVENT|count|
+-----------+-----+
|          1|   96|
|          0|  203|
+-----------+-----+



In [8]:
new_data = data.drop('DEATH_EVENT')
new_data.toPandas()

Unnamed: 0,age,anaemia,creatinine_phosphokinase,diabetes,ejection_fraction,high_blood_pressure,platelets,serum_creatinine,serum_sodium,sex,smoking,time,anaemia_imputed,smoking_imputed
0,75.0,0,582,0,20,1,265000.00,1.9,130,1,0,4,0,0
1,55.0,0,7861,0,38,0,263358.03,1.1,136,1,0,6,0,0
2,65.0,0,146,0,20,0,162000.00,1.3,129,1,1,7,0,1
3,50.0,1,111,0,20,0,210000.00,1.9,137,1,0,7,1,0
4,65.0,1,160,1,20,0,327000.00,2.7,116,0,0,8,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
294,62.0,0,61,1,38,1,155000.00,1.1,143,1,1,270,0,1
295,55.0,0,1820,0,38,0,270000.00,1.2,139,0,0,271,0,0
296,45.0,0,2060,1,60,0,742000.00,0.8,138,0,0,278,0,0
297,45.0,0,2413,0,38,0,140000.00,1.4,140,1,1,280,0,1


In [9]:
cols = new_data.columns
cols

['age',
 'anaemia',
 'creatinine_phosphokinase',
 'diabetes',
 'ejection_fraction',
 'high_blood_pressure',
 'platelets',
 'serum_creatinine',
 'serum_sodium',
 'sex',
 'smoking',
 'time',
 'anaemia_imputed',
 'smoking_imputed']

In [10]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = cols,outputCol = 'Independent Features')
data = assembler.transform(data)
data.show(10)

+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+---------------+---------------+--------------------+
| age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|anaemia_imputed|smoking_imputed|Independent Features|
+----+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+---------------+---------------+--------------------+
|75.0|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|              0|              0|[75.0,0.0,582.0,0...|
|55.0|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|         

In [11]:
finilized_data = data.select('Independent Features','DEATH_EVENT')
finilized_data.show(10)

+--------------------+-----------+
|Independent Features|DEATH_EVENT|
+--------------------+-----------+
|[75.0,0.0,582.0,0...|          1|
|(14,[0,2,4,6,7,8,...|          1|
|[65.0,0.0,146.0,0...|          1|
|[50.0,1.0,111.0,0...|          1|
|[65.0,1.0,160.0,1...|          1|
|[90.0,1.0,47.0,0....|          1|
|[75.0,1.0,246.0,0...|          1|
|[60.0,1.0,315.0,1...|          1|
|(14,[0,2,4,6,7,8,...|          1|
|[80.0,1.0,123.0,0...|          1|
+--------------------+-----------+
only showing top 10 rows



In [29]:
#split train & test data

train_data,test_data = finilized_data.randomSplit([0.80,0.20])
train_data.count(),test_data.count()

(230, 69)

# Logistic Regression

In [30]:
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression(labelCol = 'DEATH_EVENT',featuresCol = 'Independent Features')
LR = LR.fit(train_data)

In [31]:
pred_results = LR.transform(test_data)
pred_results.show()

+--------------------+-----------+--------------------+--------------------+----------+
|Independent Features|DEATH_EVENT|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(14,[0,2,3,4,6,7,...|          0|[4.89568447348046...|[0.99257672863682...|       0.0|
|(14,[0,2,3,4,6,7,...|          0|[4.44963930148836...|[0.98845213104256...|       0.0|
|(14,[0,2,3,4,6,7,...|          0|[1.92285724435646...|[0.87245671488944...|       0.0|
|(14,[0,2,4,5,6,7,...|          0|[4.80763503019085...|[0.99189901007196...|       0.0|
|(14,[0,2,4,5,6,7,...|          1|[0.02781479078888...|[0.50695324941392...|       0.0|
|(14,[0,2,4,5,6,7,...|          0|[2.27577099766974...|[0.90685042562973...|       0.0|
|(14,[0,2,4,5,6,7,...|          0|[1.79140996970861...|[0.85710005594126...|       0.0|
|(14,[0,2,4,5,6,7,...|          1|[0.68132417600435...|[0.66403417566026...|       0.0|
|(14,[0,2,4,6,7,8,...|          

In [32]:
pred_results.select('DEATH_EVENT','prediction').show(10)

+-----------+----------+
|DEATH_EVENT|prediction|
+-----------+----------+
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          1|       0.0|
|          0|       0.0|
|          0|       0.0|
|          1|       0.0|
|          0|       0.0|
|          1|       0.0|
+-----------+----------+
only showing top 10 rows



In [33]:
lr_summary = LR.summary

In [34]:
print(f"""Logistic Reression Pred Results
========================================================
over_acc:\t {lr_summary.accuracy*100}
precision:\t {lr_summary.precisionByLabel}
recall:\t {lr_summary.recallByLabel}
========================================================""")

Logistic Reression Pred Results
over_acc:	 84.34782608695653
precision:	 [0.875, 0.7714285714285715]
recall:	 [0.8974358974358975, 0.7297297297297297]


In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator =  MulticlassClassificationEvaluator(labelCol = 'DEATH_EVENT',predictionCol = 'prediction',metricName='accuracy')
pred = evaluator.evaluate(pred_results)
pred*100


79.71014492753623

# RandomForestClassifier

In [36]:
from pyspark.ml.classification import RandomForestClassifier
RF = RandomForestClassifier(labelCol = 'DEATH_EVENT',featuresCol = 'Independent Features')
RF = RF.fit(train_data)

In [37]:
pred_results = RF.transform(test_data)
pred_results.show()

+--------------------+-----------+--------------------+--------------------+----------+
|Independent Features|DEATH_EVENT|       rawPrediction|         probability|prediction|
+--------------------+-----------+--------------------+--------------------+----------+
|(14,[0,2,3,4,6,7,...|          0|[19.1719137701653...|[0.95859568850826...|       0.0|
|(14,[0,2,3,4,6,7,...|          0|[19.1513793781789...|[0.95756896890894...|       0.0|
|(14,[0,2,3,4,6,7,...|          0|[19.0258915159028...|[0.95129457579514...|       0.0|
|(14,[0,2,4,5,6,7,...|          0|[19.2031401496060...|[0.96015700748030...|       0.0|
|(14,[0,2,4,5,6,7,...|          1|[7.14141261624005...|[0.35707063081200...|       1.0|
|(14,[0,2,4,5,6,7,...|          0|[18.3536577612899...|[0.91768288806449...|       0.0|
|(14,[0,2,4,5,6,7,...|          0|[13.6065061583153...|[0.68032530791576...|       0.0|
|(14,[0,2,4,5,6,7,...|          1|[9.40793088490456...|[0.47039654424522...|       1.0|
|(14,[0,2,4,6,7,8,...|          

In [38]:
pred_results.select('DEATH_EVENT','prediction').show(10)

+-----------+----------+
|DEATH_EVENT|prediction|
+-----------+----------+
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
|          0|       0.0|
|          1|       1.0|
|          0|       0.0|
|          1|       1.0|
+-----------+----------+
only showing top 10 rows



In [39]:
rf_summary = RF.summary

In [40]:
print(f"""RandomForestClassifier Pred Results
===========================================================
over_acc:\t {rf_summary.accuracy*100}
precision:\t {rf_summary.precisionByLabel}
recall:\t {rf_summary.recallByLabel}
============================================================""")

RandomForestClassifier Pred Results
over_acc:	 94.78260869565217
precision:	 [0.9390243902439024, 0.9696969696969697]
recall:	 [0.9871794871794872, 0.8648648648648649]


In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator =  MulticlassClassificationEvaluator(labelCol = 'DEATH_EVENT',predictionCol = 'prediction',metricName='accuracy')
pred = evaluator.evaluate(pred_results)
pred*100

79.71014492753623