In [13]:
#changing the directory to locate the csv file 
import os
#using 'chdir' option to change the directory
os.chdir(r'F:\projects\Spark with Data Science_ Heath Care Data set')
#print current wworking directory:
os.getcwd()

#importing SparkSession
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
#creating an appname= 'stroke' using SparkSession
spark = SparkSession.builder.appName('stroke').getOrCreate()

#reading the csv file
train = spark.read.csv('Health_Care_dataset_train_2v.csv',
                      inferSchema=True,header=True)


In [14]:
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [15]:
#combinning the instances of Stroke column
train.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



In [17]:
#decribe the data set
train.describe()

DataFrame[summary: string, id: string, gender: string, age: string, hypertension: string, heart_disease: string, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: string, bmi: string, smoking_status: string, stroke: string]

In [19]:
#checking the type of work which has more cases of strokes
train.createOrReplaceTempView('table')

In [22]:
#creating a table work_type and work_type_count for which stroke is 1
#SQL query:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+



In [24]:
#finding the persons who participated in clinic mesurement
spark.sql("SELECT gender, count(gender) as count_gender,count(gender)*100/sum(count(gender)) over() as percent FROM table GROUP BY gender").show()

+------+------------+-------------------+
|gender|count_gender|            percent|
+------+------------+-------------------+
|Female|       25665|  59.13594470046083|
| Other|          11|0.02534562211981567|
|  Male|       17724|  40.83870967741935|
+------+------------+-------------------+



In [26]:
#retriving information to know how many female/males have a stroke:
#retriving the male stroke infomation
spark.sql("SELECT gender, count(gender), (COUNT(gender)*100.0)/(SELECT count(gender) FROM table WHERE gender == 'Male')as percentage FROM table WHERE stroke ='1' and gender = 'Male' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          352|1.98600767321146|
+------+-------------+----------------+



In [27]:
#retriving the female stroke information:
spark.sql("SELECT gender, count(gender), (COUNT(gender)*100.0)/ (SELECT count(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke = '1' and gender = 'Female' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          431|1.67932982661212|
+------+-------------+----------------+



In [28]:
#retriving information based on age if it has influence on stroke 
#finding what is the risky age:

spark.sql("SELECT age , count(age) as age_count FROM table WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|79.0|       70|
|78.0|       57|
|80.0|       49|
|81.0|       43|
|82.0|       36|
|70.0|       25|
|74.0|       24|
|77.0|       24|
|76.0|       24|
|67.0|       23|
|75.0|       23|
|72.0|       21|
|59.0|       20|
|69.0|       20|
|68.0|       20|
|71.0|       19|
|57.0|       19|
|63.0|       18|
|65.0|       18|
|66.0|       17|
+----+---------+
only showing top 20 rows



In [32]:
#calculating the number of strokes cases for people after 50 years:
train.filter((train['stroke'] == 1) &(train['age'] > '50')).count()

708

In [35]:
#cleaning data:
#dealing with categorical and missing values:
#there are missing values for smoking_status andd bmi paramaters
#fill the missing values:
train_f = train.na.fill('No Info', subset = ['smoking_status'])

#fill in miss values with mean
from pyspark.sql.functions import mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

In [66]:
#since ML algorithms cannot work directly with categorical data, lets encode
#algorithm to ccontinues features to use categorical features
from pyspark.ml.feature import (VectorAssembler, OneHotEncoder, StringIndexer)

gender_indexer = StringIndexer(inputCol = 'gender',outputCol ='genderIndex')
gender_encoder = OneHotEncoder(inputCol = 'genderIndex',outputCol = 'genderVec')

ever_married_indexer = StringIndexer(inputCol = 'ever_married', outputCol = 'ever_marriedIndex')
ever_married_encoder  =OneHotEncoder(inputCol = 'ever_marriedIndex',outputCol = 'ever_marriedVec')

work_type_indexer = StringIndexer(inputCol ='work_type', outputCol = 'work_typeIndex')
work_type_encoder = OneHotEncoder(inputCol = 'work_typeIndex', outputCol = 'work_typeVec')

Residence_type_indexer = StringIndexer(inputCol = 'Residence_type', outputCol = 'Residence_typeIndex')
Residence_type_encoder = OneHotEncoder(inputCol = 'Residence_typeIndex', outputCol = 'Residence_typeVec')

smoking_status_indexer = StringIndexer(inputCol = 'smoking_status',outputCol = 'smoking_statusIndex')
smoking_status_encoder = OneHotEncoder(inputCol = 'smoking_statusIndex', outputCol = 'smoking_statusVec')


Exception ignored in: <function JavaWrapper.__del__ at 0x0000027C056C9B70>
Traceback (most recent call last):
  File "C:\opt\spark\spark-2.4.2-bin-hadoop2.7\python\pyspark\ml\wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'StringIndexer' object has no attribute '_java_obj'


In [67]:
#create an assembler that combines columns into single vector columsn to train ML model.
#using the vector columns that we got after one_hot_encoding

assembler =VectorAssembler(inputCols =['genderVec','age','hypertension','heart_disease', 'ever_marriedVec', 'work_typeVec', 'Residence_typeVec', 'avg_glucose_level', 'bmi', 'smoking_statusVec'],outputCol = 'features')

In [68]:
#creating an Decision Tree object by importing Decision tree classifier;
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol = 'stroke',featuresCol = 'features')

In [69]:
#Spark ML represents such a workflow as a Pipeline 
#sequence oof PipeplineStages to run a specific order

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [gender_indexer,ever_married_indexer,
                             work_type_indexer,Residence_type_indexer,
                             smoking_status_indexer,gender_encoder,
                             ever_married_encoder, work_type_encoder,
                             Residence_type_encoder, smoking_status_encoder,assembler,dtc])

In [70]:
#splitting dataset to train and test:
train_data,test_data = train_f.randomSplit([0.7,0.3])

In [71]:
#fitting the model using pipeline created 
model = pipeline.fit(train_data)

In [72]:
#transform the test_data
dtc_predictions = model.transform(test_data)

In [74]:
#evaluating the model
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#select(prediction,true label) and compute test error:
acc_evaluator = MulticlassClassificationEvaluator(labelCol = "stroke",
                                                 predictionCol = "prediction",metricName = "accuracy")

dtc_acc = acc_evaluator.evaluate(dtc_predictions)

print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 97.91%
