In [1]:
#importing required packages
from pyspark.sql import *
from pyspark.sql.types import *
import six
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor

In [2]:
#creating spark session
spark = SparkSession.builder.master("local").appName("CAT2").getOrCreate()

#creating schema
schema = StructType([
    StructField('age', IntegerType(), True),
    StructField('anaemia', IntegerType(), True),
    StructField('creatinine_phosphokinase', IntegerType(), True),
    StructField('diabetes', IntegerType(), True),
    StructField('ejection_fraction', IntegerType(), True),
    StructField('high_blood_pressure', IntegerType(), True),
    StructField('platelets', DoubleType(), True),
    StructField('serum_creatinine', DoubleType(), True),
    StructField('serum_sodium', IntegerType(), True),
    StructField('sex', IntegerType(), True),
    StructField('smoking', IntegerType(), True),
    StructField('time', IntegerType(), True),
    StructField('DEATH_EVENT', IntegerType(), True)])

In [3]:
#importing dataset as rdd dataframe
heart_df = spark.read.format('csv').option('header',True).schema(schema).load('Downloads/heart.csv')

In [4]:
#printing schema
print("Schema")
print(heart_df.printSchema())

Schema
root
 |-- age: integer (nullable = true)
 |-- anaemia: integer (nullable = true)
 |-- creatinine_phosphokinase: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- ejection_fraction: integer (nullable = true)
 |-- high_blood_pressure: integer (nullable = true)
 |-- platelets: double (nullable = true)
 |-- serum_creatinine: double (nullable = true)
 |-- serum_sodium: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- smoking: integer (nullable = true)
 |-- time: integer (nullable = true)
 |-- DEATH_EVENT: integer (nullable = true)

None


In [5]:
#printing dataframe
print("Printing Dataframe")
heart_df.show()

Printing Dataframe
+---+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
|age|anaemia|creatinine_phosphokinase|diabetes|ejection_fraction|high_blood_pressure|platelets|serum_creatinine|serum_sodium|sex|smoking|time|DEATH_EVENT|
+---+-------+------------------------+--------+-----------------+-------------------+---------+----------------+------------+---+-------+----+-----------+
| 75|      0|                     582|       0|               20|                  1| 265000.0|             1.9|         130|  1|      0|   4|          1|
| 55|      0|                    7861|       0|               38|                  0|263358.03|             1.1|         136|  1|      0|   6|          1|
| 65|      0|                     146|       0|               20|                  0| 162000.0|             1.3|         129|  1|      1|   7|          1|
| 50|      1|                     111|       0|    

In [6]:
#Summary statistics
print("Summary statistics")
heart_df.describe().toPandas().transpose()

Summary statistics


Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,23652,67.50769490952139,12.575222228396107,23,99
anaemia,23696,0.4826975016880486,0.4997110781908752,0,1
creatinine_phosphokinase,23696,1077.9044142471303,661.610899691948,23,7861
diabetes,23696,1.0531313301823093,6.795316671236788,0,90
ejection_fraction,23696,37.79274983119514,12.238839987436206,0,80
high_blood_pressure,23696,11.69112930452397,125.0911693481484,0,1998
platelets,23696,261868.2184199851,94125.82543797953,25100.0,850000.0
serum_creatinine,23696,1.6750303848751371,6.4535472106958665,0.5,143.0
serum_sodium,23696,136.31195138419986,7.565578962212129,0,148


In [7]:
print("Correlation for dataframe with DEATH EVENT")
for i in heart_df.columns:
    if not( isinstance(heart_df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to DEATH_EVENT for ", i, heart_df.stat.corr('DEATH_EVENT',i))

Correlation for dataframe with DEATH EVENT
Correlation to DEATH_EVENT for  age 0.023002997846622325
Correlation to DEATH_EVENT for  anaemia 0.01623841010415318
Correlation to DEATH_EVENT for  creatinine_phosphokinase 0.010057939008859717
Correlation to DEATH_EVENT for  diabetes 0.008139721849027285
Correlation to DEATH_EVENT for  ejection_fraction -0.20959416630615788
Correlation to DEATH_EVENT for  high_blood_pressure 0.008354778793895266
Correlation to DEATH_EVENT for  platelets -0.05338207026430099
Correlation to DEATH_EVENT for  serum_creatinine 0.014214069167659478
Correlation to DEATH_EVENT for  serum_sodium -0.10469320114375177
Correlation to DEATH_EVENT for  sex 0.010903750755395217
Correlation to DEATH_EVENT for  smoking -0.03254317481342146
Correlation to DEATH_EVENT for  time -0.5668730347474669
Correlation to DEATH_EVENT for  DEATH_EVENT 1.0


In [8]:
#combining datas for fitting models
vectorAssembler = VectorAssembler(inputCols = ['age', 'creatinine_phosphokinase', 'ejection_fraction', 'platelets', 'serum_creatinine', 'serum_sodium'], outputCol = 'features')
vheart_df = vectorAssembler.setHandleInvalid("skip").transform(heart_df)
vheart_df = vheart_df.select(['features', 'DEATH_EVENT'])
vheart_df.show()

+--------------------+-----------+
|            features|DEATH_EVENT|
+--------------------+-----------+
|[75.0,582.0,20.0,...|          1|
|[55.0,7861.0,38.0...|          1|
|[65.0,146.0,20.0,...|          1|
|[50.0,111.0,20.0,...|          1|
|[65.0,160.0,20.0,...|          1|
|[90.0,47.0,40.0,2...|          1|
|[75.0,246.0,15.0,...|          1|
|[60.0,315.0,60.0,...|          1|
|[65.0,157.0,65.0,...|          1|
|[80.0,123.0,35.0,...|          1|
|[75.0,81.0,38.0,3...|          1|
|[62.0,231.0,25.0,...|          1|
|[45.0,981.0,30.0,...|          1|
|[50.0,168.0,38.0,...|          1|
|[49.0,80.0,30.0,4...|          0|
|[82.0,379.0,50.0,...|          1|
|[87.0,149.0,38.0,...|          1|
|[45.0,582.0,14.0,...|          1|
|[70.0,125.0,25.0,...|          1|
|[48.0,582.0,55.0,...|          1|
+--------------------+-----------+
only showing top 20 rows



In [9]:
#splitting training and test data
splits = vheart_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [21]:
#linear regression for training data
lr = LinearRegression(featuresCol = 'features', labelCol='DEATH_EVENT')
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)


Coefficients: [0.0008619177548666024,6.080738605538797e-06,-0.007817605965938543,-1.9157740949796373e-07,-0.013189934669389536,-0.014401674914041509]
Intercept: 2.611339399245204
RMSE: 0.459031


In [11]:
#printing description
print(train_df.describe().show())
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data (LinearRegression)= %g" % test_result.rootMeanSquaredError)

+-------+-------------------+
|summary|        DEATH_EVENT|
+-------+-------------------+
|  count|              16593|
|   mean| 0.3463508708491533|
| stddev|0.47582096397269236|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

None
Root Mean Squared Error (RMSE) on test data (LinearRegression)= 0.461956


In [12]:
print("NumIterations: %d" % trainingSummary.totalIterations)
print("ObjectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print(trainingSummary.residuals.show())

NumIterations: 1
ObjectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
|-0.4349869685966703|
| 0.6831050786310082|
| 0.6831050786310082|
| 0.6831050786310082|
| 0.6831050786310082|
| 0.6831050786310082|
|-0.3548720252856463|
|-0.3548720252856463|
|-0.3548720252856463|
|-0.3548720252856463|
|-0.3548720252856463|
|-0.3548720252856463|
|-0.3548720252856463|
+-------------------+
only showing top 20 rows

None


In [13]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","DEATH_EVENT","features").show()

+-------------------+-----------+--------------------+
|         prediction|DEATH_EVENT|            features|
+-------------------+-----------+--------------------+
| 0.4349869685966703|          0|[23.0,66.0,25.0,2...|
| 0.4349869685966703|          0|[23.0,66.0,25.0,2...|
| 0.4349869685966703|          0|[23.0,66.0,25.0,2...|
| 0.4349869685966703|          0|[23.0,66.0,25.0,2...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3168949213689918|          1|[40.0,60.0,38.0,1...|
| 0.3548720252856463|          0|[40.0,90.0,35.0,2...|
| 0.3548720252856463|          0|[40.0,90.0,35.0,2...|
| 0.3548720252856463|          0|[40.0,90.0,35.0,2...|
| 0.3548720252856463|          0|[40.0,90.0,35.0,2...|
|0.2940693

In [14]:
#Decision tree model
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'DEATH_EVENT')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="DEATH_EVENT", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data (DecisionTreeRegressor)= %g" % rmse)

Root Mean Squared Error (RMSE) on test data (DecisionTreeRegressor)= 0.380694


In [15]:
print(dt_model.featureImportances)
print(heart_df.take(1))

(6,[2,3,4,5],[0.18234413310627734,0.09172934364178946,0.5392443343419457,0.18668218890998756])
[Row(age=75, anaemia=0, creatinine_phosphokinase=582, diabetes=0, ejection_fraction=20, high_blood_pressure=1, platelets=265000.0, serum_creatinine=1.9, serum_sodium=130, sex=1, smoking=0, time=4, DEATH_EVENT=1)]


In [16]:
#GBT regressor model
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'DEATH_EVENT', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'DEATH_EVENT', 'features').show(5)
gbt_evaluator = RegressionEvaluator(labelCol="DEATH_EVENT", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data (GBTRegressor)= %g" % rmse)

+--------------------+-----------+--------------------+
|          prediction|DEATH_EVENT|            features|
+--------------------+-----------+--------------------+
|0.026057054132389637|          0|[23.0,66.0,25.0,2...|
|0.026057054132389637|          0|[23.0,66.0,25.0,2...|
|0.026057054132389637|          0|[23.0,66.0,25.0,2...|
|0.026057054132389637|          0|[23.0,66.0,25.0,2...|
|   0.991600058457055|          1|[40.0,60.0,38.0,1...|
+--------------------+-----------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data (GBTRegressor)= 0.313114
