In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
spark = SparkSession.builder.appName('stroke').getOrCreate()

In [None]:
train = spark.read.csv('/content/healthcare-dataset-stroke-data (2).csv', inferSchema=True,header=True)

In [None]:
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [None]:
train = train.withColumn("bmi",train["bmi"].cast('integer'))
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: integer (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [None]:
train.describe()

DataFrame[summary: string, id: string, gender: string, age: string, hypertension: string, heart_disease: string, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: string, bmi: string, smoking_status: string, stroke: string]

In [None]:
train.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'int'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [None]:
train.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  249|
|     0| 4861|
+------+-----+



In [None]:
# create DataFrame as a temporary view
train.createOrReplaceTempView('table')

In [None]:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            149|
|Self-employed|             65|
|     Govt_job|             33|
|     children|              2|
+-------------+---------------+



In [None]:
spark.sql("SELECT gender, count(gender) as count_gender, count(gender)*100/sum(count(gender)) over() as percent FROM table GROUP BY gender").show()

+------+------------+--------------------+
|gender|count_gender|             percent|
+------+------------+--------------------+
|Female|        2994|  58.590998043052835|
| Other|           1|0.019569471624266144|
|  Male|        2115|    41.3894324853229|
+------+------------+--------------------+



In [None]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Male') as percentage FROM table WHERE stroke = '1' and gender = 'Male' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          108|5.10638297872340|
+------+-------------+----------------+



In [None]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke = '1' and gender = 'Female' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          141|4.70941883767535|
+------+-------------+----------------+



In [None]:
spark.sql("SELECT age, count(age) as age_count FROM table WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|78.0|       21|
|79.0|       17|
|80.0|       17|
|81.0|       14|
|57.0|       11|
|76.0|       10|
|63.0|        9|
|68.0|        9|
|74.0|        9|
|82.0|        9|
|59.0|        8|
|77.0|        8|
|71.0|        7|
|58.0|        7|
|70.0|        6|
|75.0|        6|
|69.0|        6|
|72.0|        6|
|54.0|        6|
|61.0|        6|
+----+---------+
only showing top 20 rows



In [None]:
train.filter((train['stroke'] == 1) & (train['age'] > '50')).count()

226

In [None]:
# fill in missing values
train_f = train.na.fill('No Info', subset=['smoking_status'])
# fill in miss values with mean
from pyspark.sql.functions import mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

In [None]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                                StringIndexer)

In [None]:
gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol='genderVec')

ever_married_indexer = StringIndexer(inputCol='ever_married', outputCol='ever_marriedIndex')
ever_married_encoder = OneHotEncoder(inputCol='ever_marriedIndex', outputCol='ever_marriedVec')

work_type_indexer = StringIndexer(inputCol='work_type', outputCol='work_typeIndex')
work_type_encoder = OneHotEncoder(inputCol='work_typeIndex', outputCol='work_typeVec')

Residence_type_indexer = StringIndexer(inputCol='Residence_type', outputCol='Residence_typeIndex')
Residence_type_encoder = OneHotEncoder(inputCol='Residence_typeIndex', outputCol='Residence_typeVec')

smoking_status_indexer = StringIndexer(inputCol='smoking_status', outputCol='smoking_statusIndex')
smoking_status_encoder = OneHotEncoder(inputCol='smoking_statusIndex', outputCol='smoking_statusVec')

In [None]:
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[gender_indexer, ever_married_indexer, work_type_indexer, Residence_type_indexer,
                           smoking_status_indexer, gender_encoder, ever_married_encoder, work_type_encoder,
                           Residence_type_encoder, smoking_status_encoder, assembler, dtc])

In [None]:
train_data,test_data = train_f.randomSplit([0.7,0.3])

In [None]:
model = pipeline.fit(train_data)

In [None]:
dtc_predictions = model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 95.08%
