### 1. Configuração Inicial

**Objetivo:** Carregar as bibliotecas e a tabela Bronze que será nossa fonte de dados.

In [0]:
# A importação de 'monotonically_increasing_id' foi adicionada aqui
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, when, trim, substring, concat_ws, monotonically_increasing_id
from pyspark.sql.types import IntegerType

# --- CONFIGURAÇÃO DOS CAMINHOS E NOMES ---
data_source = "sinasc"
root_path = "/lakehouse/health_insights_brasil"
bronze_table_name = "default.bronze_sinasc"
silver_path = f"{root_path}/silver/{data_source}"
# Se estiver no notebook Gold, as variáveis gold_path e fact_table_name também estarão aqui

# --- CARREGAR DADOS DA CAMADA BRONZE ---
print(f"Carregando dados da tabela: {bronze_table_name}")
df_bronze = spark.table(bronze_table_name)

print("✅ Tabela Bronze carregada e funções importadas com sucesso.")

Carregando dados da tabela: default.bronze_sinasc
✅ Tabela Bronze carregada e funções importadas com sucesso.


### 2. Coluna Data

 Corrigimos a coluna de data (DTNASC), transformando o texto DDMMYYYY em um formato de data real. Também convertemos colunas numéricas importantes, como PESO e IDADEMAE, de texto para número, permitindo análises futuras. O resultado é um DataFrame limpo e com os tipos de dados corretos

In [0]:
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, when, trim, substring, concat_ws
from pyspark.sql.types import IntegerType

print("Iniciando limpeza com a lógica de data correta para o formato DDMMYYYY...")

df_cleaned = (df_bronze
              # --- LÓGICA DE DATA CORRIGIDA PARA DDMMYYYY ---
              .withColumn("dia", substring(col("DTNASC"), 1, 2))
              .withColumn("mes", substring(col("DTNASC"), 3, 2))
              .withColumn("ano", substring(col("DTNASC"), 5, 4))
              .withColumn("DTNASC_FORMATADA", concat_ws("-", col("ano"), col("mes"), col("dia")))
              .withColumn("DTNASC_CORRIGIDA", to_date(col("DTNASC_FORMATADA")))
              
              # --- Converte as outras colunas numéricas ---
              .withColumn("IDADEMAE", col("IDADEMAE").cast(IntegerType()))
              .withColumn("PESO", col("PESO").cast(IntegerType()))
              .withColumn("APGAR1", col("APGAR1").cast(IntegerType()))
              .withColumn("APGAR5", col("APGAR5").cast(IntegerType()))
              .withColumn("CONSULTAS", col("CONSULTAS").cast(IntegerType()))
              
              # --- REMOVE AS COLUNAS INTERMEDIÁRIAS ---
              .drop("dia", "mes", "ano", "DTNASC_FORMATADA")
             )

# A verificação agora mostra apenas a coluna original e a final
print("Verificação da conversão de data:")
display(df_cleaned.select("DTNASC", "DTNASC_CORRIGIDA").limit(10))

Iniciando limpeza com a lógica de data correta para o formato DDMMYYYY...
Verificação da conversão de data:


DTNASC,DTNASC_CORRIGIDA
14022024,2024-02-14
17042024,2024-04-17
29052024,2024-05-29
27052024,2024-05-27
13052024,2024-05-13
8052024,2024-05-08
1052024,2024-05-01
5062024,2024-06-05
1012024,2024-01-01
1012024,2024-01-01


### 3. Criação da `dim_tempo`

**Objetivo:** Criar uma tabela de dimensão de tempo a partir da data de nascimento para facilitar análises temporais.

In [0]:
print("Criando a dimensão de tempo...")
dim_tempo = (df_cleaned
             .select("DTNASC_CORRIGIDA")
             .distinct()
             .na.drop()
             .withColumnRenamed("DTNASC_CORRIGIDA", "data_completa")
             .withColumn("ano", year(col("data_completa")))
             .withColumn("mes", month(col("data_completa")))
             .withColumn("dia", dayofmonth(col("data_completa")))
             # ADICIONA A SK AQUI, ANTES DE SALVAR
             .withColumn("sk_tempo", monotonically_increasing_id())
            )

# Salva a dimensão na camada Silver (agora com a SK)
dim_tempo_path = f"{silver_path}/dim_tempo"

(dim_tempo.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true") 
 .save(dim_tempo_path)
)

spark.sql(f"DROP TABLE IF EXISTS default.dim_tempo_sinasc")
spark.sql(f"CREATE TABLE default.dim_tempo_sinasc USING DELTA LOCATION '{dim_tempo_path}'")

Criando a dimensão de tempo...
Out[12]: DataFrame[]

### 2. Criar a `dim_localidade`

Para enriquecer nossa análise geográfica, os dados brutos, que continham apenas códigos de municípios, foram cruzados com uma fonte de dados externa. Baixamos a lista oficial de municípios do Brasil (fornecida pelo IBGE) para "traduzir" esses códigos em seus nomes reais e legíveis (ex: "São Paulo", "Rio de Janeiro"). Com essa informação, criamos uma nova e valiosa tabela de dimensão, a dim_localidade, que é fundamental para a visualização e interpretação dos dados em um contexto geográfico claro.

In [0]:
%sh
# Baixa o arquivo CSV com a lista de municípios (sem alteração aqui)
wget -O "/tmp/municipios.csv" "https://www.gov.br/receitafederal/dados/municipios.csv"

--2025-08-14 21:32:18--  https://www.gov.br/receitafederal/dados/municipios.csv
Resolving www.gov.br (www.gov.br)... 161.148.164.31
Connecting to www.gov.br (www.gov.br)|161.148.164.31|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 234150 (229K) [text/csv]
Saving to: ‘/tmp/municipios.csv’

     0K .......... .......... .......... .......... .......... 21%  237K 1s
    50K .......... .......... .......... .......... .......... 43%  241K 1s
   100K .......... .......... .......... .......... .......... 65%  240K 0s
   150K .......... .......... .......... .......... .......... 87%  241K 0s
   200K .......... .......... ........                        100%  140K=1.0s

2025-08-14 21:32:19 (220 KB/s) - ‘/tmp/municipios.csv’ saved [234150/234150]



 Lê o CSV dos municípios e o processa para criar a dimensão

In [0]:

municipios_path = "file:/tmp/municipios.csv"

df_municipios_raw = (spark.read
                     .format("csv")
                     .option("header", "true")
                     .option("sep", ";")
                     .option("encoding", "ISO-8859-1") # Informa ao Spark a codificação correta
                     .load(municipios_path)
                    )

# Seleciona, renomeia e limpa as colunas que nos interessam
dim_localidade = (df_municipios_raw
                  .select(
                      col("CÓDIGO DO MUNICÍPIO - IBGE").alias("id_municipio_ibge"),
                      col("MUNICÍPIO - IBGE").alias("nome_municipio"),
                      col("UF").alias("uf")
                  )
                  .withColumn("id_municipio_ibge", substring(col("id_municipio_ibge"), 1, 6))
                 )

# Adiciona uma chave substituta
dim_localidade = dim_localidade.withColumn("sk_localidade", monotonically_increasing_id())

# Salva e registra a nova dimensão
dim_localidade_path = f"{silver_path}/dim_localidade"
(dim_localidade.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(dim_localidade_path))
spark.sql(f"DROP TABLE IF EXISTS default.dim_localidade")
spark.sql(f"CREATE TABLE default.dim_localidade USING DELTA LOCATION '{dim_localidade_path}'")
print("✅ Dimensão 'dim_localidade' criada com sucesso.")
display(spark.table("default.dim_localidade"))

✅ Dimensão 'dim_localidade' criada com sucesso.


id_municipio_ibge,nome_municipio,uf,sk_localidade
110010,Guajará-Mirim,RO,0
110037,Alto Alegre dos Parecis,RO,1
110020,Porto Velho,RO,2
110045,Buritis,RO,3
110012,Ji-Paraná,RO,4
110092,Chupinguaia,RO,5
110002,Ariquemes,RO,6
110094,Cujubim,RO,7
110004,Cacoal,RO,8
110143,Nova União,RO,9


### 4. Criação da `dim_parto_nascimento` (com Tratamento de Nulos)

Nesta etapa, "traduzimos" os códigos numéricos dos dados brutos (como SEXO='1') em textos legíveis (como "Masculino"), usando o dicionário de dados oficial como guia. Também tratamos valores nulos ou inválidos, substituindo-os por "Não Informado". Por fim, criamos a tabela dim_parto_nascimento contendo apenas as combinações únicas dessas descrições, resultando em uma dimensão limpa e pronta para análise.

In [0]:
print("Criando a dimensão de parto e nascimento (com SK e tratamento de nulos)...")

# Primeiro, decodificamos os campos como antes. 
df_parto_decoded = (df_cleaned
             .select("SEXO", "PARTO", "IDANOMAL", "RACACOR")
             
             .withColumn("sexo_desc", when(col("SEXO") == '1', "Masculino")
                                      .when(col("SEXO") == '2', "Feminino")
                                      .otherwise("Ignorado"))
             
             .withColumn("parto_desc", when(col("PARTO") == '1', "Vaginal")
                                       .when(col("PARTO") == '2', "Cesáreo")
                                       .otherwise("Ignorado"))

             .withColumn("anomalia_desc", when(col("IDANOMAL") == '1', "Sim")
                                          .when(col("IDANOMAL") == '2', "Não")
                                          .otherwise("Ignorado"))
             
             .withColumn("raca_cor_desc", when(col("RACACOR") == '1', "Branca")
                                          .when(col("RACACOR") == '2', "Preta")
                                          .when(col("RACACOR") == '3', "Amarela")
                                          .when(col("RACACOR") == '4', "Parda")
                                          .when(col("RACACOR") == '5', "Indígena")
                                          .otherwise("Ignorado"))
            )


dim_parto = (df_parto_decoded
                   .select("sexo_desc", "parto_desc", "anomalia_desc", "raca_cor_desc")
                   .na.fill(value="Não Informado")
                   .distinct()
                   .withColumn("sk_parto", monotonically_increasing_id())
                  )


# Salva e registra a tabela
dim_parto_path = f"{silver_path}/dim_parto"

(dim_parto.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .save(dim_parto_path))

spark.sql(f"DROP TABLE IF EXISTS default.dim_parto_sinasc")
spark.sql(f"CREATE TABLE default.dim_parto_sinasc USING DELTA LOCATION '{dim_parto_path}'")
print("✅ Dimensão 'dim_parto_sinasc' criada com sucesso (com SK).")
display(spark.table("default.dim_parto_sinasc"))

Criando a dimensão de parto e nascimento (com SK e tratamento de nulos)...
✅ Dimensão 'dim_parto_sinasc' criada com sucesso (com SK).


sexo_desc,parto_desc,anomalia_desc,raca_cor_desc,sk_parto
Masculino,Vaginal,Sim,Ignorado,0
Feminino,Ignorado,Não,Parda,1
Masculino,Cesáreo,Sim,Ignorado,2
Feminino,Ignorado,Ignorado,Branca,3
Masculino,Cesáreo,Sim,Parda,4
Masculino,Cesáreo,Não,Indígena,5
Masculino,Vaginal,Sim,Indígena,6
Feminino,Vaginal,Sim,Parda,7
Masculino,Vaginal,Não,Indígena,8
Masculino,Vaginal,Ignorado,Ignorado,9


### 5. Criação da `dim_mae` (com Tratamento de Nulos)

Nesta célula, criamos a dimensão da mãe (dim_mae) para categorizar as informações sobre seu estado civil e escolaridade. O processo consiste em "traduzir" os códigos numéricos dos dados brutos (ex: ESTCIVMAE = '1') para seus significados textuais correspondentes (ex: "Solteira"), conforme o dicionário de dados oficial. Também garantimos a qualidade dos dados, substituindo quaisquer valores ausentes ou inválidos por "Não Informado". Por fim, removemos todas as linhas duplicadas para criar uma tabela de consulta limpa e eficiente, contendo apenas os perfis únicos de mães encontrados no dataset.

In [0]:
print("Criando a dimensão da mãe (com SK e tratamento de nulos)...")

# Decodifica os campos 
df_mae_decoded = (df_cleaned
           .select("ESTCIVMAE", "ESCMAE")
           .withColumn("estado_civil_desc", when(col("ESTCIVMAE") == '1', "Solteira").when(col("ESTCIVMAE") == '2', "Casada").when(col("ESTCIVMAE") == '3', "Viúva").when(col("ESTCIVMAE") == '4', "Separado judicialmente/Divorciado").when(col("ESTCIVMAE") == '5', "União consensual").otherwise("Ignorado"))
           .withColumn("escolaridade_desc", when(col("ESCMAE") == '1', "Nenhuma").when(col("ESCMAE") == '2', "1 a 3 anos").when(col("ESCMAE") == '3', "4 a 7 anos").when(col("ESCMAE") == '4', "8 a 11 anos").when(col("ESCMAE") == '5', "12 anos e mais").otherwise("Ignorado"))
          )

# Trata os nulos, remove duplicatas e ADICIONA A CHAVE SUBSTITUTA
dim_mae_final = (df_mae_decoded
                 .select("estado_civil_desc", "escolaridade_desc")
                 .na.fill(value="Não Informado")
                 .distinct()
                 .withColumn("sk_mae", monotonically_increasing_id())
                )

# Salva e registra a tabela
dim_mae_path = f"{silver_path}/dim_mae"
(dim_mae_final.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true") 
 .save(dim_mae_path))

spark.sql(f"DROP TABLE IF EXISTS default.dim_mae_sinasc")
spark.sql(f"CREATE TABLE default.dim_mae_sinasc USING DELTA LOCATION '{dim_mae_path}'")
print("✅ Dimensão 'dim_mae_sinasc' recriada com sucesso (com SK).")
display(spark.table("default.dim_mae_sinasc"))

Criando a dimensão da mãe (com SK e tratamento de nulos)...
✅ Dimensão 'dim_mae_sinasc' recriada com sucesso (com SK).


estado_civil_desc,escolaridade_desc,sk_mae
Ignorado,8 a 11 anos,0
Viúva,12 anos e mais,1
Separado judicialmente/Divorciado,Ignorado,2
Casada,8 a 11 anos,3
Separado judicialmente/Divorciado,4 a 7 anos,4
Viúva,4 a 7 anos,5
Solteira,4 a 7 anos,6
Solteira,8 a 11 anos,7
Solteira,Ignorado,8
Ignorado,Ignorado,9


## 6. Verificação geral do resgistros

In [0]:
%sql

WITH bronze_cleaned AS (
  -- Primeiro, limpamos e decodificamos os campos da bronze para criar as chaves do JOIN
  SELECT
    *,
    -- Recria as colunas decodificadas para usar no JOIN
    to_date(concat(substring(DTNASC, 5, 4), '-', substring(DTNASC, 3, 2), '-', substring(DTNASC, 1, 2))) as data_nascimento_corr,
    
    CASE 
         WHEN ESTCIVMAE = '1' THEN 'Solteira'
         WHEN ESTCIVMAE = '2' THEN 'Casada'
         WHEN ESTCIVMAE = '3' THEN 'Viúva'
         WHEN ESTCIVMAE = '4' THEN 'Separado judicialmente/Divorciado'
         WHEN ESTCIVMAE = '5' THEN 'União consensual'
         ELSE 'Ignorado' 
    END as estado_civil_desc,
    
    CASE 
         WHEN ESCMAE = '1' THEN 'Nenhuma'
         WHEN ESCMAE = '2' THEN '1 a 3 anos'
         WHEN ESCMAE = '3' THEN '4 a 7 anos'
         WHEN ESCMAE = '4' THEN '8 a 11 anos'
         WHEN ESCMAE = '5' THEN '12 anos e mais'
         ELSE 'Ignorado'
    END as escolaridade_desc,

    CASE 
         WHEN SEXO = '1' THEN 'Masculino'
         WHEN SEXO = '2' THEN 'Feminino'
         ELSE 'Ignorado'
    END as sexo_desc,
    
    CASE 
         WHEN PARTO = '1' THEN 'Vaginal'
         WHEN PARTO = '2' THEN 'Cesáreo'
         ELSE 'Ignorado'
    END as parto_desc,

    CASE 
         WHEN IDANOMAL = '1' THEN 'Sim'
         WHEN IDANOMAL = '2' THEN 'Não'
         ELSE 'Ignorado'
    END as anomalia_desc,

    CASE 
         WHEN RACACOR = '1' THEN 'Branca'
         WHEN RACACOR = '2' THEN 'Preta'
         WHEN RACACOR = '3' THEN 'Amarela'
         WHEN RACACOR = '4' THEN 'Parda'
         WHEN RACACOR = '5' THEN 'Indígena'
         ELSE 'Ignorado'
    END as raca_cor_desc

  FROM default.bronze_sinasc
)
-- Agora, fazemos o JOIN entre a bronze limpa e as dimensões
SELECT 
  bronze.NUMERODN as id_nascimento,
  bronze.PESO,
  bronze.APGAR5,
  
  -- Colunas da dim_tempo
  tempo.ano,
  tempo.mes,
  
  -- Colunas da dim_parto
  parto.parto_desc as tipo_de_parto,
  parto.sexo_desc as sexo_recem_nascido,
  
  -- Colunas da dim_mae
  mae.escolaridade_desc as escolaridade_da_mae
  
FROM bronze_cleaned as bronze
LEFT JOIN default.dim_tempo_sinasc as tempo 
  ON bronze.data_nascimento_corr = tempo.data_completa
LEFT JOIN default.dim_mae_sinasc as mae
  ON bronze.escolaridade_desc = mae.escolaridade_desc AND bronze.estado_civil_desc = mae.estado_civil_desc
LEFT JOIN default.dim_parto_sinasc as parto
  ON bronze.sexo_desc = parto.sexo_desc 
  AND bronze.parto_desc = parto.parto_desc 
  AND bronze.anomalia_desc = parto.anomalia_desc 
  AND bronze.raca_cor_desc = parto.raca_cor_desc
LIMIT 20;

id_nascimento,PESO,APGAR5,ano,mes,tipo_de_parto,sexo_recem_nascido,escolaridade_da_mae
110001,3120,9,2024,2,Cesáreo,Masculino,4 a 7 anos
110001,3564,9,2024,4,Cesáreo,Masculino,12 anos e mais
110001,2816,9,2024,5,Cesáreo,Masculino,8 a 11 anos
110001,3126,9,2024,5,Cesáreo,Masculino,8 a 11 anos
110001,3622,9,2024,5,Cesáreo,Masculino,8 a 11 anos
110001,2935,9,2024,5,Cesáreo,Feminino,12 anos e mais
110001,3365,9,2024,5,Vaginal,Masculino,4 a 7 anos
110001,3298,9,2024,6,Cesáreo,Masculino,8 a 11 anos
110002,3240,9,2024,1,Vaginal,Masculino,8 a 11 anos
110002,3960,9,2024,1,Cesáreo,Masculino,12 anos e mais
