In [None]:
#Installing pyspark
!pip install pyspark

In [2]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


# **Creating Model**

Heart data https://www.kaggle.com/datasets/fedesoriano/heart-failure-prediction

In [1]:
# Import library
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

# Create a SparkSession
spark = SparkSession.builder.appName("HeartDiseasePrediction").getOrCreate()

# Load the data
data = spark.read.csv("drive/MyDrive/Data For Colab/heart.csv", header=True, inferSchema=True)

# Drop missing values
data = data.dropna()
data.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
+---+---+-------------+---------+-----------+---------+-

In [2]:
# Convert categorical columns to numeric
indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c, t in data.dtypes if t.startswith("string") ]

# One-hot encode indexed columns
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_ohe") for indexer in indexers]

# Select columns features to assemble
selected_cols = ["Age", "Sex_index", "RestingBP", "Cholesterol", "MaxHR", "Oldpeak"]
encoded_cols = [encoder.getOutputCol() for encoder in encoders]

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=selected_cols + encoded_cols, outputCol="features")

# Create the pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

data = pipeline.fit(data).transform(data)
data.show(5)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+---------+-------------------+----------------+--------------------+--------------+-------------+-----------------------+--------------------+------------------------+------------------+--------------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|Sex_index|ChestPainType_index|RestingECG_index|ExerciseAngina_index|ST_Slope_index|Sex_index_ohe|ChestPainType_index_ohe|RestingECG_index_ohe|ExerciseAngina_index_ohe|ST_Slope_index_ohe|            features|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+---------+-------------------+----------------+--------------------+--------------+-------------+-----------------------+--------------------+------------------------+------------------+--------------------+
| 40|  M|          ATA|  

***String indexer***

is used to convert categorical string variables to numerical variables. The output of a string indexer is a numerical representation of a categorical variable, but it should not be interpreted as an actual numerical value. The numbers assigned to each category are simply used as a unique identifier for each category, and have no inherent meaning or relation to each other.

For example, we have two categories on sex "M", "F", first category encountered in the data will be assigned the value 0, the second category will be assigned the value 1, and so on. The number assigned to a category does not indicate any inherent property or relation between the categories, it's just a way of identifying them.

---


***One-hot encoder***

The important thing is the one-hot encoding process, it takes the output of the indexer and creates a binary vector representation of the categorical variable. The vector has a length equal to the number of possible categories and each element of the vector corresponds to a specific category. The value of the element is set to 1 if the category is present in the original data, and 0 otherwise, regardless of the indexing order.

In [3]:
from pyspark.sql.functions import count
# Selecting and renaming column
data = data.select("features","HeartDisease")
data = data.withColumnRenamed("HeartDisease","label")
data.show(5)

# See the scale of the data
data.groupBy("label").agg(count("*").alias("count")).show()
print("The data should be roughly around 50:50 or at the same scale")

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[40.0,0.0,140.0,2...|    0|
|[49.0,1.0,160.0,1...|    1|
|(15,[0,2,3,4,6,9,...|    0|
|[48.0,1.0,138.0,2...|    1|
|[54.0,0.0,150.0,1...|    0|
+--------------------+-----+
only showing top 5 rows

+-----+-----+
|label|count|
+-----+-----+
|    1|  508|
|    0|  410|
+-----+-----+

The data should be roughly around 50:50 or at the same scale


# **Random Forest Classifier**

This method uses the feature importance measure provided by the Random Forest algorithm, so you don't have to use any other feature selection techniques.

In [4]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Split the data into training and test sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Random Forest Classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train the model
model = rf.fit(trainingData)

# Make predictions on the test set
predictions = model.transform(testData)

# create a BinaryClassificationEvaluator object
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction")

# calculate the evaluation metrics
accuracy = evaluator.evaluate(predictions)

# print prediction and evaluation metrics
predictions.show()
print("Accuracy: ", accuracy)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(15,[0,1,2,3,4,5,...|    1|[3.83547879593311...|[0.19177393979665...|       1.0|
|(15,[0,1,2,3,4,5,...|    1|[4.84949396026114...|[0.24247469801305...|       1.0|
|(15,[0,1,2,3,4,5,...|    1|[5.17130091270719...|[0.25856504563535...|       1.0|
|(15,[0,1,2,3,4,5,...|    0|[10.8611386483128...|[0.54305693241564...|       0.0|
|(15,[0,1,2,3,4,7,...|    0|[16.7573178103634...|[0.83786589051817...|       0.0|
|(15,[0,1,2,3,4,8,...|    0|[18.6298105856310...|[0.93149052928155...|       0.0|
|(15,[0,1,2,3,4,8,...|    0|[18.7553024772519...|[0.93776512386259...|       0.0|
|(15,[0,1,2,3,4,8,...|    0|[18.7178460312589...|[0.93589230156294...|       0.0|
|(15,[0,1,2,3,4,9,...|    0|[19.4730238823709...|[0.97365119411854...|       0.0|
|(15,[0,1,2,3,4,