In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[1]').appName('teste').getOrCreate()

In [2]:
caminho = 'D:/jean_/OneDrive/Cursos/MBA Data Science & Machine Learning/Cientista de Dados/DESM2/Documents/stroke_data.csv'

df = spark.read.csv(caminho, header = True, inferSchema = True)

In [3]:
df.show(5)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [4]:
from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|  0|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level|bmi|smoking_status|stroke|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+
|  0|     0|  0|           0|            0|           0|        0|             0|                0|  0|             0|     0|
+---+------+---+------------+-------------+------------+---------+--------------+-----------------+---+--------------+------+



### Pergunta 1 - Quantos registros existem no arquivo?

In [5]:
df.count()

67135

### Pergunta 2 - Quantas colunas existem no arquivo? Quantas são numéricas? Ao ler o arquivo com spark.read.csv, habilite inferSchema=True. Use a função printSchema() da API de Dataframes.

In [6]:
#quantidade colunas
len(df.columns)

12

In [7]:
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



### Pergunta 3 - No conjunto de dados, quantos pacientes sofreram e não sofreram derrame (stroke), respectivamente?

In [8]:
# 1 - sofreram derrame
# 0 - não sofreram derrame

df.select('stroke').distinct().show()

+------+
|stroke|
+------+
|     1|
|     0|
+------+



In [9]:
df.filter(df['stroke'] == 1).count()

40287

In [10]:
df.filter(df['stroke'] == 0).count()

26848

### Pergunta 4 - A partir do dataframe, crie uma tabela temporária usando df.createOrReplaceTempView('table') e a seguir use spark.sql para escrever uma consulta SQL que obtenha quantos pacientes tiveram derrame por tipo de trabalho (work_type). Quantos pacientes sofreram derrame e trabalhavam respectivamente, no setor privado, de forma independente, no governo e quantas são crianças?

In [11]:
df.createOrReplaceTempView('table')

In [12]:
spark.sql('SELECT work_type, COUNT(work_type) AS Quantidade_derrame FROM table WHERE stroke = 1 GROUP BY work_type ORDER BY Quantidade_derrame DESC').show()

+-------------+------------------+
|    work_type|Quantidade_derrame|
+-------------+------------------+
|      Private|             23711|
|Self-employed|             10807|
|     Govt_job|              5164|
|     children|               520|
| Never_worked|                85|
+-------------+------------------+



### Pergunta 5 - Escreva uma consulta com spark.sql para determinar a proporção, por gênero, de participantes do estudo. A maioria dos participantes é:

In [13]:
spark.sql('SELECT gender, COUNT(gender) FROM table GROUP BY gender ORDER BY count(gender) DESC').show()

+------+-------------+
|gender|count(gender)|
+------+-------------+
|Female|        39530|
|  Male|        27594|
| Other|           11|
+------+-------------+



### Pergunta 6 - Escreva uma consulta com spark.sql para determinar quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos. Você pode escrever uma consulta para cada grupo. A partir das probabilidades que você obteve, você conclui que:

In [14]:
#probabilidade de ter derrame sem ser hipertenso
spark.sql('SELECT COUNT(CASE WHEN stroke = 1 THEN 1 ELSE NULL END) / COUNT(*) AS propabilidade FROM table WHERE hypertension = 0').show()

+------------------+
|     propabilidade|
+------------------+
|0.5607826365871913|
+------------------+



In [15]:
#probabilidade de ter derrame sendo hipertenso
spark.sql('SELECT ROUND(COUNT(CASE WHEN stroke = 1 THEN 1 ELSE NULL END) / COUNT(*), 2) AS propabilidade FROM table WHERE hypertension = 1').show()

+-------------+
|propabilidade|
+-------------+
|          0.8|
+-------------+



### Pergunta 7 - Escreva uma consulta com spark.sql que determine o número de pessoas que sofreram derrame por idade. Com qual idade o maior número de pessoas do conjunto de dados sofreu derrame?

In [16]:
spark.sql('SELECT age, COUNT(age) AS qtd_derrame FROM table WHERE stroke = 1 GROUP BY age ORDER BY qtd_derrame DESC').show(5)

+----+-----------+
| age|qtd_derrame|
+----+-----------+
|79.0|       2916|
|78.0|       2279|
|80.0|       1858|
|81.0|       1738|
|82.0|       1427|
+----+-----------+
only showing top 5 rows



### Pergunta 8 - Usando a API de dataframes, determine quantas pessoas sofreram derrames após os 50 anos.

In [17]:
df.filter((df['age'] > 50) & (df['stroke'] == 1)).count()

28938

### Pergunta 9 - Usando spark.sql, determine qual o nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.

In [18]:
spark.sql('SELECT stroke, ROUND(AVG(avg_glucose_level), 2) FROM table GROUP BY stroke ORDER BY stroke DESC').show(5)

+------+--------------------------------+
|stroke|round(avg(avg_glucose_level), 2)|
+------+--------------------------------+
|     1|                          119.95|
|     0|                           103.6|
+------+--------------------------------+



### Pergunta 10 - Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?

In [19]:
spark.sql('SELECT stroke, ROUND(AVG(bmi), 2) FROM table GROUP BY stroke ORDER BY stroke DESC').show(5)


+------+------------------+
|stroke|round(avg(bmi), 2)|
+------+------------------+
|     1|             29.94|
|     0|             27.99|
+------+------------------+



### Pergunta 11 - Crie um modelo de árvore de decisão que prevê a chance de derrame (stroke) a partir das variáveis contínuas/categóricas: idade, BMI, hipertensão, doença do coração, nível médio de glicose. Use o conteúdo da segunda aula interativa para criar e avaliar o modelo. Qual a acurácia de um modelo construído?

In [20]:
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [21]:
from pyspark.ml.feature import VectorAssembler

colunas_numericas = ['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level']

montagem_vetor = VectorAssembler(inputCols = colunas_numericas, outputCol = 'features')

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier

decision_tree_classifier = DecisionTreeClassifier(labelCol = 'stroke', featuresCol = 'features')

In [23]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [montagem_vetor, decision_tree_classifier])

train_data, test_data = df.randomSplit([.7, .3])

pipeline_model = pipeline.fit(train_data)

prediction_df = pipeline_model.transform(test_data)

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(metricName = 'accuracy', labelCol = 'stroke')
print(f'Acurácia: {evaluator.evaluate(prediction_df)}')

Acurácia: 0.6882275583264332


### Pergunta 12 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas.  A acurácia (qualidade) do modelo aumentou para:

In [25]:
df.select('smoking_status').distinct().show()

+---------------+
| smoking_status|
+---------------+
|         smokes|
|   never_smoked|
|formerly smoked|
+---------------+



In [26]:
df = df.withColumn('smoking_status', when(col('smoking_status') == 'never_smoked', 0).when(col('smoking_status') == 'formerly smoked', 1).otherwise(2))

In [27]:
df.show(10)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+--------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi|smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+--------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|             2|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|             0|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|             2|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|             1|     0|
|  5|Female|82.0|           0|            0|         Yes|      Privat

In [44]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Normalizer

colunas_categoricas = ['gender']

string_indexer = StringIndexer(inputCols = colunas_categoricas, outputCols = [x + '_Index' for x in colunas_categoricas])
one_hot_encoder = OneHotEncoder(inputCols = string_indexer.getOutputCols(), outputCols = [x + '_OHE' for x in colunas_categoricas])

In [45]:
colunas_numericas = ['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level', 'smoking_status']

todas_colunas = colunas_numericas + one_hot_encoder.getOutputCols()

montagem_vetor = VectorAssembler(inputCols = todas_colunas, outputCol = 'features')

normalizer = Normalizer(inputCol = 'features', outputCol = 'features_norm', p = 1.0)

decision_tree_classifier = DecisionTreeClassifier(labelCol = 'stroke', featuresCol = 'features_norm')

In [46]:
pipeline = Pipeline(stages = [string_indexer, one_hot_encoder, montagem_vetor, normalizer, decision_tree_classifier])

train_data, test_data = df.randomSplit([.75, .25])

df_treino = pipeline.fit(train_data)

df_predito = df_treino.transform(test_data)

In [47]:
evaluator = MulticlassClassificationEvaluator(metricName = 'accuracy', labelCol = 'stroke')
print(f'Acurácia: {evaluator.evaluate(df_predito)}')

Acurácia: 0.8271275016331137


### Pergunta 13 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. Qual dessas variáveis é mais importante no modelo de árvore de decisão que você construiu?

In [48]:
decision_tree_model = df_treino.stages[-1]

decision_tree_model.featureImportances

SparseVector(8, {0: 0.039, 1: 0.0382, 2: 0.0001, 3: 0.003, 4: 0.0002, 5: 0.9196})

In [49]:
for feature in zip(montagem_vetor.getInputCols(), decision_tree_model.featureImportances):
    print(f'Variável: {feature[0].upper()}\tContribuição: {feature[1]:.4f}')

Variável: AGE	Contribuição: 0.0390
Variável: BMI	Contribuição: 0.0382
Variável: HYPERTENSION	Contribuição: 0.0001
Variável: HEART_DISEASE	Contribuição: 0.0030
Variável: AVG_GLUCOSE_LEVEL	Contribuição: 0.0002
Variável: SMOKING_STATUS	Contribuição: 0.9196
Variável: GENDER_OHE	Contribuição: 0.0000


### Pergunta 14 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. Qual a profundidade da árvore de decisão? 

In [34]:
decision_tree_model.depth

5

### Pergunta 15 - Quantos nodos a árvore de decisão possui?

In [35]:
decision_tree_model.numNodes

17