In [1]:
%load_ext autoreload
%autoreload 2

import ccxt
import time

In [2]:
import mlflow
mlflow.set_tracking_uri('http://mlflow:8890')
mlflow.set_experiment('crypto_portfolio_bot')

<Experiment: artifact_location='ftp://wannabebotter:wannabebotter@ftp_server/artifacts/1', experiment_id='1', lifecycle_stage='active', name='crypto_portfolio_bot', tags={}>

In [3]:
import os
from timescaledb_util import *

# TimescaleDB用のユーティリティライブラリの設定
pg_config = {
    'user': os.environ['POSTGRES_USER'],
    'password': os.environ['POSTGRES_PASSWORD'],
    'host': os.environ['POSTGRES_HOST'],
    'port': os.environ['POSTGRES_PORT'],
    'database': os.environ['POSTGRES_DATABASE_OHLCV_TRADES']
}

# TimescaleDBアクセス用のユーティリティオブジェクトを作成
dbutil = TimeScaleDBUtil(user = pg_config['user'], password = pg_config['password'], host = pg_config['host'], port = pg_config['port'], database = pg_config['database'])

In [4]:
exchange = ccxt.binanceusdm()
result = exchange.load_markets()

In [5]:
target_symbols = []
for symbol in exchange.symbols:
    if '/USDT' in symbol:
        target_symbols.append(symbol)
target_symbols

['1000BTTC/USDT',
 '1000SHIB/USDT',
 '1000XEC/USDT',
 '1INCH/USDT',
 'AAVE/USDT',
 'ADA/USDT',
 'AKRO/USDT',
 'ALGO/USDT',
 'ALICE/USDT',
 'ALPHA/USDT',
 'ANKR/USDT',
 'ANT/USDT',
 'AR/USDT',
 'ARPA/USDT',
 'ATA/USDT',
 'ATOM/USDT',
 'AUDIO/USDT',
 'AVAX/USDT',
 'AXS/USDT',
 'BAKE/USDT',
 'BAL/USDT',
 'BAND/USDT',
 'BAT/USDT',
 'BCH/USDT',
 'BEL/USDT',
 'BLZ/USDT',
 'BNB/USDT',
 'BTC/USDT',
 'BTCDOM/USDT',
 'BTS/USDT',
 'C98/USDT',
 'CELO/USDT',
 'CELR/USDT',
 'CHR/USDT',
 'CHZ/USDT',
 'COMP/USDT',
 'COTI/USDT',
 'CRV/USDT',
 'CTK/USDT',
 'CTSI/USDT',
 'CVC/USDT',
 'DASH/USDT',
 'DEFI/USDT',
 'DENT/USDT',
 'DGB/USDT',
 'DODO/USDT',
 'DOGE/USDT',
 'DOT/USDT',
 'DUSK/USDT',
 'DYDX/USDT',
 'EGLD/USDT',
 'ENJ/USDT',
 'ENS/USDT',
 'EOS/USDT',
 'ETC/USDT',
 'ETH/USDT',
 'FIL/USDT',
 'FLM/USDT',
 'FLOW/USDT',
 'FTM/USDT',
 'GALA/USDT',
 'GRT/USDT',
 'GTC/USDT',
 'HBAR/USDT',
 'HNT/USDT',
 'HOT/USDT',
 'ICP/USDT',
 'ICX/USDT',
 'IMX/USDT',
 'IOST/USDT',
 'IOTA/USDT',
 'IOTX/USDT',
 'KAVA/USDT'

In [8]:
from decimal import Decimal
from datetime import timezone, datetime, timedelta
import numpy as np
from tqdm import tqdm

for symbol in target_symbols:
    since = int(time.mktime(datetime.strptime('2019-01-01 00:00:00+0000', '%Y-%m-%d %H:%M:%S%z').timetuple()) * 1000)

    dbutil.init_timebar_table('binanceusdm', symbol, 5*60)
    table_name = dbutil.get_timebar_table_name('binanceusdm', symbol, 5*60)
    
    df = dbutil.get_latest_timebar('binanceusdm', symbol, 5*60)
    if type(df) != type(None):
        latest_datetime = df['datetime']
        latest_timestamp = int(latest_datetime.timestamp() * 1000) + 1

        if latest_timestamp > since:
            since = latest_timestamp

    till = int(datetime.now(tz = timezone.utc).timestamp() * 1000)
    origin = since

    with tqdm(total = int(till - origin), initial=0) as _pbar:
        _pbar.set_postfix_str(f'binanceusdm, {symbol}, row_counts: 0')
        while since <= till:
            time.sleep(exchange.rateLimit / 1000)

            result = exchange.fetch_ohlcv(symbol, '5m', since = int(since), limit = 1000) # 5min

            df = pd.DataFrame.from_dict(result, dtype=str)
            if len(df) == 0:
                break
            
            _to_decimal = lambda x: Decimal(x)
            
            df.columns = ['datetime', 'open', 'high', 'low', 'close', 'dollar_volume']
            df['datetime'] = df['datetime'].apply(lambda x: datetime.fromtimestamp(float(x) / 1000, tz = timezone.utc))
            df['datetime_from'] = df['datetime'] - timedelta(seconds=300) - timedelta(microseconds=1)
            df['open'] = df['open'].apply(_to_decimal)
            df['high'] = df['high'].apply(_to_decimal)
            df['low'] = df['low'].apply(_to_decimal)
            df['close'] = df['close'].apply(_to_decimal)
            df['dollar_volume'] = df['dollar_volume'].apply(_to_decimal)
            df['dollar_cumsum'] = df['dollar_volume'].cumsum()
            
            nan_columns = ['id', 'id_from', 'volume', 'dollar_buy_volume', 'dollar_sell_volume', 'dollar_liquidation_volume', 'dollar_liquidation_buy_volume', 'dollar_liquidation_sell_volume', 'dollar_buy_cumsum', 'dollar_sell_cumsum']
            for column in nan_columns:
                df[column] = Decimal(0)

            ret = dbutil.df_to_sql(df=df, schema = table_name, if_exists = 'append')

            since = int(df.iloc[-1]['datetime'].timestamp()*1000) + 1
            _pbar.n = since - origin
            _pbar.set_postfix_str(f'binanceusdm, {symbol}, row_counts: {len(df)}')
            _pbar.refresh()

  0%|          | 0/201217 [00:00<?, ?it/s, binanceusdm, 1000BTTC/USDT, row_counts: 0]
  0%|          | 0/201345 [00:00<?, ?it/s, binanceusdm, 1000SHIB/USDT, row_counts: 0]
  0%|          | 0/201586 [00:00<?, ?it/s, binanceusdm, 1000XEC/USDT, row_counts: 0]
  0%|          | 0/201874 [00:00<?, ?it/s, binanceusdm, 1INCH/USDT, row_counts: 0]
  0%|          | 0/202127 [00:00<?, ?it/s, binanceusdm, AAVE/USDT, row_counts: 0]
  0%|          | 0/202391 [00:00<?, ?it/s, binanceusdm, ADA/USDT, row_counts: 0]
  0%|          | 0/202623 [00:00<?, ?it/s, binanceusdm, AKRO/USDT, row_counts: 0]
  0%|          | 0/202881 [00:00<?, ?it/s, binanceusdm, ALGO/USDT, row_counts: 0]
  0%|          | 0/203116 [00:00<?, ?it/s, binanceusdm, ALICE/USDT, row_counts: 0]
  0%|          | 0/203396 [00:00<?, ?it/s, binanceusdm, ALPHA/USDT, row_counts: 0]
  0%|          | 0/203641 [00:00<?, ?it/s, binanceusdm, ANKR/USDT, row_counts: 0]
  0%|          | 0/203831 [00:00<?, ?it/s, binanceusdm, ANT/USDT, row_counts: 0]
  0%

In [59]:
result = []
row_counts = 0
with tqdm(total = len(target_symbols), initial=0) as _pbar:
    for symbol in target_symbols:
        table_name = dbutil.get_timebar_table_name('binanceusdm', symbol, 5*60)
        df = dbutil.read_sql_query(f'SELECT open, close, dollar_volume from "{table_name}"')
        recent_dollar_volume = (df.iloc[-50000:]['dollar_volume'] * (df.iloc[-50000:]['close'] + df.iloc[-50000:]['open']) / 2).sum()
        result.append({'symbol': symbol, 'rows': len(df), 'recent_dollar_volume': recent_dollar_volume})
        
        row_counts = row_counts + len(df)
        _pbar.n = _pbar.n + 1
        _pbar.set_postfix_str(f'{symbol}, row_counts: {row_counts:,}, recent_dollar_volume: {recent_dollar_volume:,}')
        _pbar.refresh()

df = pd.DataFrame(result)

100%|██████████| 137/137 [02:15<00:00,  1.01it/s, ZRX/USDT, row_counts: 17,728,174, recent_dollar_volume: 5,996,655,121.430565]   


In [64]:
pd.set_option("display.max_rows", 150)
pd.set_option("display.min_rows", 150)
df = df.sort_values('recent_dollar_volume', ascending=False)
df['listed_days'] = df['rows']*5/60/24
df = df.reset_index(drop=True)
df[df['listed_days'] > 250]

Unnamed: 0,symbol,rows,recent_dollar_volume,listed_days
0,BTC/USDT,258047,2597094000000.0,895.996528
1,ETH/USDT,235129,1150720000000.0,816.420139
2,SOL/USDT,151042,344041700000.0,524.451389
3,1000SHIB/USDT,82371,305924800000.0,286.010417
4,SAND/USDT,112738,188231500000.0,391.451389
5,FTM/USDT,148162,181578400000.0,514.451389
6,LUNA/USDT,111874,167810200000.0,388.451389
7,XRP/USDT,223602,166067200000.0,776.395833
8,ADA/USDT,216406,159791800000.0,751.409722
9,AVAX/USDT,148450,145471500000.0,515.451389
