# Camada Gold — Data Warehouse (Esquema Estrela)

Nesta etapa (Gold), os dados tratados na camada Silver são consolidados em um modelo analítico, estruturado como um Data Warehouse em esquema estrela. A granularidade adotada é uma linha por internação hospitalar, permitindo o cálculo e a análise de indicadores hospitalares consolidados.

O pipeline desta camada executa as seguintes etapas:

- leitura das tabelas da camada Silver (patients_silver e encounters_silver);
- construção das dimensões analíticas:
  - dim_paciente, contendo atributos demográficos e datas vitais do paciente;
  - dim_diagnostico, representando o motivo principal da internação;
  - dim_tempo, correspondendo ao calendário derivado das datas de admissão e alta;
- construção da tabela fato fato_internacao, contendo:
  - chaves substitutas para as dimensões;
  - métricas derivadas, incluindo tempo de permanência (LOS), óbito durante a internação, reinternação em até 30 dias, idade e faixa etária na admissão;
- persistência das tabelas finais como Delta Tables na camada Gold;
- checagem da qualidade dos dados das tabelas da camada Gold;
- comentários de colunas para compor um dicionário de dados e apresentaçao do catálogo de dados.

## Modelagem do Data Warehouse

### Abordagem de modelagem

A modelagem dos dados foi construída utilizando a abordagem de esquema estrela, adequada para cenários analíticos e de Business Intelligence. A granularidade da tabela fato é uma linha por internação hospitalar, permitindo análises agregadas e temporais a partir das dimensões clínicas, demográficas e temporais.


### Esquema estrela adotado

Tabela fato
- fato_internacao

Dimensões
- dim_paciente
- dim_diagnostico
- dim_tempo

Relacionamentos (FKs na fato)
- fato_internacao.paciente_sk → dim_paciente.paciente_sk
- fato_internacao.diagnostico_sk → dim_diagnostico.diagnostico_sk
- fato_internacao.data_admissao_sk → dim_tempo.tempo_sk
- fato_internacao.data_alta_sk → dim_tempo.

O diagrama lógico do esquema estrela é apresentado a seguir:

![Diagrama do esquema estrela](diagrama_esquema_estrela.png)


### Descrição das tabelas

#### dim_paciente

Representa o paciente e seus atributos demográficos principais, utilizados para segmentação dos indicadores. :contentReference[oaicite:2]{index=2}

Colunas
- paciente_sk (PK, int): chave substituta do paciente no DW
- patient_id (string): identificador original do paciente no Synthea
- sexo (string): sexo registrado
- data_nascimento (date): data de nascimento
- data_obito (date): data de óbito (quando disponível)
- raca (string): raça informada
- etnia (string): etnia informada

#### dim_diagnostico

Representa o diagnóstico principal associado à internação, baseado no par reasoncode e reasondescription originado do próprio encontro clínico. 

Colunas
- diagnostico_sk (PK, int): chave substituta do diagnóstico no DW
- reasoncode (string): código do motivo/diagnóstico do encontro
- reasondescription (string): descrição do motivo/diagnóstico do encontro

#### dim_tempo

Dimensão de calendário para análise temporal e padronização de agregações por dia, mês, ano, semana e trimestre. É construída a partir das datas derivadas de start e stop das internações. 

Colunas
- tempo_sk (PK, int): chave substituta da data no DW
- data (date): data calendário
- ano (int)
- mes (int)
- dia (int)
- dia_semana_num (int)
- dia_semana_nome (string)
- semana_ano (int)
- mes_nome (string)
- trimestre (int)

#### fato_internacao

Tabela fato que concentra os eventos de internação (inpatient), com chaves para as dimensões e métricas derivadas para análise.

Granularidade
- 1 linha por internação (encounter inpatient)

Chaves e atributos
- internacao_sk (PK, int): chave substituta da internação no DW
- encounter_id (string): identificador original do encontro/internação no Synthea
- patient_id (string): identificador original do paciente no Synthea
- paciente_sk (FK, int): referência à dim_paciente
- diagnostico_sk (FK, int): referência à dim_diagnostico
- data_admissao_sk (FK, int): referência à dim_tempo para a data de admissão
- data_alta_sk (FK, int): referência à dim_tempo para a data de alta
- data_admissao (date): data de admissão
- data_alta (date): data de alta

Métricas derivadas e flags
- los_dias (int): tempo de permanência em dias
- obito_internacao (boolean): indicador de óbito durante a internação
- readmissao_30d (boolean): indicador de reinternação em até 30 dias
- idade_admissao (int): idade na admissão
- faixa_etaria_admissao (string): faixa etária na admissão


## Implementação

In [0]:
%sql
USE CATALOG mvp_engenharia_de_dados;
USE SCHEMA gold;


In [0]:

# Imports globais 

from pyspark.sql import functions as F
from pyspark.sql.window import Window


### Contexto e leitura das tabelas Silver

Nesta seção, carrego as tabelas tratadas na Silver que servem como fonte para o DW.


In [0]:
# Leitura das tabelas da camada Silver (fonte da Gold)
df_patients_silver = spark.table("mvp_engenharia_de_dados.silver.patients_silver")
df_enc_silver      = spark.table("mvp_engenharia_de_dados.silver.encounters_silver")

# Verificação rápida (opcional)
df_patients_silver.printSchema()
df_enc_silver.printSchema()


root
 |-- patient_id: string (nullable = true)
 |-- BIRTHDATE: date (nullable = true)
 |-- DEATHDATE: date (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- RACE: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)

root
 |-- patient_id: string (nullable = true)
 |-- encounter_id: string (nullable = true)
 |-- START: timestamp (nullable = true)
 |-- STOP: timestamp (nullable = true)
 |-- REASONCODE: string (nullable = true)
 |-- REASONDESCRIPTION: string (nullable = true)



### Dimensão paciente DIM_PACIENTE

Dimensão com atributos demográficos e datas vitais do paciente.
- **Chave primária:** `paciente_sk`
- **Chave natural:** `patient_id`


In [0]:
# Construção da DIM_PACIENTE

dim_paciente = (
    df_patients_silver
    .select(
        F.col("patient_id"),
        F.col("BIRTHDATE").alias("data_nascimento"),
        F.col("DEATHDATE").alias("data_obito"),
        F.col("GENDER").alias("sexo"),
        F.col("RACE").alias("raca"),
        F.col("ETHNICITY").alias("etnia")
    )
    # surrogate key 
    .withColumn("paciente_sk", F.monotonically_increasing_id())
)

# Reordenar colunas
dim_paciente = dim_paciente.select(
    "paciente_sk",
    "patient_id",
    "sexo",
    "data_nascimento",
    "data_obito",
    "raca",
    "etnia"
)

dim_paciente.printSchema()
dim_paciente.show(10)



root
 |-- paciente_sk: long (nullable = false)
 |-- patient_id: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- data_nascimento: date (nullable = true)
 |-- data_obito: date (nullable = true)
 |-- raca: string (nullable = true)
 |-- etnia: string (nullable = true)

+-----------+--------------------+----+---------------+----------+-----+-----------+
|paciente_sk|          patient_id|sexo|data_nascimento|data_obito| raca|      etnia|
+-----------+--------------------+----+---------------+----------+-----+-----------+
|          0|814a01d3-1b39-eb5...|   M|     1977-09-26|      NULL|black|nonhispanic|
|          1|4ed407c7-aa6d-9d4...|   M|     1977-07-04|      NULL|asian|nonhispanic|
|          2|53c5c008-0665-192...|   M|     1994-07-16|      NULL|white|   hispanic|
|          3|3da529f1-5400-f04...|   F|     1973-08-26|      NULL|black|nonhispanic|
|          4|34a6f45e-0ece-d37...|   F|     2009-04-24|      NULL|black|nonhispanic|
|          5|741f2f01-bd96-171...|  

In [0]:
# Persistência da DIM_PACIENTE na camada Gold (Delta Table)
(
    dim_paciente
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("mvp_engenharia_de_dados.gold.dim_paciente")
)


### Dimensão diagnóstico (DIM_DIAGNOSTICO)

Dimensão do diagnóstico principal da internação, baseada em:
- `REASONCODE`
- `REASONDESCRIPTION`

A deduplicação garante um único registro por par (código, descrição).

In [0]:
# Construção da DIM_DIAGNOSTICO
dim_diagnostico = (
    df_enc_silver
    .select(
        F.col("REASONCODE").alias("reasoncode"),
        F.col("REASONDESCRIPTION").alias("reasondescription"),
    )
    .dropDuplicates()
    .withColumn("diagnostico_sk", F.monotonically_increasing_id())
    .select(
        "diagnostico_sk",
        "reasoncode",
        "reasondescription",
    )
)

dim_diagnostico.show(5)


+--------------+--------------+--------------------+
|diagnostico_sk|    reasoncode|   reasondescription|
+--------------+--------------+--------------------+
|             0|     183996000|Sterilization req...|
|             1|67811000119102|Primary small cel...|
|             2|     424132000|Non-small cell ca...|
|             3|     399261000|History of corona...|
|             4|       6525002|Dependent drug ab...|
+--------------+--------------+--------------------+
only showing top 5 rows


In [0]:
# Persistência da DIM_DIAGNOSTICO na camada Gold (Delta Table)
(
    dim_diagnostico.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("mvp_engenharia_de_dados.gold.dim_diagnostico")
)


### Dimensão tempo (DIM_TEMPO)

Dimensão de calendário construída a partir das datas de admissão e alta das internações.
- Datas são convertidas para `date` (sem hora), para análises temporais e agregações.



In [0]:
# Base de datas (admissão e alta) derivadas de encounters_silver
df_datas = (
    df_enc_silver
    .select(F.to_date("START").alias("data"))
    .union(df_enc_silver.select(F.to_date("STOP").alias("data")))
    .dropna()
    .dropDuplicates()
)

# Construção da DIM_TEMPO
dim_tempo = (
    df_datas
    .withColumn("ano", F.year("data"))
    .withColumn("mes", F.month("data"))
    .withColumn("dia", F.dayofmonth("data"))
    .withColumn("dia_semana_num", F.dayofweek("data"))          # 1=Domingo ... 7=Sábado
    .withColumn("dia_semana_nome", F.date_format("data", "EEEE"))
    .withColumn("semana_ano", F.weekofyear("data"))
    .withColumn("mes_nome", F.date_format("data", "MMMM"))
    .withColumn("trimestre", F.quarter("data"))
    .withColumn("tempo_sk", F.monotonically_increasing_id())
    .select(
        "tempo_sk",
        "data",
        "ano",
        "mes",
        "dia",
        "dia_semana_num",
        "dia_semana_nome",
        "semana_ano",
        "mes_nome",
        "trimestre",
    )
)

dim_tempo.show(5)


+--------+----------+----+---+---+--------------+---------------+----------+--------+---------+
|tempo_sk|      data| ano|mes|dia|dia_semana_num|dia_semana_nome|semana_ano|mes_nome|trimestre|
+--------+----------+----+---+---+--------------+---------------+----------+--------+---------+
|       0|2004-02-03|2004|  2|  3|             3|        Tuesday|         6|February|        1|
|       1|2022-10-28|2022| 10| 28|             6|         Friday|        43| October|        4|
|       2|2016-12-14|2016| 12| 14|             4|      Wednesday|        50|December|        4|
|       3|2004-07-05|2004|  7|  5|             2|         Monday|        28|    July|        3|
|       4|1995-07-28|1995|  7| 28|             6|         Friday|        30|    July|        3|
+--------+----------+----+---+---+--------------+---------------+----------+--------+---------+
only showing top 5 rows


In [0]:
# Persistência da DIM_TEMPO na camada Gold (Delta Table)
(
    dim_tempo.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("mvp_engenharia_de_dados.gold.dim_tempo")
)


### FATO_INTERNACAO

Tabela fato no grão de uma internação por linha.

Inclui:
- FKs para `dim_paciente`, `dim_diagnostico`, `dim_tempo` (admissão e alta)
- Métricas derivadas:
  - `los_dias` (length of stay)
  - `obito_internacao` (óbito entre admissão e alta)
  - `readmissao_30d` (nova internação até 30 dias após alta)
  - `idade_admissao` e `faixa_etaria_admissao`


In [0]:
# Leitura das dimensões Gold para construção da fato
enc          = spark.table("mvp_engenharia_de_dados.silver.encounters_silver")
dim_paciente = spark.table("mvp_engenharia_de_dados.gold.dim_paciente")
dim_diag     = spark.table("mvp_engenharia_de_dados.gold.dim_diagnostico")
dim_tempo    = spark.table("mvp_engenharia_de_dados.gold.dim_tempo")


In [0]:
# Preparar base da fato: datas (date)
enc_base = (
    enc
    .withColumn("data_admissao", F.to_date("START"))
    .withColumn("data_alta", F.to_date("STOP"))
)


#### Auditoria temporal (qualidade) — datas de admissão/alta

Auditoria de plausibilidade temporal para identificar valores anômalos (identificados na etapa Silver) no dado sintético e quantificar o impacto do filtro aplicado na Gold.

In [0]:
# Auditoria temporal (antes do filtro)
DATA_MIN_ADM = F.lit("2000-01-01")

auditoria_temporal = (
    enc_base
    .agg(
        F.min("data_admissao").alias("min_data_admissao"),
        F.max("data_admissao").alias("max_data_admissao"),
        F.sum(F.when(F.col("data_admissao") < DATA_MIN_ADM, 1).otherwise(0)).alias("n_adm_antes_2000"),
        F.sum(F.when(F.col("data_admissao") > F.current_date(), 1).otherwise(0)).alias("n_adm_no_futuro"),
        F.sum(F.when(F.col("data_alta") < F.col("data_admissao"), 1).otherwise(0)).alias("n_alta_antes_adm"),
        F.count("*").alias("n_total_internacoes")
    )
)

display(auditoria_temporal)


min_data_admissao,max_data_admissao,n_adm_antes_2000,n_adm_no_futuro,n_alta_antes_adm,n_total_internacoes
1919-09-19,2025-12-01,2013,0,0,8352


In [0]:
# Auditoria temporal: distribuição de internações por ano de admissão
auditoria_por_ano = (
    enc_base
    .withColumn("ano_admissao", F.year("data_admissao"))
    .groupBy("ano_admissao")
    .count()
    .orderBy("ano_admissao")
)

display(auditoria_por_ano)


ano_admissao,count
1919,1
1924,1
1931,2
1932,1
1938,2
1940,1
1942,1
1943,1
1944,2
1945,3


#### Regra de plausibilidade temporal (ano de admissão)

A distribuição das internações por ano revela registros esparsos entre 1919 e o final da década de 1990, com volumes incompatíveis com a operação hospitalar, caracterizando artefatos da geração sintética do Synthea. Para assegurar consistência temporal e confiabilidade dos indicadores na camada Gold, os  registros com ano de admissão anterior a 2000 foram excluídos.

In [0]:
# Filtro temporal 
DATA_MIN_ADM = F.lit("2000-01-01")

enc_base_filtrado = (
    enc_base
    .filter(F.col("data_admissao").isNotNull())
    .filter(F.col("data_alta").isNotNull())
    .filter(F.col("data_admissao") >= DATA_MIN_ADM)
    .filter(F.col("data_admissao") <= F.current_date())
    .filter(F.col("data_alta") >= F.col("data_admissao"))
)

# Auditoria pós-filtro (impacto)
auditoria_pos_filtro = (
    enc_base_filtrado
    .agg(
        F.min("data_admissao").alias("min_data_admissao_pos_filtro"),
        F.max("data_admissao").alias("max_data_admissao_pos_filtro"),
        F.count("*").alias("n_internacoes_pos_filtro")
    )
)

display(auditoria_pos_filtro)


min_data_admissao_pos_filtro,max_data_admissao_pos_filtro,n_internacoes_pos_filtro
2000-01-01,2025-12-01,6339


#### Integração com dimensões e cálculo de métricas

Após a aplicação das regras de qualidade temporal, as internações filtradas foram integradas às dimensões do Data Warehouse, associando pacientes, diagnósticos e tempo por meio de chaves substitutas. A partir dessa integração, foram derivadas as métricas da tabela fato de tempo de permanência (LOS), óbito na internação, readmissão em até 30 dias, idade e faixa etária na admissão, que fundamentam os indicadores hospitalares analisados.


In [0]:

# Join com DIM_PACIENTE (traz paciente_sk, data_nascimento, data_obito)
enc_pac = (
    enc_base_filtrado.alias("e")
    .join(
        dim_paciente.alias("p"),
        F.col("e.patient_id") == F.col("p.patient_id"),
        "left",
    )
    .select(
        "e.*",
        "p.paciente_sk",
        "p.data_nascimento",
        "p.data_obito",
    )
)

# Join com DIM_DIAGNOSTICO (traz diagnostico_sk)
enc_pac_diag = (
    enc_pac.alias("e")
    .join(
        dim_diag.alias("d"),
        (F.col("e.REASONCODE") == F.col("d.reasoncode")) &
        (F.col("e.REASONDESCRIPTION") == F.col("d.reasondescription")),
        "left",
    )
)

# Join com DIM_TEMPO (FKs de admissão e alta)
dt = dim_tempo.select("tempo_sk", "data")

fato_temp = (
    enc_pac_diag
    .join(dt.alias("dt_adm"),  F.col("data_admissao") == F.col("dt_adm.data"), "left")
    .join(dt.alias("dt_alta"), F.col("data_alta")     == F.col("dt_alta.data"), "left")
    .withColumn("data_admissao_sk", F.col("dt_adm.tempo_sk"))
    .withColumn("data_alta_sk",     F.col("dt_alta.tempo_sk"))
)


In [0]:
# Métricas derivadas: LOS e óbito durante internação
fato_temp = (
    fato_temp
    .withColumn("los_dias", F.datediff(F.col("data_alta"), F.col("data_admissao")))
    .withColumn(
        "obito_internacao",
        F.when(
            (F.col("data_obito").isNotNull()) &
            (F.col("data_obito") >= F.col("data_admissao")) &
            (F.col("data_obito") <= F.col("data_alta")),
            F.lit(True),
        ).otherwise(F.lit(False))
    )
)


In [0]:
# Readmissão em 30 dias: próxima admissão do mesmo paciente até 30 dias após alta
w_paciente = Window.partitionBy("patient_id").orderBy("data_admissao")

fato_lead = (
    fato_temp
    .withColumn("prox_data_admissao", F.lead("data_admissao").over(w_paciente))
    .withColumn(
        "readmissao_30d",
        F.when(
            (F.col("prox_data_admissao").isNotNull()) &
            (F.col("prox_data_admissao") > F.col("data_alta")) &
            (F.col("prox_data_admissao") <= F.date_add(F.col("data_alta"), 30)),
            F.lit(True),
        ).otherwise(F.lit(False))
    )
)


In [0]:
# Idade e faixa etária na admissão (atributo do evento de internação)
fato_temp2 = fato_lead.withColumn(
    "idade_admissao",
    F.floor(F.datediff(F.col("data_admissao"), F.col("data_nascimento")) / 365.25)
)

fato_temp3 = (
    fato_temp2
    .withColumn(
        "faixa_etaria_admissao",
        F.when(F.col("idade_admissao") < 18, "0-17")
         .when((F.col("idade_admissao") >= 18) & (F.col("idade_admissao") <= 39), "18-39")
         .when((F.col("idade_admissao") >= 40) & (F.col("idade_admissao") <= 64), "40-64")
         .when((F.col("idade_admissao") >= 65) & (F.col("idade_admissao") <= 79), "65-79")
         .otherwise("80+")
    )
)


In [0]:
# Seleção final da tabela fato (colunas do DW)
fato_internacao = (
    fato_temp3
    .withColumn("internacao_sk", F.monotonically_increasing_id())
    .select(
        "internacao_sk",
        "encounter_id",
        "patient_id",
        "paciente_sk",
        "diagnostico_sk",
        "data_admissao_sk",
        "data_alta_sk",
        "data_admissao",
        "data_alta",
        "idade_admissao",
        "faixa_etaria_admissao",
        "los_dias",
        "obito_internacao",
        "readmissao_30d",
    )
)

display(fato_internacao.limit(10))


internacao_sk,encounter_id,patient_id,paciente_sk,diagnostico_sk,data_admissao_sk,data_alta_sk,data_admissao,data_alta,idade_admissao,faixa_etaria_admissao,los_dias,obito_internacao,readmissao_30d
0,000083c5-4d9b-d8aa-2fda-50268a3460ad,000083c5-4d9b-d8aa-232f-761f8c91ea3f,6039,11,1394,4136,2016-03-05,2016-03-06,23,18-39,1,False,False
1,000083c5-4d9b-d8aa-3446-ced86b510c82,000083c5-4d9b-d8aa-232f-761f8c91ea3f,6039,11,119,2674,2019-11-02,2019-11-21,27,18-39,19,False,False
2,000554a6-5ac9-e011-bd3f-ccc467ac4f91,000554a6-5ac9-e011-28b7-a560f48e755d,4158,6,5737,9625,2002-04-30,2002-05-01,35,18-39,1,False,False
3,000554a6-5ac9-e011-8810-5d0120863f35,000554a6-5ac9-e011-28b7-a560f48e755d,4158,12,1027,4433,2022-10-22,2022-10-24,56,40-64,2,False,False
4,000cac0f-a4e3-dc6f-0dc2-672388c9e3b4,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,11,4511,8273,2018-04-26,2018-05-04,36,18-39,8,False,False
5,000cac0f-a4e3-dc6f-c492-25c59775100b,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,11,3680,4445,2020-08-19,2020-09-11,38,18-39,23,False,False
6,000cac0f-a4e3-dc6f-ceb5-3189672cccec,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,23,5544,5068,2023-05-20,2023-05-22,41,40-64,2,False,False
7,000cac0f-a4e3-dc6f-1603-006a7db503b4,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,18,5544,3563,2023-05-20,2023-05-21,41,40-64,1,False,True
8,000cac0f-a4e3-dc6f-24e7-b127eb2b52ef,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,23,5068,59,2023-05-22,2023-05-23,41,40-64,1,False,True
9,000cac0f-a4e3-dc6f-6bff-1953c2e49496,000cac0f-a4e3-dc6f-2d55-624dd93a665e,88,11,1181,2761,2023-06-08,2023-06-10,41,40-64,2,False,False


In [0]:
# Persistência da FATO_INTERNACAO na camada Gold (Delta Table)
(
    fato_internacao.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("mvp_engenharia_de_dados.gold.fato_internacao")
)


### Checagem de Qualidade dos Dados — Camada Gold

Antes da análise dos indicadores, foram realizadas verificações de qualidade na camada Gold para garantir a consistência do modelo analítico e a validade dos domínios definidos no catálogo de dados.

As verificações incluíram:
- validação do schema e das contagens das tabelas fato e dimensões;
- verificação de unicidade e não nulidade das chaves substitutas;
- checagem de integridade referencial lógica entre fato e dimensões;
- validação de consistência temporal (datas de admissão e alta) e de domínios numéricos (LOS e idade);
- inspeção dos domínios de variáveis categóricas relevantes.

Foi identificado um único registro de internação sem diagnóstico principal informado no dado de origem. Esse registro foi mantido no Data Warehouse, devidamente documentado, por permanecer válido para análises que não dependem da estratificação por diagnóstico clínico.



In [0]:
from pyspark.sql.functions import (
    col, count as spark_count, countDistinct, min as spark_min, max as spark_max,
    when, lit
)

print("====================================================")
print("      QUALITY CHECK — Gold (DW)")
print("====================================================")

# Tabelas Gold
fato = spark.table("fato_internacao")
dim_pac = spark.table("dim_paciente")
dim_diag = spark.table("dim_diagnostico")
dim_tempo = spark.table("dim_tempo")

# -----------------------------------------------------
# 0) SCHEMAS
# -----------------------------------------------------
print("\n>> 0. Schemas (Gold):")
print("\n- fato_internacao")
fato.printSchema()
print("\n- dim_paciente")
dim_pac.printSchema()
print("\n- dim_diagnostico")
dim_diag.printSchema()
print("\n- dim_tempo")
dim_tempo.printSchema()


# -----------------------------------------------------
# 1) CONTAGENS BÁSICAS
# -----------------------------------------------------
print("\n>> 1. Contagens básicas:")
print(f"fato_internacao:  {fato.count()}")
print(f"dim_paciente:     {dim_pac.count()}")
print(f"dim_diagnostico:  {dim_diag.count()}")
print(f"dim_tempo:        {dim_tempo.count()}")


# -----------------------------------------------------
# 2) UNICIDADE DE CHAVES SUBSTITUTAS (PKs)
# -----------------------------------------------------
print("\n>> 2. Unicidade de chaves substitutas:")

def check_pk_uniqueness(df, pk_col, name):
    total = df.count()
    distinct_pk = df.select(pk_col).dropna().distinct().count()
    null_pk = df.filter(col(pk_col).isNull()).count()
    print(f"{name}.{pk_col} -> total={total}, distinct={distinct_pk}, nulls={null_pk}, dup_est={total - distinct_pk}")

check_pk_uniqueness(fato, "internacao_sk", "fato_internacao")
check_pk_uniqueness(dim_pac, "paciente_sk", "dim_paciente")
check_pk_uniqueness(dim_diag, "diagnostico_sk", "dim_diagnostico")
check_pk_uniqueness(dim_tempo, "tempo_sk", "dim_tempo")


# -----------------------------------------------------
# 3) NULOS EM FKs NA FATO
# -----------------------------------------------------
print("\n>> 3. Nulos nas FKs da fato_internacao:")

fk_cols = ["paciente_sk", "diagnostico_sk", "data_admissao_sk", "data_alta_sk"]
for c in fk_cols:
    n = fato.filter(col(c).isNull()).count()
    print(f"{c} nulos: {n}")


# -----------------------------------------------------
# 4) INTEGRIDADE REFERENCIAL LÓGICA (FATO -> DIM)
# -----------------------------------------------------
print("\n>> 4. Integridade referencial lógica (órfãos):")

# paciente_sk órfão
orph_pac = (
    fato.select("paciente_sk").dropna().distinct()
    .join(dim_pac.select("paciente_sk").distinct(), "paciente_sk", "left_anti")
    .count()
)

# diagnostico_sk órfão
orph_diag = (
    fato.select("diagnostico_sk").dropna().distinct()
    .join(dim_diag.select("diagnostico_sk").distinct(), "diagnostico_sk", "left_anti")
    .count()
)

# tempo_sk órfão (admissão)
orph_tempo_adm = (
    fato.select(col("data_admissao_sk").alias("tempo_sk")).dropna().distinct()
    .join(dim_tempo.select("tempo_sk").distinct(), "tempo_sk", "left_anti")
    .count()
)

# tempo_sk órfão (alta)
orph_tempo_alta = (
    fato.select(col("data_alta_sk").alias("tempo_sk")).dropna().distinct()
    .join(dim_tempo.select("tempo_sk").distinct(), "tempo_sk", "left_anti")
    .count()
)

print(f"paciente_sk órfãos:       {orph_pac}")
print(f"diagnostico_sk órfãos:    {orph_diag}")
print(f"data_admissao_sk órfãos:  {orph_tempo_adm}")
print(f"data_alta_sk órfãos:      {orph_tempo_alta}")


# -----------------------------------------------------
# 5) CONSISTÊNCIA TEMPORAL E DOMÍNIOS NA FATO
# -----------------------------------------------------
print("\n>> 5. Consistência temporal e domínios (fato_internacao):")

# datas invertidas
alta_antes_adm = fato.filter(col("data_alta") < col("data_admissao")).count()
print(f"Internações com data_alta < data_admissao: {alta_antes_adm}")

# LOS negativo
los_neg = fato.filter(col("los_dias") < 0).count()
print(f"Internações com los_dias < 0: {los_neg}")

# idade improvável
idade_neg = fato.filter(col("idade_admissao") < 0).count()
idade_alta = fato.filter(col("idade_admissao") > 120).count()
print(f"idade_admissao < 0:   {idade_neg}")
print(f"idade_admissao > 120: {idade_alta}")

# flags booleanas nulas
bool_cols = ["obito_internacao", "readmissao_30d"]
for c in bool_cols:
    n = fato.filter(col(c).isNull()).count()
    print(f"{c} nulos: {n}")

# intervalo das datas
fato.agg(
    spark_min("data_admissao").alias("min_data_admissao"),
    spark_max("data_admissao").alias("max_data_admissao"),
    spark_min("data_alta").alias("min_data_alta"),
    spark_max("data_alta").alias("max_data_alta")
).show()


# -----------------------------------------------------
# 6) DOMÍNIOS CATEGÓRICOS (AMOSTRA + TOP VALUES)
# -----------------------------------------------------
print("\n>> 6. Domínios categóricos (top valores):")

# faixa etária (top)
if "faixa_etaria_admissao" in fato.columns:
    (
        fato.groupBy("faixa_etaria_admissao")
        .agg(spark_count(lit(1)).alias("count"))
        .orderBy(col("count").desc())
        .show(50, truncate=False)
    )

# sexo/raca/etnia (dim_paciente)
for c in ["sexo", "raca", "etnia"]:
    if c in dim_pac.columns:
        print(f"\n-- dim_paciente.{c} (top valores)")
        (
            dim_pac.groupBy(c)
            .agg(spark_count(lit(1)).alias("count"))
            .orderBy(col("count").desc())
            .show(50, truncate=False)
        )


print("\n### QUALITY CHECK — Gold (DW) COMPLETO ###")


      QUALITY CHECK — Gold (DW)

>> 0. Schemas (Gold):

- fato_internacao
root
 |-- internacao_sk: long (nullable = true)
 |-- encounter_id: string (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- paciente_sk: long (nullable = true)
 |-- diagnostico_sk: long (nullable = true)
 |-- data_admissao_sk: long (nullable = true)
 |-- data_alta_sk: long (nullable = true)
 |-- data_admissao: date (nullable = true)
 |-- data_alta: date (nullable = true)
 |-- idade_admissao: long (nullable = true)
 |-- faixa_etaria_admissao: string (nullable = true)
 |-- los_dias: integer (nullable = true)
 |-- obito_internacao: boolean (nullable = true)
 |-- readmissao_30d: boolean (nullable = true)


- dim_paciente
root
 |-- paciente_sk: long (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- data_nascimento: date (nullable = true)
 |-- data_obito: date (nullable = true)
 |-- raca: string (nullable = true)
 |-- etnia: string (nullable = true)


- 

### Comentários de colunas (Data Dictionary)

Nesta seção, adicionamos comentários técnicos às colunas das tabelas Gold para suportar governança e entendimento do modelo.


In [0]:
# ------------------------------
# DIM_PACIENTE
# ------------------------------
columns_comments_dim_paciente = {
    "paciente_sk": "Chave substituta do paciente.",
    "patient_id": "Identificador original do paciente.",
    "sexo": "Sexo do paciente.",
    "data_nascimento": "Data de nascimento.",
    "data_obito": "Data de óbito, quando existente.",
    "raca": "Raça registrada.",
    "etnia": "Etnia registrada.",
}

for c, comment in columns_comments_dim_paciente.items():
    spark.sql(f"COMMENT ON COLUMN mvp_engenharia_de_dados.gold.dim_paciente.{c} IS '{comment}'")


In [0]:
# ------------------------------
# DIM_DIAGNOSTICO
# ------------------------------
columns_comments_dim_diagnostico = {
    "diagnostico_sk": "Chave substituta do diagnóstico.",
    "reasoncode": "Código do diagnóstico principal.",
    "reasondescription": "Descrição do diagnóstico principal.",
}

for c, comment in columns_comments_dim_diagnostico.items():
    spark.sql(f"COMMENT ON COLUMN mvp_engenharia_de_dados.gold.dim_diagnostico.{c} IS '{comment}'")


In [0]:
# ------------------------------
# DIM_TEMPO
# ------------------------------
columns_comments_dim_tempo = {
    "tempo_sk": "Chave substituta da data.",
    "data": "Data do evento.",
    "ano": "Ano da data.",
    "mes": "Mês da data.",
    "dia": "Dia do mês.",
    "dia_semana_num": "Dia da semana (1–7).",
    "dia_semana_nome": "Nome do dia da semana.",
    "semana_ano": "Semana do ano.",
    "mes_nome": "Nome do mês.",
    "trimestre": "Trimestre do ano.",
}

for c, comment in columns_comments_dim_tempo.items():
    spark.sql(f"COMMENT ON COLUMN mvp_engenharia_de_dados.gold.dim_tempo.{c} IS '{comment}'")


In [0]:
# ------------------------------
# FATO_INTERNACAO
# ------------------------------
columns_comments_fato_internacao = {
    "internacao_sk": "Chave substituta da internação.",
    "encounter_id": "Identificador original do encontro.",
    "patient_id": "Identificador original do paciente.",
    "paciente_sk": "FK para a dimensão paciente.",
    "diagnostico_sk": "FK para a dimensão diagnóstico.",
    "data_admissao_sk": "FK para a data de admissão.",
    "data_alta_sk": "FK para a data de alta.",
    "data_admissao": "Data da admissão.",
    "data_alta": "Data da alta.",
    "idade_admissao": "Idade na admissão.",
    "faixa_etaria_admissao": "Faixa etária na admissão.",
    "los_dias": "Tempo de permanência (dias).",
    "obito_internacao": "Óbito durante a internação.",
    "readmissao_30d": "Reinternação em até 30 dias.",
}

for c, comment in columns_comments_fato_internacao.items():
    spark.sql(f"COMMENT ON COLUMN mvp_engenharia_de_dados.gold.fato_internacao.{c} IS '{comment}'")


In [0]:
%sql
-- Metadados da DIM_PACIENTE (Gold)
DESCRIBE TABLE EXTENDED gold.dim_paciente;


col_name,data_type,comment
paciente_sk,bigint,Chave substituta do paciente.
patient_id,string,Identificador original do paciente.
sexo,string,Sexo do paciente.
data_nascimento,date,Data de nascimento.
data_obito,date,"Data de óbito, quando existente."
raca,string,Raça registrada.
etnia,string,Etnia registrada.
,,
# Delta Statistics Columns,,
Column Names,"etnia, raca, data_nascimento, sexo, paciente_sk, patient_id, data_obito",


In [0]:
%sql
-- Metadados da DIM_DIAGNOSTICO (Gold)
DESCRIBE TABLE EXTENDED gold.dim_diagnostico;


col_name,data_type,comment
diagnostico_sk,bigint,Chave substituta do diagnóstico.
reasoncode,string,Código do diagnóstico principal.
reasondescription,string,Descrição do diagnóstico principal.
,,
# Delta Statistics Columns,,
Column Names,"diagnostico_sk, reasoncode, reasondescription",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,mvp_engenharia_de_dados,


In [0]:
%sql
-- Metadados da DIM_TEMPO (Gold)
DESCRIBE TABLE EXTENDED gold.dim_tempo;


col_name,data_type,comment
tempo_sk,bigint,Chave substituta da data.
data,date,Data do evento.
ano,int,Ano da data.
mes,int,Mês da data.
dia,int,Dia do mês.
dia_semana_num,int,Dia da semana (1–7).
dia_semana_nome,string,Nome do dia da semana.
semana_ano,int,Semana do ano.
mes_nome,string,Nome do mês.
trimestre,int,Trimestre do ano.


In [0]:
%sql
-- Metadados da FATO_INTERNACAO (Gold)
DESCRIBE TABLE EXTENDED gold.fato_internacao;


col_name,data_type,comment
internacao_sk,bigint,Chave substituta da internação.
encounter_id,string,Identificador original do encontro.
patient_id,string,Identificador original do paciente.
paciente_sk,bigint,FK para a dimensão paciente.
diagnostico_sk,bigint,FK para a dimensão diagnóstico.
data_admissao_sk,bigint,FK para a data de admissão.
data_alta_sk,bigint,FK para a data de alta.
data_admissao,date,Data da admissão.
data_alta,date,Data da alta.
idade_admissao,bigint,Idade na admissão.


## Catálogo de Dados (Domínios Esperados)

Esta seção complementa o Data Dictionary (comentários de colunas) com domínios esperados para atributos numéricos e categóricos, com o objetivo de apoiar governança, validação e entendimento do modelo analítico.

### Convenções gerais

- Chaves substitutas (_sk): inteiros positivos, não nulos.
- Identificadores operacionais (patient_id, encounter_id): strings não nulas, originadas do Synthea.
- Datas (data_*): tipo date, não nulas quando aplicável, dentro de um intervalo plausível (ex.: 1900-01-01 a 2100-12-31).
- Indicadores booleanos: valores possíveis true/false.

---

### DIM_PACIENTE (Gold)

Tabela de dimensão com atributos demográficos e datas vitais do paciente.

| Coluna | Tipo | Descrição | Domínio esperado |
|---|---|---|---|
| paciente_sk | int | Chave substituta do paciente no DW | mínimo 1; inteiro positivo |
| patient_id | string | ID original do paciente no Synthea | string não vazia |
| sexo | string | Sexo registrado | categorias esperadas: M, F (podem existir outras conforme geração do Synthea) |
| data_nascimento | date | Data de nascimento | entre 1900-01-01 e 2100-12-31 |
| data_obito | date | Data de óbito (se aplicável) | nulo ou entre 1900-01-01 e 2100-12-31; quando não nulo, data_obito >= data_nascimento |
| raca | string | Raça | categorias esperadas conforme Synthea (ex.: White, Black, Asian, Native, Other) |
| etnia | string | Etnia | categorias esperadas conforme Synthea (ex.: Hispanic, Non-Hispanic) |

---

### DIM_DIAGNOSTICO (Gold)

Dimensão do diagnóstico/motivo principal da internação, baseada em REASONCODE e REASONDESCRIPTION do encontro.

| Coluna | Tipo | Descrição | Domínio esperado |
|---|---|---|---|
| diagnostico_sk | int | Chave substituta do diagnóstico no DW | mínimo 1; inteiro positivo |
| reasoncode | string | Código do motivo/diagnóstico do encontro | string (pode ter formato SNOMED/terminologia do Synthea); pode ser nulo em poucos casos |
| reasondescription | string | Descrição do motivo/diagnóstico do encontro | string; pode ser nulo em poucos casos |

Observação: para internações com motivo ausente no dado de origem, reasoncode e/ou reasondescription podem estar nulos.

---

### DIM_TEMPO (Gold)

Dimensão de calendário derivada das datas utilizadas no DW (admissão/alta).

| Coluna | Tipo | Descrição | Domínio esperado |
|---|---|---|---|
| tempo_sk | int | Chave substituta da data | mínimo 1; inteiro positivo |
| data | date | Data calendário | entre 1900-01-01 e 2100-12-31 |
| ano | int | Ano | 1900 a 2100 |
| mes | int | Mês | 1 a 12 |
| dia | int | Dia do mês | 1 a 31 (consistente com a data) |
| dia_semana_num | int | Dia da semana (numérico) | 1 a 7 |
| dia_semana_nome | string | Dia da semana (nome) | categorias esperadas: Monday..Sunday (ou equivalente) |
| semana_ano | int | Semana do ano | 1 a 53 |
| mes_nome | string | Nome do mês | categorias esperadas: January..December (ou equivalente) |
| trimestre | int | Trimestre | 1 a 4 |

---

### FATO_INTERNACAO (Gold)

Tabela fato no grão de 1 internação por linha, contendo chaves para dimensões e métricas derivadas.

| Coluna | Tipo | Descrição | Domínio esperado |
|---|---|---|---|
| internacao_sk | int | Chave substituta da internação no DW | mínimo 1; inteiro positivo |
| encounter_id | string | ID original do encontro/internação | string não vazia |
| patient_id | string | ID original do paciente (do encontro) | string não vazia |
| paciente_sk | int | FK para DIM_PACIENTE | inteiro positivo; deve existir em DIM_PACIENTE |
| diagnostico_sk | int | FK para DIM_DIAGNOSTICO | inteiro positivo; deve existir em DIM_DIAGNOSTICO |
| data_admissao_sk | int | FK para DIM_TEMPO (admissão) | inteiro positivo; deve existir em DIM_TEMPO |
| data_alta_sk | int | FK para DIM_TEMPO (alta) | inteiro positivo; deve existir em DIM_TEMPO |
| data_admissao | date | Data de admissão | >= 2000-01-01 |
| data_alta     | date | Data de alta     | >= 2000-01-01; data_alta >= data_admissao |
| idade_admissao | int | Idade na admissão (anos) | 0 a 120 |
| faixa_etaria_admissao | string | Faixa etária na admissão | categorias definidas no pipeline (ex.: 0–17, 18–39, 40–59, 60–79, 80+) |
| los_dias | int | Tempo de permanência em dias | mínimo 0; tipicamente 0 a 365 (dependendo dos filtros aplicados) |
| obito_internacao | boolean | Óbito durante a internação | true/false |
| readmissao_30d | boolean | Reinternação em até 30 dias | true/false |  

Observação: o domínio temporal da tabela fato (fato_internacao) reflete o filtro aplicado no pipeline do MVP, restringindo as análises a internações ocorridas a partir do ano 2000. As dimensões, em especial a dimensão de tempo, podem conter um intervalo mais amplo, por se tratarem de tabelas de referência.  

Regras de consistência recomendadas:
- los_dias deve ser consistente com data_admissao e data_alta.
- obito_internacao = true implica que o paciente possui data_obito não nula na DIM_PACIENTE e compatível com o período da internação (conforme regra do pipeline).
- readmissao_30d = true implica existência de uma internação subsequente para o mesmo paciente dentro de 30 dias após a alta.

---

### Linhagem dos dados (resumo)

- Fonte: dados sintéticos gerados pelo Synthea, exportados em CSV.
- Bronze: ingestão e persistência dos CSVs em Delta Tables (estrutura bruta preservada).
- Silver: padronização de tipos, filtros de internação (inpatient), validações temporais e conciliação entre tabelas.
- Gold: construção do esquema estrela (dimensões e fato) e derivação das métricas analíticas (LOS, óbito na internação, reinternação em 30 dias, idade e faixa etária).
