In [1]:
from pathlib import Path


# PROJECT_ROOT = Path().resolve()  # se o notebook estiver em /notebooks, pode ajustar abaixo
PROJECT_ROOT = Path().resolve().parents[0]  # (se necessário)

IPCA_PATH = (PROJECT_ROOT / "lakehouse" / "bronze" / "ipca").resolve()
BOI_PATH  = (PROJECT_ROOT / "lakehouse" / "bronze" / "boi_gordo").resolve()

print("IPCA:", IPCA_PATH)
print("BOI :", BOI_PATH)

BRONZE_ROOT = (PROJECT_ROOT / "lakehouse" / "bronze").resolve()
SILVER_ROOT = (PROJECT_ROOT / "lakehouse" / "silver").resolve()

IPCA_SILVER_PATH = (SILVER_ROOT / "ipca").resolve()
BOI_SILVER_PATH  = (SILVER_ROOT / "boi_gordo").resolve()

print("SILVER:", SILVER_ROOT)
print("PROJECT_ROOT:", PROJECT_ROOT)
print("IPCA BRONZE  :", IPCA_PATH)
print("BOI  BRONZE  :", BOI_PATH)
print("IPCA SILVER  :", IPCA_SILVER_PATH)
print("BOI  SILVER  :", BOI_SILVER_PATH)

IPCA: C:\Users\fdani\project_ipca_boi\lakehouse\bronze\ipca
BOI : C:\Users\fdani\project_ipca_boi\lakehouse\bronze\boi_gordo
SILVER: C:\Users\fdani\project_ipca_boi\lakehouse\silver
PROJECT_ROOT: C:\Users\fdani\project_ipca_boi
IPCA BRONZE  : C:\Users\fdani\project_ipca_boi\lakehouse\bronze\ipca
BOI  BRONZE  : C:\Users\fdani\project_ipca_boi\lakehouse\bronze\boi_gordo
IPCA SILVER  : C:\Users\fdani\project_ipca_boi\lakehouse\silver\ipca
BOI  SILVER  : C:\Users\fdani\project_ipca_boi\lakehouse\silver\boi_gordo


In [None]:
# Abertura de sessão Spark: Configuação

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
import os, sys


# --------------------------------------------
# Spark Session (igual aos pipelines Bronze)
# --------------------------------------------

def get_spark(app_name: str = "transform_silver") -> SparkSession:
    
    os.environ["PYSPARK_PYTHON"] = sys.executable
    os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
    builder = (
        SparkSession.builder
        .appName(app_name)
        .config("spark.sql.session.timeZone", "UTC")
        .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.sql.shuffle.partitions", "8")  # bom para local
    )
    return configure_spark_with_delta_pip(builder).getOrCreate()





In [3]:
# Ativação do Spark:

spark = get_spark("transform_silver")

In [4]:
# Carregamento da camada básica da Bronze:

df_ipca_bronze = spark.read.format("delta").load(IPCA_PATH.as_posix())
df_boi_bronze  = spark.read.format("delta").load(BOI_PATH.as_posix())

df_ipca_bronze.printSchema()
df_ipca_bronze.show(5, truncate=False)



root
 |-- data_ref: date (nullable = true)
 |-- valor: double (nullable = true)
 |-- source: string (nullable = true)
 |-- series_id: integer (nullable = true)
 |-- data_inicial: string (nullable = true)
 |-- data_final: string (nullable = true)
 |-- request_url: string (nullable = true)
 |-- payload_hash: string (nullable = true)
 |-- ingest_run_id: string (nullable = true)
 |-- ingested_at_utc: timestamp (nullable = true)
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)

+----------+-----+-------+---------+------------+----------+----------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------+------------------------------------+--------------------------+----+---+
|data_ref  |valor|source |series_id|data_inicial|data_final|request_url                                                                                                     |payload_hash  

In [5]:
df_boi_bronze.printSchema()
df_boi_bronze.show(5, truncate=False)

root
 |-- csv_file: string (nullable = true)
 |-- mes_ano: string (nullable = true)
 |-- valor: double (nullable = true)
 |-- source_file: string (nullable = true)
 |-- file_hash: string (nullable = true)
 |-- converted_at_utc: timestamp (nullable = true)
 |-- ingest_run_id: string (nullable = true)
 |-- ingested_at_utc: timestamp (nullable = true)
 |-- mes: integer (nullable = true)
 |-- ano: integer (nullable = true)

+---------------------------------+-------+------+---------------------------------+----------------------------------------------------------------+-------------------+------------------------------------+--------------------------+---+----+
|csv_file                         |mes_ano|valor |source_file                      |file_hash                                                       |converted_at_utc   |ingest_run_id                       |ingested_at_utc           |mes|ano |
+---------------------------------+-------+------+---------------------------------+------

In [7]:
# Transformação : Camada Silver do IPCA :

# data_ref (date) OK
# ano, mes OK
# valor (double) 
# series_id (int)
# auditoria leve: source, ingested_at_utc, payload_hash, ingest_run_id
# remoção de campos “de requisição” (request_url, data_inicial, data_final) porque são mais Bronze via API

from pyspark.sql import functions as F

df_ipca_silver = (
    df_ipca_bronze
    .select(
        F.col("data_ref").alias("data"),
        F.col("ano").cast("int").alias("ano"),
        F.col("mes").cast("int").alias("mes"),
        F.col("valor").cast("double").alias("ipca_mensal"),
        F.col("series_id").cast("int").alias("series_id"),
        F.col("source").alias("source"),
        F.col("payload_hash").alias("payload_hash"),
        F.col("ingest_run_id").alias("ingest_run_id"),
        F.col("ingested_at_utc").alias("ingested_at_utc"),
    )
    .filter(F.col("data").isNotNull())
    .dropDuplicates(["series_id", "data"])
)


In [8]:
# Check rápido da silver IPCA:

df_ipca_silver.show(5, truncate=False)
df_ipca_silver.printSchema()


+----------+----+---+-----------+---------+-------+----------------------------------------------------------------+------------------------------------+--------------------------+
|data      |ano |mes|ipca_mensal|series_id|source |payload_hash                                                    |ingest_run_id                       |ingested_at_utc           |
+----------+----+---+-----------+---------+-------+----------------------------------------------------------------+------------------------------------+--------------------------+
|2025-01-01|2025|1  |0.16       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a654da4568d523603050761|1796618f-301f-47ab-82a3-caf46c8e0b96|2026-02-06 01:52:55.954725|
|2025-02-01|2025|2  |1.31       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a654da4568d523603050761|1796618f-301f-47ab-82a3-caf46c8e0b96|2026-02-06 01:52:55.954725|
|2025-03-01|2025|3  |0.56       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a65

In [9]:
# Transformação Silver — BOI GORDO:

# criar uma data canônica mensal (primeiro dia do mês) a partir de ano/mes
# manter valor como preco_boi_gordo
# manter auditoria: source_file, file_hash, ingest_run_id, ingested_at_utc
# remover csv_file e mes_ano (redundantes na Silver)

from pyspark.sql import functions as F

df_boi_silver = (
    df_boi_bronze
    .withColumn(
        "data",
        F.to_date(F.concat_ws("-", F.col("ano"), F.col("mes"), F.lit("01")))
    )
    .select(
        F.col("data"),
        F.col("ano").cast("int").alias("ano"),
        F.col("mes").cast("int").alias("mes"),
        F.col("valor").cast("double").alias("preco_boi_gordo"),
        F.col("source_file").alias("source_file"),
        F.col("file_hash").alias("file_hash"),
        F.col("ingest_run_id").alias("ingest_run_id"),
        F.col("ingested_at_utc").alias("ingested_at_utc"),
        F.col("converted_at_utc").alias("converted_at_utc"),
    )
    .filter(F.col("data").isNotNull())
    .dropDuplicates(["data"])
)


In [10]:
# Check List SILVER - BOI GORDO

df_boi_silver.show(5, truncate=False)
df_boi_silver.printSchema()


+----------+----+---+---------------+---------------------------------+----------------------------------------------------------------+------------------------------------+--------------------------+-------------------+
|data      |ano |mes|preco_boi_gordo|source_file                      |file_hash                                                       |ingest_run_id                       |ingested_at_utc           |converted_at_utc   |
+----------+----+---+---------------+---------------------------------+----------------------------------------------------------------+------------------------------------+--------------------------+-------------------+
|2025-01-01|2025|1  |324.95         |cepea-consulta-20260131203157.xls|3f0d5e276f235d8d2e7a49218ea4fbc11d219de76083e99f75ef2a935bca1d09|5092bde9-7049-4356-96b7-eaa42cac86d0|2026-02-06 00:45:53.029236|2026-02-05 23:20:02|
|2025-02-01|2025|2  |319.21         |cepea-consulta-20260131203157.xls|3f0d5e276f235d8d2e7a49218ea4fbc11d219de76083e

In [None]:
# Escrever e Salvar no DELTA LAKE Silver (Delta):

(df_ipca_silver.write
 .format("delta")
 .mode("overwrite")
 .save(IPCA_SILVER_PATH.as_posix()))

(df_boi_silver.write
 .format("delta")
 .mode("overwrite")
 .save(BOI_SILVER_PATH.as_posix()))


In [12]:
# Registrar no catálogo:

spark.sql(f"CREATE DATABASE IF NOT EXISTS silver LOCATION '{SILVER_ROOT.as_posix()}'")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.ipca
USING DELTA
LOCATION '{IPCA_SILVER_PATH.as_posix()}'
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.boi_gordo
USING DELTA
LOCATION '{BOI_SILVER_PATH.as_posix()}'
""")

spark.sql("SHOW TABLES IN silver").show(truncate=False)


+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|silver   |boi_gordo|false      |
|silver   |ipca     |false      |
+---------+---------+-----------+



In [13]:
# Leitura final (prova que a Silver está no DELTA):

spark.table("silver.ipca").show(5, truncate=False)
spark.table("silver.boi_gordo").show(5, truncate=False)

+----------+----+---+-----------+---------+-------+----------------------------------------------------------------+------------------------------------+--------------------------+
|data      |ano |mes|ipca_mensal|series_id|source |payload_hash                                                    |ingest_run_id                       |ingested_at_utc           |
+----------+----+---+-----------+---------+-------+----------------------------------------------------------------+------------------------------------+--------------------------+
|2025-01-01|2025|1  |0.16       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a654da4568d523603050761|1796618f-301f-47ab-82a3-caf46c8e0b96|2026-02-06 01:52:55.954725|
|2025-02-01|2025|2  |1.31       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a654da4568d523603050761|1796618f-301f-47ab-82a3-caf46c8e0b96|2026-02-06 01:52:55.954725|
|2025-03-01|2025|3  |0.56       |433      |BCB_SGS|edb41f451ba121bbee8af8d90493a237d05968ca8a65

In [14]:
from pyspark.sql import functions as F

# ======================================
# GOLD PREP: join Silver IPCA + BOI
# ======================================

# 1) Ler Silver
df_boi  = spark.table("silver.boi_gordo")
df_ipca = spark.table("silver.ipca")

# 2) Garantir chave mensal (data = primeiro dia do mês) em ambas
df_boi_k = (
    df_boi
    .withColumn("data", F.to_date(F.concat_ws("-", F.col("ano"), F.col("mes"), F.lit("01"))))
    .select(
        "data",
        F.col("ano").cast("int").alias("ano"),
        F.col("mes").cast("int").alias("mes"),
        F.col("preco_boi_gordo").cast("double").alias("preco_boi_gordo")
    )
    .dropDuplicates(["data"])
)

df_ipca_k = (
    df_ipca
    .withColumn("data", F.to_date(F.concat_ws("-", F.col("ano"), F.col("mes"), F.lit("01"))))
    .select(
        "data",
        F.col("ano").cast("int").alias("ano"),
        F.col("mes").cast("int").alias("mes"),
        F.col("ipca_mensal").cast("double").alias("ipca_mensal"),
        F.col("series_id").cast("int").alias("series_id")
    )
    .dropDuplicates(["series_id", "data"])
)

# 3) Se existir mais de uma série no IPCA, pega a mais comum (fallback automático)
#    (se já for 1 série só, isso não muda nada)
series_ids = [r[0] for r in df_ipca_k.select("series_id").distinct().collect()]
if len(series_ids) > 1:
    top_series = (
        df_ipca_k.groupBy("series_id")
        .count()
        .orderBy(F.col("count").desc())
        .first()["series_id"]
    )
    df_ipca_k = df_ipca_k.filter(F.col("series_id") == F.lit(top_series))

# 4) JOIN (padrão: inner; mude para "left" se quiser manter todos os meses do boi)
df_gold_prep = (
    df_boi_k.alias("b")
    .join(df_ipca_k.alias("i"), on="data", how="inner")
    .select(
        F.col("data"),
        F.coalesce(F.col("b.ano"), F.col("i.ano")).alias("ano"),
        F.coalesce(F.col("b.mes"), F.col("i.mes")).alias("mes"),
        F.col("b.preco_boi_gordo"),
        F.col("i.ipca_mensal")
    )
    .orderBy("data")
)

# 5) Checagens rápidas
print("Linhas gold_prep:", df_gold_prep.count())
(df_gold_prep.groupBy("data").count().filter("count > 1")).show(20, truncate=False)
df_gold_prep.show(12, truncate=False)

# 6) (Opcional) escrever e registrar GOLD
# Ajuste os paths se você já tiver definido GOLD_ROOT em outro lugar
# from pathlib import Path
# GOLD_ROOT = (PROJECT_ROOT / "lakehouse" / "gold").resolve()
# GOLD_PATH = (GOLD_ROOT / "boi_ipca").resolve()
# (df_gold_prep.write.format("delta").mode("overwrite").save(GOLD_PATH.as_posix()))
# spark.sql(f"CREATE DATABASE IF NOT EXISTS gold LOCATION '{GOLD_ROOT.as_posix()}'")
# spark.sql(f"CREATE TABLE IF NOT EXISTS gold.boi_ipca USING DELTA LOCATION '{GOLD_PATH.as_posix()}'")


Linhas gold_prep: 12
+----+-----+
|data|count|
+----+-----+
+----+-----+

+----------+----+---+---------------+-----------+
|data      |ano |mes|preco_boi_gordo|ipca_mensal|
+----------+----+---+---------------+-----------+
|2025-01-01|2025|1  |324.95         |0.16       |
|2025-02-01|2025|2  |319.21         |1.31       |
|2025-03-01|2025|3  |312.47         |0.56       |
|2025-04-01|2025|4  |323.96         |0.43       |
|2025-05-01|2025|5  |308.15         |0.26       |
|2025-06-01|2025|6  |313.51         |0.24       |
|2025-07-01|2025|7  |299.97         |0.26       |
|2025-08-01|2025|8  |307.25         |-0.11      |
|2025-09-01|2025|9  |307.87         |0.48       |
|2025-10-01|2025|10 |310.51         |0.09       |
|2025-11-01|2025|11 |322.08         |0.18       |
|2025-12-01|2025|12 |320.28         |0.33       |
+----------+----+---+---------------+-----------+



In [None]:
# 6) (Opcional) escrever e registrar GOLD

from pathlib import Path
# Abrir caminho em 'paths' para a camada GOLD:
GOLD_ROOT = (PROJECT_ROOT / "lakehouse" / "gold").resolve()
GOLD_PATH = (GOLD_ROOT / "boi_ipca").resolve()

# Escrita, criação e salve em formato DELTA da camada GOLD:

(df_gold_prep.write.format("delta").mode("overwrite").save(GOLD_PATH.as_posix()))
spark.sql(f"CREATE DATABASE IF NOT EXISTS gold LOCATION '{GOLD_ROOT.as_posix()}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS gold.boi_ipca USING DELTA LOCATION '{GOLD_PATH.as_posix()}'")

DataFrame[]

In [16]:
spark.stop()