In [0]:
# ------------------------------------------------------------
# 03. Gold | Consumo analítico (agregações) - IBGE SIDRA T6579
# ------------------------------------------------------------

from pyspark.sql import functions as F

# 1. Tabelas (origem e destino)
silver_tbl = "main.medallion.silver_sidra_t6579"
gold_tbl   = "main.medallion.gold_sidra_t6579"

# 2. Leitura da Silver
df_silver = spark.table(silver_tbl)

# 3. Seleção e normalização do que interessa para consumo
#    - d2n: ano
#    - d1n: UF
#    - v  : valor (população)
df_base = (
    df_silver
    .select(
        F.col("d2n").cast("int").alias("ano"),
        F.col("d1n").cast("string").alias("uf"),
        F.col("v").cast("double").alias("populacao")
    )
    .filter(F.col("ano").isNotNull())
    .filter(F.col("uf").isNotNull())
)

# 4. Agregação (grão: ano + UF)
df_gold = (
    df_base
    .groupBy("ano", "uf")
    .agg(
        F.sum("populacao").alias("populacao_total")
    )
    .withColumn("gold_ts", F.current_timestamp())
    .orderBy("ano", "uf")
)

# 5. Escrita da Gold (reprocessável)
spark.sql(f"DROP TABLE IF EXISTS {gold_tbl}")

(
    df_gold
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(gold_tbl)
)

# 6. Evidências (prints para README)
print(f"Gold gravado com sucesso: {gold_tbl}")
display(df_gold.limit(20))
print("Total de linhas (Gold):", df_gold.count())
