In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv("heart.csv", inferSchema=True,
                   header=True)

In [4]:
df.show()

+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Gender|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|     M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|     F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|     M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|     F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|     M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|     M|          NAP|      

In [5]:
df.select("HeartDisease").show()

+------------+
|HeartDisease|
+------------+
|           0|
|           1|
|           0|
|           1|
|           0|
|           0|
|           0|
|           0|
|           1|
|           0|
|           0|
|           1|
|           0|
|           1|
|           0|
|           0|
|           1|
|           0|
|           1|
|           1|
+------------+
only showing top 20 rows



In [6]:
df.count()

918

In [7]:
len(df.columns)

12

In [8]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [9]:
df.describe().show()

+-------+------------------+------+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age|Gender|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+------+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918|   918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|  null|         null|132.39651416122004| 198.7995642701525|0.23311546840958605|      null|136.80936819172112|          null|0.8873638344226581|    null| 0.5533769063180

In [10]:
df.head(5)

[Row(Age=40, Gender='M', ChestPainType='ATA', RestingBP=140, Cholesterol=289, FastingBS=0, RestingECG='Normal', MaxHR=172, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0),
 Row(Age=49, Gender='F', ChestPainType='NAP', RestingBP=160, Cholesterol=180, FastingBS=0, RestingECG='Normal', MaxHR=156, ExerciseAngina='N', Oldpeak=1.0, ST_Slope='Flat', HeartDisease=1),
 Row(Age=37, Gender='M', ChestPainType='ATA', RestingBP=130, Cholesterol=283, FastingBS=0, RestingECG='ST', MaxHR=98, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0),
 Row(Age=48, Gender='F', ChestPainType='ASY', RestingBP=138, Cholesterol=214, FastingBS=0, RestingECG='Normal', MaxHR=108, ExerciseAngina='Y', Oldpeak=1.5, ST_Slope='Flat', HeartDisease=1),
 Row(Age=54, Gender='M', ChestPainType='NAP', RestingBP=150, Cholesterol=195, FastingBS=0, RestingECG='Normal', MaxHR=122, ExerciseAngina='N', Oldpeak=0.0, ST_Slope='Up', HeartDisease=0)]

In [11]:
df.groupBy('HeartDisease').count().show()

+------------+-----+
|HeartDisease|count|
+------------+-----+
|           1|  508|
|           0|  410|
+------------+-----+



In [12]:
df.groupBy('Age').count().show()

+---+-----+
|Age|count|
+---+-----+
| 31|    2|
| 65|   21|
| 53|   33|
| 34|    7|
| 28|    1|
| 76|    2|
| 44|   19|
| 47|   19|
| 52|   36|
| 40|   13|
| 57|   38|
| 54|   51|
| 48|   31|
| 64|   22|
| 41|   24|
| 43|   24|
| 37|   11|
| 61|   31|
| 72|    4|
| 35|   11|
+---+-----+
only showing top 20 rows



In [13]:
df.groupBy('Gender').count().show()

+------+-----+
|Gender|count|
+------+-----+
|     F|  193|
|     M|  725|
+------+-----+



In [14]:
df.groupBy('Age').mean().show()

+---+--------+------------------+------------------+-------------------+------------------+-------------------+-------------------+
|Age|avg(Age)|    avg(RestingBP)|  avg(Cholesterol)|     avg(FastingBS)|        avg(MaxHR)|       avg(Oldpeak)|  avg(HeartDisease)|
+---+--------+------------------+------------------+-------------------+------------------+-------------------+-------------------+
| 31|    31.0|             110.0|             244.5|                0.0|             151.5|               0.75|                0.5|
| 65|    65.0|141.76190476190476|207.57142857142858| 0.3333333333333333| 125.9047619047619| 1.1714285714285715| 0.7142857142857143|
| 53|    53.0| 131.0909090909091|181.03030303030303| 0.2727272727272727|131.57575757575756| 0.7181818181818181|0.45454545454545453|
| 34|    34.0|124.14285714285714|163.28571428571428|0.14285714285714285|172.57142857142858|0.12857142857142856| 0.2857142857142857|
| 28|    28.0|             130.0|             132.0|                0.0|    

In [15]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [16]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [17]:
indexer=StringIndexer(inputCol= "Gender",outputCol='Gender_cat')
indexed=indexer.fit(df).transform(df)

In [18]:
indexer=StringIndexer(inputCol= "ChestPainType",outputCol='ChestPainType_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [19]:
indexer=StringIndexer(inputCol= "RestingECG",outputCol='RestingECG_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [20]:
indexer=StringIndexer(inputCol= "ExerciseAngina",outputCol='ExerciseAngina_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [21]:
indexer=StringIndexer(inputCol= "ST_Slope",outputCol='ST_Slope_cat')
indexed=indexer.fit(indexed).transform(indexed)

In [24]:
indexed.show()

+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+-----------------+--------------+------------------+------------+
|Age|Gender|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|gender_cat|ChestPainType_cat|RestingECG_cat|ExerciseAngina_cat|ST_Slope_cat|
+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+-----------------+--------------+------------------+------------+
| 40|     M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|       0.0|              2.0|           0.0|               0.0|         1.0|
| 49|     F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|       1.0|              1.0|           0.0|               0.0|         0.0|
| 37|     M|   

In [25]:
indexed.columns

['Age',
 'Gender',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease',
 'gender_cat',
 'ChestPainType_cat',
 'RestingECG_cat',
 'ExerciseAngina_cat',
 'ST_Slope_cat']

In [29]:
assembler= VectorAssembler(inputCols=['Age',
 'RestingBP',
 'FastingBS',
 'MaxHR',
 'Oldpeak',
 'gender_cat', 'ChestPainType_cat', 'RestingECG_cat', 'ST_Slope_cat'],outputCol='features')

In [30]:
assembler

VectorAssembler_30b5d3981a68

In [31]:
output= assembler.transform(indexed)

In [32]:
output

DataFrame[Age: int, Gender: string, ChestPainType: string, RestingBP: int, Cholesterol: int, FastingBS: int, RestingECG: string, MaxHR: int, ExerciseAngina: string, Oldpeak: double, ST_Slope: string, HeartDisease: int, gender_cat: double, ChestPainType_cat: double, RestingECG_cat: double, ExerciseAngina_cat: double, ST_Slope_cat: double, features: vector]

In [35]:
output.show()

# As we can see, now we have one extra column named features, which is
#nothing but a combination of all the input features represented as a single dense
#vector.

+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+-----------------+--------------+------------------+------------+--------------------+
|Age|Gender|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|gender_cat|ChestPainType_cat|RestingECG_cat|ExerciseAngina_cat|ST_Slope_cat|            features|
+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+----------+-----------------+--------------+------------------+------------+--------------------+
| 40|     M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|       0.0|              2.0|           0.0|               0.0|         1.0|[40.0,140.0,0.0,1...|
| 49|     F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|       1

In [34]:
output.select('features','HeartDisease').show(5)

+--------------------+------------+
|            features|HeartDisease|
+--------------------+------------+
|[40.0,140.0,0.0,1...|           0|
|[49.0,160.0,0.0,1...|           1|
|[37.0,130.0,0.0,9...|           0|
|[48.0,138.0,0.0,1...|           1|
|[54.0,150.0,0.0,1...|           0|
+--------------------+------------+
only showing top 5 rows



In [38]:
indexed.columns

['Age',
 'Gender',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease',
 'gender_cat',
 'ChestPainType_cat',
 'RestingECG_cat',
 'ExerciseAngina_cat',
 'ST_Slope_cat']

In [39]:
model_df=output.select(['features','HeartDisease'])

In [40]:
training_df,test_df=model_df.randomSplit([0.75,0.25])

In [41]:
print(training_df.count())

700


In [42]:
print(test_df.count())

218


In [44]:
from pyspark.ml.classification import LogisticRegression

In [47]:
log_reg=LogisticRegression(labelCol='HeartDisease').fit(training_df)

In [48]:
# training results
train_results=log_reg.evaluate(training_df).predictions

In [50]:
train_results.show()

+--------------------+------------+--------------------+--------------------+----------+
|            features|HeartDisease|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
|(9,[0,1,2,3],[40....|           1|[-1.6717677659908...|[0.15818863204064...|       1.0|
|(9,[0,1,2,3],[51....|           1|[-2.7622383962315...|[0.05939918131315...|       1.0|
|(9,[0,1,2,3],[52....|           1|[-2.5183596203808...|[0.07458108313041...|       1.0|
|(9,[0,1,2,3],[57....|           1|[-2.8693017087646...|[0.05369212062450...|       1.0|
|(9,[0,1,3],[32.0,...|           1|[-0.7559773814412...|[0.31952025665009...|       1.0|
|(9,[0,1,3],[38.0,...|           1|[-0.3469558384591...|[0.41412081645468...|       1.0|
|(9,[0,1,3],[38.0,...|           1|[-0.2627511255505...|[0.43468754126401...|       1.0|
|(9,[0,1,3],[39.0,...|           1|[-0.5984541475180...|[0.35469744004032...|       1.0|
|(9,[0,1,3],[44.0,...

In [51]:
results=log_reg.evaluate(test_df).predictions

In [52]:
results.printSchema()

root
 |-- features: vector (nullable = true)
 |-- HeartDisease: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [53]:
results.select(['HeartDisease','prediction']).show(10,False)

+------------+----------+
|HeartDisease|prediction|
+------------+----------+
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
|1           |1.0       |
+------------+----------+
only showing top 10 rows



In [54]:
# confusion matrix
tp = results[(results.HeartDisease == 1) & (results.prediction == 1)].count()

In [58]:
tp

106

In [55]:
tn = results[(results.HeartDisease == 0) & (results.prediction ==0)].count()

In [59]:
tn

79

In [56]:
fp = results[(results.HeartDisease == 0) & (results.prediction == 1)].count()

In [60]:
fp

16

In [57]:
fn = results[(results.HeartDisease == 1) & (results.prediction ==0)].count()

In [61]:
fn

17

In [64]:
# accuracy
accuracy=float((tp+tn)/(results.count()))

In [65]:
accuracy

0.8486238532110092

In [68]:
# recall
recall = float(tn)/(tp + tn)

In [69]:
recall

0.42702702702702705