# Starting Spark

In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


You should consider upgrading via the 'c:\users\panf\appdata\local\programs\python\python39\python.exe -m pip install --upgrade pip' command.


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [3]:
#Creating a SparkSession

conf = SparkConf().setAppName("mySparkApp") # Nome da aplicação Spark
sc = SparkContext(conf=conf)

In [4]:
#Creating a SparkSession

spark = SparkSession.builder.appName("mySparkApp").getOrCreate()

# Analyzing the database

In [5]:
#Extract Data

df = spark.read.csv('C:\desafio\coleta_dados_estruturados\Modulo 2 - Cientista de dados\stroke_data.csv', header=True, inferSchema=True)
df

DataFrame[0: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int]

In [6]:
#How many records are there in the file?

df.count()

67135

In [7]:
#How many columns are there in the file? How many are numerical?

df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



# Analyzing the database
Using API Spark

In [8]:
#In the data set, how many patients had and did not have a stroke? (stroke: 1 if the patient had a stroke or 0 if not)

df.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|40287|
|     0|26848|
+------+-----+



In [9]:
#How many stroke patients worked in the private sector, independently, in government and how many are children?

df_filter = df.filter(df["stroke"] == 1)
df_filter.groupBy('stroke', 'work_type').count().show()

+------+-------------+-----+
|stroke|    work_type|count|
+------+-------------+-----+
|     1|     children|  520|
|     1| Never_worked|   85|
|     1|Self-employed|10807|
|     1|     Govt_job| 5164|
|     1|      Private|23711|
+------+-------------+-----+



In [10]:
#Determine the proportion, by gender of study participants

df.groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|Female|39530|
| Other|   11|
|  Male|27594|
+------+-----+



In [11]:
#Who is more likely to have a stroke: hypertensive or non-hypertensive?

df.groupBy('hypertension').agg({"stroke": "avg"}).show() 

+------------+------------------+
|hypertension|       avg(stroke)|
+------------+------------------+
|           1|0.8003086139602432|
|           0|0.5607826365871913|
+------------+------------------+



In [12]:
#The number of people who have had a stroke by age

df_filter2 = df.filter(df["stroke"] == 1)
df_filter2.groupBy('age').count().orderBy('count', ascending=False).show()

+----+-----+
| age|count|
+----+-----+
|79.0| 2916|
|78.0| 2279|
|80.0| 1858|
|81.0| 1738|
|82.0| 1427|
|77.0|  994|
|74.0|  987|
|63.0|  942|
|76.0|  892|
|70.0|  881|
|66.0|  848|
|75.0|  809|
|67.0|  801|
|57.0|  775|
|73.0|  759|
|65.0|  716|
|72.0|  709|
|68.0|  688|
|69.0|  677|
|71.0|  667|
+----+-----+
only showing top 20 rows



In [13]:
#The number of people who have had a stroke by age 50

df_age50 = df[(df["age"] > 50) & (df["stroke"] == 1)]
df_age50.count()

28938

In [14]:
#The average glucose level for people who have had and have not had a stroke

df.groupBy('stroke').agg({"avg_glucose_level": "avg"}).show() 

+------+----------------------+
|stroke|avg(avg_glucose_level)|
+------+----------------------+
|     1|    119.95307046938272|
|     0|    103.60273130214506|
+------+----------------------+



In [15]:
#The average BMI (BMI = body mass index) of those who have had and have not had a stroke

df.groupBy('stroke').agg({"bmi": "avg"}).show() 

+------+------------------+
|stroke|          avg(bmi)|
+------+------------------+
|     1|29.942490629729495|
|     0|27.989678933253657|
+------+------------------+



# Analyzing the database
Using SQL Spark

In [16]:
#In the data set, how many patients had and did not have a stroke? (stroke: 1 if the patient had a stroke or 0 if not)

df.createOrReplaceTempView("stroke_table")
spark.sql("SELECT stroke, count(*) FROM stroke_table GROUP BY stroke").show()

+------+--------+
|stroke|count(1)|
+------+--------+
|     1|   40287|
|     0|   26848|
+------+--------+



In [17]:
#How many stroke patients worked in the private sector, independently, in government and how many are children?

df.createOrReplaceTempView("work_table")
spark.sql("SELECT work_type, count(*) FROM work_table WHERE stroke=1 GROUP BY work_type").show()

+-------------+--------+
|    work_type|count(1)|
+-------------+--------+
| Never_worked|      85|
|Self-employed|   10807|
|      Private|   23711|
|     children|     520|
|     Govt_job|    5164|
+-------------+--------+



In [18]:
#Determine the proportion, by gender of study participants

df.createOrReplaceTempView("gender_table")
spark.sql("SELECT gender, count(*) FROM gender_table GROUP BY gender").show()

+------+--------+
|gender|count(1)|
+------+--------+
|Female|   39530|
| Other|      11|
|  Male|   27594|
+------+--------+



In [19]:
#Who is more likely to have a stroke: hypertensive or non-hypertensive?

df.createOrReplaceTempView("stroke_hypertension_table")
spark.sql("SELECT AVG(stroke) as prob FROM stroke_hypertension_table Group BY hypertension").show()

+------------------+
|              prob|
+------------------+
|0.8003086139602432|
|0.5607826365871913|
+------------------+



In [20]:
#The number of people who have had a stroke by age
df.createOrReplaceTempView("stroke_age_table")

spark.sql("SELECT age, count(*) as total_stroke FROM stroke_age_table WHERE stroke=1 GROUP BY age ORDER BY total_stroke DESC").show()

+----+------------+
| age|total_stroke|
+----+------------+
|79.0|        2916|
|78.0|        2279|
|80.0|        1858|
|81.0|        1738|
|82.0|        1427|
|77.0|         994|
|74.0|         987|
|63.0|         942|
|76.0|         892|
|70.0|         881|
|66.0|         848|
|75.0|         809|
|67.0|         801|
|57.0|         775|
|73.0|         759|
|65.0|         716|
|72.0|         709|
|68.0|         688|
|69.0|         677|
|71.0|         667|
+----+------------+
only showing top 20 rows



In [21]:
#The number of people who have had a stroke by age 50

df.createOrReplaceTempView("stroke_age50_table")

spark.sql("SELECT sum(CASE WHEN stroke = 1 AND age > 50 THEN 1 ELSE 0 END) as total_stroke50 FROM stroke_age50_table").show()

+--------------+
|total_stroke50|
+--------------+
|         28938|
+--------------+



In [22]:
#The average glucose level for people who have had and have not had a stroke

df.createOrReplaceTempView("stroke_glucose_table")
spark.sql("SELECT AVG(avg_glucose_level) as prob FROM stroke_glucose_table GROUP BY stroke").show()

+------------------+
|              prob|
+------------------+
|119.95307046938272|
|103.60273130214506|
+------------------+



In [23]:
#The average BMI (BMI = body mass index) of those who have had and have not had a stroke

df.createOrReplaceTempView("stroke_bmi_table")
spark.sql("SELECT AVG(bmi) as prob FROM stroke_bmi_table GROUP BY stroke").show()

+------------------+
|              prob|
+------------------+
|29.942490629729495|
|27.989678933253657|
+------------------+



# Analytics with Spark ML

Create a decision tree model that predicts the chance of stroke from the continuous/categorical variables: age, BMI, hypertension, heart disease, mean glucose level.


In [24]:
#Create the vector to prepare the data to train a machine learning model in Spark. 

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level'], outputCol='features')

In [25]:
#Create the Decision Tree Classifier

from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

In [26]:
#Create the Pipeline

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier])

In [27]:
#Random selection of test data

train_data, test_data = df.randomSplit([0.7, 0.3])

In [28]:
predictStrokeModel = pipeline.fit(train_data)

In [29]:
mean_test = df.agg({'age':'mean', 'bmi':'mean', 'hypertension':'mean', 'heart_disease':'mean', 'avg_glucose_level':'mean'}).collect()[0]

In [30]:
#Generating the test dataframe

predictions = predictStrokeModel.transform(test_data)
predictions

DataFrame[0: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [31]:
#Evaluating the accuracy of machine learning model responses

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy

0.684379065345742

Add to the decision tree model  the categorical variables to the model: gender and smoking status

In [32]:
#Creating index for categorical variables

from pyspark.ml.feature import StringIndexer, OneHotEncoder

gender_indexer = StringIndexer(inputCol = 'gender', outputCol = 'GenderIndexer')
gender_encoder = OneHotEncoder(inputCol = 'GenderIndexer', outputCol = 'GenderVector')

smoke_indexer = StringIndexer(inputCol = 'smoking_status', outputCol = 'SmokeIndexer')
smoke_encoder = OneHotEncoder(inputCol = 'SmokeIndexer', outputCol = 'SmokeVector')

In [33]:
#Create the vector to prepare the data to train a machine learning model in Spark. 

assembler = VectorAssembler(inputCols=['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level', 'GenderVector', 'SmokeVector'], outputCol='features')

In [34]:
pipeline = Pipeline(stages=[gender_indexer,gender_encoder, smoke_indexer, smoke_encoder, assembler, classifier])

In [35]:
classifier = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

In [36]:
train_data, test_data = df.randomSplit([0.7, 0.3])

In [37]:
predictStrokeModel = pipeline.fit(train_data)

In [38]:
mean_test = df.agg({'age':'mean', 'bmi':'mean', 'hypertension':'mean', 'heart_disease':'mean', 'avg_glucose_level':'mean','gender':'mean', 'smoking_status':'mean'}).collect()[0]

In [39]:
#Generating the new test dataframe

predictions = predictStrokeModel.transform(test_data)
predictions

DataFrame[0: int, gender: string, age: double, hypertension: int, heart_disease: int, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: double, bmi: double, smoking_status: string, stroke: int, GenderIndexer: double, GenderVector: vector, SmokeIndexer: double, SmokeVector: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [40]:
#Evaluating the accuracy of machine learning model responses

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy

0.8350505201333931

In [41]:
#Evaluating how many levels the decision tree has

decisionTreeModel = predictStrokeModel.stages[-1]
decisionTreeModel.depth

5

In [42]:
assembler.getInputCols()

['age',
 'bmi',
 'hypertension',
 'heart_disease',
 'avg_glucose_level',
 'GenderVector',
 'SmokeVector']

In [43]:
#Analyzing the importance of variables for the accuracy of the decision model

decisionTreeModel.featureImportances

SparseVector(9, {0: 0.1718, 1: 0.001, 4: 0.0066, 7: 0.4809, 8: 0.3398})

In [44]:
#Analyzing the importance of variables for the accuracy of the decision model

list(zip(assembler.getInputCols(), decisionTreeModel.featureImportances))

[('age', 0.17181653294336569),
 ('bmi', 0.0009543131655798752),
 ('hypertension', 0.0),
 ('heart_disease', 0.0),
 ('avg_glucose_level', 0.006561802302894585),
 ('GenderVector', 0.0),
 ('SmokeVector', 0.0)]