# Reading health data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import seaborn as sns
import matplotlib.pyplot as plt  



spark = SparkSession.builder\
        .master("local[*]")\
        .appName('HealthcarePrediction')\
        .getOrCreate()

sqlContext = SQLContext(spark.sparkContext)

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.AssertionError: assertion failed: Expected hostname or IPv6 IP enclosed in [] but got 2405:204:220b:386c:44f9:7bbf:599d:2d81
	at scala.Predef$.assert(Predef.scala:223)
	at org.apache.spark.util.Utils$.checkHost(Utils.scala:1072)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:89)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:579)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)


In [None]:
# Reading CSV file
csv_file = 'HealthData.csv'
df_health = spark.read.csv(csv_file, inferSchema=True, header=True, mode='DROPMALFORMED')

# Exploring the dataset

In [None]:
#Spark schema is the structure of the DataFrame or Dataset, we can define it using StructType
df_health.printSchema()

In [None]:
df_health.describe().show()

In [None]:
# Returns the schema of the data frame.
df_health.dtypes

In [None]:
# Top 10 data
df_health.head(5)

In [None]:
df_health.toPandas().head(5)

# Looking at target distribution

In [None]:
df_health.groupBy('stroke').count().show()

# Data visualization

In [None]:
pdata = df_health.toPandas()

fig, ax = plt.subplots()
fig.set_size_inches(10, 7)

sns.heatmap(pdata.corr(),annot=True ,cmap='Reds').set_title('Correlation Factors Heat Map', color='red', size='20')

# Feature Analysis

In [None]:
df_health.createOrReplaceTempView('table')

## Work Type

In [None]:
# sql query to find the number of people in specific work_type who have had stroke and not

spark.sql("SELECT work_type, COUNT(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY COUNT(work_type) DESC").show()

In [None]:
spark.sql("SELECT work_type, COUNT(work_type) as work_type_count FROM table WHERE stroke == 0 GROUP BY work_type ORDER BY COUNT(work_type) DESC").show()

## Gender

In [None]:
spark.sql("SELECT gender, COUNT(gender) as gender_count, COUNT(gender)*100/(SELECT COUNT(gender) FROM table WHERE gender == 'Male') as percentage FROM table WHERE stroke== 1 AND gender = 'Male' GROUP BY gender").show()

In [None]:
spark.sql("SELECT gender, COUNT(gender) as gender_count, COUNT(gender)*100/(SELECT COUNT(gender) FROM table WHERE gender == 'Female') as percentage FROM table WHERE stroke== 1 AND gender = 'Female' GROUP BY gender").show()

## Age

In [None]:
spark.sql("SELECT COUNT(age)*100/(SELECT COUNT(age) FROM table WHERE stroke ==1) as percentage FROM table WHERE stroke == 1 AND age>=50").show()

# Cleaning data

In [None]:
df_health.describe().show()

1. Few missing values in smoking_status(30108) and bmi(41938) column.
2. Also there are few categorical data (gender, ever_married, work_type, Residence_type, smoking_status which we need to covert into one hot encoding


In [None]:
df_health = df_health.na.fill('No Info', subset=['smoking_status'])

In [None]:
from pyspark.sql.functions import mean

mean = df_health.select(mean(df_health['bmi'])).collect()
mean_bmi = mean[0][0]
df_health = df_health.na.fill(mean_bmi,['bmi'])

In [None]:
df_health.describe().show()

StringIndexer -> OneHotEncoder -> VectorAssembler

In [None]:
# indexing all categorical columns in the dataset
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol="gender", outputCol="genderIndex")
indexer2 = StringIndexer(inputCol="ever_married", outputCol="ever_marriedIndex")
indexer3 = StringIndexer(inputCol="work_type", outputCol="work_typeIndex")
indexer4 = StringIndexer(inputCol="Residence_type", outputCol="Residence_typeIndex")
indexer5 = StringIndexer(inputCol="smoking_status", outputCol="smoking_statusIndex")


In [None]:
# Doing one hot encoding of indexed data
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=["genderIndex","ever_marriedIndex","work_typeIndex","Residence_typeIndex","smoking_statusIndex"],
                                 outputCols=["genderVec","ever_marriedVec","work_typeVec","Residence_typeVec","smoking_statusVec"])

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')


# Decision tree classifier

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler, dtc])

In [None]:
train_data,test_data = df_health.randomSplit([0.7,0.3])

In [None]:
# training model pipeline with data
model = pipeline.fit(train_data)

Now we will evaluate the model with testing data

In [None]:
dtc_predictions = model.transform(test_data)

# Select example rows to display.
dtc_predictions.select("prediction","probability", "stroke", "features").show(5)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))