# Pipeline IPCA - BCB

**Objetivo:** Automatizar a captura e o processamento do IPCA do Banco Central
- **Fonte:** API do BCB (Série 433)
- **Saída:** Tabelas Delta na camada Gold para o Power BI
- **Arquitetura:** Medallion

## Setup e Parâmetros

Setup inicial com imports e definição dos parâmetros
Uso widgets para não deixar nada estático (caminhos, URLs), facilitando a execução em outros ambientes (dev/prod)

In [0]:
# Importação libs
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, expr, lit
import requests
import pandas as pd
from datetime import datetime

# Definição Parâmetros
dbutils.widgets.text("api_url", "https://api.bcb.gov.br/dados/serie/bcdata.sgs.433/dados?formato=json", "URL da API do BCB para IPCA")
dbutils.widgets.text("base_path", "/mnt/banco_ip2ca/ipca", "Caminho base no DBFS para as tabelas Delta")
dbutils.widgets.text("anos_historia", "30", "Anos a serem processados")

# Leitura Parâmetros
API_URL = dbutils.widgets.get("api_url")
ANOS_HISTORIA = int(dbutils.widgets.get("anos_historia"))

CATALOG = "banco_ip2ca_catalog"
SCHEMA_BRONZE = "bronze"
SCHEMA_SILVER = "silver"
SCHEMA_GOLD = "gold"

print(f"URL da API: {API_URL}")
print(f"Salvando tabelas no Catálogo: {CATALOG}")
print(f"Histórico de: {ANOS_HISTORIA} anos")

URL da API: https://api.bcb.gov.br/dados/serie/bcdata.sgs.433/dados?formato=json
Salvando tabelas no Catálogo: banco_ip2ca_catalog
Histórico de: 30 anos


## 1 - Funções Helper

Funções para buscar dados da API e salvar em Delta. Isola a lógica e evita repetição de código

In [0]:
def fetch_data_from_api(api_url: str) -> pd.DataFrame:
    # Busca os dados da API do Banco Central, realiza validações e os retorna como um DataFrame usando Pandas, a função inclui tratamento de erros para garantir a robustez do pipeline
    try:
        response = requests.get(api_url, timeout=30)
        response.raise_for_status()
        data = response.json()
        if not data:
            raise ValueError("A API retornou resposta vazia")
        
        df = pd.DataFrame(data)
        df['data'] = pd.to_datetime(df['data'], dayfirst=True)
        df['valor'] = pd.to_numeric(df['valor'], errors='coerce').fillna(0.0) 
        return df
    except requests.exceptions.RequestException as e:
        print(f"Falha ao acessar a API em {api_url}. Motivos: {e}")
        raise
    except (ValueError, KeyError) as e:
        print(f"Falha no processamento dos dados da API. Motivos: {e}")
        raise

## 2 - Camada Bronze: Ingestão

Busca os dados da API do BCB e salva as-is na camada Bronze

In [0]:
print("Iniciando Camada Bronze")
try:
    df_raw_pd = fetch_data_from_api(API_URL)
    spark_df_bronze = spark.createDataFrame(df_raw_pd)
    spark_df_bronze = spark_df_bronze.withColumn("data_ingestao", lit(datetime.now()))
    
    table_name_bronze = f"{CATALOG}.{SCHEMA_BRONZE}.ipca_raw"
    spark_df_bronze.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name_bronze)
    print(f"Tabela Bronze criada/atualizada com sucesso: {table_name_bronze}")
except Exception as e:
    dbutils.notebook.exit(f"Falha na camada Bronze. Código falhou por causa do erro: {e}")

Iniciando Camada Bronze
Tabela Bronze criada/atualizada com sucesso: banco_ip2ca_catalog.bronze.ipca_raw


## 3 - Camada Silver: Limpeza e Preparação

Lê os dados da Bronze, faz a limpeza, tipagem e renomeia colunas visando boas praticas. Particionando por ano pra otimizar as leituras futuras na camada Gold

In [0]:
print("\nIniciando Camada Silver")
try:
    df_bronze = spark.read.table(f"{CATALOG}.{SCHEMA_BRONZE}.ipca_raw")

    current_year = datetime.now().year
    start_year = current_year - ANOS_HISTORIA

    df_silver = (
        df_bronze
        .withColumnRenamed("data", "data_referencia")
        .withColumnRenamed("valor", "ipca_percentual_mensal")
        .withColumn("ano", year(col("data_referencia")))
        .filter(col("ano") >= start_year)
        .select("data_referencia", "ano", "ipca_percentual_mensal")
    )

    table_name_silver = f"{CATALOG}.{SCHEMA_SILVER}.ipca_cleaned"
    df_silver.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("ano").saveAsTable(table_name_silver)
    print(f"Tabela Silver criada/atualizada com sucesso: {table_name_silver}")
except Exception as e:
    dbutils.notebook.exit(f"Falha na camada Silver. Código falhou por causa do erro: {e}")


Iniciando Camada Silver
Tabela Silver criada/atualizada com sucesso: banco_ip2ca_catalog.silver.ipca_cleaned


## 4 - Camada Gold: Tabela final

A partir da Silver, é criado a tabela GOLD em que o Power BI vai consumir para gerar os gráficos e criar medidas DAX

In [0]:
print("\nIniciando Camada Gold")
try:
    df_silver_gold = spark.read.table(f"{CATALOG}.{SCHEMA_SILVER}.ipca_cleaned")

    print("Criando a tabela de fatos 'ipca_mensal'...")
    df_ipca_mensal = df_silver_gold.select(
        col("data_referencia").alias("data"),
        col("ano"),
        col("ipca_percentual_mensal").alias("ipca_mensal_percentual")
    )
    
    table_name_gold_mensal = f"{CATALOG}.{SCHEMA_GOLD}.ipca_mensal"
    df_ipca_mensal.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name_gold_mensal)
    print(f"Tabela de Fatos Gold '{table_name_gold_mensal}' criada com sucesso.")

except Exception as e:
    dbutils.notebook.exit(f"Falha na camada Gold. Código falhou por causa do erro: {e}")


Iniciando Camada Gold
Criando a tabela de fatos 'ipca_mensal'...
Tabela de Fatos Gold 'banco_ip2ca_catalog.gold.ipca_mensal' criada com sucesso.


## 5 - Finalização do notebook
Etapa final para finalizar o notebook e confirmar que o Pipeline rodou com sucesso

In [0]:
dbutils.notebook.exit("Pipeline completo executado com sucesso!")

> 