<b>Importação das bibliotecas</b>

In [0]:
# Importação do método SparkSession e functions da biblioteca pyspark
from pyspark.sql import SparkSession, functions as f

# Instalação e importação da biblioteca request para invocar a URL
import requests

# Importação da biblioteca json para leitura do formato do dataset
import json

# Importação da biblioteca pandas para manipulação dos dados
#import pandas as pd

# Importação dos métodos datetime e timedelta da biblioteca datetime para manipulação e cálculos de data
from datetime import datetime, timedelta

<b>Extração dos dados via API, criação de variáveis e da sessão Spark</b><br>
<b>Filtro: </b>IPCA dos útimos 30 anos com base na data de execução.

In [0]:
# Cria a uma sessão spark
spark = SparkSession.builder.appName("Analise_IPCA").getOrCreate()

# Cria a variável dataIncial para obter a data de execução
dataFinal = datetime.today().date()

# A partir da data de execução, identifico o mês anterior para obter os dados da API
dataFinal = datetime(dataFinal.year, dataFinal.month, 1) - timedelta(days = 1)

# Então realizo com base no primeiro período, a subtração de 30 anos
dataInicial = datetime(dataFinal.year-30, 1, 1)

# Padronização de datas para inserção nos filtros da API
dataInicial = dataInicial.strftime('%d/%m/%Y')
dataFinal = dataFinal.strftime('%d/%m/%Y')

# Requisição dos dados da API já com os filtros
url = f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.10844/dados?formato=json&dataInicial={dataInicial}&dataFinal={dataFinal}"
resp = requests.get(url)

# Validação da requisição da API
if resp.status_code == 200:
    dadosAPI = resp.json() # Atribui a resposta do requests em json na variável dadosAPI
else:
    raise Exception(f"Erro ao acessar a API. Status: {resp.status_code}") # Informa a mensagem de Erro ao acessar APi com o código do erro

<b>Transformação dos dados</b><br>
Criação de Dataframe, definição dos tipos de campo, criação de novos campos e etc..

In [0]:
# Criação do dataframe com os dados json para manipulação dos dados
df_API = spark.createDataFrame(dadosAPI)

# Por apenas haver poucos registros, utilizei este print para validar:
# a criação do dataframe e seu layout 
# importação de TODOS os dados devidamente 
df_API.show()

+----------+-----+
|      data|valor|
+----------+-----+
|01/01/1995| 5.18|
|01/02/1995| 3.09|
|01/03/1995| 3.83|
|01/04/1995| 4.29|
|01/05/1995| 6.91|
|01/06/1995| 4.38|
|01/07/1995| 3.90|
|01/08/1995| 3.31|
|01/09/1995| 2.85|
|01/10/1995| 2.47|
|01/11/1995| 2.71|
|01/12/1995| 2.28|
|01/01/1996| 2.20|
|01/02/1996| 2.32|
|01/03/1996| 1.63|
|01/04/1996| 1.40|
|01/05/1996| 2.21|
|01/06/1996| 1.54|
|01/07/1996| 1.41|
|01/08/1996| 1.43|
+----------+-----+
only showing top 20 rows



In [0]:
# Tratamento dos campos importados e seus "data types"
# Iniciando pelo campo data tipo data
df_API = df_API.withColumn("data", f.to_date("data", "dd/MM/yyyy"))

# E finalizando com o campo valor tipo float com 2 casas decimais
# E renomeando o campo valor para ipca_mensal
df_API = df_API.withColumn("valor", f.floor(f.col("valor").cast("float") * 100)/100)
df_API = df_API.withColumnRenamed("valor", "ipca_mensal")

# Cria uma nova coluna com o ano de cada período
df_API = df_API.withColumn("ano", f.year("data"))

# Cria uma nova coluna com a data de execução da consulta na API
df_API = df_API.withColumn("data_importacao", f.current_date())

# Ordenação dos campos por ano e validação das "tipagens" dos campos
df_API = df_API.orderBy("ano")
df_API.printSchema()

root
 |-- data: date (nullable = true)
 |-- ipca_mensal: double (nullable = true)
 |-- ano: integer (nullable = true)
 |-- data_importacao: date (nullable = false)



<b>Criação da regra de negócio</b><br>
Cálculo do IPCA anual para aplicar a fórmula de multiplicação dos fatores nos períodos de Janeiro à Dezembro de cada ano.<br>
Fórmula: Acumulado = ((1 + ipca_mes_01 / 100) * (1 + ipca_mes_02 / 100)...(1 + ipca_mes_n / 100) - 1) * 100

In [0]:
# Função para o cálculo do IPCA
def calc_acu(ipca):
    return (f.exp(f.sum(f.log(1 + ipca / 100))) - 1) * 100

<b>Resultado</b><br>
Como na etapa de <i>Transformação dos dados </i> já havia criado a coluna ano, agora vou apenas aplica a função.

In [0]:
# Aplica a função de cálculo do IPCA no campo ipca_mensal e utiliza como agrupador o campo ano
df_Acumulado_Anual = (df_API
                    .groupBy("data_importacao", "ano")  # Agrupar por ano
                    .agg(calc_acu(f.col("ipca_mensal")).alias("ipca_acumulado"))  # Aplicar a função calc_acu na coluna 'ipca'
                    .orderBy("ano")
                    )

# Padroniza o campo ipca_acumulado com 2 casas decimais
df_Acumulado_Anual = df_Acumulado_Anual.withColumn("ipca_acumulado", f.floor(f.col("ipca_acumulado") * 100)/100)

# Print dos dados padronizados
df_Acumulado_Anual.show()

+---------------+----+--------------+
|data_importacao| ano|ipca_acumulado|
+---------------+----+--------------+
|     2025-02-03|1995|          55.7|
|     2025-02-03|1996|         18.76|
|     2025-02-03|1997|           6.2|
|     2025-02-03|1998|          1.55|
|     2025-02-03|1999|          1.34|
|     2025-02-03|2000|          3.12|
|     2025-02-03|2001|          4.86|
|     2025-02-03|2002|          5.54|
|     2025-02-03|2003|          7.31|
|     2025-02-03|2004|          6.65|
|     2025-02-03|2005|          6.76|
|     2025-02-03|2006|          5.48|
|     2025-02-03|2007|          5.17|
|     2025-02-03|2008|          6.38|
|     2025-02-03|2009|          6.35|
|     2025-02-03|2010|          7.61|
|     2025-02-03|2011|          8.98|
|     2025-02-03|2012|          8.73|
|     2025-02-03|2013|          8.74|
|     2025-02-03|2014|          8.32|
+---------------+----+--------------+
only showing top 20 rows



<b>Carga dos dados na tabela Delta Lake</b>

In [0]:
# Carga do dataframe df_API para o Delta Lake com os dados do ipca mensal
df_API.write.format("delta").mode("overwrite").saveAsTable("IPCA_Mensal")

# Display dos dados do df_API para Downloads
display(df_API)

data,ipca_mensal,ano,data_importacao
1995-01-01,5.18,1995,2025-02-03
1995-02-01,3.09,1995,2025-02-03
1995-03-01,3.83,1995,2025-02-03
1995-04-01,4.29,1995,2025-02-03
1995-05-01,6.91,1995,2025-02-03
1995-06-01,4.38,1995,2025-02-03
1995-07-01,3.9,1995,2025-02-03
1995-08-01,3.31,1995,2025-02-03
1995-09-01,2.85,1995,2025-02-03
1995-10-01,2.47,1995,2025-02-03


In [0]:
# Carga do dataframe df_Acumulado_Anual para o Delta Lake com os dados do ipca anual acumulado
df_Acumulado_Anual.write.format("delta").mode("overwrite").saveAsTable("IPCA_Anual")

# Display dos dados do df_Acumulado_Anual para Downloads
display(df_Acumulado_Anual)

data_importacao,ano,ipca_acumulado
2025-02-03,1995,55.7
2025-02-03,1996,18.76
2025-02-03,1997,6.2
2025-02-03,1998,1.55
2025-02-03,1999,1.34
2025-02-03,2000,3.12
2025-02-03,2001,4.86
2025-02-03,2002,5.54
2025-02-03,2003,7.31
2025-02-03,2004,6.65
