In [None]:
# Importa bibliotecas do Glue e Spark
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: b744bdcf-13d8-4294-8dfc-6a7872a87a7c
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session b744bdcf-13d8-4294-8dfc-6a7872a87a7c to get into ready status...
Session b744bdcf-13d8-4294-8dfc-6a7872a87a7c has been created.



# LEVANTANDO AS BASES (BRONZE)

## DICIONÁRIO BASE

In [None]:
#Peguei o dicionário original, retirei celulas mescladas e fiz um Fill nos valores que ficaram nulos (código em outro notebook)

dict_path_csv = "s3://898876087650/data_input/tech_challenge_fase3/dicionario/Dicionario_PNAD_COVID_092020_tratado.csv"

pdf_dict = spark.read \
    .option("header", True) \
    .option("delimiter", ";") \
    .option("charset", "ISO-8859-1") \
    .csv(dict_path_csv)

pdf_dict.show()


+------------------+--------------------+----+-------------------+
|Código da variável|           Descrição|Tipo|        Descrição.1|
+------------------+--------------------+----+-------------------+
|                UF|Unidade da Federação|  11|           Rondônia|
|                UF|Unidade da Federação|  12|               Acre|
|                UF|Unidade da Federação|  13|           Amazonas|
|                UF|Unidade da Federação|  14|            Roraima|
|                UF|Unidade da Federação|  15|               Pará|
|                UF|Unidade da Federação|  16|              Amapá|
|                UF|Unidade da Federação|  17|          Tocantins|
|                UF|Unidade da Federação|  21|           Maranhão|
|                UF|Unidade da Federação|  22|              Piauí|
|                UF|Unidade da Federação|  23|              Ceará|
|                UF|Unidade da Federação|  24|Rio Grande do Norte|
|                UF|Unidade da Federação|  25|            Para

In [None]:
# Ajusta os nomes das colunas do dicionário
df_dict = pdf_dict.withColumnRenamed("Código da variável", "codigo") \
                  .withColumnRenamed("Descrição", "descricao_coluna") \
                  .withColumnRenamed("Tipo", "tipo_valor") \
                  .withColumnRenamed("Descrição.1", "descricao_valor")

col_rename_dict = {row['codigo']: row['descricao_coluna'] for row in df_dict.collect()}

df_dict.show(5)

+------+--------------------+----------+---------------+
|codigo|    descricao_coluna|tipo_valor|descricao_valor|
+------+--------------------+----------+---------------+
|    UF|Unidade da Federação|        11|       Rondônia|
|    UF|Unidade da Federação|        12|           Acre|
|    UF|Unidade da Federação|        13|       Amazonas|
|    UF|Unidade da Federação|        14|        Roraima|
|    UF|Unidade da Federação|        15|           Pará|
+------+--------------------+----------+---------------+
only showing top 5 rows


In [None]:
#Criar um dicionário de mapeamento código -> descrição
dict_mapping = pdf_dict.select("Código da variável", "Descrição").rdd.collectAsMap()

dict_mapping

{'UF': 'Unidade da Federação', 'CAPITAL': 'Capital', 'RM_RIDE': 'Região Metropolitana e Região Administrativa Integrada de Desenvolvimento', 'Luís (MA)"': None, 'de Desenvolvimento da Grande Teresina (PI)"': None, 'V1008': 'Número de seleção do domicílio', 'V1012': 'Semana no mês', 'V1013': 'Mês da pesquisa', 'V1016': 'Número da entrevista no domicílio', 'Estrato': 'Estrato', 'UPA': 'UPA', 'V1022': 'Situação do domicílio', 'V1023': 'Tipo de área', 'V1030': 'Projeção da população', 'V1031': 'Peso do domicílio e das pessoas', 'V1032': 'Peso do domicílio e das pessoas', 'posest': 'Domínios de projeção', 'A001': 'Número de ordem', 'A001A': 'Condição no domicílio', 'A001B1': 'Dia de nascimento', 'A001B2': 'Mês de nascimento', 'A001B3': 'Ano de nascimento', 'A002': 'Idade do morador ', 'A003': 'Sexo', 'A004': 'Cor ou raça', 'A005': 'Escolaridade', 'A006': 'Frequenta escola', 'A007': 'Na semana passada, _____ foram disponibilizadas atividades escolares para realizar em casa?', 'A008': 'Na sem

## BASES PNAD

In [None]:

from pyspark.sql import functions as F

df_set = spark.read.option("header", True).option("inferSchema", True).csv("s3://898876087650/data_input/tech_challenge_fase3/092020/PNAD_COVID_092020.csv")
df_out = spark.read.option("header", True).option("inferSchema", True).csv("s3://898876087650/data_input/tech_challenge_fase3/102020/PNAD_COVID_102020.csv")
df_nov = spark.read.option("header", True).option("inferSchema", True).csv("s3://898876087650/data_input/tech_challenge_fase3/112020/PNAD_COVID_112020.csv")

# df de novembro tem algumas colunas a mais, então estou tirando essas colunas a mais pra ficar os três periodos iguais
df_nov = df_nov.select(df_set.columns)

# adiciona coluna do período
df_set = df_set.withColumn("periodo", F.lit("09/2020"))
df_out = df_out.withColumn("periodo", F.lit("10/2020"))
df_nov = df_nov.withColumn("periodo", F.lit("11/2020"))

df_nov.columns


['Ano', 'UF', 'CAPITAL', 'RM_RIDE', 'V1008', 'V1012', 'V1013', 'V1016', 'Estrato', 'UPA', 'V1022', 'V1023', 'V1030', 'V1031', 'V1032', 'posest', 'A001', 'A001A', 'A001B1', 'A001B2', 'A001B3', 'A002', 'A003', 'A004', 'A005', 'A006', 'A007', 'A008', 'A009', 'B0011', 'B0012', 'B0013', 'B0014', 'B0015', 'B0016', 'B0017', 'B0018', 'B0019', 'B00110', 'B00111', 'B00112', 'B00113', 'B002', 'B0031', 'B0032', 'B0033', 'B0034', 'B0035', 'B0036', 'B0037', 'B0041', 'B0042', 'B0043', 'B0044', 'B0045', 'B0046', 'B005', 'B006', 'B007', 'B008', 'B009A', 'B009B', 'B009C', 'B009D', 'B009E', 'B009F', 'B0101', 'B0102', 'B0103', 'B0104', 'B0105', 'B0106', 'B011', 'C001', 'C002', 'C003', 'C004', 'C005', 'C0051', 'C0052', 'C0053', 'C006', 'C007', 'C007A', 'C007B', 'C007C', 'C007D', 'C007E', 'C007E1', 'C007E2', 'C007F', 'C008', 'C009', 'C009A', 'C010', 'C0101', 'C01011', 'C01012', 'C0102', 'C01021', 'C01022', 'C0103', 'C0104', 'C011A', 'C011A1', 'C011A11', 'C011A12', 'C011A2', 'C011A21', 'C011A22', 'C012', 'C0

In [None]:
df_total = df_set.unionByName(df_out).unionByName(df_nov)

df_total.count()

1149197


# RENOMEANDO AS COLUNAS COM AS COLUNAS DO DICIONÁRIO (SILVER)


In [None]:
# Função para renomear as colunas com formato "CÓDIGO - DESCRIÇÃO"
def rename_columns_with_dict(df, mapping_dict):
    # Para cada coluna no DataFrame
    for old_col in df.columns:
        # Se a coluna existe no dicionário, renomeia com formato "CÓDIGO - DESCRIÇÃO"
        if old_col in mapping_dict:
            new_col_name = f"{old_col} - {mapping_dict[old_col]}"
            df = df.withColumnRenamed(old_col, new_col_name)
    return df

df_total_renomeado = rename_columns_with_dict(df_total, dict_mapping)

# Verificar o resultado
print("Colunas originais vs renomeadas:")
for i, col in enumerate(df_total.columns[:10]):  # Mostra apenas as primeiras 10
    old_name = col
    if i < len(df_total_renomeado.columns):
        new_name = df_total_renomeado.columns[i]
        print(f"{old_name} -> {new_name}")

Colunas originais vs renomeadas:
Ano -> Ano
UF -> UF - Unidade da Federação
CAPITAL -> CAPITAL - Capital
RM_RIDE -> RM_RIDE - Região Metropolitana e Região Administrativa Integrada de Desenvolvimento
V1008 -> V1008 - Número de seleção do domicílio
V1012 -> V1012 - Semana no mês
V1013 -> V1013 - Mês da pesquisa
V1016 -> V1016 - Número da entrevista no domicílio
Estrato -> Estrato - Estrato
UPA -> UPA - UPA


In [None]:
df_total_renomeado.printSchema()

root
 |-- Ano: integer (nullable = true)
 |-- UF - Unidade da Federação: integer (nullable = true)
 |-- CAPITAL - Capital: integer (nullable = true)
 |-- RM_RIDE - Região Metropolitana e Região Administrativa Integrada de Desenvolvimento: integer (nullable = true)
 |-- V1008 - Número de seleção do domicílio: integer (nullable = true)
 |-- V1012 - Semana no mês: integer (nullable = true)
 |-- V1013 - Mês da pesquisa: integer (nullable = true)
 |-- V1016 - Número da entrevista no domicílio: integer (nullable = true)
 |-- Estrato - Estrato: integer (nullable = true)
 |-- UPA - UPA: integer (nullable = true)
 |-- V1022 - Situação do domicílio: integer (nullable = true)
 |-- V1023 - Tipo de área: integer (nullable = true)
 |-- V1030 - Projeção da população: integer (nullable = true)
 |-- V1031 - Peso do domicílio e das pessoas: double (nullable = true)
 |-- V1032 - Peso do domicílio e das pessoas: double (nullable = true)
 |-- posest - Domínios de projeção: integer (nullable = true)
 |-- A0

## Criando chave única (CODIGO COLUNA + TIPO_VALOR) no dicionario

In [None]:


from pyspark.sql.functions import broadcast, when, col, concat, lit
from functools import reduce

# 1. Preparar o dicionário com chave única: codigo + tipo_valor
df_dict_prep = df_dict.withColumn("chave_unica",
    concat(col("codigo"), lit("_"), col("tipo_valor")))

# 2. Coletar os mapeamentos
mapeamentos = df_dict_prep.select("codigo", "chave_unica", "tipo_valor", "descricao_valor").collect()

mapeamentos_dict = {row['chave_unica']: row['descricao_valor'] for row in mapeamentos}

mapeamentos_dict


{'UF_11': 'Rondônia', 'UF_12': 'Acre', 'UF_13': 'Amazonas', 'UF_14': 'Roraima', 'UF_15': 'Pará', 'UF_16': 'Amapá', 'UF_17': 'Tocantins', 'UF_21': 'Maranhão', 'UF_22': 'Piauí', 'UF_23': 'Ceará', 'UF_24': 'Rio Grande do Norte', 'UF_25': 'Paraíba', 'UF_26': 'Pernambuco', 'UF_27': 'Alagoas', 'UF_28': 'Sergipe', 'UF_29': 'Bahia', 'UF_31': 'Minas Gerais', 'UF_32': 'Espírito Santo', 'UF_33': 'Rio de Janeiro', 'UF_35': 'São Paulo', 'UF_41': 'Paraná', 'UF_42': 'Santa Catarina', 'UF_43': 'Rio Grande do Sul', 'UF_50': 'Mato Grosso do Sul', 'UF_51': 'Mato Grosso', 'UF_52': 'Goiás', 'UF_53': 'Distrito Federal', 'CAPITAL_11': 'Município de Porto Velho (RO)', 'CAPITAL_12': 'Município de Rio Branco (AC)', 'CAPITAL_13': 'Município de Manaus (AM)', 'CAPITAL_14': 'Município de Boa Vista (RR)', 'CAPITAL_15': 'Município de Belém (PA)', 'CAPITAL_16': 'Município de Macapá (AP)', 'CAPITAL_17': 'Município de Palmas (TO)', 'CAPITAL_21': 'Município de São Luís (MA)', 'CAPITAL_22': 'Município de Teresina (PI)', '

# SELEÇÃO APENAS DAS PERGUNTAS QUE USAREMOS, TRADUZINDO OS VALORES DAS COLUNAS E EXPORTANDO CSV (GOLD)

In [None]:
# lista de colunas que você quer manter
colunas = [
    # Características Clínicas dos Sintomas
    "B0011 - Na semana passada teve febre?",
    "B0012 - Na semana passada teve tosse?",
    "B0014 - Na semana passada teve dificuldade para respirar?",
    "B00111 - Na semana passada teve perda de cheiro ou sabor?",
    "B002 - Por causa disso, foi a algum estabelecimento de saúde?",
    "B008 - O(A) Sr(a) fez algum teste para saber se estava infectado(a) pelo coronavírus? ",

    "B009A - Fez o exame coletado com cotonete na boca e/ou nariz (SWAB)? ",
    "B009B - Qual o resultado?",
    "B009C - Fez o exame de coleta de sangue através de furo no dedo?",
    "B009D - Qual o resultado?",
    "B009E - Fez o exame de coleta de sangue através da veia da braço?",
    "B009F - Qual o resultado?",
    # Características da População
    "UF - Unidade da Federação",
    "A002 - Idade do morador ",
    "A003 - Sexo",
    "A004 - Cor ou raça",
    "A005 - Escolaridade",
    "B0102 - Algum médico já lhe deu o diagnóstico de hipertensão?",
    "B011 - Qual foi o resultado do teste?  Na semana passada, devido à pandemia do Coronavírus, em que medida o(a) Sr(a) restringiu o contato com as pessoas? ",

    # Características Econômicas
    "C001 - Na semana passada, por pelo menos uma hora, trabalhou ou fez algum bico?",
    "C013 - Na semana passada, o(a) Sr(a) estava em trabalho remoto (home office ou teletrabalho)?",
    "C01012 - Valor em dinheiro",
    "D0051 - Auxílios emergenciais relacionados ao coronavirus",
    "D0053 - Somatório dos valores recebidos",
    "E001 - Durante o período da pandemia alguém deste domicílio solicitou algum empréstimo?  ",
    "periodo"
]

# filtrar apenas essas colunas
df_filtrado = df_total_renomeado[colunas]





In [None]:
#Transformar os valores tudo em string

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Converte TODAS as colunas do DataFrame para string
df_filtrado = df_filtrado.select(
    [F.col(c).cast(StringType()).alias(c) for c in df_filtrado.columns]
)

df_resultado = df_filtrado





In [None]:
from pyspark.sql import functions as F

colunas_excecao = [
    "C01012 - Valor em dinheiro",
    "D0053 - Somatório dos valores recebidos"
]

# Criar dicionário (se ainda não existir)
if 'mapeamentos_dict' not in locals():
    mapeamentos = df_dict_prep.select("codigo", "chave_unica", "descricao_valor").collect()
    mapeamentos_dict = {
        row['chave_unica']: row['descricao_valor']
        for row in mapeamentos if row['chave_unica'] is not None
    }

df_com_descricoes = df_resultado

for col in df_com_descricoes.columns:
    if col in colunas_excecao:
        print(f"Pulando coluna {col}")
        continue

    codigo = col.split(" - ")[0] if " - " in col else col

    # Pegar só os mapeamentos daquele código
    chaves = {
        k: v
        for k, v in mapeamentos_dict.items()
        if isinstance(k, str) and k.startswith(codigo + "_")
    }

    if not chaves:
        continue

    # Normalizar: transformar valor cru "1" em "B0011_1"
    col_norm = F.when(F.col(col).isNotNull(), F.concat(F.lit(codigo + "_"), F.col(col))).otherwise(None)

    # Substituir pelo dicionário
    case_expr = col_norm
    for chave, desc in chaves.items():
        case_expr = F.when(col_norm == chave, desc).otherwise(case_expr)

    df_com_descricoes = df_com_descricoes.withColumn(col, case_expr)

print("Substituições aplicadas (com prefixo código_valor)")


Pulando coluna C01012 - Valor em dinheiro
Pulando coluna D0053 - Somatório dos valores recebidos
Substituições aplicadas (com prefixo código_valor)


In [None]:
df_com_descricoes.select("B0011 - Na semana passada teve febre?") \
    .distinct() \
    .show(truncate=False)


+-------------------------------------+
|B0011 - Na semana passada teve febre?|
+-------------------------------------+
|Não                                  |
|Não sabe                             |
|Sim                                  |
|Ignorado                             |
+-------------------------------------+


In [None]:
#Retirando o codigo da pergunta nos valores dessa coluna

coluna = "A002 - Idade do morador "

df_com_descricoes = df_com_descricoes.withColumn(
    coluna,
    F.when(F.col(coluna).contains("_"), F.split(F.col(coluna), "_").getItem(1))
     .otherwise(F.col(coluna))
)




In [None]:
# Exportar relatório
df_com_descricoes.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("sep", ",") \
    .option("encoding", "UTF-8") \
    .option("quote", '"') \
    .option("escape", '"') \
    .csv('s3://898876087650/data_output/CSV/')

print("Relatório de processamento exportado!")

✅ Relatório de processamento exportado!
