
### Assignment 2: Heart Disease Classification using PySpark MLlib

In [0]:
##  Pre-processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline

# Start Spark session
spark = SparkSession.builder.appName("HeartDiseaseClassification").getOrCreate()

# Load data
df = spark.read.csv("/FileStore/tables/heart.csv", header=True, inferSchema=True)

# Check schema
df.printSchema()

# Handle missing values (if any)
df = df.dropna()

# Convert target column to numeric if needed
label_indexer = StringIndexer(inputCol="target", outputCol="label")


root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [0]:
##  Exploratory Data Analysis

# Show basic statistics
df.describe().show()

# Count classes
df.groupBy("target").count().show()

# Display sample
df.show(5)

+-------+-----------------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+
|summary|              age|               sex|                cp|          trestbps|             chol|                fbs|           restecg|           thalach|              exang|           oldpeak|             slope|                ca|              thal|            target|
+-------+-----------------+------------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+
|  count|             1025|              1025|              1025|              1025|             1025|               1025|              1025|              1025|            

In [0]:
##  Model Building

# Features
feature_cols = [col for col in df.columns if col != 'target']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# ML model
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[label_indexer, assembler, lr])

# Split
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Fit model
model = pipeline.fit(train_data)


In [0]:
## Performance Evaluation

# Predictions
predictions = model.transform(test_data)

# Evaluate
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Accuracy
binary_evaluator = BinaryClassificationEvaluator()
accuracy = binary_evaluator.evaluate(predictions)

# Precision
precision_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
precision = precision_evaluator.evaluate(predictions)

# Recall
recall_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="recallByLabel")
recall = recall_evaluator.evaluate(predictions)

# F1 Score
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)

# AUC
auc_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = auc_eval.evaluate(predictions)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC: {auc:.4f}")


Accuracy: 0.9214
Precision: 0.7717
Recall: 0.9103
F1 Score: 0.8342
AUC: 0.9214


In [0]:
from pyspark.sql.functions import col

# Group by label and prediction
conf_matrix = predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
conf_matrix.show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   71|
|  0.0|       1.0|    7|
|  1.0|       0.0|   21|
|  1.0|       1.0|   70|
+-----+----------+-----+



#### Conclusions

Model used: Logistic Regression

Accuracy, Precision, Recall, F1 Score obtained