In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
spark = SparkSession.builder.appName('stroke').getOrCreate()
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                               StringIndexer)
from pyspark.ml import Pipeline

In [2]:
#Creating the spark session
#spark = SparkSession.builder.master("local[2]").appName("myApp").getOrCreate()

In [3]:
#Spark session info- connection
spark

In [4]:
#Spark context enviroment accessed
#sc = spark.sparkContext

In [5]:
#Reading the raw data without the schema
# f = sc.textFile("train_2v.csv")
f= spark.read.csv("train_2v.csv", inferSchema=True, header =True)

In [6]:
f

DataFrame[id: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int]

In [7]:
#Fetch list from RDD(Spark term form resiliant distrubuted dataset)
f.collect()

[Row(id=30669, gender='Male', age=3.0, hypertension=0, heart_disease=0, ever_married='No', work_type='children', Residence_type='Rural', avg_glucose_level=95.12, bmi=18.0, smoking_status=None, stroke=0),
 Row(id=30468, gender='Male', age=58.0, hypertension=1, heart_disease=0, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=87.96, bmi=39.2, smoking_status='never smoked', stroke=0),
 Row(id=16523, gender='Female', age=8.0, hypertension=0, heart_disease=0, ever_married='No', work_type='Private', Residence_type='Urban', avg_glucose_level=110.89, bmi=17.6, smoking_status=None, stroke=0),
 Row(id=56543, gender='Female', age=70.0, hypertension=0, heart_disease=0, ever_married='Yes', work_type='Private', Residence_type='Rural', avg_glucose_level=69.04, bmi=35.9, smoking_status='formerly smoked', stroke=0),
 Row(id=46136, gender='Male', age=14.0, hypertension=0, heart_disease=0, ever_married='No', work_type='Never_worked', Residence_type='Rural', avg_glucose_l

In [8]:
# Define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("id", IntegerType(), nullable=True),
    StructField("gender", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
    StructField("hypertension", IntegerType(), nullable=True),
    StructField("heart_disease", IntegerType(), nullable=True),
    StructField("ever_married", StringType(), nullable=True),
    StructField("Residence_type", StringType(), nullable=True),
    StructField("avg_glucose_level",IntegerType(), nullable=True),
    StructField("bmi", IntegerType(), nullable=True),
    StructField("smoking_status", StringType(), nullable=True),
    StructField("stroke", IntegerType(), nullable=True)
   ]
)

In [9]:
updated_f=f.na.fill('No Info', subset=['smoking_status'])

In [10]:
updated_f.take(5)

[Row(id=30669, gender='Male', age=3.0, hypertension=0, heart_disease=0, ever_married='No', work_type='children', Residence_type='Rural', avg_glucose_level=95.12, bmi=18.0, smoking_status='No Info', stroke=0),
 Row(id=30468, gender='Male', age=58.0, hypertension=1, heart_disease=0, ever_married='Yes', work_type='Private', Residence_type='Urban', avg_glucose_level=87.96, bmi=39.2, smoking_status='never smoked', stroke=0),
 Row(id=16523, gender='Female', age=8.0, hypertension=0, heart_disease=0, ever_married='No', work_type='Private', Residence_type='Urban', avg_glucose_level=110.89, bmi=17.6, smoking_status='No Info', stroke=0),
 Row(id=56543, gender='Female', age=70.0, hypertension=0, heart_disease=0, ever_married='Yes', work_type='Private', Residence_type='Rural', avg_glucose_level=69.04, bmi=35.9, smoking_status='formerly smoked', stroke=0),
 Row(id=46136, gender='Male', age=14.0, hypertension=0, heart_disease=0, ever_married='No', work_type='Never_worked', Residence_type='Rural', avg

In [28]:
# from pyspark.ml.feature import OneHotEncoder, StringIndexer

#One-Hot-Encoding for fields - gender, residence type and smoke_status
gender_indexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
#model = stringIndexer.fit(updated_f)
#indexed = model.transform(updated_f)
gender_encoder = OneHotEncoder(inputCol="genderIndex", outputCol="genderVec")
#encoded = encoder.transform(indexed)
#encoded.show()

In [35]:
residence_type_indexer = StringIndexer(inputCol="Residence_type", outputCol="Residence_typeIndex")
#model = stringIndexer.fit(updated_f)
#indexed = model.transform(updated_f)
residence_type_encoder = OneHotEncoder(inputCol="Residence_typeIndex", outputCol="Residence_typeVec")
#encoded = encoder.transform(indexed)
#encoded.show()

In [41]:
smoking_status_indexer = StringIndexer(inputCol="smoking_status", outputCol="smoking_statusIndex")
#model = stringIndexer.fit(updated_f)
#indexed = model.transform(updated_f)
smoking_status_encoder = OneHotEncoder(inputCol="smoking_statusIndex", outputCol="smoking_statusVec")
#encoded = encoder.transform(indexed)
#encoded.show()

In [42]:
ever_married_indexer = StringIndexer(inputCol="ever_married", outputCol="ever_marriedIndex")
#model = stringIndexer.fit(updated_f)
#indexed = model.transform(updated_f)
ever_married_encoder = OneHotEncoder(inputCol="ever_marriedIndex", outputCol="ever_marriedVec")
#encoded = encoder.transform(indexed)
#encoded.show()

In [43]:
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'Residence_typeVec',
 'avg_glucose_level',
 #'bmi',
 'smoking_statusVec'],outputCol='features')

#pipeline = Pipeline(stages=assembler)
#updated_f = pipeline.fit(updated_f).transform(updated_f)

## View the assembler vector

In [44]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [45]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[gender_indexer, ever_married_indexer, residence_type_indexer,
                           smoking_status_indexer, gender_encoder, ever_married_encoder,
                           residence_type_encoder, smoking_status_encoder, assembler, dtc])

In [46]:
train_data,test_data = updated_f.randomSplit([0.7,0.3])

In [47]:
model = pipeline.fit(train_data)

In [48]:
dtc_predictions = model.transform(test_data)

In [49]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 98.14%
