In [0]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

# Inicializar Spark
spark = SparkSession.builder.appName("Bitcoin Price Integration").getOrCreate()

# Função para obter o preço atual do Bitcoin
def fetch_bitcoin_price():
    """Obtém o preço atual do Bitcoin na Coinbase."""
    url = 'https://api.coinbase.com/v2/prices/spot?currency=USD'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()['data']
        return {
            "amount": float(data['amount']),
            "base": data['base'],
            "currency": data['currency']
        }
    else:
        raise Exception(f"Erro ao consultar API: {response.status_code}")

# Função para salvar os dados no Delta Lake
def save_to_delta(data):
    """Salva os dados em uma Delta Table."""
    # Estrutura do schema
    schema = StructType([
        StructField("amount", FloatType(), True),
        StructField("base", StringType(), True),
        StructField("currency", StringType(), True)
    ])

    # Converter para DataFrame Spark
    data_df = spark.createDataFrame([data], schema=schema)

    # Escrever em Delta Table
    delta_path = "/mnt/delta/bitcoin_price"
    data_df.write.format("delta").mode("append").save(delta_path)
    print(f"Dados salvos em Delta Table no caminho: {delta_path}")

# Pipeline principal
if __name__ == "__main__":
    print("Obtendo o preço do Bitcoin...")
    bitcoin_price = fetch_bitcoin_price()
    print(f"Preço do Bitcoin obtido: {bitcoin_price['amount']} {bitcoin_price['currency']}")
    
    print("Salvando os dados no Delta Lake...")
    save_to_delta(bitcoin_price)
    print("Pipeline concluído com sucesso.")
