In [0]:
import requests
import pandas as pd
from datetime import datetime

In [0]:
def extrair_dados_bitcoin():
    """Extrai o JSON da API da Coinbase"""
    url = "https://api.coinbase.com/v2/prices/spot"
    response = requests.get(url)
    return response.json()

In [0]:
def extrair_cotacao_usd_brl():
    "Extrai a cotação USD-BRL da API da CurrencyFreaks"
    api_key = dbutils.widgets.get("api_key")
    url = f'https://api.currencyfreaks.com/v2.0/rates/latest?apikey={api_key}'
    response = requests.get(url)
    return response.json()

In [0]:
def tratar_dados_bitcoin(dados_json, taxa_usd_brl):
    valor_usd = float(dados_json['data']['amount'])
    criptomoeda = dados_json['data']['base']
    moeda_original = dados_json['data']['currency']

    valor_brl = valor_usd * taxa_usd_brl

    timestamp = datetime.now()

    dados_tratados = [{
        "valor_usd": valor_usd,
        "valor_brl": valor_brl,
        "criptomoeda": criptomoeda,
        "moeda_original": moeda_original,
        "taxa_conversao_usd_brl": taxa_usd_brl,
        "timestamp": timestamp
    }]

    return dados_tratados

In [0]:
dados_bitcoin = extrair_dados_bitcoin()
dados_cotacao = extrair_cotacao_usd_brl()

taxa_usd_brl = float(dados_cotacao['rates']['BRL'])

In [0]:
dados_bitcoin_tratado = tratar_dados_bitcoin(dados_bitcoin, taxa_usd_brl)

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pipeline_api_bitcoin
COMMENT 'Catálogo da pipeline do projeto api bitcoin.'

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.lakehouse
COMMENT 'Schema lakehouse para salvar dados processados.'

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS pipeline_api_bitcoin.lakehouse.raw_files
COMMENT 'Volume para arquivos brutos de ingestão inicial.'

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.bitcoin_data
COMMENT 'Schema para salvar dados processados de Bitcoin.'

In [0]:
df = pd.DataFrame(dados_bitcoin_tratado)

In [0]:
events_ts = dados_bitcoin_tratado[0]['timestamp']
ts = events_ts.strftime("%Y-%m-%d_%H-%M-%S")

json_file = f"/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.json"

df.to_json(json_file, orient='records', date_format='iso', indent=2)

In [0]:
parquet_file = f"/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.parquet"
df.to_parquet(parquet_file, index=False)

In [0]:
csv_file = f"/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.csv"
df.to_csv(csv_file, index=False)

In [0]:
df_spark = spark.createDataFrame(df)

In [0]:
df_spark.write.format('delta').mode('append').saveAsTable('pipeline_api_bitcoin.bitcoin_data.bitcoin_data')