In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder



# Spark entry point
spark = SparkSession \
    .builder \
    .appName("DESM2") \
    .getOrCreate()

spark.version

'3.5.1'

In [3]:
# Carregando a planilha CSV
stroke_data_df = spark.read.csv('C:\\Users\\Matheus Poletto\\Desktop\\Cientista de Dados\\POS XP\\BOOTCAMP CDD\\MÓDULO 2\\DESM2\\stroke_data.csv',header='True',inferSchema='True')


In [4]:
# Detalhes dos atributos em https://www.kaggle.com/datasets/fedesoriano/stroke-prediction-dataset
stroke_data_df.printSchema()


root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [5]:
stroke_data_df = stroke_data_df.withColumnRenamed("0", "id")\
    .withColumnRenamed("Residence_type", "residence_type")
                                                  

In [6]:
stroke_data_df.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
| id|gender| age|hypertension|heart_disease|ever_married|    work_type|residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [7]:
# Pergunta 1 - Quantos registros existem no arquivo?
stroke_data_df.count()

67135

In [8]:
# Pergunta 2 - Quantas colunas existem no arquivo? Quantas são numéricas? 

columns_info = [f"coluna {name}, {dtype}" for name, dtype in stroke_data_df.dtypes]

for info in columns_info:
    print(info)

# Contando o número de colunas por tipo de dado
type_counts = {}
for name, dtype in stroke_data_df.dtypes:
    if dtype in type_counts:
        type_counts[dtype] += 1
    else:
        type_counts[dtype] = 1

# Exibindo a contagem de cada tipo de dado
for dtype, count in type_counts.items():
    print(f"{dtype}: {count}")

# Calculando e exibindo o total de colunas
total_columns = sum(type_counts.values())
print(f"Total de colunas: {total_columns}")

coluna id, int
coluna gender, string
coluna age, double
coluna hypertension, int
coluna heart_disease, int
coluna ever_married, string
coluna work_type, string
coluna residence_type, string
coluna avg_glucose_level, double
coluna bmi, double
coluna smoking_status, string
coluna stroke, int
int: 4
string: 5
double: 3
Total de colunas: 12


In [9]:
# Pergunta 3 - No conjunto de dados, quantos pacientes sofreram e não sofreram derrame (stroke), respectivamente?

count_1 = stroke_data_df.filter(stroke_data_df.stroke == 1).count()
count_2 = stroke_data_df.filter(stroke_data_df.stroke == 0).count()

print(f"Pacientes que sofreram derrame: {count_1}")
print(f"Pacientes que não sofreram derrame: {count_2}")


Pacientes que sofreram derrame: 40287
Pacientes que não sofreram derrame: 26848


In [10]:
# Pergunta 4 - A partir do dataframe, crie uma tabela temporária usando df.createOrReplaceTempView('table') e a seguir use spark.sql para escrever uma consulta SQL que obtenha
# quantos pacientes tiveram derrame por tipo de trabalho (work_type). Quantos pacientes sofreram derrame e trabalhavam respectivamente, no setor privado, de forma independente, 
# no governo e quantas são crianças?

stroke_data_df.createOrReplaceTempView("data_stroke")
spark.sql("""
SELECT work_type, COUNT(*) as num_patients
FROM data_stroke
WHERE stroke = 1
GROUP BY work_type
ORDER BY
    CASE
        WHEN work_type = 'Private' THEN 1
        WHEN work_type = 'Self-employed' THEN 2
        WHEN work_type = 'Govt_job' THEN 3
        WHEN work_type = 'children' THEN 4
        
        ELSE 5
    END
"""            
).show()

+-------------+------------+
|    work_type|num_patients|
+-------------+------------+
|      Private|       23711|
|Self-employed|       10807|
|     Govt_job|        5164|
|     children|         520|
| Never_worked|          85|
+-------------+------------+



In [11]:
# Pergunta 5 - Escreva uma consulta com spark.sql para determinar a proporção, por gênero, de participantes do estudo. A maioria dos participantes é:
spark.sql("""
          SELECT gender, COUNT(*) * 100 / SUM(COUNT(*)) OVER () AS proportion
          FROM data_stroke
          GROUP BY gender
          ORDER BY proportion DESC
          LIMIT 1
          
          """).show()

+------+------------------+
|gender|        proportion|
+------+------------------+
|Female|58.881358456840694|
+------+------------------+



In [12]:
# Pergunta 6 - Escreva uma consulta com spark.sql para determinar quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos. 
# Você pode escrever uma consulta para cada grupo. A partir das probabilidades que você obteve, você conclui que:
spark.sql("""
            SELECT (COUNT(CASE WHEN stroke = 1 THEN 1 END) * 1.0 / COUNT(*)) AS prob_hypertension_stroke
            FROM data_stroke
            WHERE hypertension = 1
          
          """).show()

spark.sql("""
            SELECT (COUNT(CASE WHEN stroke = 1 THEN 1 END) * 1.0 / COUNT(*)) AS prob_no_hypertension_stroke
            FROM data_stroke
            WHERE hypertension = 0
          
          """).show()

+------------------------+
|prob_hypertension_stroke|
+------------------------+
|      0.8003086139602433|
+------------------------+

+---------------------------+
|prob_no_hypertension_stroke|
+---------------------------+
|         0.5607826365871913|
+---------------------------+



In [13]:
# Pergunta 7 - Escreva uma consulta com spark.sql que determine o número de pessoas que sofreram derrame por idade. Com qual idade o maior número de pessoas do conjunto de dados 
# sofreu derrame?

spark.sql("""
          SELECT age, COUNT(*) AS total_stroke
          FROM data_stroke
          WHERE stroke = 1
          GROUP BY age
          ORDER BY total_stroke DESC
          LIMIT 1
          """).show()

+----+------------+
| age|total_stroke|
+----+------------+
|79.0|        2916|
+----+------------+



In [14]:
# Pergunta 8 - Usando a API de dataframes, determine quantas pessoas sofreram derrames após os 50 anos.
spark.sql("""
          SELECT COUNT(*) AS total_stroke
          FROM data_stroke
          WHERE stroke = 1 AND age > 50
          
          """).show()

+------------+
|total_stroke|
+------------+
|       28938|
+------------+



In [15]:
# Pergunta 9 - Usando spark.sql, determine qual o nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.
spark.sql("""
          SELECT stroke, ROUND(AVG(avg_glucose_level),2) AS avg_glucose
          FROM data_stroke
          GROUP BY stroke
          
          """).show()

+------+-----------+
|stroke|avg_glucose|
+------+-----------+
|     1|     119.95|
|     0|      103.6|
+------+-----------+



In [16]:
# Pergunta 10 - Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?
spark.sql("""
          SELECT stroke, ROUND(AVG(bmi),2) AS avg_bmi
          FROM data_stroke
          GROUP BY stroke
          
          """).show()


+------+-------+
|stroke|avg_bmi|
+------+-------+
|     1|  29.94|
|     0|  27.99|
+------+-------+



In [17]:
# Pergunta 11 - Crie um modelo de árvore de decisão que prevê a chance de derrame (stroke) a partir das variáveis contínuas/categóricas: idade, BMI, hipertensão, doença do coração, 
# nível médio de glicose. Use o conteúdo da segunda aula interativa para criar e avaliar o modelo.
# Qual a acurácia de um modelo construído?

#selecionando as colunas do modelo
decision_tree_df = stroke_data_df.select("age", "bmi", "hypertension", "heart_disease", "avg_glucose_level", "stroke")
assembler = VectorAssembler(inputCols=["age", "bmi", "hypertension", "heart_disease", "avg_glucose_level"],outputCol="features")
classifier = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")
pipeline = Pipeline(stages=[assembler, classifier])
train_data, test_data = decision_tree_df.randomSplit([0.7, 0.3])
predictSurvivedModel = pipeline.fit(train_data)
predictions = predictSurvivedModel.transform(test_data)
predictions.show(10)

+----+----+------------+-------------+-----------------+------+--------------------+-------------+--------------------+----------+
| age| bmi|hypertension|heart_disease|avg_glucose_level|stroke|            features|rawPrediction|         probability|prediction|
+----+----+------------+-------------+-----------------+------+--------------------+-------------+--------------------+----------+
|0.08|12.1|           0|            0|           125.11|     0|[0.08,12.1,0.0,0....| [2995.0,1.0]|[0.99966622162883...|       0.0|
|0.08|15.1|           0|            0|            68.58|     0|[0.08,15.1,0.0,0....| [2995.0,1.0]|[0.99966622162883...|       0.0|
|0.08|16.7|           0|            0|           108.82|     0|[0.08,16.7,0.0,0....| [2995.0,1.0]|[0.99966622162883...|       0.0|
|0.16|12.0|           0|            0|            99.25|     0|[0.16,12.0,0.0,0....| [2995.0,1.0]|[0.99966622162883...|       0.0|
|0.16|13.9|           0|            0|           109.52|     0|[0.16,13.9,0.0,0....

In [18]:
#avaliando o modelo
evaluator = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy


0.6875341852717418

In [19]:
# Pergunta 12 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. 
# A acurácia (qualidade) do modelo aumentou para:

#selecionando as colunas do modelo
decision_tree_df12 = stroke_data_df.select("age", "bmi", "hypertension", "heart_disease", "avg_glucose_level", "gender", "smoking_status", "stroke")
decision_tree_df12.show(5)


+----+-----+------------+-------------+-----------------+------+---------------+------+
| age|  bmi|hypertension|heart_disease|avg_glucose_level|gender| smoking_status|stroke|
+----+-----+------------+-------------+-----------------+------+---------------+------+
|18.0|12.12|           0|            0|            94.19|Female|         smokes|     1|
|58.0| 33.7|           1|            0|           154.24|  Male|   never_smoked|     0|
|36.0| 24.7|           0|            0|            72.63|Female|         smokes|     0|
|62.0| 31.2|           0|            0|            85.52|Female|formerly smoked|     0|
|82.0| 33.2|           0|            0|            59.32|Female|         smokes|     1|
+----+-----+------------+-------------+-----------------+------+---------------+------+
only showing top 5 rows



In [20]:
#gender encoder
gender_indexer = StringIndexer(inputCol="gender", outputCol="gender_indexer")
gender_encoder = OneHotEncoder(inputCol="gender_indexer", outputCol="gender_encoder")
#smoking encoder
smoking_indexer = StringIndexer(inputCol="smoking_status", outputCol="smoking_status_indexer")
smoking_encoder = OneHotEncoder(inputCol="smoking_status_indexer", outputCol="smoking_status_encoder")

pipeline_encoder = Pipeline(stages=[gender_indexer, gender_encoder, smoking_indexer, smoking_encoder])
model_encoder = pipeline_encoder.fit(decision_tree_df12)
encoded_df = model_encoder.transform(decision_tree_df12)
encoded_df.show(5)

+----+-----+------------+-------------+-----------------+------+---------------+------+--------------+--------------+----------------------+----------------------+
| age|  bmi|hypertension|heart_disease|avg_glucose_level|gender| smoking_status|stroke|gender_indexer|gender_encoder|smoking_status_indexer|smoking_status_encoder|
+----+-----+------------+-------------+-----------------+------+---------------+------+--------------+--------------+----------------------+----------------------+
|18.0|12.12|           0|            0|            94.19|Female|         smokes|     1|           0.0| (2,[0],[1.0])|                   0.0|         (2,[0],[1.0])|
|58.0| 33.7|           1|            0|           154.24|  Male|   never_smoked|     0|           1.0| (2,[1],[1.0])|                   2.0|             (2,[],[])|
|36.0| 24.7|           0|            0|            72.63|Female|         smokes|     0|           0.0| (2,[0],[1.0])|                   0.0|         (2,[0],[1.0])|
|62.0| 31.2|    

In [21]:
encoded_df.select("age", "bmi", "hypertension", "heart_disease", "avg_glucose_level", "smoking_status_encoder", "gender_encoder").show(5)

+----+-----+------------+-------------+-----------------+----------------------+--------------+
| age|  bmi|hypertension|heart_disease|avg_glucose_level|smoking_status_encoder|gender_encoder|
+----+-----+------------+-------------+-----------------+----------------------+--------------+
|18.0|12.12|           0|            0|            94.19|         (2,[0],[1.0])| (2,[0],[1.0])|
|58.0| 33.7|           1|            0|           154.24|             (2,[],[])| (2,[1],[1.0])|
|36.0| 24.7|           0|            0|            72.63|         (2,[0],[1.0])| (2,[0],[1.0])|
|62.0| 31.2|           0|            0|            85.52|         (2,[1],[1.0])| (2,[0],[1.0])|
|82.0| 33.2|           0|            0|            59.32|         (2,[0],[1.0])| (2,[0],[1.0])|
+----+-----+------------+-------------+-----------------+----------------------+--------------+
only showing top 5 rows



In [22]:
#Reaplicando ao modelo

assembler = VectorAssembler(inputCols=["age", "bmi", "hypertension", "heart_disease", "avg_glucose_level", "smoking_status_encoder", "gender_encoder"],outputCol="features")
classifier = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")
pipeline = Pipeline(stages=[assembler, classifier])
train_data, test_data = encoded_df.randomSplit([0.7, 0.3])
predictSurvivedModel = pipeline.fit(train_data)
predictions = predictSurvivedModel.transform(test_data)
predictions.show(10)

+----+----+------------+-------------+-----------------+------+--------------+------+--------------+--------------+----------------------+----------------------+--------------------+--------------+--------------------+----------+
| age| bmi|hypertension|heart_disease|avg_glucose_level|gender|smoking_status|stroke|gender_indexer|gender_encoder|smoking_status_indexer|smoking_status_encoder|            features| rawPrediction|         probability|prediction|
+----+----+------------+-------------+-----------------+------+--------------+------+--------------+--------------+----------------------+----------------------+--------------------+--------------+--------------------+----------+
|0.08|12.2|           0|            0|           111.09|Female|  never_smoked|     0|           0.0| (2,[0],[1.0])|                   2.0|             (2,[],[])|(9,[0,1,4,7],[0.0...|[9250.0,102.0]|[0.98909324208725...|       0.0|
|0.08|14.1|           0|            0|           139.67|Female|  never_smoked|  

In [23]:
#re-avaliando o modelo
evaluator = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy

0.8314634507982009

In [24]:
# Pergunta 13 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. 
# Qual dessas variáveis é mais importante no modelo de árvore de decisão que você construiu?
decisionTreeModel = predictSurvivedModel.stages[-1]


In [25]:
assembler.getInputCols()


['age',
 'bmi',
 'hypertension',
 'heart_disease',
 'avg_glucose_level',
 'smoking_status_encoder',
 'gender_encoder']

In [26]:
decisionTreeModel.featureImportances

SparseVector(9, {0: 0.1704, 1: 0.0012, 4: 0.0066, 5: 0.4905, 6: 0.3311, 7: 0.0003})

In [27]:
list(zip(assembler.getInputCols(), decisionTreeModel.featureImportances))

[('age', 0.17043091574871366),
 ('bmi', 0.0012164102265446739),
 ('hypertension', 0.0),
 ('heart_disease', 0.0),
 ('avg_glucose_level', 0.0065685103017649165),
 ('smoking_status_encoder', 0.4904525656122635),
 ('gender_encoder', 0.331073062426722)]

In [28]:
# Pergunta 14 - Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas.
# Qual a profundidade da árvore de decisão? 
decisionTreeModel.depth

5

In [29]:
# Pergunta 15 - Quantos nodos a árvore de decisão possui?

tree = decisionTreeModel.toDebugString

import re

int(re.search(r'numNodes=(\d+)', tree).group(1))

print(f"Number of nodes in the decision tree: {int(re.search(r'numNodes=(\d+)', tree).group(1))}")


Number of nodes in the decision tree: 25


In [31]:
stroke_data_df.toPandas()

Unnamed: 0,id,gender,age,hypertension,heart_disease,ever_married,work_type,residence_type,avg_glucose_level,bmi,smoking_status,stroke
0,1,Female,18.0,0,0,No,Private,Urban,94.19,12.12,smokes,1
1,2,Male,58.0,1,0,Yes,Private,Rural,154.24,33.70,never_smoked,0
2,3,Female,36.0,0,0,Yes,Govt_job,Urban,72.63,24.70,smokes,0
3,4,Female,62.0,0,0,Yes,Self-employed,Rural,85.52,31.20,formerly smoked,0
4,5,Female,82.0,0,0,Yes,Private,Rural,59.32,33.20,smokes,1
...,...,...,...,...,...,...,...,...,...,...,...,...
67130,67131,Male,18.0,0,0,No,Private,Urban,109.06,22.10,smokes,1
67131,67132,Female,29.0,0,0,Yes,Private,Urban,73.02,32.10,never_smoked,0
67132,67133,Female,24.0,0,0,No,Private,Urban,141.18,23.30,smokes,1
67133,67134,Female,77.0,0,0,Yes,Self-employed,Urban,199.71,36.20,formerly smoked,1


In [None]:
stroke_data_df.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
| id|gender| age|hypertension|heart_disease|ever_married|    work_type|residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     