✅ Parte 1 – Carregar a Bronze

In [0]:
from pyspark.sql import functions as F

df_bronze = spark.table("workspace.mvp.bronze_dca")

display(df_bronze.limit(10))
df_bronze.printSchema()


✅ Parte 2 – Construir as métricas 

Aqui é traduzido get_padroes_conta + groupby para um groupBy do Spark com sum(when(...)).

Observação: agora usando as colunas uf, Codigo_UF e Ano da Bronze.

Nesta parte do código existe uma função que lida com a mudança de estrutura da classificação de contas orçamentárias que ocorreu ao longo dos anos. As contas do setor público (e os códigos que as representam) podem mudar de um ano para o outro devido a novas legislações, atualizações de plano de contas, etc.

Assim se o ano for igual ou anterior a 2017 a função retorna um dicionário com os padrões de contas (strings que representam o início do código da conta e sua descrição) que eram válidos para aquele período.

Se o ano for posterior a 2017, ele retorna um conjunto diferente de padrões, refletindo as novas classificações.
Isso garante que está sendo usado os códigos de conta corretos para filtrar e somar valores, independente do ano, o que torna a análise mais precisa e confiável ao longo do tempo.

In [0]:
df = df_bronze

# Garantia extra: 'valor' já é double, mas deixo explícito
df = df.withColumn("valor", F.col("valor").cast("double"))

# Condições para cada tipo de conta, respeitando o corte ANO <= 2017 / > 2017

# Despesas Correntes
cond_desp_corrente = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("3.0.00.00.00.00 - Despesas Correntes")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("3.0.00.00.00 - Despesas Correntes")
    )
) & F.col("coluna").contains("Despesas Liquidadas")

# Despesas de Capital
cond_desp_capital = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("4.0.00.00.00.00 - Despesas de Capital")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("4.0.00.00.00 - Despesas de Capital")
    )
) & F.col("coluna").contains("Despesas Liquidadas")

# Receitas Correntes (exceto intra)
cond_rec_corrente = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("1.0.0.0.00.00.00 - Receitas Correntes")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("1.0.0.0.00.0.0 - Receitas Correntes")
    )
) & F.col("coluna").contains("Realizadas")

# Receitas de Capital (exceto intra)
cond_rec_capital = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("2.0.0.0.00.00.00 - Receitas de Capital")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("2.0.0.0.00.0.0 - Receitas de Capital")
    )
) & F.col("coluna").contains("Realizadas")

# Receitas Correntes Intra
cond_rec_corrente_intra = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("7.0.0.0.00.00.00 - Receitas Correntes")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("7.0.0.0.00.0.0 - Receitas Correntes")
    )
) & F.col("coluna").contains("Realizadas")

# Receitas de Capital Intra
cond_rec_capital_intra = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("8.0.0.0.00.00.00 - Receitas de Capital")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("8.0.0.0.00.0.0 - Receitas de Capital")
    )
) & F.col("coluna").contains("Realizadas")

# Investimentos
cond_investimentos = (
    (
        (F.col("Ano") <= 2017) &
        F.col("conta").contains("4.4.00.00.00.00 - Investimentos")
    )
    |
    (
        (F.col("Ano") > 2017) &
        F.col("conta").contains("4.4.00.00.00 - Investimentos")
    )
) & F.col("coluna").contains("Despesas Liquidadas")


In [0]:
df_silver = (
    df
    .groupBy("uf", "Codigo_UF", "Ano")
    .agg(
        # Receitas
        F.sum(F.when(cond_rec_corrente, F.col("valor")).otherwise(0)).alias("Receita_Corrente"),
        F.sum(F.when(cond_rec_capital, F.col("valor")).otherwise(0)).alias("Receita_Capital"),
        F.sum(F.when(cond_rec_corrente_intra, F.col("valor")).otherwise(0)).alias("Receita_Intra_Corrente"),
        F.sum(F.when(cond_rec_capital_intra, F.col("valor")).otherwise(0)).alias("Receita_Intra_Capital"),

        # Despesas
        F.sum(F.when(cond_desp_corrente, F.col("valor")).otherwise(0)).alias("Despesa_Corrente"),
        F.sum(F.when(cond_desp_capital, F.col("valor")).otherwise(0)).alias("Despesa_Capital"),

        # Investimentos
        F.sum(F.when(cond_investimentos, F.col("valor")).otherwise(0)).alias("Investimento"),

        # População (um valor por UF/Ano)
        F.max("populacao").alias("Populacao")
    )
)

# Replicação dos cálculos finais

df_silver = (
    df_silver
    .withColumn("Receita_Exceto_Intra", F.col("Receita_Corrente") + F.col("Receita_Capital"))
    .withColumn("Receita_Intra", F.col("Receita_Intra_Corrente") + F.col("Receita_Intra_Capital"))
    .withColumn("Receita_Total", F.col("Receita_Exceto_Intra") + F.col("Receita_Intra"))
    .withColumn("Despesa_Total", F.col("Despesa_Corrente") + F.col("Despesa_Capital"))
)

display(df_silver.orderBy("uf", "Ano").limit(20))
df_silver.printSchema()


✅ Parte 3 – Salvar como tabela SILVER

In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.mvp.silver_dca")

print("✅ Tabela SILVER criada: workspace.mvp.silver_dca")
