#### Importing Libraries and Data set

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql as pysparksql
from pyspark.sql.functions import mean
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder.appName("stroke").getOrCreate()
train = spark.read.csv('train_2v.csv', inferSchema= True, header= True)      #reading data file csv 

#### Initial Data Analysis

In [2]:
train.printSchema()                              #schema of the data

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [3]:
train.show(5)

+-----+------+----+------------+-------------+------------+------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|   work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+------------+--------------+-----------------+----+---------------+------+
|30669|  Male| 3.0|           0|            0|          No|    children|         Rural|            95.12|18.0|           null|     0|
|30468|  Male|58.0|           1|            0|         Yes|     Private|         Urban|            87.96|39.2|   never smoked|     0|
|16523|Female| 8.0|           0|            0|          No|     Private|         Urban|           110.89|17.6|           null|     0|
|56543|Female|70.0|           0|            0|         Yes|     Private|         Rural|            69.04|35.9|formerly smoked|     0|
|46136|  Male|14.0|           0|            0|          No|Nev

In [4]:
train.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



In [5]:
train.createOrReplaceTempView('table')

In [6]:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+



In [7]:
spark.sql('SELECT gender, count(gender) as count_gender, count(gender)*100 /sum(count(gender)) over() as percent FROM table  GROUP BY gender').show()

+------+------------+-------------------+
|gender|count_gender|            percent|
+------+------------+-------------------+
|Female|       25665|  59.13594470046083|
| Other|          11|0.02534562211981567|
|  Male|       17724|  40.83870967741935|
+------+------------+-------------------+



In [8]:
spark.sql("SELECT gender, count(gender), (COUNT(gender)*100.0)/(SELECT count(gender) FROM table WHERE gender == 'Male') as percentage FROM table WHERE stroke == '1' and gender = 'Male' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          352|1.98600767321146|
+------+-------------+----------------+



In [9]:
spark.sql("SELECT gender, count(gender), (COUNT(gender)*100.0)/(SELECT count(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke == '1' and gender = 'Female' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          431|1.67932982661212|
+------+-------------+----------------+



In [10]:
spark.sql("SELECT age, count(age) as count FROM table WHERE stroke == '1' GROUP BY age ORDER BY count DESC").show()

+----+-----+
| age|count|
+----+-----+
|79.0|   70|
|78.0|   57|
|80.0|   49|
|81.0|   43|
|82.0|   36|
|70.0|   25|
|76.0|   24|
|74.0|   24|
|77.0|   24|
|67.0|   23|
|75.0|   23|
|72.0|   21|
|68.0|   20|
|69.0|   20|
|59.0|   20|
|71.0|   19|
|57.0|   19|
|63.0|   18|
|65.0|   18|
|66.0|   17|
+----+-----+
only showing top 20 rows



In [11]:
train.filter((train['stroke'] == 1) & (train['age'] >= 50)).count()

717

In [12]:
train_f = train.na.fill('No Info',subset= ['smoking_status'])       # Handling missing values

mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

### One Hot Vectoring and Encoding

In [13]:
gender_indexer = StringIndexer(inputCol= 'gender', outputCol= 'genderIndex')
gender_encoder = OneHotEncoder(inputCol= 'genderIndex', outputCol= 'genderVec')

In [14]:
ever_married_indexer = StringIndexer(inputCol= "ever_married", outputCol= 'ever_marriedIndex')
ever_married_encoder = OneHotEncoder(inputCol= 'ever_marriedIndex', outputCol= 'ever_marriedVec')

In [15]:
work_type_indexer = StringIndexer(inputCol= 'work_type', outputCol= 'work_typeIndex')
work_type_encoder = OneHotEncoder(inputCol= 'work_typeIndex', outputCol= 'work_typeVec')

In [16]:
Residence_type_indexer = StringIndexer(inputCol= 'Residence_type', outputCol= 'Residence_typeIndex')
Residence_type_encoder = OneHotEncoder(inputCol= 'Residence_typeIndex', outputCol= 'Residence_typeVec')

In [17]:
smoking_status_indexer = StringIndexer(inputCol= 'smoking_status', outputCol= 'smoking_statusIndex')
smoking_status_encoder = OneHotEncoder(inputCol= 'smoking_statusIndex', outputCol= 'smoking_statusVec')

In [18]:
assembler= VectorAssembler(inputCols= ['genderVec', 'age', 'hypertension', 'heart_disease', 'ever_marriedVec', 'work_typeVec', 'Residence_typeVec', 'avg_glucose_level', 'bmi', 'smoking_statusVec'], outputCol= 'features')

### Decision Tree Classifier

In [19]:
dt_classifier = DecisionTreeClassifier(featuresCol= 'features', labelCol= 'stroke')

#### Pipeline to align stages

In [20]:
pipeline = Pipeline(stages= [gender_indexer, ever_married_indexer, work_type_indexer, Residence_type_indexer, smoking_status_indexer,
                             gender_encoder, ever_married_encoder, work_type_encoder, Residence_type_encoder, smoking_status_encoder,
                             assembler, dt_classifier])

In [1]:
train_data, test_data = train_f.randomSplit([0.8,0.2])   ## Splitting Data into Train and Test

NameError: name 'train_f' is not defined

In [22]:
model = pipeline.fit(train_data)

In [23]:
preds = model.transform(test_data)

#### Evaluating Accuracy for the Decision Tree Accuracy

In [24]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol= 'stroke', predictionCol= 'prediction', metricName= 'accuracy')
dt_acc = acc_evaluator.evaluate(preds)
print("The Decision Tree accuracy: {0:2.2f}".format(dt_acc*100))

The Decision Tree accuracy: 98.03
