# Setup
Utilizaremos a biblioteca `boto3` para nos comunicarmos com a AWS.

Utilizaremos as bibliotecas `pandas` e `pyspark` para lidarmos com nossos dados.

Utilizaremos as bibliotecas `matplotlib` e `seaborn` para a análise final.

In [1]:
import pandas as pd
import boto3
import os

import matplotlib.pyplot as plt
import seaborn as sns

import findspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark import SparkContext

from sqlalchemy import create_engine, text
import pyarrow.parquet

In [2]:
BUCKET = '896309849144'
LAYERS_PREFIX = f'layers'# f's3://{BUCKET}/layers'
GOLD = f'{LAYERS_PREFIX}/gold'
SILVER = f'{LAYERS_PREFIX}/silver'
BRONZE = f'{LAYERS_PREFIX}/bronze'

LOCAL_BRONZE = './local/bronze'
LOCAL_SILVER = './local/silver'
LOCAL_GOLD = './local/gold'

files = [
    f'{LOCAL_BRONZE}/PNAD_COVID_092020.csv',
    f'{LOCAL_BRONZE}/PNAD_COVID_102020.csv',
    f'{LOCAL_BRONZE}/PNAD_COVID_112020.csv']


In [3]:
s3_client = boto3.client('s3')

def upload_to_s3(
        file_path : str,
        bucket : str,
        bucket_path : str):
    '''
    Uploads file_path to the given bucket, as bucket_path (include the name of the file in bucket_path)
    
    Parameters
    -
    file_path : str
    bucket : str
    bucket_path : str
    '''
    s3_client.upload_file(file_path, bucket, bucket_path)

# 0. Ingestion
Faremos o upload dos dados brutos extraídos manualmente do IBGE, a um bucket AWS S3 via a biblioteca `boto3`.

Conforme o pedido, utilizaremos os últimos 3 meses disponíveis.

In [26]:
for file in files:
    print(file.split('/')[-1])

PNAD_COVID_092020.csv
PNAD_COVID_102020.csv
PNAD_COVID_112020.csv


# 1. Bronze
Subindo nossos dados para a AWS

In [None]:
for file in files:
    upload_to_s3(
        file,
        BUCKET,
        f'{BRONZE}/{file.split("/")[-1]}')

O processo de re-estruturamento dos dados é feito na AWS via `AWS Glue`.

# 2. Silver
Pegaremos esses dados re-estruturados *silver* e faremos nossas queries via `PySpark`.

## Puxando nossos dados do S3

In [4]:
to_download = s3_client.list_objects_v2(
    Bucket = BUCKET,
    Prefix = SILVER
)['Contents'][1:]

count = 1

for file in to_download:
    s3_client.download_file(BUCKET, file['Key'], f'{LOCAL_SILVER}/silver_{count}.parquet')
    count+=1

## Checando integridade dos dados

Comparemos o tamanho da camade bronze original com a camada silver atual para nos certificarmos que o processo de merge ocorreu corretamente

In [5]:
df_silver = pd.read_parquet(f'{LOCAL_SILVER}')
df_silver.shape[0]

1149197

In [6]:
row_count = 0
for f in files[:]:
    _ = pd.read_csv(f)
    row_count += _.shape[0]

    print(_.shape[0])
print(f'TOTAL = {row_count}')

387298
380461
381438
TOTAL = 1149197


## Carregando nosso Spark DataFrame

In [7]:
findspark.init()

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('PySpark Dataframe').getOrCreate()

In [8]:
# import os
# print(os.environ['HADOOP_HOME']) 
# print(os.environ['JAVA_HOME']) 

In [9]:
df_silver = spark.read.parquet(LOCAL_SILVER)

In [10]:
df_silver.show(5)

+----+---+-------+-------+-----+-----+-----+-----+-------+---------+-----+-----+------+-----+-----+------+----+-----+------+------+------+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+----+----+-----+-----+-----+----+----+-----+-----+-----+-----+-----+------+------+-----+----+----+-----+----+-----+------+------+-----+------+------+-----+-----+-----+------+-------+-------+------+-------+-------+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+-----+-----+-----+-----+----+-----+-----+-----+-----+----+-----+-----+------+------+------+------+------+-----+----+---+
| ano| uf|capital|rm_ride|v1008|v1012|v1013|v1016|estrato|      upa|v1022|v1023| v1030|v1031|v1032|posest|a001|a001a|a

# 3. Gold

## Manipulação

### Selecionamento das perguntas
Foi requesitado a utilização de no máximo 20 perguntas da pesquisa. Seguem as selecionadas abaixo.
- `UF` Unidade da Federação
 - `V1022` Situação de Domicílio
 - `V1030` Projeção da população 
 - `A002` Qual sua idade?
 - `A004` Cor ou raça:
 - `A007` Na semana passada, foram disponibilizadas teve atividades escolares para realizar em casa (aulas online, deveres, estudo dirigido etc.)? 
 - `A005` Qual sua escolaridade?
 - `B002` Por causa (de sintomas), foi a algum estabelecimento de saúde?
 - `B007` O(A) Sr(a) tem algum plano de saúde médico, seja particular, de empresa ou de órgão público?
 - `B009B` (SWAB) Qual o resultado?
 - `B009D` (Furo no dedo) Qual o resultado?
 - `B009F` (Veia no braço) Qual o resultado?
 - `B010` Algum médico já lhe deu o diagnóstico de alguma dessas doenças?
 - `B011` Na semana passada, devido à pandemia do Coronavírus, em que medida o(a) Sr(a) restringiu o contato com as pessoas?
 - `C001` Na semana passada, por pelo menos uma hora, o(a) Sr(a) trabalhou ou fez algum bico?
 - `C003` Qual o principal motivo deste afastamento temporário?
 - `C007` No trabalho (único ou principal) que tinha nessa semana, o(a) Sr(a) era:
 - `C010` Número de faixa do rendimento / retirada em dinheiro (NORMALMENTE)
 - `C011` Número de faixa do rendimento / retirada em dinheiro (NO MES ATUAL)
 - `C013` Na semana passada, o(a) Sr(a) estava em trabalho remoto (home office ou teletrabalho)? (C013)

In [8]:
selected_cols = {
    'mes':'mes',
    'uf':'uf',
    'v1022':'situacao_domicilio',
    'v1030':'proj_populacao',
    'a002':'idade',
    'a004':'cor',
    'a007':'aulas_online',
    'a005':'escolaridade',
    'b002':'foi_a_estabelecimento_saude',
    'b007':'plano_saude',
    'b009b':'teste_resultado_swab',
    'b009d':'teste_resultado_furo',
    'b009f':'teste_resultado_braco',
    'b0101':'diagnostico_diabetes',
    'b0102':'diagnostico_hipertensao',
    'b0103':'diagnostico_respiratorias',
    'b0104':'diagnostico_cardiacas',
    'b0105':'diagnostico_depressao',
    'b0106':'diagnostico_cancer',
    'b011':'contato_reduzido',
    'c001':'trabalhou',
    'c003':'afastamento_motivo',
    'c007':'trabalho_funcao',
    'c01011':'rendimento_regular',
    'c011a11':'rendimento_real',
    'c013':'home_office'
}

In [9]:
df_gold = df_silver.select([c for c in selected_cols.keys()])
df_gold.show(5)

+---+---+-----+------+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+------+-------+----+
|mes| uf|v1022| v1030|a002|a004|a007|a005|b002|b007|b009b|b009d|b009f|b0101|b0102|b0103|b0104|b0105|b0106|b011|c001|c003|c007|c01011|c011a11|c013|
+---+---+-----+------+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+----+----+----+------+-------+----+
| 10| 11|    1|152790|  36|   4|NULL|   5|NULL|   1| NULL| NULL| NULL|    2|    2|    2|    2|    2|    2|   2|   1|NULL|   4|     4|      4|NULL|
| 10| 11|    1|150739|  30|   4|NULL|   7|NULL|   2| NULL| NULL| NULL|    2|    2|    2|    2|    2|    2|   2|   1|NULL|   7|     4|      4|NULL|
| 10| 11|    1|144993|  13|   4|   1|   2|NULL|   1| NULL| NULL| NULL|    2|    2|    2|    2|    2|    2|   4|NULL|NULL|NULL|  NULL|   NULL|NULL|
| 10| 11|    1|144993|  11|   4|   1|   2|NULL|   1| NULL| NULL| NULL|    2|    2|    2|    2|    2|    2|   4|NULL|NU

#### Aplicando dicionários a colunas existentes

In [10]:
for col in selected_cols.keys():
    df_gold = df_gold.withColumnRenamed(col, selected_cols[col])
df_gold.show(5)

+---+---+------------------+--------------+-----+---+------------+------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+----------------+---------+------------------+---------------+------------------+---------------+-----------+
|mes| uf|situacao_domicilio|proj_populacao|idade|cor|aulas_online|escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|contato_reduzido|trabalhou|afastamento_motivo|trabalho_funcao|rendimento_regular|rendimento_real|home_office|
+---+---+------------------+--------------+-----+---+------------+------------+---------------------------+-----------+--------------------+------

In [11]:
def convert_columns(
        df,
        column_to_convert : str,
        dict_of_conversion : dict,
):
    map_col = F.create_map(
        [F.lit(x) for i in dict_of_conversion.items() for x in i]
    )

    df_new = df.withColumn(
        column_to_convert,
        map_col[F.col(column_to_convert)]
    )

    return df_new

In [12]:
#uf
current_col = 'uf'
current_dict = dict_uf = {
    11:'Rondônia',
    12:'Acre',
    13:'Amazonas',
    14:'Roraima',
    15:'Pará',
    16:'Amapá',
    17:'Tocantins',
    21:'Maranhão',
    22:'Piauí',
    23:'Ceará',
    24:'Rio Grande do Norte',
    25:'Paraíba',
    26:'Pernambuco',
    27:'Alagoas',
    28:'Sergipe',
    29:'Bahia',
    31:'Minas Gerais',
    32:'Espírito Santo',
    33:'Rio de Janeiro',
    35:'São Paulo',
    41:'Paraná',
    42:'Santa Catarina',
    43:'Rio Grande do Sul',
    50:'Mato Grosso do Sul',
    51:'Mato Grosso',
    52:'Goiás',
    53:'Distrito Federal'
}
df_gold = convert_columns(df_gold, current_col, current_dict)

#v1022
current_col = 'situacao_domicilio'
current_dict = {
    1: 'Urbana', 2: 'Rural'
}
df_gold = convert_columns(df_gold, current_col, current_dict)

#a004
current_col = 'cor'
current_dict = {
    1: 'Branca', 2: 'Preta', 3: 'Amarela', 4: 'Parda', 5: 'Indígena', 9: None
}
df_gold = convert_columns(df_gold, current_col, current_dict)

#a005
current_col = 'escolaridade'
current_dict = {
    1: 'Sem instrução', 2: 'Fundamental incompleto', 3: 'Fundamental completo', 4: 'Médio incompleto', 5: 'Médio completo', 6: 'Superior incompleto', 7: 'Superior completo', 8: 'Pós-graduação, mestrado ou doutorado'
}
df_gold = convert_columns(df_gold, current_col, current_dict)

#a007
current_col = 'aulas_online'
current_dict = {
    1: 'Sim, e realizou pelo menos parte', 2: 'Sim, mas não realizou', 3: 'Não', 4: 'Não (férias)'
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b002
current_col = 'foi_a_estabelecimento_saude'
current_dict = {
    1: True, 2: False
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b007
current_col = 'plano_saude'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b009b
current_col = 'teste_resultado_swab'
current_dict = {
    1: 'Positivo', 2: 'Negativo', 3: 'Inconclusivo', 4: 'Não recebeu', 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b009d
current_col = 'teste_resultado_furo'
current_dict = {
    1: 'Positivo', 2: 'Negativo', 3: 'Inconclusivo', 4: 'Não recebeu', 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b009d
current_col = 'teste_resultado_braco'
current_dict = {
    1: 'Positivo', 2: 'Negativo', 3: 'Inconclusivo', 4: 'Não recebeu', 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0101
current_col = 'diagnostico_diabetes'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0102
current_col = 'diagnostico_hipertensao'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0103
current_col = 'diagnostico_respiratorias'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0104
current_col = 'diagnostico_cardiacas'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0105
current_col = 'diagnostico_depressao'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b0106
current_col = 'diagnostico_cancer'
current_dict = {
    1: True, 2: False, 9: None
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# b011
current_col = 'contato_reduzido'
current_dict = {
    1: 'Sem restrição', 2: 'Reduziu mas continuou saindo', 3: 'Só saiu em caso de necessidade', 4: 'Ficou rigorosamente em casa'
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c001
current_col = 'trabalhou'
current_dict = {
    1: True, 2: False
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c003
current_col = 'afastamento_motivo'
current_dict = {
    1: 'Quarentena, isolamente ou férias coletivas', 2: 'Férias, folga ou jornada variável', 3: 'Licença maternidade ou paternidade',
    4:'Licença remunerada por motivo de saúde ou acidente', 5: 'Outro tipo de licença remunerada',
    6: 'Afastamento pela empresa por motivo de gestação, saúde, etc, sem remuneração por instituto de previdência', 7: 'Fatores ocasionais',
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c007c
current_col = 'trabalho_funcao'
current_dict = {
    1:'Empregado doméstico, diarista, cozinheiro (em domicílios particulares), ',
    2:'Faxineiro, auxiliar de limpeza etc. (em empresa pública ou privada), ',
    3:'Auxiliar de escritório, escriturário',
    4:'Secretária, recepcionista',
    5:'Operador de Telemarketing',
    6:'Comerciante (dono do bar, da loja etc.) ',
    7:'Balconista, vendedor de loja',
    8:'Vendedor a domicílio, representante de vendas, vendedor de catálogo (Avon, Natura etc.)  ',
    9:'Vendedor ambulante (feirante, camelô, comerciante de rua, quiosque)',
    10:'Cozinheiro e garçom (de restaurantes, empresas)',
    11:'Padeiro, açougueiro e doceiro ',
    12:'Agricultor, criador de animais, pescador, silvicultor e jardineiro',
    13:'Auxiliar da agropecuária (colhedor de frutas, boia fria, etc.)',
    14:'Motorista (de aplicativo, de taxi, de van, de mototáxi, de ônibus)',
    15:'Motorista de caminhão (caminhoneiro), ',
    16:'Motoboy, ',
    17:'Entregador de mercadorias (de restaurante, de farmácia, de loja, Uber Eats, IFood, Rappy etc.) ',
    18:'Pedreiro, servente de pedreiro, pintor, eletricista, marceneiro',
    19:'Mecânico de veículos, máquinas industriais etc.',
    20:'Artesão, costureiro e sapateiro',
    21:'Cabeleireiro, manicure e afins',
    22:'Operador de máquinas, montador na indústria; ',
    23:'Auxiliar de produção, de carga e descarga;',
    24:'Professor da educação infantil, de ensino fundamental, médio ou superior,',
    25:'Pedagogo, professor de idiomas, música, arte e reforço escolar',
    26:'Médico, enfermeiro, profissionais de saúde de nível superior',
    27:'Técnico, profissional da saúde de nível médio ',
    28:'Cuidador de crianças, doentes ou idosos',
    29:'Segurança, vigilante, outro trabalhador dos serviços de proteção',
    30:'Policial civil',
    31:'Porteiro, zelador',
    32:'Artista, religioso (padre, pastor etc.)',
    33:'Diretor, gerente, cargo político ou comissionado',
    34:'Outra profissão de nível superior (advogado, engenheiro, contador, jornalista etc.)',
    35:'Outro técnico ou profissional de nível médio',
    36:'Outros'
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c01011
current_col = 'rendimento_regular'
current_dict = {
    0 : '0 - 100',
    1 : '101 - 300',
    2 : '301 - 600',
    3 : '601 - 800',
    4 : '801 - 1.600',
    5 : '1.601 - 3.000',
    6 : '3.001 - 10.000',
    7 : '10.001 - 50.000',
    8 : '50.001 - 100.000',
    9 : 'Mais de 100.000'
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c011a11
current_col = 'rendimento_real'
current_dict = {
    0 : '0 - 100',
    1 : '101 - 300',
    2 : '301 - 600',
    3 : '601 - 800',
    4 : '801 - 1.600',
    5 : '1.601 - 3.000',
    6 : '3.001 - 10.000',
    7 : '10.001 - 50.000',
    8 : '50.001 - 100.000',
    9 : 'Mais de 100.000'
    }
df_gold = convert_columns(df_gold, current_col, current_dict)

# c0013
current_col = 'home_office'
df_gold.na.fill({current_col : False})
current_dict = {
    1: True, 2: False, 'NULL': False
    }

df_gold = convert_columns(df_gold, current_col, current_dict)


In [13]:
df_gold.show(5)

+---+--------+------------------+--------------+-----+------+--------------------+--------------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|   cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|
+---+--------+------------------+--------------+-----+------+-------------------

### Criando colunas novas

In [14]:
# Dicionários que serão utilizados para NOVAS COLUNAS

dict_regiao = {
    1: 'Norte',
    2: 'Nordeste',
    3: 'Sudeste',
    4: 'Sul',
    5: 'Centro-Oeste'
}

dict_faixa_populacional = {
    1: '5000 ou menos',
    2: '5000 - 10000',
    3: '10000 - 20000',
    4: '20000 - 50000',
    5: '50000 - 100000',
    6: '100000 - 500000'
}

In [15]:
df_gold = df_gold.withColumn(
    'regiao',
    F.when(F.col('uf') < 21, dict_regiao[1])\
    .when(F.col('uf') < 31, dict_regiao[2])\
    .when(F.col('uf') < 41, dict_regiao[3])\
    .when(F.col('uf') < 50, dict_regiao[4])\
    .otherwise(dict_regiao[5])
    )

In [16]:
df_gold = df_gold.withColumn(
    'faixa_populacional',
    F.when(F.col('proj_populacao') < 5_000, dict_faixa_populacional[1])\
    .when(F.col('proj_populacao') < 10_000, dict_faixa_populacional[2])\
    .when(F.col('proj_populacao') < 20_000, dict_faixa_populacional[3])\
    .when(F.col('proj_populacao') < 50_000, dict_faixa_populacional[4])\
    .when(F.col('proj_populacao') < 100_000, dict_faixa_populacional[5])\
    .when(F.col('proj_populacao') < 500_000, dict_faixa_populacional[6])\
    .otherwise('mais de 500000')
    )

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|
+---+--------+----

In [17]:
df_gold = df_gold.withColumn(
    'teve_sintomas',
    F.when(F.col('foi_a_estabelecimento_saude') == True, True)\
    .when(F.col('foi_a_estabelecimento_saude') == False, True)\
    .otherwise(False)
)

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_

In [18]:
df_gold.createOrReplaceTempView('gold')

spark.sql('''
          SELECT foi_a_estabelecimento_saude, teve_sintomas from gold
          WHERE foi_a_estabelecimento_saude IS true OR foi_a_estabelecimento_saude IS false
          LIMIT 5
          ''').show()

+---------------------------+-------------+
|foi_a_estabelecimento_saude|teve_sintomas|
+---------------------------+-------------+
|                       true|         true|
|                       true|         true|
|                       true|         true|
|                      false|         true|
|                      false|         true|
+---------------------------+-------------+



In [19]:
df_gold = df_gold.withColumn(
    'teste_feito',
    F.when(F.col('teste_resultado_swab').isNull(), False)\
    .when(F.col('teste_resultado_furo').isNull(), False)\
    .when(F.col('teste_resultado_braco').isNull(), False)\
    .otherwise(True)
)

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_popula

In [20]:
df_gold.createOrReplaceTempView('gold')

spark.sql('''
          SELECT teste_resultado_swab, teste_resultado_furo, teste_resultado_braco, teste_feito FROM gold
          WHERE teste_feito = True
          LIMIT 10
          ''').show(3)

+--------------------+--------------------+---------------------+-----------+
|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|teste_feito|
+--------------------+--------------------+---------------------+-----------+
|            Positivo|            Negativo|             Positivo|       true|
|            Positivo|            Positivo|             Positivo|       true|
|            Negativo|            Negativo|             Negativo|       true|
+--------------------+--------------------+---------------------+-----------+
only showing top 3 rows



In [21]:
df_gold = df_gold.withColumn(
    'teste_positivo',
    F.when(F.col('teste_resultado_swab') == 'Positivo', True)\
    .when(F.col('teste_resultado_furo') == 'Positivo', True)\
    .when(F.col('teste_resultado_braco') == 'Positivo',True)\
    .otherwise(False)
    )

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+--------------------+---------------------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|teste_resultado_swab|teste_resultado_furo|teste_resultado_braco|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regi

In [22]:
df_gold.createOrReplaceTempView('gold')

spark.sql('''
          SELECT teste_positivo, teste_feito FROM gold
          WHERE teste_resultado_swab = 'Positivo' OR teste_resultado_furo = 'Positivo' OR teste_resultado_braco = 'Positivo'
          LIMIT 10
          ''').show()

+--------------+-----------+
|teste_positivo|teste_feito|
+--------------+-----------+
|          true|       true|
|          true|      false|
|          true|      false|
|          true|      false|
|          true|      false|
|          true|      false|
|          true|       true|
|          true|      false|
|          true|      false|
|          true|      false|
+--------------+-----------+



In [23]:
df_gold = df_gold.drop(
    *['teste_resultado_swab', 'teste_resultado_furo', 'teste_resultado_braco']
)

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_sintomas|teste_feito|teste_positivo|
+---+--------+------------------+--------------+-----+-----+----

In [24]:
df_gold = df_gold.withColumn(
    'afastamento_trabalho',
    F.when(F.col('afastamento_motivo').isNull(), False).otherwise(True)
)

df_gold.show(3)

+---+--------+------------------+--------------+-----+-----+--------------------+--------------------+---------------------------+-----------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+--------------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|  cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_sintomas|teste_feito|teste_positivo|afastamento_trabalho|
+---+--------+--------

In [25]:
df_gold = df_gold.withColumn(
    'acima_14',
    F.when(F.col('idade') >= 14, True).otherwise(False)
)

df_gold.show(5)

+---+--------+------------------+--------------+-----+------+--------------------+--------------------+---------------------------+-----------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+--------------------+--------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|   cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_sintomas|teste_feito|teste_positivo|afastamento_trabalho|acima_14|
+-

In [26]:
df_gold = df_gold.withColumn(
    'rendimento_mudou',
    F.when(F.col('rendimento_regular') == F.col('rendimento_real'), False)\
    .when(F.col('rendimento_regular').isNull(), False)\
    .when(F.col('rendimento_real').isNull(), False).otherwise(True)
)

df_gold.show()

+---+--------+------------------+--------------+-----+-------+--------------------+--------------------+---------------------------+-----------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+--------------------+--------+----------------+
|mes|      uf|situacao_domicilio|proj_populacao|idade|    cor|        aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_sintomas|teste_feito|teste_positivo|afastamento_tr

In [27]:
spark.sql('''
    SELECT uf, proj_populacao, faixa_populacional FROM gold
    WHERE uf = 'Goiás'
''').show()

+-----+--------------+------------------+
|   uf|proj_populacao|faixa_populacional|
+-----+--------------+------------------+
|Goiás|        587542|    mais de 500000|
|Goiás|        599475|    mais de 500000|
|Goiás|        239508|   100000 - 500000|
|Goiás|        528915|    mais de 500000|
|Goiás|        517112|    mais de 500000|
|Goiás|        507076|    mais de 500000|
|Goiás|        528915|    mais de 500000|
|Goiás|        202436|   100000 - 500000|
|Goiás|        202436|   100000 - 500000|
|Goiás|        384624|   100000 - 500000|
|Goiás|        579484|    mais de 500000|
|Goiás|        587542|    mais de 500000|
|Goiás|        411955|   100000 - 500000|
|Goiás|        587542|    mais de 500000|
|Goiás|        528915|    mais de 500000|
|Goiás|        579484|    mais de 500000|
|Goiás|        587542|    mais de 500000|
|Goiás|        517112|    mais de 500000|
|Goiás|        528898|    mais de 500000|
|Goiás|        602264|    mais de 500000|
+-----+--------------+------------

In [28]:
df_gold.printSchema()

root
 |-- mes: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- situacao_domicilio: string (nullable = true)
 |-- proj_populacao: integer (nullable = true)
 |-- idade: integer (nullable = true)
 |-- cor: string (nullable = true)
 |-- aulas_online: string (nullable = true)
 |-- escolaridade: string (nullable = true)
 |-- foi_a_estabelecimento_saude: boolean (nullable = true)
 |-- plano_saude: boolean (nullable = true)
 |-- diagnostico_diabetes: boolean (nullable = true)
 |-- diagnostico_hipertensao: boolean (nullable = true)
 |-- diagnostico_respiratorias: boolean (nullable = true)
 |-- diagnostico_cardiacas: boolean (nullable = true)
 |-- diagnostico_depressao: boolean (nullable = true)
 |-- diagnostico_cancer: boolean (nullable = true)
 |-- contato_reduzido: string (nullable = true)
 |-- trabalhou: boolean (nullable = true)
 |-- afastamento_motivo: string (nullable = true)
 |-- trabalho_funcao: string (nullable = true)
 |-- rendimento_regular: string (nullable = true)


In [16]:
df_gold = spark.read.parquet(LOCAL_GOLD)

print(df_gold.count())
df_gold.show(3)

1149197
+------------+------------------+--------------+-----+------+------------+--------------------+---------------------------+-----------+--------------------+-----------------------+-------------------------+---------------------+---------------------+------------------+--------------------+---------+------------------+--------------------+------------------+---------------+-----------+------------+------------------+-------------+-----------+--------------+--------------------+--------+----------------+---+
|          uf|situacao_domicilio|proj_populacao|idade|   cor|aulas_online|        escolaridade|foi_a_estabelecimento_saude|plano_saude|diagnostico_diabetes|diagnostico_hipertensao|diagnostico_respiratorias|diagnostico_cardiacas|diagnostico_depressao|diagnostico_cancer|    contato_reduzido|trabalhou|afastamento_motivo|     trabalho_funcao|rendimento_regular|rendimento_real|home_office|      regiao|faixa_populacional|teve_sintomas|teste_feito|teste_positivo|afastamento_trabalho

## Exportando nossa camada Gold e a jogando na nuvem

In [29]:
df_gold.write.partitionBy('mes').parquet('local/gold/gold.parquet')

In [None]:
for root, folders, files in os.walk(f'{LOCAL_GOLD}'):
    for file in files:
        if file.endswith('.parquet'):
            file_path = os.path.join(root, file)
            print(f'Attempting to upload {file_path}')
            try:
                upload_to_s3(
                    file_path,
                    BUCKET,
                    f'{GOLD}/{file}')
            except Exception as e:
                print(f'Failed to upload | {e}')

## Conectando com o PostgreSQL

In [11]:
usuario = 'postgres'
senha = ''
with open('./ignore/postgresql-pw') as f:
    senha = f.read()
host = 'bd-relacional.circagi8eu0o.us-east-1.rds.amazonaws.com'
porta = 5432
banco = 'postgres'

engine = create_engine(f"postgresql+psycopg2://{usuario}:{senha}@{host}:{porta}/{banco}")


In [12]:
def test_connection(engine):
    try:
        with engine.connect() as connection:
            # Testa a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Lista as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("  -", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)


In [13]:
test_connection(engine)

✅ Conectado com sucesso: PostgreSQL 17.4 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit
📄 Tabelas no banco:
  - pnad


### Inserindo nossos dados

In [19]:
df = pd.read_parquet(LOCAL_GOLD)

df.shape

(1149197, 31)

In [None]:
df.to_sql(
    name = 'pnad',
    con = engine,
    if_exists = 'append',
    index = False,
    method = 'multi',
    chunksize = 100_000
)

PendingRollbackError: Can't reconnect until invalid transaction is rolled back.  Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)