Stroke Prediction - Kaggle
https://www.kaggle.com/datasets/fedesoriano/stroke-prediction-dataset

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, Imputer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors

In [None]:
spark = SparkSession.builder.appName('Stroke Prediction').getOrCreate()

In [None]:
stroke_ds = spark.read.csv('./dataset/healthcare-dataset-stroke-data.csv', header=True, inferSchema = True)

In [None]:
stroke_ds = stroke_ds.sampleBy("stroke", fractions={0:0.0535, 1:1}, seed=1234)

In [None]:
stroke_ds.show(1, vertical=True)

In [None]:
stroke_ds.printSchema()

In [None]:
stroke_ds.filter(stroke_ds.gender == 'Male').count()

In [None]:
stroke_ds.filter(stroke_ds.gender == 'Female').count()

In [None]:
stroke_ds.filter(stroke_ds.stroke == 1).count()

In [None]:
stroke_ds.filter(stroke_ds.stroke == 0).count()

In [None]:
# null empty check
from pyspark.sql.functions import isnan, when, count, col
stroke_ds.select([count(when(isnan(c), c)).alias(c) for c in stroke_ds.columns]).show(vertical=True)

In [None]:
stroke_ds.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in stroke_ds.columns]).show()

In [None]:
stroke_ds.filter(stroke_ds.bmi == 'N/A').count()

In [None]:
#Replace part of string with another string
stroke_ds = stroke_ds.withColumn('bmi', regexp_replace('bmi', 'N/A', ''))

In [None]:
stroke_ds = stroke_ds.withColumn("bmi", stroke_ds.bmi.cast("int"))

In [None]:
bmiImputer = Imputer(inputCols=["bmi"], outputCols=["{}_imputed".format(c) for c in ["bmi"]]).setStrategy('mean') 
stroke_ds_mod = bmiImputer.fit(stroke_ds).transform(stroke_ds)

In [None]:
stroke_ds_mod = stroke_ds_mod.drop('bmi', 'id')

In [None]:
stroke_ds_mod.show(3, vertical=True)

In [None]:
vectorAssembler_age = VectorAssembler(inputCols= ['age'], outputCol='age_v') 
stroke_ds_mod = vectorAssembler_age.transform(stroke_ds_mod)
scale_age = MinMaxScaler(inputCol='age_v', outputCol='age_scaled')
stroke_ds_mod = scale_age.fit(stroke_ds_mod).transform(stroke_ds_mod)

In [None]:
vectorAssembler_bmi = VectorAssembler(inputCols= ['bmi_imputed'], outputCol='bmi_v') 
stroke_ds_mod = vectorAssembler_bmi.transform(stroke_ds_mod)
scale_bmi = MinMaxScaler(inputCol='bmi_v', outputCol='bmi_scaled')
stroke_ds_mod = scale_bmi.fit(stroke_ds_mod).transform(stroke_ds_mod)

In [None]:
vectorAssembler_glu = VectorAssembler(inputCols= ['avg_glucose_level'], outputCol='glu_v') 
stroke_ds_mod = vectorAssembler_glu.transform(stroke_ds_mod)
scale_glu = MinMaxScaler(inputCol='glu_v', outputCol='glu_scaled')
stroke_ds_mod = scale_glu.fit(stroke_ds_mod).transform(stroke_ds_mod)

In [None]:
stroke_ds_mod.show(1, vertical=True)

In [None]:
strIndexer = StringIndexer(inputCols=['gender', 'smoking_status', 'Residence_type', 'work_type', 'ever_married'], 
outputCols=['gender_idx', 'smoking', 'residence', 'work', 'married']) 

In [None]:
stroke_ds_transform = strIndexer.fit(stroke_ds_mod).transform(stroke_ds_mod)

In [None]:
stroke_ds_transform.printSchema()

In [None]:
stroke_ds_transform.show(1, vertical=True)

In [None]:
stroke_ds_transform = stroke_ds_transform.withColumnRenamed("stroke","label")

In [None]:
vectorAssembler = VectorAssembler(inputCols= ['hypertension', 'heart_disease', 'glu_scaled', 
                                             'smoking', 'bmi_scaled', 'age_scaled', 'gender_idx', 
                                             'work', 'residence', 'married'], 
                                  outputCol='features') 

In [None]:
vector_stroke_data = vectorAssembler.transform(stroke_ds_transform) 
stroke_data_final = vector_stroke_data.select(["features","label"]) 

In [None]:
stroke_data_final.show(5, vertical=True)

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(stroke_data_final) 
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(stroke_data_final) 
(trainingData, testData) = stroke_data_final.randomSplit([0.6, 0.4]) 
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",  maxDepth=5) 
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])  

In [None]:
model = pipeline.fit(trainingData) 

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("prediction", "indexedLabel", "features").show(5)

In [None]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

In [None]:
treeModel = model.stages[2]
# summary only
print(treeModel)