In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=8d603e52a3e2fe77a566b2b1fcdb8ba72490a0e8c62dd2a2fbc22834e58a342d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession

In [None]:
sp=SparkSession.builder.appName('heart_disease').getOrCreate()
path='/content/Heart_Disease_Prediction.csv'
df=sp.read.format('csv').options(inferSchema=True,header=True).load(path)
df.show()

+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|index|Age|Sex|Chest pain type| BP|Cholesterol|FBS over 120|EKG results|Max HR|Exercise angina|ST depression|Slope of ST|Number of vessels fluro|Thallium|Heart Disease|
+-----+---+---+---------------+---+-----------+------------+-----------+------+---------------+-------------+-----------+-----------------------+--------+-------------+
|    0| 70|  1|              4|130|        322|           0|          2|   109|              0|          2.4|          2|                      3|       3|     Presence|
|    1| 67|  0|              3|115|        564|           0|          2|   160|              0|          1.6|          2|                      0|       7|      Absence|
|    2| 57|  1|              2|124|        261|           0|          0|   141|              0|          0.3|          1|                      0|       7| 

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

df=spark.read.csv("/content/Heart_Disease_Prediction.csv",header=True)


numeric_columns = ["Age", "Chest pain type", "BP", "Cholesterol"]
for column in numeric_columns:
    df = df.withColumn(column, df[column].cast("double"))



In [None]:
feature_columns = ["Age", "Chest pain type", "BP", "Cholesterol"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")


label_indexer = StringIndexer(inputCol="Heart Disease", outputCol="label")


dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")


pipeline = Pipeline(stages=[assembler, label_indexer, dt])


(training_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
model = pipeline.fit(training_data)
predictions = model.transform(test_data)
predictions.select("label", "prediction").show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|  0.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  0.0|       0.0|
+-----+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy: {:.2f}%".format(accuracy * 100))

Accuracy: 66.67%


In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder()\
            .addGrid(dt.maxDepth, [3,5,7])\
            .addGrid(dt.minInstancesPerNode, [1,3,5])\
            .build()

In [None]:
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(
                      labelCol='label', predictionCol='prediction', metricName='accuracy'),
                      numFolds=5)

cvModel = crossval.fit(training_data)

best_model = cvModel.bestModel

predictions = best_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Accuracy: {accuracy:.2f}")

Test Accuracy: 0.60


Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:
numeric_columns = ["Age", "Chest pain type", "BP", "Cholesterol",
                   "FBS over 120","EKG results","Max HR","Exercise angina",
                   "ST depression","Slope of ST","Number of vessels fluro","Thallium"]
for column in numeric_columns:
    df = df.withColumn(column, df[column].cast("double"))

In [None]:
feature_columns2 = ["Age", "Chest pain type", "BP", "Cholesterol",
                   "FBS over 120","EKG results","Max HR","Exercise angina",
                   "ST depression","Slope of ST","Number of vessels fluro","Thallium"]
assembler = VectorAssembler(inputCols=feature_columns2, outputCol="features2")


label_indexer = StringIndexer(inputCol="Heart Disease", outputCol="label2")


rf = RandomForestClassifier(featuresCol="features2", labelCol="label2")


pipeline2 = Pipeline(stages=[assembler, label_indexer, rf])


(training_data2, test_data2) = df.randomSplit([0.8, 0.2], seed=42)

In [None]:
model2 = pipeline2.fit(training_data2)
predictions2 = model2.transform(test_data2)
predictions2.select("label2", "prediction").show(5)

+------+----------+
|label2|prediction|
+------+----------+
|   0.0|       1.0|
|   1.0|       1.0|
|   1.0|       1.0|
|   0.0|       1.0|
|   0.0|       0.0|
+------+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
param_grid2 = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15])  \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .build()

In [None]:
crossval2 = CrossValidator(estimator=pipeline2, estimatorParamMaps=paramGrid2,
                      evaluator=MulticlassClassificationEvaluator(
                      labelCol='label2', predictionCol='prediction', metricName='accuracy'),
                      numFolds=5)

cvModel2 = crossval2.fit(training_data2)

best_model2 = cvModel2.bestModel

predictions2 = best_model2.transform(test_data2)

evaluator2 = MulticlassClassificationEvaluator(labelCol="label2", predictionCol="prediction", metricName="accuracy")
accuracy2 = evaluator2.evaluate(predictions2)

print(f"Test Accuracy: {accuracy2:.2f}")

Test Accuracy: 0.69
