# Imports

In [1]:
import asyncio
import asyncpg
import aiohttp
import backoff
import requests
import ccxt.async_support as ccxt
import time
from datetime import datetime, timezone
import polars as pl
import pandas as pd

from dotenv import find_dotenv, dotenv_values

# Config / Params

## Credentials

In [2]:
config = dotenv_values(find_dotenv())
user = config.get('USERNAME_PG')
password = config.get('PASSWORD_PG')

## Database Connection

In [3]:
uri = f"postgresql://{user}:{password}@localhost:5432/crypto_data"
db_params = {
    'database': 'crypto_data',
    'user': user,
    'password': password,
    'host': 'localhost'
}

In [4]:
aiohttp_errors = (
    aiohttp.ClientResponseError,
    aiohttp.ServerDisconnectedError,
    aiohttp.ContentTypeError,
    asyncio.TimeoutError,
    aiohttp.client_exceptions.ClientOSError,
)

# Helper Functions

## Backoff

In [5]:
def backoff_hdlr(details):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(
        "{} - "
        "Encountered exception {exception} "
        "calling function {target}; "
        "Backing off {wait:0.1f} seconds after {tries} tries "
        "".format(now, **details)
    )

# Get Data

## Get All Symbols

In [6]:
ad_api_key = config.get('AMBERDATA_API_KEY')
headers = {
    "accept": "application/json",
    "x-api-key": ad_api_key
}

url = "https://api.amberdata.com/market/futures/exchanges/reference"
params = {
    'exchange':'binance',
    'includeInactive':'true',
}

list_instruments = []
while url is not None:
    # print(url)
    response = requests.get(url, headers=headers, params=params)
    payload = response.json().get('payload', {})
    metadata = payload.get('metadata', {})
    data = payload.get('data', {})
    for exchange in data.keys():
        list_exchange = []
        data_exchange = data[exchange]
        for instrument in data_exchange.keys():
            data_instrument = data_exchange[instrument]
            dict_instrument = {'instrument':instrument, **data_instrument}
            list_exchange.append(dict_instrument)
        chunk_exchange = (
            pd.DataFrame(list_exchange)
            .assign(exchange = exchange)
        )
        list_instruments.append(chunk_exchange)
    url = metadata.get('next', None)

df_instruments = pd.concat(list_instruments, axis=0)
perps = df_instruments.query('contractType == "perpetual"')
dict_instruments = {exchange:list(perps.query('exchange == @exchange')['instrument']) for exchange in ['binance','bybit']}
list_symbols = dict_instruments.get('binance')

In [7]:
list_symbols = dict_instruments.get('binance')

## Get Existing Symbol Info

In [8]:
query_symbol_info_mv = """
    SELECT
        symbol,
        last_datetime
    FROM perps_last_datetime
    WHERE exchange = 'binance'
"""

In [9]:
df_symbol_info = (
    pl.read_database_uri(query=query_symbol_info_mv, uri=uri)
)

In [10]:
df_symbol_info.sort('last_datetime', descending=True).head()

symbol,last_datetime
str,datetime[ns]
"""SFPUSDT""",2024-06-06 13:59:00
"""SKLUSDT""",2024-06-06 13:59:00
"""SCUSDT""",2024-06-06 13:37:00
"""SANDUSDT""",2024-06-06 13:15:00
"""SAGAUSDT""",2024-06-06 12:53:00


## Get Data

### Version 1

In [11]:
# async def fetch_segment(exchange, symbol, start_time, timeframe='1m', limit=1000):
#     # segment = await exchange.fetch_ohlcv(symbol, '1m', since=start_time, limit=limit)
#     print(f'{datetime.utcnow()} - Requesting Segment')
#     segment = await exchange.fapipublic_get_klines({'symbol':symbol, 'startTime':start_time, 'limit':limit, 'interval':timeframe})
#     return segment

# async def fetch_all_klines(symbol, start_date_str):
#     # exchange = ccxt.binance({
#     #     # 'rateLimit': 1200,
#     #     'enableRateLimit': True,
#     # })
#     # ref_time = time.time()
#     start_time = exchange.parse8601(start_date_str + "T00:00:00Z")
#     end_time = int(datetime.now().timestamp() * 1000)  # Current time in milliseconds
#     limit = 1500
#     all_klines = []
    
#     # Calculate the number of segments needed based on the time range and limit
#     time_range = end_time - start_time
#     one_minute = 60000  # milliseconds
#     segments = time_range // (limit * one_minute)
    
#     tasks = []
#     for i in range(segments + 1):
#         segment_start = start_time + i * limit * one_minute
#         async with sem:
#             task = asyncio.create_task(fetch_segment(exchange, symbol, segment_start, limit=limit))
#             tasks.append(task)
    
#     # Wait for all tasks to complete and collect results
#     results = await asyncio.gather(*tasks)
#     for result in results:
#         all_klines.extend(result)
    
#     await exchange.close()
#     return all_klines

# async def main1():
#     symbol = 'BTCUSDT'
#     start_date = '2024-05-15'
#     klines = await fetch_all_klines(symbol, start_date)
#     print(f"Fetched {len(klines)} klines for {symbol} since {start_date}")
#     return klines 



### Version 2

In [12]:
async def insert_ohlc_data(conn, table_name, exchange, symbol, ohlc_data):
    """
    Asynchronously inserts OHLC data into a dynamically specified PostgreSQL table.

    Parameters:
    - db_params: A dictionary containing database connection parameters.
    - table_name: The name of the table to insert data into.
    - exchange: The name of the exchange (e.g., 'Binance').
    - symbol: The trading pair or symbol (e.g., 'BTC/USDT').
    - ohlc_data: The OHLC data from the Binance API.
    """

    # SQL query to insert data, using safe string formatting for the table name
    # Note: PostgreSQL does not support 'INSERT IGNORE', so you might need to use 'ON CONFLICT DO NOTHING' or similar based on your table's unique constraints
    insert_query = f"""
    INSERT INTO {table_name} (exchange, symbol, datetime, open, high, low, close, base_volume, quote_volume, base_buy_volume, quote_buy_volume)
    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    ON CONFLICT DO NOTHING
    """

    for data in ohlc_data:
        # Convert timestamp to datetime
        datetime_val = datetime.utcfromtimestamp(int(data[0]) / 1000.0)

        # Prepare data tuple
        data_tuple = (
            exchange,
            symbol,
            datetime_val,
            data[1],  # open
            data[2],  # high
            data[3],  # low
            data[4],  # close
            data[5],  # base_volume
            data[7],  # quote_volume (index 6 is close time, so we use index 7)
            data[9],  # base_buy_volume
            data[10]  # quote_buy_volume
        )

        # Execute the insert query
        await conn.execute(insert_query, *data_tuple)

async def fetch_segment(client, symbol, start_time, timeframe='1m', limit=1000):
    # segment = await exchange.fetch_ohlcv(symbol, '1m', since=start_time, limit=limit)
    # print(f'{datetime.utcnow()} - Requesting segment for {symbol} starting {datetime.utcfromtimestamp(start_time/1000).strftime("%Y-%m-%d %H:%M")}')
    segment = await client.fapipublic_get_klines({'symbol':symbol, 'startTime':start_time, 'limit':limit, 'interval':timeframe})
    return segment

async def fetch_data_for_symbol(client, symbol, last_datetime, semaphore):
    # Assuming last_datetime is already in UTC
    start_time = int(last_datetime.replace(tzinfo=timezone.utc).timestamp() * 1000)
    
    # Current time in UTC
    end_time = int(datetime.now(timezone.utc).timestamp() * 1000)
    # print(last_datetime, start_time, end_time)
    limit = 1500
    all_klines = []
    
    # Calculate the number of segments needed based on the time range and limit
    time_range = end_time - start_time
    one_minute = 60000  # milliseconds
    segments = time_range // (limit * one_minute)
    
    tasks = []
    for i in range(segments + 1):
        segment_start = start_time + i * limit * one_minute
        async with semaphore:
            task = asyncio.create_task(fetch_segment(client=client, symbol=symbol, start_time=segment_start, limit=limit))
            tasks.append(task)
    
    # Wait for all tasks to complete and collect results
    results = await asyncio.gather(*tasks)
    for result in results:
        all_klines.extend(result)
    
    return all_klines



async def process_symbol(pool, table_name, client, exchange_name, symbol, last_datetime, semaphore):
    """
    Fetches OHLC data for a given symbol and inserts it into the database.
    """
    # Fetch the data for the symbol
    async with pool.acquire() as conn:
        try:
            ohlc_data = await fetch_data_for_symbol(client, symbol, last_datetime, semaphore)
            
            # Insert the fetched data into the database
            print(f'Inserting {len(ohlc_data)} rows for {symbol}')
            await insert_ohlc_data(conn, table_name, exchange_name, symbol, ohlc_data)
        except ccxt.BadSymbol as e:
            print(f"An error occurred: {e}. Skipping symbol {symbol}.")

async def fetch_and_store_all_symbols(client, pool, table_name, list_symbols, dict_existing_symbols, semaphore_num=100):
    
    semaphore = asyncio.Semaphore(semaphore_num)
    
    tasks = []
    # for symbol, last_datetime in dict_existing_symbols.items():
    for symbol in list_symbols:
        print(f'Getting Data For {symbol}')
        last_datetime = dict_existing_symbols.get(symbol, datetime(2017, 1, 1))
        task = asyncio.create_task(process_symbol(pool, table_name, client, 'binance', symbol, last_datetime, semaphore))
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)

async def main(list_symbols, dict_existing_symbols):
    client = ccxt.binanceusdm({
        # 'rateLimit': 1200,  # Binance rate limit in milliseconds
        'enableRateLimit': True,
    })
    client.throttle.config['maxCapacity'] = 1e09
    table_name = 'ohlc_1m_swaps_test'

    pool = await asyncpg.create_pool(**db_params)
    
    await fetch_and_store_all_symbols(client, pool, table_name, list_symbols, dict_existing_symbols)

    await client.close()
    await pool.close()
    # for symbol, klines in data.items():
    #     print(f"Fetched {len(klines)} klines for {symbol} since {symbols_last_datetime[symbol]}")

# if __name__ == "__main__":
#     asyncio.run(main())

In [13]:
dict_existing_symbols = {row['symbol']: row['last_datetime'] for row in df_symbol_info.to_dicts()}
ref_time = time.time()
await main(list_symbols[-85:], dict_existing_symbols)
print(f'Got results in {time.time() - ref_time:.2f}s')

Getting Data For SKLUSDT
Getting Data For SLPUSDT
Getting Data For SNTUSDT
Getting Data For SNXUSDT
Getting Data For SOLBUSD
Getting Data For SOLUSDC
Getting Data For SOLUSDT
Getting Data For SOLUSD_PERP
Getting Data For SPELLUSDT
Getting Data For SRMUSDT
Getting Data For SSVUSDT
Getting Data For STEEMUSDT
Getting Data For STGUSDT
Getting Data For STMXUSDT
Getting Data For STORJUSDT
Getting Data For STPTUSDT
Getting Data For STRAXUSDT
Getting Data For STRKUSDT
Getting Data For STXUSDT
Getting Data For SUIUSDC
Getting Data For SUIUSDT
Getting Data For SUPERUSDT
Getting Data For SUSHIUSDT
Getting Data For SXPUSDT
Getting Data For TAOUSDT
Getting Data For THETAUSDT
Getting Data For THETAUSD_PERP
Getting Data For TIAUSDC
Getting Data For TIAUSDT
Getting Data For TLMBUSD
Getting Data For TLMUSDT
Getting Data For TNSRUSDT
Getting Data For TOKENUSDT
Getting Data For TOMOUSDT
Getting Data For TONUSDT
Getting Data For TRBUSDT
Getting Data For TRUUSDT
Getting Data For TRXBUSD
Getting Data For TR

CancelledError: 

In [None]:
list_symbols[-85]

In [None]:
client = ccxt.binanceusdm({
    # 'rateLimit': 1200,  # Binance rate limit in milliseconds
    'enableRateLimit': True,
})

client.throttle.config['maxCapacity'] = 1e09

In [None]:
throttle = client.throttle

In [None]:
throttle.config

In [None]:
dict_last_datetime