In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Mapeamento dos indicadores (nome curto usado no Bronze)
indicators_map = {
    "med_ntg": {"code": "SCSUD_MEDNTG", "name": "Medicamento em diretrizes nacionais"},
    "rehab": {"code": "SCSUD_REHAB", "name": "Programas de reabilita√ß√£o"},
    "prison_overdose": {"code": "PRISON_D3_DEATHS_DRUG_MRATE", "name": "Mortalidade por overdose em pris√µes"},
    "gov_progress": {"code": "SCSUD_GOVPROGRESS", "name": "Progresso em governan√ßa"},
    "prison_adverse_events": {"code": "PRISON_B15_REPDRUGEVENTS", "name": "Eventos adversos em pris√µes"},
    "treatment_coverage": {"code": "SUD_TREATMENTSERVICES_COVERAGE", "name": "Cobertura de tratamento"},
    "med_reg": {"code": "SCSUD_MEDREG", "name": "Medicamento registrado"},
    "financing_exp": {"code": "SCSUD_FINANCINGEXP", "name": "Gastos governamentais"},
    "capacity_index": {"code": "SUD_SERVICECAPACITYINDEX", "name": "√çndice de capacidade"},
    "med_public": {"code": "SCSUD_MEDPUBLIC", "name": "Medicamento p√∫blico gratuito"},
    "rehab_progress": {"code": "SCSUD_REHABPROGRESS", "name": "Progresso em reabilita√ß√£o"},
    "services_nss": {"code": "SCSUD_NSS", "name": "Servi√ßos n√£o estruturados"},
    "hcv_treated_pwid": {"code": "HEPATITIS_HCV_TREATMENT_PWID_PERCENT", "name": "Tratamento HCV em PWID"},
    "pharm": {"code": "SCSUD_PHARM", "name": "Servi√ßos farmacol√≥gicos"},
    "med_formulary": {"code": "SCSUD_MEDDF", "name": "Medicamento em formul√°rio"},
    "pwid_pop": {"code": "PWID_PSE_NUM", "name": "Popula√ß√£o PWID estimada"},
    "prison_sud_diag": {"code": "PRISON_D4_SUD_DIAG_TOT", "name": "Diagn√≥sticos SUD em pris√µes"},
    "med_access_progress": {"code": "SCSUD_MEDACCESSPROGRESS", "name": "Progresso acesso medicamentos"},
    "pharm_progress": {"code": "SCSUD_PHARMPROGRESS", "name": "Progresso farmacol√≥gico"},
    "hcv_test_pwid": {"code": "HEPATITIS_HCV_TESTING_PWID_PERCENT", "name": "Teste HCV em PWID"}
}


In [0]:
dfs_unified = []
stats = {"success": 0, "failed": 0, "empty": 0}

for short_name, info in indicators_map.items():
    table_name = f"bronze.data_suicide.drug_{short_name}"
    
    try:
        # Verificar se a tabela existe
        df = spark.table(table_name)
        row_count = df.count()
        
        if row_count == 0:
            print(f"‚ö†Ô∏è  {short_name}: Tabela vazia")
            stats["empty"] += 1
            continue
        
        # Normalizar estrutura baseado nas colunas dispon√≠veis
        # Adaptar para a estrutura real da API WHO
        
        # Selecionar colunas base que devem existir
        base_cols = []
        
        # Pa√≠s
        if "SpatialDim" in df.columns:
            base_cols.append(F.col("SpatialDim").alias("country_code"))
        else:
            print(f"‚ö†Ô∏è  {short_name}: Sem coluna SpatialDim")
            continue
        
        # Ano
        if "TimeDim" in df.columns:
            base_cols.append(F.col("TimeDim").cast("int").alias("year"))
        elif "TimeDimensionBegin" in df.columns:
            base_cols.append(F.year(F.col("TimeDimensionBegin")).alias("year"))
        else:
            print(f"‚ö†Ô∏è  {short_name}: Sem coluna de tempo")
            continue
        
        # Indicador
        base_cols.append(F.lit(short_name).alias("indicator_code"))
        base_cols.append(F.lit(info["name"]).alias("indicator_name"))
        
        # Valor - tentar NumericValue primeiro, depois Value
        if "NumericValue" in df.columns:
            base_cols.append(
                F.when(F.col("NumericValue").isNotNull(), 
                       F.col("NumericValue").cast("double"))
                 .otherwise(None)
                 .alias("numeric_value")
            )
        else:
            base_cols.append(F.lit(None).cast("double").alias("numeric_value"))
        
        if "Value" in df.columns:
            base_cols.append(F.col("Value").alias("value_text"))
        else:
            base_cols.append(F.lit(None).cast("string").alias("value_text"))
        
        # Criar DataFrame normalizado
        df_clean = (
            df.select(*base_cols)
            .filter(F.col("country_code").isNotNull())
            .filter(F.col("year").isNotNull())
            .filter(
                F.col("numeric_value").isNotNull() | 
                F.col("value_text").isNotNull()
            )
        )
        
        clean_count = df_clean.count()
        
        if clean_count > 0:
            dfs_unified.append(df_clean)
            print(f"‚úì {short_name:20s}: {clean_count:>6,} registros")
            stats["success"] += 1
        else:
            print(f"‚ö†Ô∏è  {short_name:20s}: Sem dados v√°lidos ap√≥s limpeza")
            stats["empty"] += 1
            
    except Exception as e:
        print(f"‚úó {short_name:20s}: ERRO - {str(e)[:60]}...")
        stats["failed"] += 1

print("\n" + "=" * 80)
print("üìä RESUMO DO PROCESSAMENTO")
print("=" * 80)
print(f"‚úì Sucesso: {stats['success']} tabelas")
print(f"‚ö†Ô∏è  Vazias: {stats['empty']} tabelas")
print(f"‚úó Erros: {stats['failed']} tabelas")

In [0]:
if len(dfs_unified) == 0:
    print("\n‚ùå ERRO: Nenhuma tabela foi processada com sucesso!")
    print("Verifique se as tabelas Bronze foram criadas corretamente.")
    print("\nExecute o notebook de diagn√≥stico primeiro:")
    print("  ‚Üí Diagn√≥stico - Estrutura das Tabelas de Drogas")
    raise Exception("Nenhum dado dispon√≠vel para consolidar")

# ============================================================================
# CRIAR TABELA SILVER UNIFICADA (FORMATO LONG)
# ============================================================================

print("\n" + "=" * 80)
print("üì¶ CRIANDO TABELA SILVER UNIFICADA")
print("=" * 80)

In [0]:
# Uni√£o de todos os indicadores
df_unified = dfs_unified[0]
for df in dfs_unified[1:]:
    df_unified = df_unified.union(df)

# Adicionar metadados
df_unified = (
    df_unified
    .withColumn("value_type", 
                F.when(F.col("numeric_value").isNotNull(), "numeric")
                 .otherwise("categorical"))
    .withColumn("ingestion_date", F.current_date())
    .withColumn("ingestion_timestamp", F.current_timestamp())
)

# Salvar formato LONG (uma linha por indicador/pa√≠s/ano)
silver_long_table = "silver.data_suicide.drug_indicators"

print(f"\nSalvando formato LONG em: {silver_long_table}")
(
    df_unified.write
    .mode("overwrite")
    .option("mergeSchema", "true")
    .partitionBy("indicator_code")
    .saveAsTable(silver_long_table)
)

total_records = df_unified.count()
print(f"‚úÖ Tabela LONG criada: {total_records:,} registros")

In [0]:
print("\n" + "=" * 80)
print("üì¶ CRIANDO TABELA SILVER WIDE")
print("=" * 80)

# Pivot apenas para valores num√©ricos
df_numeric = df_unified.filter(F.col("numeric_value").isNotNull())

if df_numeric.count() > 0:
    df_wide = (
        df_numeric
        .groupBy("country_code", "year")
        .pivot("indicator_code")
        .agg(F.first("numeric_value"))
    )
    
    # Adicionar metadados
    df_wide = (
        df_wide
        .withColumn("ingestion_date", F.current_date())
        .withColumn("num_indicators", 
                    F.size(F.array([F.col(c) for c in df_wide.columns 
                                    if c not in ['country_code', 'year']])))
    )
    
    silver_wide_table = "silver.data_suicide.drug_indicators_wide"
    
    print(f"\nSalvando formato WIDE em: {silver_wide_table}")
    (
        df_wide.write
        .mode("overwrite")
        .option("mergeSchema", "true")
        .saveAsTable(silver_wide_table)
    )
    
    wide_records = df_wide.count()
    print(f"‚úÖ Tabela WIDE criada: {wide_records:,} registros")
    print(f"   Colunas de indicadores: {len([c for c in df_wide.columns if c.startswith('drug_')])}")
else:
    print("‚ö†Ô∏è  Tabela WIDE n√£o criada - n√£o h√° dados num√©ricos suficientes")

In [0]:
print("\n" + "=" * 80)
print("üìù ADICIONANDO DOCUMENTA√á√ÉO")
print("=" * 80)

spark.sql(f"""
COMMENT ON TABLE {silver_long_table} IS
'Indicadores de drogas WHO consolidados (formato LONG): cada linha representa um indicador/pa√≠s/ano. 
Fonte: WHO GHO API. Processado de bronze.data_suicide.drug_*'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.country_code IS
'C√≥digo ISO3 do pa√≠s (SpatialDim da WHO)'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.year IS
'Ano de refer√™ncia do dado'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.indicator_code IS
'C√≥digo curto do indicador (chave para join/filtro)'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.indicator_name IS
'Nome descritivo do indicador'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.numeric_value IS
'Valor num√©rico do indicador (quando aplic√°vel)'
""")

spark.sql(f"""
COMMENT ON COLUMN {silver_long_table}.value_text IS
'Valor categ√≥rico/textual do indicador'
""")

if df_numeric.count() > 0:
    spark.sql(f"""
    COMMENT ON TABLE {silver_wide_table} IS
    'Indicadores de drogas WHO (formato WIDE): cada coluna representa um indicador. 
    Facilita an√°lises e correla√ß√µes. Apenas valores num√©ricos.'
    """)

print("‚úì Documenta√ß√£o adicionada")


In [0]:
print("\n" + "=" * 80)
print("üîç AN√ÅLISE EXPLORAT√ìRIA")
print("=" * 80)

# Cobertura por indicador
print("\n1Ô∏è‚É£ COBERTURA POR INDICADOR:")
coverage = (
    df_unified
    .groupBy("indicator_code", "indicator_name")
    .agg(
        F.countDistinct("country_code").alias("num_countries"),
        F.count("*").alias("total_records"),
        F.min("year").alias("first_year"),
        F.max("year").alias("last_year")
    )
    .orderBy(F.desc("num_countries"))
)
display(coverage)

# Cobertura por pa√≠s
print("\n2Ô∏è‚É£ TOP 10 PA√çSES COM MAIS DADOS:")
country_coverage = (
    df_unified
    .groupBy("country_code")
    .agg(
        F.countDistinct("indicator_code").alias("num_indicators"),
        F.count("*").alias("total_records")
    )
    .orderBy(F.desc("num_indicators"))
    .limit(10)
)
display(country_coverage)

# Anos com mais dados
print("\n3Ô∏è‚É£ COBERTURA POR ANO:")
year_coverage = (
    df_unified
    .groupBy("year")
    .agg(
        F.countDistinct("country_code").alias("num_countries"),
        F.countDistinct("indicator_code").alias("num_indicators"),
        F.count("*").alias("total_records")
    )
    .orderBy("year")
)
display(year_coverage)