In [0]:
%pip install yfinance

In [0]:
import yfinance as yf
import pandas as pd
import requests
from pyspark.sql.functions import current_timestamp, lit, col
from pyspark.sql.types import *
from datetime import datetime, timedelta


TARGET_CATALOG = "inflacao" 
TARGET_SCHEMA = "bronze"         


end_date = datetime.now()
start_date = end_date - timedelta(days=5*365)

print(f"--- INICIANDO PIPELINE BRONZE ---")
print(f"Janela: {start_date.date()} até {end_date.date()}")

def ingest_yfinance_data(ticker, table_name):
    """
    Baixa dados do Yahoo Finance, trata colunas e salva como Delta Table.
    Inclui verificação de dados vazios para evitar erros de Schema.
    """
    full_table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{table_name}"
    
    try:
        print(f"\n--> Processando: {ticker}")
        
       
        pdf = yf.download(ticker, start=start_date, end=end_date, progress=False, auto_adjust=True)
        
        
        if pdf.empty:
            print(f"ALERTA: Nenhum dado retornado para {ticker}. Tabela {table_name} NÃO foi atualizada.")
            return

       
        pdf = pdf.reset_index()
        
        
        pdf.columns = [c[0] if isinstance(c, tuple) else c for c in pdf.columns]
        
        
        pdf.columns = [c.strip().replace(' ', '_').lower() for c in pdf.columns]
        
       
        df_spark = spark.createDataFrame(pdf)
        
        
        df_final = df_spark.withColumn("ingestion_date", current_timestamp()) \
                           .withColumn("source_ticker", lit(ticker)) \
                           .withColumn("original_currency", lit("USD" if "USD" in ticker or "=F" in ticker else "BRL"))

        
        df_final.write.format("delta") \
            .mode("overwrite") \
            .option("mergeSchema", "true") \
            .saveAsTable(full_table_name)
            
        print(f"SUCESSO: Tabela salva em {full_table_name}")
        
    except Exception as e:
        print(f"ERRO CRÍTICO em {ticker}: {str(e)}")


def ingest_bcb_poupanca(table_name):
    """
    Baixa a série 196 (Poupança) da API do BCB e salva como Delta Table.
    CORREÇÃO: Adicionado overwriteSchema para permitir mudança de colunas.
    """
    full_table_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{table_name}"
    
    try:
        print(f"\n--> Processando: Poupança (BCB API)")
        url = "https://api.bcb.gov.br/dados/serie/bcdata.sgs.196/dados?formato=json"
        
       
        response = requests.get(url)
        response.raise_for_status() 
        data = response.json()
        
        
        pdf = pd.DataFrame(data)
        
      
        df_spark = spark.createDataFrame(pdf)
        
        
        df_final = df_spark.withColumn("ingestion_date", current_timestamp()) \
                           .withColumn("source", lit("BCB_SGS_196")) \
                           .withColumn("description", lit("Rendimento mensal poupanca (%)"))

       
        df_final.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(full_table_name)
        
        print(f"SUCESSO: Tabela salva em {full_table_name}")

    except Exception as e:
        print(f"ERRO CRÍTICO na Poupança: {str(e)}")



ingest_yfinance_data("BTC-USD", "bronze_bitcoin_usd")


ingest_yfinance_data("GC=F", "bronze_gold_usd")


ingest_yfinance_data("BRL=X", "bronze_usd_brl_rate")


ingest_bcb_poupanca("bronze_poupanca_taxas")