In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.appName('HeartDisease').getOrCreate()

In [3]:
spark

### HEART DISEASE DATASET

##### THIS DATA INCLUDES VARIABLES THAT CAN BE USED TO PREDICT IF  A PERSON IS LIKELY TO HAVE A HEART DISEASE OR NOT

In [4]:
df = spark.read.csv(r"C:\Users\Prateek\Downloads\heart.csv", inferSchema = True, header = True)
df.show()

+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Gender|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+------+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|     M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|     F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|     M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|     F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|     M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|     M|          NAP|      

#### TARGET VARIABLE: HEART DISEASE

In [5]:
df.select('HeartDisease').show()

+------------+
|HeartDisease|
+------------+
|           0|
|           1|
|           0|
|           1|
|           0|
|           0|
|           0|
|           0|
|           1|
|           0|
|           0|
|           1|
|           0|
|           1|
|           0|
|           0|
|           1|
|           0|
|           1|
|           1|
+------------+
only showing top 20 rows



In [6]:
# Number of rows
df.count()

918

In [7]:
# Number of columns
len(df.columns)

12

#### CHECKING THE DATATYPE IN EACH COLUMN

In [8]:
df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



#### NUMBER OF 0s AND 1s IN THE DATASET

In [9]:
df.groupby('HeartDisease').count().show()

+------------+-----+
|HeartDisease|count|
+------------+-----+
|           1|  508|
|           0|  410|
+------------+-----+



#### AGE-WISE COUNT OF PATIENTS

In [10]:
df.groupby('Age').count().sort('count', ascending = False).show()

+---+-----+
|Age|count|
+---+-----+
| 54|   51|
| 58|   42|
| 55|   41|
| 57|   38|
| 56|   38|
| 52|   36|
| 51|   35|
| 62|   35|
| 59|   35|
| 53|   33|
| 60|   32|
| 61|   31|
| 48|   31|
| 63|   30|
| 50|   25|
| 43|   24|
| 46|   24|
| 41|   24|
| 64|   22|
| 65|   21|
+---+-----+
only showing top 20 rows



#### NUMBER OF FEMALES AND MALES IN THE DATASET

In [11]:
df.groupby('Gender').count().show()

#The number of males exceeds the number of females by a high margin, so the dataset is imbalanced in this sense

+------+-----+
|Gender|count|
+------+-----+
|     F|  193|
|     M|  725|
+------+-----+



#### CONVERTING STRING VARIABLES INTO NUMERIC 

In [12]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [13]:
df.printSchema()

# We can see that the columns Gender, Chest_Pain_Type, Resting_ECG, Exercise_Angina, and ST_slope are of string datatype

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [14]:
indexer = StringIndexer(inputCols = ('Gender', 'ChestPainType', 'RestingECG', 'ExerciseAngina', 'ST_Slope'), outputCols = ('Gender_cat', 'ChestPainType_cat', 'RestingECG_cat', 'ExerciseAngina_cat', 'ST_Slope_cat'))
indexed = indexer.fit(df).transform(df)

In [15]:
indexed.columns

['Age',
 'Gender',
 'ChestPainType',
 'RestingBP',
 'Cholesterol',
 'FastingBS',
 'RestingECG',
 'MaxHR',
 'ExerciseAngina',
 'Oldpeak',
 'ST_Slope',
 'HeartDisease',
 'Gender_cat',
 'ChestPainType_cat',
 'RestingECG_cat',
 'ExerciseAngina_cat',
 'ST_Slope_cat']

#### CREATING A VECTOR ASSEMBLER

<b>VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and those generated by different feature transformers into a single feature vector.</b>

In [16]:
assembler = VectorAssembler(inputCols = ['Gender_cat', 'ChestPainType_cat', 'FastingBS', 'RestingECG_cat',
                                         'ExerciseAngina_cat', 'Oldpeak', 'ST_Slope_cat'],
                           outputCol = 'features')

In [17]:
output = assembler.transform(indexed)

In [21]:
output.select('features', 'HeartDisease').show(truncate = False)

+-----------------------------+------------+
|features                     |HeartDisease|
+-----------------------------+------------+
|(7,[1,6],[2.0,1.0])          |0           |
|(7,[0,1,5],[1.0,1.0,1.0])    |1           |
|(7,[1,3,6],[2.0,2.0,1.0])    |0           |
|(7,[0,4,5],[1.0,1.0,1.5])    |1           |
|(7,[1,6],[1.0,1.0])          |0           |
|(7,[1,6],[1.0,1.0])          |0           |
|(7,[0,1,6],[1.0,2.0,1.0])    |0           |
|(7,[1,6],[2.0,1.0])          |0           |
|(7,[4,5],[1.0,1.5])          |1           |
|(7,[0,1,6],[1.0,2.0,1.0])    |0           |
|(7,[0,1,6],[1.0,1.0,1.0])    |0           |
|[0.0,2.0,0.0,2.0,1.0,2.0,0.0]|1           |
|(7,[1,6],[2.0,1.0])          |0           |
|(7,[4,5],[1.0,1.0])          |1           |
|[1.0,1.0,0.0,2.0,0.0,0.0,1.0]|0           |
|(7,[0,1,5],[1.0,2.0,1.5])    |0           |
|(7,[],[])                    |1           |
|(7,[0,1,6],[1.0,2.0,1.0])    |0           |
|(7,[5],[1.0])                |1           |
|(7,[1,5],

In [23]:
model_df = output.select('features', 'HeartDisease')

In [24]:
model_df.show()

+--------------------+------------+
|            features|HeartDisease|
+--------------------+------------+
| (7,[1,6],[2.0,1.0])|           0|
|(7,[0,1,5],[1.0,1...|           1|
|(7,[1,3,6],[2.0,2...|           0|
|(7,[0,4,5],[1.0,1...|           1|
| (7,[1,6],[1.0,1.0])|           0|
| (7,[1,6],[1.0,1.0])|           0|
|(7,[0,1,6],[1.0,2...|           0|
| (7,[1,6],[2.0,1.0])|           0|
| (7,[4,5],[1.0,1.5])|           1|
|(7,[0,1,6],[1.0,2...|           0|
|(7,[0,1,6],[1.0,1...|           0|
|[0.0,2.0,0.0,2.0,...|           1|
| (7,[1,6],[2.0,1.0])|           0|
| (7,[4,5],[1.0,1.0])|           1|
|[1.0,1.0,0.0,2.0,...|           0|
|(7,[0,1,5],[1.0,2...|           0|
|           (7,[],[])|           1|
|(7,[0,1,6],[1.0,2...|           0|
|       (7,[5],[1.0])|           1|
| (7,[1,5],[2.0,3.0])|           1|
+--------------------+------------+
only showing top 20 rows



#### TRAIN TEST SPLIT

In [47]:
train_df, test_df = model_df.randomSplit([0.8, 0.2], seed = 789)

In [48]:
print(train_df.count())
print(test_df.count())

743
175


In [49]:
from pyspark.ml.classification import LogisticRegression

In [50]:
log_reg = LogisticRegression(labelCol = 'HeartDisease').fit(train_df)

In [51]:
log_reg

LogisticRegressionModel: uid=LogisticRegression_0d74b0beff45, numClasses=2, numFeatures=7

In [52]:
train_results = log_reg.evaluate(train_df).predictions

In [53]:
train_results.show()

+-------------------+------------+--------------------+--------------------+----------+
|           features|HeartDisease|       rawPrediction|         probability|prediction|
+-------------------+------------+--------------------+--------------------+----------+
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|          (7,[],[])|           

In [54]:
results = log_reg.evaluate(test_df).predictions

In [55]:
results.show()

+--------------------+------------+--------------------+--------------------+----------+
|            features|HeartDisease|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
|           (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|           (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|           (7,[],[])|           1|[-0.6759856420972...|[0.33715785379091...|       1.0|
|       (7,[0],[1.0])|           1|[0.93148363650734...|[0.71737618621218...|       0.0|
| (7,[0,1],[1.0,1.0])|           0|[1.70243091937994...|[0.84585196034276...|       0.0|
|(7,[0,1,3],[1.0,1...|           0|[1.50821594597613...|[0.81879665962130...|       0.0|
|(7,[0,1,5],[1.0,1...|           0|[1.50488286453893...|[0.81830160912534...|       0.0|
|(7,[0,1,5],[1.0,2...|           0|[1.81488468611584...|[0.85995119710538...|       0.0|
|(7,[0,1,6],[1.0,1...

### CONFUSION MATRIX

In [57]:
tp = results[(results.HeartDisease == 1) & (results.prediction == 1)].count()
tn = results[(results.HeartDisease == 0) & (results.prediction == 0)].count()
fp = results[(results.HeartDisease == 0) & (results.prediction == 1)].count()
fn = results[(results.HeartDisease == 1) & (results.prediction == 0)].count()


print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("Total", df.count())

r = float(tp)/(tp + fn)
print("recall", r)

p = float(tp) / (tp + fp)
print("precision", p)

accuracy = float((tp + tn) / (results.count()))
print(accuracy)

True Positives: 79
True Negatives: 74
False Positives: 10
False Negatives: 12
Total 918
recall 0.8681318681318682
precision 0.8876404494382022
0.8742857142857143


#### Notes:

+ stringindexer after train_test_split
+ scaling should have been done
+ try gridsearch_CV