## Data Acquisition

In [112]:
import time
from collections import deque
from os import listdir
from os.path import isfile, join
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

import requests
from loguru import logger

import pandas as pd

In [113]:
api_v4_trades = "https://api.mercadobitcoin.net/api/v4/{symbol}/trades"

In [114]:
import json

from loguru import logger


def serialize(record):
    subset = {
        "timestamp": int(record["time"].timestamp()),
        "file.name": record["file"].path,
        "func": record["function"],
        "line.number": record["line"],
        "log.level": record["level"].name,
        "message": record["message"],
        "exception": record["exception"],
        **record["extra"],
    }
    return json.dumps(subset, default=str)


def sink(message):
    serialized = serialize(message.record)
    print(serialized)


def formatter(record):
    # Note this function returns the string to be formatted, not the actual message to be logged
    # return "{time:YYYY-MM-DD HH:mm:ss} [application_name] [{extra[correlationId]}] [{level}] - {name}:{function}:{line} - {message}\n"
    return (
        "{time} | {level: <8} | {name: ^15} | {function: ^15} | {line: >3} | {message}"
    )


logger.remove()
logger.add(sink, format=formatter)

11

In [115]:
def get_last_tid_from_file(symbol: str) -> int:
    base = symbol.split('-')[0].lower()
    datasets = [
        f for f in listdir('../datasets') if isfile(join('../datasets', f))
        and f.startswith(base) and f.endswith('.csv')
    ]

    if datasets:
        last_tid_on_file = min([int(tid.split('_')[-1].replace('.csv', '')) for tid in datasets])
        extra = {"symbol": symbol, "last_tid_on_file": last_tid_on_file}
        logger.bind(**extra).info("Recovered tid from file")

        return last_tid_on_file
    
    return 0

In [116]:
def fetch_initial_trades(symbol: str) -> int:
    """
    Fetch most recent trades and immediately after next batch of trades.
    On Mercabo Bitcoin v4, we must fetch trade data by batches of 1000 trades. To achieve that,
    we first recover the most recently trade id (tid) and subtract 1000 to get the next batch.
    
    :param str symbol: Instrument symbol in the form BASE-QUOTE(e.g. BTC-BRL)
    :return: most recently tid and next tid of trades
    :rtype: int
    """
    last_tid_from_file = get_last_tid_from_file(symbol)

    if last_tid_from_file:
        next_tid = last_tid_from_file - 1000
        extra = {"symbol": symbol, "last_tid_from_file": last_tid_from_file, "next_tid": next_tid}
        logger.bind(**extra).info("Retriving last tid from saved dataset")
    
        return last_tid_from_file, next_tid, {}
    
    resp_init_trades = requests.get(url=api_v4_trades.format(symbol=symbol))

    if resp_init_trades.status_code != 200:
        extra = {"error": resp_init_trades.text, "symbol": symbol}
        logger.bind(**extra).error("Error to fetch initial trade data")

    initial_tid = resp_init_trades.json()[0]["tid"]
    next_tid = initial_tid - 1000
    
    extra = {"initial_tid": initial_tid, "next_tid": next_tid, "symbol": symbol}
    logger.bind(**extra).info("Info on the initial tid and tid of next trades")
    
    return initial_tid, next_tid, resp_init_trades.json()

In [117]:
def fetch_trades(payload: dict, symbol: str) -> list:
    """
    Fetch trades with since param. We begin by fetching the most recently tid,
    from there we need to go back by batches of 1000 tids.
    
    :param dict payload: payload with the since info
    :param str symbol: Instrument symbol in the form BASE-QUOTE(e.g. BTC-BRL)
    :return: list of trades in dec order, from older to most recently
    :rtype: list
    """
    
    response_trades = requests.get(url=api_v4_trades.format(symbol=symbol), params=payload)

    if response_trades.status_code != 200:
        extra = {"error": response_trades.text, "symbol": symbol, "request": payload}
        logger.bind(**extra).error("Error to fetch batch trade data")
    
    # extra = {"next": response_trades.json()[0]['tid']}
    # logger.success("Next tid of the next batch of trades")

    return response_trades.json()

In [118]:
def save_dataset(trades: list, filename: str = ""):
    columns = ["tid", "date", "type", "price", "amount"]

    btc_trades_df = pd.DataFrame(trades, columns=[c for c in columns])
    btc_trades_df.set_index('tid', inplace=True)

    btc_trades_df['price'] = pd.to_numeric(btc_trades_df['price'])
    btc_trades_df['amount'] = pd.to_numeric(btc_trades_df['amount'])
    
    btc_trades_df.drop_duplicates()
    btc_trades_df.info()
    btc_trades_df.head()
    
    btc_trades_df.to_csv(f"../datasets/{filename}.csv", sep='\t', index=False)
    btc_trades_df.to_parquet(f"../datasets/{filename}.parquet", engine="fastparquet")

In [119]:
def is_different_year(one_dt: datetime, another_dt: datetime) -> bool:   
    return True if one_dt.year != another_dt.year else False

In [120]:
def fetch_all_by_year(symbol: str, by_year: bool = False):
    raise Exception("not supported")
    initial_tid, next_tid, initial_trades = fetch_initial_trades(symbol=symbol)
    trades.extend(initial_trades.json())

    while next_trades > 0:
        initial_dt = datetime.fromtimestamp(trades[0]['date'])

        with ThreadPoolExecutor() as executor:
            # time.sleep(1)
            future = executor.submit(fetch_trades, payload={"since": next_trades}, symbol=symbol)
            next_batch_trades = future.result()
            trades.extendleft(reversed(next_batch_trades))
            next_trades -= 1000
            
            # log only when we get to a new year
            date_now = datetime.fromtimestamp(trades[-1]['date'])
            date_next_batch = datetime.fromtimestamp(trades[len(trades)-1000]['date'])
            if is_different_year(date_now, date_next_batch):
                logger.info(f"Fetching from year {date_next_batch.year}")
            logger.info(f"Fetching from year {date_now.year}")
            
            # create trade symbol dataset by year
            if by_year:
                for index, trade in enumerate(next_batch_trades):
                    candidate_dt = datetime.fromtimestamp(trade['date'])
                    if is_different_year(initial_ts, candidate_ts):
                        yearly_trade = trades[:index]
                        f_name = symbol.lower().replace('-', '_') + "_" + str(candidate_dt.year)
                        save_dataset(yearly_trade, symbol, f_name)
                initial_dt = datetime.fromtimestamp(trades[0]['date'])


    logger.info("Done!")

In [121]:
def fetch_all(symbol: str):
    trades = deque()
    initial_tid, next_tid, initial_trades = fetch_initial_trades(symbol=symbol)
    trades.extend(initial_trades)
    
    try:
        while next_tid > 0:
            l_pointer = 0
            with ThreadPoolExecutor() as executor:
                date_init = datetime.fromtimestamp(trades[l_pointer]['date'] if trades else time.time())
                time.sleep(2)
                future = executor.submit(fetch_trades, payload={"since": next_tid}, symbol=symbol)
                next_batch_trades = future.result()
                trades.extendleft(reversed(next_batch_trades))
                next_tid -= 1000

                # log only when we get to a new year
                date_last = datetime.fromtimestamp(next_batch_trades[0]['date'] if next_batch_trades else time.time())

                if is_different_year(date_init, date_last):
                    logger.info(f"Fetching from year {max(date_init, date_last).year}")

                l_pointer += 1000


    except Exception as err:
        logger.bind(error=err).error("Error")
        return initial_tid, next_tid, trades

    logger.info("Done!")

    return initial_tid, next_tid, trades

In [123]:
top_10_symbol = [
    'BTC-BRL', 'ETH-BRL', 'USDT-BRL', 'SOL-BRL', 'XRP-BRL',
    'USDC-BRL', 'DOGE-BRL', 'ADA-BRL', 'AVAX-BRL', 'SHIB-BRL'
]
for symbol in top_10_symbol:
    begin = datetime.now()
    initial_tid, next_tid, trades = fetch_all(symbol)
    f_name = f"{symbol.lower().replace('-', '_')}_trades_{initial_tid}_{next_tid}"
    save_dataset(trades, f_name)
    end = datetime.now()

    logger.info(f"Took {end - begin} time to process {symbol}")


{"timestamp": 1711360140, "file.name": "/tmp/ipykernel_271436/2581772991.py", "func": "get_last_tid_from_file", "line.number": 11, "log.level": "INFO", "message": "Recovered tid from file", "exception": null, "symbol": "BTC-BRL", "last_tid_on_file": 2278752}
{"timestamp": 1711360140, "file.name": "/tmp/ipykernel_271436/2090901018.py", "func": "fetch_initial_trades", "line.number": 16, "log.level": "INFO", "message": "Retriving last tid from saved dataset", "exception": null, "symbol": "BTC-BRL", "last_tid_from_file": 2278752, "next_tid": 2277752}
{"timestamp": 1711360143, "file.name": "/tmp/ipykernel_271436/203277443.py", "func": "fetch_all", "line.number": 21, "log.level": "INFO", "message": "Fetching from year 2024", "exception": null}
{"timestamp": 1711361224, "file.name": "/tmp/ipykernel_271436/203277443.py", "func": "fetch_all", "line.number": 21, "log.level": "INFO", "message": "Fetching from year 2018", "exception": null}
{"timestamp": 1711364521, "file.name": "/tmp/ipykernel_27