# O PROBLEMA

Imagine agora que você foi contratado(a) como Expert em Data Analytics por um grande hospital para entender como foi o comportamento da população na época da pandemia da COVID-19 e quais indicadores seriam importantes para o planejamento, caso haja um novo surto da doença.

Apesar de ser contratado(a) agora, a sua área observou que a utilização do estudo do PNAD-COVID 19 do IBGE seria uma ótima base para termos boas respostas ao problema proposto, pois são dados confiáveis.Porém, não será necessário utilizar todas as perguntas realizadas na pesquisa para enxergar todas as oportunidades ali postas.

É sempre bom ressaltar que há dados triviais que precisam estar no projeto, pois auxiliam muito na análise dos dados:

## PNAD-COVID-19 do IBGE

O Head de Dados pediu para que você entrasse na base de dados do PNAD-COVID-19 do IBGE e organizasse esta base para análise, utilizando Banco de Dados em Nuvem e trazendo as seguintes características:

- a. Utilização de no máximo 20 questionamentos realizados na pesquisa;
- b. Utilizar 3 meses para construção da solução;
- c. Caracterização dos sintomas clínicos da população;
- d. Comportamento da população na época da COVID-19;
- e. Características econômicas da Sociedade;

Seu objetivo será trazer uma breve análise dessas informações, como foi a organização do banco, as perguntas selecionadas para a resposta do problema e quais seriam as principais ações que o hospital deverá tomar em caso de um novo surto de COVID-19.

- Dica: Leiam com atenção a base de dados e toda a documentação que o site o PNAD – Covid19 traz, principalmente os dicionários, que ajudam e muito no entendimento da Base de Dados.
- Dica 2: Utilizem o que já foi ensinado e consolidado nas outras fases para apresentar a resolução do projeto.

Lembre-se de que você poderá apresentar o desenvolvimento do seu projeto durante as lives com docentes. Essa é uma boa oportunidade para discutir sobre as dificuldades encontradas e pegar dicas valiosas com especialistas e colegas de turma.

Link para a base: https://www.ibge.gov.br/estatisticas/investigacoes-experimentais/estatisticas-experimentais/27946-divulgacao-semanal-pnadcovid1?t=downloads&utm_source=covid19&utm_medium=hotsite&utm_campaign=covid_19

### Importando bibliotecas

In [3]:
import warnings 
warnings.filterwarnings(action = 'ignore')

In [4]:
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from pyspark.sql import SparkSession
import findspark
import os
import pandas as pd
import plotly.express as px

### Sobre o banco de dados

Um banco de dados no MySQL foi desenvolvido com o objetivo de centralizar as informações disponíveis na pesquisa PNAD-COVID 19 do IBGE. Optou-se por incluir todos os dados da pesquisa no banco de dados e, desses, extrair para análise somente os dados referentes aos meses de setembro, outubro e novembro de 2020. O esquema foi desenhado de modo que fosse permitido a inclusão de novos dados, para o caso em que haja continuação da pesquisa. 

#### Estrutura do banco de dados

O banco de dados é composto por uma tabela principal (dados_covid) que armazena todos os dados da pesquisa e tabelas auxiliares, que guardam as respostas obtidas para os questionamentos da pesquisa. As tabelas auxiliares abaixo guardam multiplos relacionamentos, pois muitos dos questionamentos da pesquisa têm como resposta um mesmo conjunto de opções.
    
- depara_respostas: guarda o conjunto de respostas "Sim", "Não", "Não sabe", “Ignorado” e "Não aplicável".
- depara_resultado_covid: guarda o conjunto de respostas "Positivo", "Negativo", "Inconclusivo", "Ainda não recebeu o resultado" e "Ignorado".

A tabela dados_covid possui chaves estrangeiras que relacionam os campos que possuem um mesmo conjunto de respostas às tabelas auxiliares acima. Houve a inserção de índices únicos (UNIQUE KEYS) nas tabelas para evitar duplicidade de dados, permitindo assim a inserção de novos dados. Cada linha da tabela dados_covid representa o conjunto de respostas de um entrevistado.

#### Estrutura das tabelas

dados_covid - armazena os dados da pesquisa:

- id - chave primária;
- ano – ano da pesquisa;
- uf - sigla da unidade da federação;
- ...demais campos da pesquisa.

depara_respostas - armazena as respostas padronizadas:

- id - chave primária;
- respostas_id - valores;
- name - "Sim", "Não", "Não sabe", "Ignorado", "Não aplicável".

depara_resultado_covid - armazena os sintomas investigados:

- id - chave primária;
- respostas_id – valores;
- name - "Positivo", "Negativo", "Inconclusivo", "Ainda não recebeu o resultado" e "Ignorado".

#### View

Uma view (pnad_covid_view) foi criada com o objetivo de facilitar consultas aos dados. Essa traz somente os dados os meses de setembro, outubro e novembro de campos consideradas mais relevantes para a análise, assim é possível manipular os dados de forma ágil, sem a necessidade de consultar a estrutura integral do banco.

Em síntese, o banco de dados foi modelado visando integridade e flexibilidade para as consultas. As tabelas auxiliares e a view agregam valor ao permitir padronização e consulta orientada para análise. A estrutura foi pensada para receber dados de pesquisas futuras, garantindo a continuidade do uso do banco.

### Conectando no MySQL

In [5]:
def mysql_connection(host, user, passwd, database=None):
    engine = create_engine(f'mysql+pymysql://{user}:{passwd}@{host}/{database}')
    return engine.connect()

mysql = mysql_connection('127.0.0.1', 'root', 'admin', 'pnad_covid')

In [6]:
'''statement = text('SELECT * FROM pnad_covid_view LIMIT 5')
response = mysql.execute(statement)
for row in response:
        print(row)'''

"statement = text('SELECT * FROM pnad_covid_view LIMIT 5')\nresponse = mysql.execute(statement)\nfor row in response:\n        print(row)"

### Criando sessão Spark

In [7]:
findspark.init()

In [8]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

23/10/31 15:23:49 WARN Utils: Your hostname, platero-Lenovo-IdeaPad-S145-15IWL resolves to a loopback address: 127.0.1.1; using 192.168.15.109 instead (on interface wlp2s0)
23/10/31 15:23:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/31 15:23:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/31 15:23:51 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/10/31 15:23:51 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [9]:
spark

### Importando dados da pesquisa

In [10]:
'''df_05 = spark.read.csv('dados/dados_importados/PNAD_COVID_052020.csv', sep=',', inferSchema=True, header=True)
df_06 = spark.read.csv('dados/dados_importados/PNAD_COVID_062020.csv', sep=',', inferSchema=True, header=True)
df_07 = spark.read.csv('dados/dados_importados/PNAD_COVID_072020.csv', sep=',', inferSchema=True, header=True)
df_08 = spark.read.csv('dados/dados_importados/PNAD_COVID_082020.csv', sep=',', inferSchema=True, header=True)
df_09 = spark.read.csv('dados/dados_importados/PNAD_COVID_092020.csv', sep=',', inferSchema=True, header=True)
df_10 = spark.read.csv('dados/dados_importados/PNAD_COVID_102020.csv', sep=',', inferSchema=True, header=True)
df_11 = spark.read.csv('dados/dados_importados/PNAD_COVID_112020.csv', sep=',', inferSchema=True, header=True)
'''

"df_05 = spark.read.csv('dados/dados_importados/PNAD_COVID_052020.csv', sep=',', inferSchema=True, header=True)\ndf_06 = spark.read.csv('dados/dados_importados/PNAD_COVID_062020.csv', sep=',', inferSchema=True, header=True)\ndf_07 = spark.read.csv('dados/dados_importados/PNAD_COVID_072020.csv', sep=',', inferSchema=True, header=True)\ndf_08 = spark.read.csv('dados/dados_importados/PNAD_COVID_082020.csv', sep=',', inferSchema=True, header=True)\ndf_09 = spark.read.csv('dados/dados_importados/PNAD_COVID_092020.csv', sep=',', inferSchema=True, header=True)\ndf_10 = spark.read.csv('dados/dados_importados/PNAD_COVID_102020.csv', sep=',', inferSchema=True, header=True)\ndf_11 = spark.read.csv('dados/dados_importados/PNAD_COVID_112020.csv', sep=',', inferSchema=True, header=True)\n"

### Tratando dados da pesquisa

In [11]:
#igualando número de colunas, pois a quantidade de questões divergem entre os meses da pesquisa
'''from pyspark.sql.functions import lit

def add_missing_cols(df):
    cols = df_11.columns
    new_cols = []

    for col_name in cols: #selecionando colunas faltantes
        if col_name not in df.columns:
            new_cols.append(col_name)

    for new_col_name in new_cols: #adicionando colunas faltantes
        df = df.withColumn(new_col_name, lit(None))

    return df.select(cols) #organizando colunas

df_05 = add_missing_cols(df_05)
df_06 = add_missing_cols(df_06)
df_07 = add_missing_cols(df_07)
df_08 = add_missing_cols(df_08)
df_09 = add_missing_cols(df_09)
df_10 = add_missing_cols(df_10)'''

'from pyspark.sql.functions import lit\n\ndef add_missing_cols(df):\n    cols = df_11.columns\n    new_cols = []\n\n    for col_name in cols: #selecionando colunas faltantes\n        if col_name not in df.columns:\n            new_cols.append(col_name)\n\n    for new_col_name in new_cols: #adicionando colunas faltantes\n        df = df.withColumn(new_col_name, lit(None))\n\n    return df.select(cols) #organizando colunas\n\ndf_05 = add_missing_cols(df_05)\ndf_06 = add_missing_cols(df_06)\ndf_07 = add_missing_cols(df_07)\ndf_08 = add_missing_cols(df_08)\ndf_09 = add_missing_cols(df_09)\ndf_10 = add_missing_cols(df_10)'

In [12]:
#concatenando DataFrames
'''
df = df_05.union(df_06)
df = df.union(df_07)
df = df.union(df_08)
df = df.union(df_09)
df = df.union(df_10)
df = df.union(df_11)'''

'\ndf = df_05.union(df_06)\ndf = df.union(df_07)\ndf = df.union(df_08)\ndf = df.union(df_09)\ndf = df.union(df_10)\ndf = df.union(df_11)'

In [13]:
'''df.select('C007C').where(df['V1013'] == '06').show(5)'''

"df.select('C007C').where(df['V1013'] == '06').show(5)"

### Exportando dados tratados da pesquisa para CSV

In [14]:
#exportando arquivo csv único
'''df.coalesce(1).write.csv(
    path='dados/dados_exportados',
    mode='overwrite',
    sep=',',
    header=True
)'''

"df.coalesce(1).write.csv(\n    path='dados/dados_exportados',\n    mode='overwrite',\n    sep=',',\n    header=True\n)"

### Exportando seguimentos dos dados tratados da pesquisa

A seguimentação torna processo de inserção dos dados no banco de dados mais ágil

In [15]:
#exportando seguimentos do DataFrame
'''n_splits = 5
n_rows = df.count() // n_splits
copy_df = df

for i in range(1, n_splits + 1):
    temp_df = copy_df.limit(n_rows)
    copy_df = copy_df.subtract(temp_df) #subtraindo conteúdo seguimentado
    
    #exportando seguimento
    temp_df.coalesce(1).write.csv(
    path=f'dados/dados_exportados/dados_seguimentados/seguimento_{i}',
    mode='overwrite',
    sep=',',
    header=True
    )'''

"n_splits = 5\nn_rows = df.count() // n_splits\ncopy_df = df\n\nfor i in range(1, n_splits + 1):\n    temp_df = copy_df.limit(n_rows)\n    copy_df = copy_df.subtract(temp_df) #subtraindo conteúdo seguimentado\n    \n    #exportando seguimento\n    temp_df.coalesce(1).write.csv(\n    path=f'dados/dados_exportados/dados_seguimentados/seguimento_{i}',\n    mode='overwrite',\n    sep=',',\n    header=True\n    )"

In [16]:
'''df_1 = spark.read.csv('dados/dados_exportados/dados_seguimentados/seguimento_1/*.csv', sep=',', inferSchema=True, header=True)
df_2 = spark.read.csv('dados/dados_exportados/dados_seguimentados/seguimento_2/*.csv', sep=',', inferSchema=True, header=True)'''

"df_1 = spark.read.csv('dados/dados_exportados/dados_seguimentados/seguimento_1/*.csv', sep=',', inferSchema=True, header=True)\ndf_2 = spark.read.csv('dados/dados_exportados/dados_seguimentados/seguimento_2/*.csv', sep=',', inferSchema=True, header=True)"

### Exportando dados sequimentados para o MySQL

In [17]:
'''
df = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_1.csv')
df.to_sql('dados_covid', mysql, if_exists='append', index=False)
mysql.execute(text('COMMIT'))

df = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_2.csv')
df.to_sql('dados_covid', mysql, if_exists='append', index=False)
mysql.execute(text('COMMIT'))

df = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_3.csv')
df.to_sql('dados_covid', mysql, if_exists='append', index=False)
mysql.execute(text('COMMIT'))

df = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_4.csv')
df.to_sql('dados_covid', mysql, if_exists='append', index=False)
mysql.execute(text('COMMIT'))

df = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_5.csv')
df.to_sql('dados_covid', mysql, if_exists='append', index=False)
mysql.execute(text('COMMIT'))
'''

"\ndf = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_1.csv')\ndf.to_sql('dados_covid', mysql, if_exists='append', index=False)\nmysql.execute(text('COMMIT'))\n\ndf = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_2.csv')\ndf.to_sql('dados_covid', mysql, if_exists='append', index=False)\nmysql.execute(text('COMMIT'))\n\ndf = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_3.csv')\ndf.to_sql('dados_covid', mysql, if_exists='append', index=False)\nmysql.execute(text('COMMIT'))\n\ndf = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/seguimento_4.csv')\ndf.to_sql('dados_covid', mysql, if_exists='append', index=False)\nmysql.execute(text('COMMIT'))\n\ndf = pd.read_csv('/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/dados_seguimentados/s

### Importando view do MySQL

In [18]:
'''res = mysql.execute(text('SELECT * FROM pnad_covid_view LIMIT 1149196'))
res = res.fetchall()
df = pd.DataFrame(res)'''

"res = mysql.execute(text('SELECT * FROM pnad_covid_view LIMIT 1149196'))\nres = res.fetchall()\ndf = pd.DataFrame(res)"

In [19]:
'''df.info()'''

'df.info()'

### Exportando view para arquivo CSV

Os dados serão analisados a partir de arquivo CSV dada limitação de hardware, haja visto que a base de dados no MySQL contempla todos os dados disponibilizados pela pesquisa PNAD-COVID 19 do IBGE

In [20]:
'''df.to_csv('dados/dados_exportados/2023-10-27_pnad_covid_view.csv', index=False)'''

"df.to_csv('dados/dados_exportados/2023-10-27_pnad_covid_view.csv', index=False)"

### Importando view do arquivo CSV

In [21]:
df = spark.read.csv('dados/dados_exportados/2023-10-31_pnad_covid_view.csv', sep=',', inferSchema=True, header=True)
df.show(5, truncate=False)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/platero/postech_techchallenge_fase_3/dados/dados_exportados/2023-10-27_pnad_covid_view.csv.

### Tratando view do arquivo CSV

#### Dicionários

In [None]:
estados = {
    'Acre': 'AC',
    'Alagoas': 'AL',
    'Amapá': 'AP',
    'Amazonas': 'AM',
    'Bahia': 'BA',
    'Ceará': 'CE', 
    'Distrito Federal': 'DF',
    'Espírito Santo': 'ES',
    'Goiás': 'GO',
    'Maranhão': 'MA',
    'Mato Grosso': 'MT',
    'Mato Grosso do Sul': 'MS',
    'Minas Gerais': 'MG',
    'Pará': 'PA',
    'Paraíba': 'PB',
    'Paraná': 'PR',
    'Pernambuco': 'PE',
    'Piauí': 'PI', 
    'Rio de Janeiro': 'RJ',
    'Rio Grande do Norte': 'RN',
    'Rio Grande do Sul': 'RS',
    'Rondônia': 'RO',
    'Roraima': 'RR',
    'Santa Catarina': 'SC',
    'São Paulo': 'SP',
    'Sergipe': 'SE',
    'Tocantins': 'TO'
}

regioes = {
    'AC': 'Norte',
    'AL': 'Nordeste',
    'AP': 'Norte',
    'AM': 'Norte',
    'BA': 'Nordeste',
    'CE': 'Nordeste',
    'DF': 'Centro-Oeste',
    'ES': 'Sudeste',
    'GO': 'Centro-Oeste',
    'MA': 'Nordeste',
    'MT': 'Centro-Oeste',
    'MS': 'Centro-Oeste', 
    'MG': 'Sudeste',
    'PA': 'Norte',
    'PB': 'Nordeste',
    'PR': 'Sul',
    'PE': 'Nordeste',
    'PI': 'Nordeste',
    'RJ': 'Sudeste',
    'RN': 'Nordeste',
    'RS': 'Sul',
    'RO': 'Norte',
    'RR': 'Norte',
    'SC': 'Sul',
    'SP': 'Sudeste',
    'SE': 'Nordeste',
    'TO': 'Norte'
}

#### Adicionando colunas

In [None]:
df = df.withColumn('sigla', df.uf).replace(to_replace=estados, subset=['sigla']) #adicionando sigla dos estados
df = df.withColumn('regiao', df.sigla).replace(to_replace=regioes, subset=['regiao']) #adicionando regioes dos estados
df.show(5, truncate=False)

+----------+--------------+------------------+-----+------+--------+------------------------------------+-----------------------------+-----------------------+-----------------------------+---------------------------------+--------------------+------------------------+------------------+---------------------------------+--------------------------+---------------------------+-------------------------------+----------------+-------------------------------+------------------------+---------------------+-----------------------+-----------------+-----------------------+--------------------+-------------+-----------------------+-----------+---------------------------+-------------+-----------------+---------------+-----+--------+
|data      |uf            |situacao_domicilio|idade|sexo  |cor_raca|escolaridade                        |questao_estabelecimento_saude|questao_permaneceu_casa|questao_remedio_conta_propria|questao_remedio_orientacao_medica|questao_hospital_SUS|questao_hospital_priva

### Exportando dados tratados da view para arquivo CSV

In [None]:
#exportando arquivo csv único
df.coalesce(1).write.csv(
    path='dados/dados_exportados/dados_uteis',
    mode='overwrite',
    sep=',',
    header=True
)

                                                                                

### Importanto dados tratados da view do arquivo CSV

In [None]:
df = spark.read.csv('dados/dados_exportados/dados_uteis/2023-10-31_pnad_covid_view.csv', sep=',', inferSchema=True, header=True)
df.show(5, truncate=False)

                                                                                

+----------+--------------+------------------+-----+------+--------+------------------------------------+-----------------------------+-----------------------+-----------------------------+---------------------------------+--------------------+------------------------+------------------+---------------------------------+--------------------------+---------------------------+-------------------------------+----------------+-------------------------------+------------------------+---------------------+-----------------------+-----------------+-----------------------+--------------------+-------------+-----------------------+-----------+---------------------------+-------------+-----------------+---------------+-----+--------+
|data      |uf            |situacao_domicilio|idade|sexo  |cor_raca|escolaridade                        |questao_estabelecimento_saude|questao_permaneceu_casa|questao_remedio_conta_propria|questao_remedio_orientacao_medica|questao_hospital_SUS|questao_hospital_priva

### Análise exploratória das caracteristicas econômicas

In [None]:
df_temp = df.createOrReplaceTempView('df_temp')