In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7") # SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
import pyspark.sql as sparksql

In [0]:
sc = spark.sparkContext

In [0]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
spark = SparkSession.builder.appName('stroke').getOrCreate()


In [0]:
#DATA LOADING.
#The files should be uploaded to '/content/'.

train = spark.read.csv('/content/train_2v.csv', inferSchema=True,header=True)
test = spark.read.csv('/content/test_2v.csv', inferSchema=True,header=True)

In [0]:

#Here are the different features of the dataset.
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [0]:

train.show(2,truncate= True)

+-----+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| bmi|smoking_status|stroke|
+-----+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
|30669|  Male| 3.0|           0|            0|          No| children|         Rural|            95.12|18.0|          null|     0|
|30468|  Male|58.0|           1|            0|         Yes|  Private|         Urban|            87.96|39.2|  never smoked|     0|
+-----+------+----+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+
only showing top 2 rows



In [0]:
#dropping id because it has no effect on label
#train = train.drop('id')
#train_data=train.toPandas().head(5)

In [0]:
#test= test.drop('id')
#test_df= test.toPandas().head(5)

In [0]:
#count no of rows in each group
train.count(),test.count()

(43400, 18601)

In [0]:
#no. of columns, stroke column is not there in test data, as it is the dataset that will be used for the testing of 
#the predictor and the classifier.
len(train.columns),len(test.columns)

(12, 11)

In [0]:
#Summary statistics information of the features, for categorical values, no statistic information like mean,stddev is showing
train.describe().show(),test.describe().show()

+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|summary|               id|gender|               age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi| smoking_status|             stroke|
+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|  count|            43400| 43400|             43400|              43400|              43400|       43400|    43400|         43400|             43400|             41938|          30108|              43400|
|   mean|36326.14235023042|  null| 42.21789400921646|0.09357142857142857|0.04751152073732719|        null|     null|          null|104.48274999999916|28.605038390004545|       

(None, None)

In [0]:
train.groupBy('stroke').count().show() #This is an Imbalanced dataset, where the number of observations belonging to 
#one class is significantly lower than those belonging to the other classes. 
#In this case, the predictive model could be biased and inaccurate

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



In [0]:
train.createOrReplaceTempView('table')

VISUALIZATION

In [0]:
# sql query to find the number of people in specific work_type who have had stroke and not
spark.sql("SELECT work_type, COUNT(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY COUNT(work_type) DESC").show()
spark.sql("SELECT work_type, COUNT(work_type) as work_type_count FROM table WHERE stroke == 0 GROUP BY work_type ORDER BY COUNT(work_type) DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|          24393|
|Self-employed|           6542|
|     children|           6154|
|     Govt_job|           5351|
| Never_worked|            177|
+-------------+---------------+



In [0]:
#Here we check if it is related to gender.
spark.sql("SELECT gender, COUNT(gender) as gender_count, COUNT(gender)*100/(SELECT COUNT(gender) FROM table WHERE gender == 'Male') as percentage FROM table WHERE stroke== 1 AND gender = 'Male' GROUP BY gender").show()
spark.sql("SELECT gender, COUNT(gender) as gender_count, COUNT(gender)*100/(SELECT COUNT(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke== 1 AND gender = 'Female' GROUP BY gender").show()

#We obtain that of all the data; 1.98% of males had a stroke and 1.68% of females had a stroke

+------+------------+------------------+
|gender|gender_count|        percentage|
+------+------------+------------------+
|  Male|         352|1.9860076732114647|
+------+------------+------------------+

+------+------------+------------------+
|gender|gender_count|        percentage|
+------+------------+------------------+
|Female|         431|1.6793298266121177|
+------+------------+------------------+



In [0]:
#We check how many people that had a stroke are over 50 years old
spark.sql("SELECT COUNT(age)*100/(SELECT COUNT(age) FROM table WHERE stroke ==1) as percentage FROM table WHERE stroke == 1 AND age>=50").show()


+-----------------+
|       percentage|
+-----------------+
|91.57088122605364|
+-----------------+



Cleaning up training data

In [0]:
train.describe().show()

+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|summary|               id|gender|               age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi| smoking_status|             stroke|
+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|  count|            43400| 43400|             43400|              43400|              43400|       43400|    43400|         43400|             43400|             41938|          30108|              43400|
|   mean|36326.14235023042|  null| 42.21789400921646|0.09357142857142857|0.04751152073732719|        null|     null|          null|104.48274999999916|28.605038390004545|       

IMPUTATION

Here we see that there are few missing values in smoking_status and bmi column
Also there are few categorical data (gender, ever_married, work_type, Residence_type, smoking_status which we need to covert into one hot encoding

In [0]:
# fill in missing values for smoking status
# As this is categorical data, we will add one data type "No Info" for the missing one
train_f = train.na.fill('No Info', subset=['smoking_status'])
test_f = test.na.fill('No Info', subset=['smoking_status'])

In [0]:
# fill in miss values for bmi 
# as this is numecial data , we will simple fill the missing values with mean
from pyspark.sql.functions import mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])
test_f = test_f.na.fill(mean_bmi,['bmi'])

In [0]:
train_f.describe().show()

+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|summary|               id|gender|               age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|             stroke|
+-------+-----------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|  count|            43400| 43400|             43400|              43400|              43400|       43400|    43400|         43400|             43400|             43400|         43400|              43400|
|   mean|36326.14235023042|  null| 42.21789400921646|0.09357142857142857|0.04751152073732719|        null|     null|          null|104.48274999999916|28.605038390005145|          n

In [0]:
# indexing all categorical columns in the dataset
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol="gender", outputCol="genderIndex")
indexer2 = StringIndexer(inputCol="ever_married", outputCol="ever_marriedIndex")
indexer3 = StringIndexer(inputCol="work_type", outputCol="work_typeIndex")
indexer4 = StringIndexer(inputCol="Residence_type", outputCol="Residence_typeIndex")
indexer5 = StringIndexer(inputCol="smoking_status", outputCol="smoking_statusIndex")

In [0]:
#Doing one hot encoding of indexed data
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=["genderIndex","ever_marriedIndex","work_typeIndex","Residence_typeIndex","smoking_statusIndex"],
                                 outputCols=["genderVec","ever_marriedVec","work_typeVec","Residence_typeVec","smoking_statusVec"])

In [0]:
#vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['id','genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')


Decision tree classifier and predictor

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [0]:
#importing pipelines to distribute the task
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler, dtc])

In [0]:
# splitting train data into training and validation data
train_data,val_data = train_f.randomSplit([0.7,0.3])

# training model pipeline with data
model = pipeline.fit(train_data)

In [0]:
# making prediction on model with validation data
dtc_predictions = model.transform(val_data)

# Select example rows to display.
dtc_predictions.select("prediction","probability", "stroke", "features").show(5)

+----------+--------------------+------+--------------------+
|prediction|         probability|stroke|            features|
+----------+--------------------+------+--------------------+
|       0.0|[0.86322869955156...|     0|(17,[0,1,3,5,6,8,...|
|       0.0|[0.99202921313020...|     0|(17,[0,1,3,7,11,1...|
|       0.0|[0.99202921313020...|     0|(17,[0,1,3,6,7,11...|
|       0.0|[0.99202921313020...|     0|(17,[0,1,3,6,8,11...|
|       0.0|[0.99202921313020...|     0|(17,[0,1,3,6,7,12...|
+----------+--------------------+------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 98.03%


In [0]:
# now predicting the labels for test data
test_pred = model.transform(test_f)
test_selected = test_pred.select( "features", "prediction","probability")
test_selected.limit(5).toPandas()

Unnamed: 0,features,prediction,probability
0,"(36306.0, 0.0, 1.0, 80.0, 0.0, 0.0, 1.0, 1.0, ...",0.0,"[0.943382530849262, 0.05661746915073796]"
1,"(61829.0, 1.0, 0.0, 74.0, 0.0, 1.0, 1.0, 0.0, ...",0.0,"[0.8632286995515696, 0.1367713004484305]"
2,"(14152.0, 1.0, 0.0, 14.0, 0.0, 0.0, 0.0, 0.0, ...",0.0,"[0.9920292131302026, 0.007970786869797393]"
3,"(12997.0, 0.0, 1.0, 28.0, 0.0, 0.0, 0.0, 1.0, ...",0.0,"[0.9920292131302026, 0.007970786869797393]"
4,"(40801.0, 1.0, 0.0, 63.0, 0.0, 0.0, 1.0, 0.0, ...",0.0,"[0.9920292131302026, 0.007970786869797393]"


Without Imputation of the missing values

In [0]:
train_c = train.na.drop()
test_c = test.na.drop()

In [0]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler, dtc])

In [0]:
# splitting training and validation data
train_data,val_data = train_c.randomSplit([0.7,0.3])

# training model pipeline with data
model = pipeline.fit(train_data)

In [0]:
# making prediction on model with validation data
dtc_predictions = model.transform(val_data)

# Select example rows to display.
dtc_predictions.select("prediction","probability", "stroke", "features").show(5)

+----------+--------------------+------+--------------------+
|prediction|         probability|stroke|            features|
+----------+--------------------+------+--------------------+
|       0.0|         [0.95,0.05]|     0|[43.0,1.0,0.0,77....|
|       0.0|[0.99670138888888...|     0|(16,[0,1,3,6,7,12...|
|       0.0|[0.99670138888888...|     0|(16,[0,1,3,6,7,12...|
|       0.0|[0.99670138888888...|     0|(16,[0,1,3,9,11,1...|
|       0.0|[0.98655845802688...|     0|(16,[0,1,3,6,9,12...|
+----------+--------------------+------+--------------------+
only showing top 5 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 97.88%


In [0]:
# now predicting the labels for test data
test_pred = model.transform(test_c)
test_selected = test_pred.select( "features", "prediction","probability")
test_selected.limit(5).toPandas()

Unnamed: 0,features,prediction,probability
0,"(36306.0, 0.0, 1.0, 80.0, 0.0, 0.0, 1.0, 1.0, ...",0.0,"[0.9286624203821656, 0.07133757961783439]"
1,"(61829.0, 1.0, 0.0, 74.0, 0.0, 1.0, 1.0, 0.0, ...",0.0,"[0.8455598455598455, 0.15444015444015444]"
2,"(40801.0, 1.0, 0.0, 63.0, 0.0, 0.0, 1.0, 0.0, ...",0.0,"[0.9865584580268831, 0.013441541973116916]"
3,"[9348.0, 1.0, 0.0, 66.0, 1.0, 0.0, 1.0, 1.0, 0...",0.0,"[0.9661114302125215, 0.03388856978747846]"
4,"(60512.0, 0.0, 1.0, 46.0, 0.0, 0.0, 1.0, 0.0, ...",0.0,"[0.9967013888888889, 0.003298611111111111]"


Clustering

In [0]:
# CLUSTERING ...

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(k=2,seed=1)
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler, kmeans])
Model = pipeline.fit(train_data)
predictionResult = Model.transform(train_data)
predictionResult.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+-----------+-----------------+--------------+-------------------+-------------------+---------------+-------------+-----------------+-------------+-----------------+--------------------+----------+
| id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|genderIndex|ever_marriedIndex|work_typeIndex|Residence_typeIndex|smoking_statusIndex|ever_marriedVec|    genderVec|smoking_statusVec| work_typeVec|Residence_typeVec|            features|prediction|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+-----------+-----------------+--------------+-------------------+-------------------+---------------+-------------+-----------------+-------------+-----------------+--------------------+----------+
|  1|Femal