In [2]:
!pip install pyspark



### *Iniciando uma sessão*

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

### *Criando um DF apartir de um arquivo csv*

In [4]:
df = spark.read.csv('stroke_data.csv', inferSchema=True, header=True)

In [5]:
df.show(5)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [6]:
print('Total de registros: ', df.count())

Total de registros:  67135


In [7]:
# Verificando total de colunas e sua tipagem
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [8]:
# quantos pacientes sofreram e não sofreram derrame (stroke), respectivamente
df.groupby('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|40287|
|     0|26848|
+------+-----+



In [9]:
#A partir do dataframe, crie uma tabela temporária usando df.createOrReplaceTempView('table') e a seguir 
#use spark.sql para escrever uma consulta SQL que obtenha quantos pacientes tiveram derrame 
#por tipo de trabalho (work_type). 
#Quantos pacientes sofreram derrame e trabalhavam respectivamente, no setor privado, de forma independente, 
#no governo e quantas são crianças? 

df.createOrReplaceTempView('temp')
print('Total de pacientes que tiveram derrame por tipo de trabalho (work_type): ')
spark.sql('select work_type, count(*) as '"total"' from temp where stroke = 1 group by work_type order by total desc').show()

print('Total de crianças: ')
spark.sql('select work_type, count(*) as '"total"' from temp where stroke = 1 and work_type="children" group by work_type').show()


Total de pacientes que tiveram derrame por tipo de trabalho (work_type): 
+-------------+-----+
|    work_type|total|
+-------------+-----+
|      Private|23711|
|Self-employed|10807|
|     Govt_job| 5164|
|     children|  520|
| Never_worked|   85|
+-------------+-----+

Total de crianças: 
+---------+-----+
|work_type|total|
+---------+-----+
| children|  520|
+---------+-----+



In [10]:
#  Escreva uma consulta com spark.sql para determinar a proporção, por gênero, de participantes do estudo. 
# A maioria dos participantes é: 

print("Total por gênero: ")
spark.sql('select gender, count(*) as total from temp group by gender order by total desc').show()

print("Proporção por gênero(%): ")
spark.sql('select round((Male/Total),2)*100 as '"Male"',\
                  round((Female/Total),2)*100 as '"Female"',\
                  round((Other/Total),2)*100 as '"Other"'\
           From\
                  (select SUM(CASE WHEN gender = "Male" THEN 1 ELSE 0 END) as '"Male"', \
                          SUM(CASE WHEN gender = "Female" THEN 1 ELSE 0 END) as '"Female"', \
                          SUM(CASE WHEN gender = "Other" THEN 1 ELSE 0 END) as '"Other"', \
                          COUNT(*) AS '"Total"' from temp)').show()
         

Total por gênero: 
+------+-----+
|gender|total|
+------+-----+
|Female|39530|
|  Male|27594|
| Other|   11|
+------+-----+

Proporção por gênero(%): 
+----+------+-----+
|Male|Female|Other|
+----+------+-----+
|41.0|  59.0|  0.0|
+----+------+-----+



In [11]:
#Escreva uma consulta com spark.sql para determinar quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos. 
#Você pode escrever uma consulta para cada grupo. A partir das probabilidades que você obteve, você conclui que:
hyper_no=spark.sql('select * from temp where hypertension = 0')
hyper_yes=spark.sql('select * from temp where hypertension = 1')

print('Proporção de pacientes SEM hipertensão: ')
spark.sql('select hypertension,stroke, count(*) as total from temp group by stroke, hypertension order by hypertension').show()


print('Proporção de pacientes COM hipertensão: ')



Proporção de pacientes SEM hipertensão: 
+------------+------+-----+
|hypertension|stroke|total|
+------------+------+-----+
|           0|     1|31470|
|           0|     0|24648|
|           1|     1| 8817|
|           1|     0| 2200|
+------------+------+-----+

Proporção de pacientes COM hipertensão: 


In [12]:
# Escreva uma consulta com spark.sql que determine o número de pessoas que sofreram derrame por idade.
# Com qual idade o maior número de pessoas do conjunto de dados sofreu derrame?

spark.sql('select age, count(*) as total from temp where stroke = 1 group by age order by total desc').show(5)

+----+-----+
| age|total|
+----+-----+
|79.0| 2916|
|78.0| 2279|
|80.0| 1858|
|81.0| 1738|
|82.0| 1427|
+----+-----+
only showing top 5 rows



In [13]:
# Usando a API de dataframes, determine quantas pessoas sofreram derrames após os 50 anos.
df.filter((df.age > 50.0) & (df.stroke == 1)).count()

28938

In [14]:
# Usando spark.sql, determine qual o nível médio de glicose para pessoas que, respectivamente, 
# sofreram e não sofreram derrame.
spark.sql('select * from (select round(avg(avg_glucose_level),2) sofreram from temp where stroke = 1),\
                         (select round(avg(avg_glucose_level),2) Nsofreram from temp where stroke = 0)').show()


+--------+---------+
|sofreram|Nsofreram|
+--------+---------+
|  119.95|    103.6|
+--------+---------+



In [15]:
# Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?

spark.sql('select * from (select round(avg(bmi),2) sofreram from temp where stroke = 1),\
                         (select round(avg(bmi),2) Nsofreram from temp where stroke = 0)').show()

+--------+---------+
|sofreram|Nsofreram|
+--------+---------+
|   29.94|    27.99|
+--------+---------+



In [16]:
# Crie um modelo de árvore de decisão que prevê a chance de derrame (stroke) a partir das variáveis contínuas/categóricas: idade, BMI, hipertensão, doença do coração, nível médio de glicose. 
# Use o conteúdo da segunda aula interativa para criar e avaliar o modelo.

from pyspark.ml.feature import VectorAssembler
# usando colunas numericas/categóricas
numericCols=['age','bmi','hypertension','heart_disease','avg_glucose_level']
vecAssembler=VectorAssembler(inputCols=numericCols, outputCol='features')

#Executar o algoritmo Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, dtc])

# Separar os dados em treinamento e teste
train_data, test_data = df.randomSplit([0.7,0.3])

# recebe o DF e produz o modelo
pipelineModel = pipeline.fit(train_data) 

# aplica o modelo aos dados de teste
predictionsDF = pipelineModel.transform(test_data)

# verificar a qualidade do modelo
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', labelCol='stroke') 
print(f'Acurácia: {evaluator.evaluate(predictionsDF)}')

Acurácia: 0.6927229108356657


In [17]:
# Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas.  
# A acurácia (qualidade) do modelo aumentou para:

categoricalCols = ['gender','smoking_status']
# converter as colunas string para inteiros
from pyspark.ml.feature import StringIndexer, OneHotEncoder

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + 'Index' for x in categoricalCols])
oneHotEncoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + 'OHE' for x in categoricalCols])

allCols = [c + 'OHE' for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=allCols, outputCol='features')





In [18]:
# Ao construirmos e avaliarmos o novo modelo, veja que a qualidade das previsões melhorou um pouco, após adicionarmos os atributoscategóricos:
pipeline = Pipeline(stages=[stringIndexer,oneHotEncoder,vecAssembler, dtc])

# Separar os dados em treinamento e teste
train_data, test_data = df.randomSplit([0.7,0.3])

# recebe o DF e produz o modelo
pipelineModel = pipeline.fit(train_data) 

# aplica o modelo aos dados de teste
predictionsDF = pipelineModel.transform(test_data)

# verificar a qualidade do modelo
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', labelCol='stroke') 
print(f'Acurácia: {evaluator.evaluate(predictionsDF)}')

Acurácia: 0.8322339273407502
