# Bitcoin Price Monitoring - Ingestão, Transformação e Exportação

## Configurando Unity Catalog

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pipeline_api_bitcoin
COMMENT 'Catálogo de demonstração criado para o workshop de pipeline_api_bitcoin';

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.lakehouse
COMMENT 'Schema Lakehouse para salvar dados processados';

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS pipeline_api_bitcoin.lakehouse.raw_files
COMMENT 'Volume para arquivos de ingestão inicial'

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.bitcoin_data
COMMENT 'Schema para armazenar dados de Bitcoin processados';

## Importando Bibliotecas

In [0]:
import requests
import pandas as pd
from datetime import datetime

## Extração

In [0]:
def extrair_dados_bitcoin():
    """"Extrai o JSON da API da Coinbase"""
    url = r"https://api.coinbase.com/v2/prices/spot"
    response = requests.get(url)
    return response.json()

def extrair_cotacao_usd():
    """"Extrai o JSON da API da CurrencyFreaks"""
    api_key = 'xxxxxxxxxx'
    url = f"https://api.currencyfreaks.com/v2.0/rates/latest?apikey={api_key}"
    response = requests.get(url)
    return response.json()


In [0]:
dados_btc = extrair_dados_bitcoin()
print(dados_btc)
cotacoes = extrair_cotacao_usd()
print(cotacoes)

{'data': {'amount': '88008.3', 'base': 'BTC', 'currency': 'USD'}}
{'date': '2025-12-20 00:00:00+00', 'base': 'USD', 'rates': {'AGLD': '4.012036108324975', 'FJD': '2.27972', 'LVL': '0.600192', 'SCR': '14.9248', 'BBD': '2.0', 'HNL': '26.3693', 'UGX': '3569.84', 'KUJI': '15.066869726525479', 'COSMOSDYDX': '5.868544600938967', 'NEAR': '0.6504065040650407', 'AIOZ': '9.775171065493646', 'AUDIO': '33.61344537815126', 'WLD': '1.9151584793641674', 'HNT': '0.6038647342995169', 'ETHFI': '1.3360053440213762', 'FARM': '0.0568343279340722', 'A8': '21.598272138228943', 'SDG': '600.071', 'DGB': '175.08384738054207', 'AB': '200.7011513788445', 'BCH': '0.0016019608000192', 'AI': '27.51174467943515', 'FKP': '0.747496', 'JST': '25.258703580118734', 'HOT': '2079.634869126455', 'AO': '0.19132971433847024', 'AR': '0.2903044789772597', 'B3': '1159.4202898550725', 'CHILLGUY': '59.62189272671328', 'SEI': '8.901548869503294', 'QAI': '0.011860242242022763', 'SEK': '9.26766', 'LION': '148.00286776531567', 'BB': '1

## Transformação

In [0]:
def tratar_dados_bitcoin (dados_bitcoin, cotacoes_moedas, moeda_parametro):
    """Transforma dados brutos da API, renomeia colunas, adiciona timestamp e converte para BRL."""
    valor_btc_usd = float(dados_bitcoin['data']['amount'])
    cripto = dados_bitcoin['data']['base']
    moeda_api_coinbase = dados_bitcoin['data']['currency']
    valor_usd_brl = float(cotacoes_moedas['rates'][moeda_parametro])

    valor_btc_brl = round(float(valor_btc_usd * valor_usd_brl), 3)

    timestamp = datetime.now()

    dados_tratados = [{
        "valor_btc_usd": valor_btc_usd,
        "valor_btc_brl": valor_btc_brl,
        "valor_brl": valor_usd_brl,
        "cripto": cripto,
        "moeda_original": moeda_api_coinbase,
        "taxa_conversao": cotacoes_moedas['rates'][moeda_parametro],
        "timestamp": timestamp
    }]
    return dados_tratados

In [0]:
dados_tratados = tratar_dados_bitcoin(dados_btc, cotacoes, 'BRL')

## Exportação

### Criando um DataFrame pandas

In [0]:
import pandas as pd

df_pandas = pd.DataFrame(dados_tratados)
display(df_pandas)

valor_btc_usd,valor_btc_brl,valor_brl,cripto,moeda_original,taxa_conversao,timestamp
88008.3,487830.007,5.543,BTC,USD,5.543,2025-12-20T00:13:12.015Z


### Salvando JSON

In [0]:
from datetime import datetime

agora = datetime.now()

ano = agora.year
mes = agora.month
dia = agora.day
hora = agora.hour

json_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_brl_{ano}-{mes}-{dia}-{hora}h.json'
df_pandas.to_json(json_file, orient='records', date_format='iso', indent=2)
print('Arquivo JSON salvo: ', json_file)

Arquivo JSON salvo:  /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_brl_2025-12-20-0h.json


### Salvando como Delta Table

In [0]:
# Converte DF pandas para Pystark
df_spark = spark.createDataFrame(df_pandas)

df_spark.printSchema()

root
 |-- valor_btc_usd: double (nullable = true)
 |-- valor_btc_brl: double (nullable = true)
 |-- valor_brl: double (nullable = true)
 |-- cripto: string (nullable = true)
 |-- moeda_original: string (nullable = true)
 |-- taxa_conversao: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



#### Converte Delta Table para DataFrame

In [0]:
df_spark.write.format('delta').mode('append').saveAsTable(f'pipeline_api_bitcoin.bitcoin_data.bitcoin_data')