### Mercado Bitcoin

Será utilizado como exemplo a API da plataforma mercado bitcoin

url: https://api.mercadobitcoin.net/api/v4/docs


In [68]:
import logging
import requests
from abc import ABC, abstractmethod
import datetime
from typing import List
import json


logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [69]:
class DataTypeNotSupportedException(Exception):
    def __init__(self, data) -> None:
        self.data = data
        self.message = f'Data type {type(data)} is not supported for ingestion'
        super().__init__(self.message)

In [70]:
class DateWriter():
    def __init__(self, filename: str) -> None:
        self.filename = filename

    def _write_row(self, row: str) -> None:
        with open(self.filename, 'a') as f:
            f.write(row)

    def write(self, data):
        if isinstance(data, dict):
            self._write_row(json.dumps(data) + '\n')
        elif isinstance(data, List):
            for element in data:
                self.write(element)
        else:
            raise DataTypeNotSupportedException(data)

In [71]:
class MercadoBitcoinApi(ABC):
    def __init__(self, coin: str):
        self.coin = coin
        self.base = 'https://api.mercadobitcoin.net/api/v4'

    @abstractmethod
    def _get_endpoint(self) -> str:
        pass

    def get_data(self, **kwargs) -> dict:
        endpoint = self._get_endpoint(**kwargs)
        logger.info(f'Getting data from endpoint: {endpoint}')
        response = requests.get(endpoint)
        return response.json()

class TickersApi(MercadoBitcoinApi):
    type = 'tickers'

    def _get_endpoint(self) -> str:
        return f'{self.base}/{self.type}/?symbols={self.coin}'


class TradesApi(MercadoBitcoinApi):
    type = 'trades'

    def _get_unix_epoch(self, date:datetime) -> int:
        return int(date.timestamp())

    def _get_endpoint(self, date_from: datetime.datetime = None, date_to: datetime.datetime = None) -> str:

        if date_from and not date_to:
            unix_date_from = self._get_unix_epoch(date_from)
            endpoint = f'{self.base}/{self.coin}/{self.type}/?from={unix_date_from}'
        elif date_from and date_to:
            unix_date_from = self._get_unix_epoch(date_from)
            unix_date_to = self._get_unix_epoch(date_to)
            endpoint = f'{self.base}/{self.coin}/{self.type}/?from={unix_date_from},to={unix_date_to}'
        else:
            endpoint = f'{self.base}/{self.coin}/{self.type}'

        return endpoint

In [74]:
tickers = TickersApi('BTC-BRL').get_data()
trades = TradesApi('BTC-BRL').get_data(date_from = datetime.datetime(2024, 1, 26))

writer_tickers = DateWriter('tickers.json')
writer_tickers.write(tickers)

writer_trades = DateWriter('trades.json')
writer_trades.write(trades)


INFO:__main__:Getting data from endpoint: https://api.mercadobitcoin.net/api/v4/tickers/?symbols=BTC-BRL
INFO:__main__:Getting data from endpoint: https://api.mercadobitcoin.net/api/v4/BTC-BRL/trades/?from=1706238000


DataTypeNotSupportedException: Data type <class 'int'> is not supported for ingestion