In [3]:
import asyncio
from datetime import datetime, timedelta, timezone
import ccxt.async_support as ccxt
import ccxt.pro as ccxtpro
from loguru import logger
import redis.asyncio as redis
import pandas as pd

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

In [25]:
def transform_candles(rawdata):
    # structure of watchOHLCV response: [[milliseconds_since_unix_epoch, Open, High, Low, Close, Volume]]
    ohlcv_keys = ["timestamp", "open", "high", "low", "close", "volume"]
    candles = [dict(zip(ohlcv_keys, candle)) for candle in rawdata]
    return candles

async def streamer(ex, symbol):
    while True:
        ohlcv = await ex.watchOHLCV(symbol, '5m')
        candles = transform_candles(ohlcv)
        tasks = [asyncio.create_task(redis_client.xadd(f"ohlcv:{symbol}", c)) for c in candles]
        await asyncio.gather(*tasks)

In [None]:
import asyncio
import redis.asyncio as redis

redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
ohlcv_queue = asyncio.Queue(maxsize=1000)

def transform_candles(rawdata):
    ohlcv_keys = ["timestamp", "open", "high", "low", "close", "volume"]
    return [dict(zip(ohlcv_keys, candle)) for candle in rawdata]

async def streamer(ex, symbol):
    while True:
        try:
            ohlcv = await ex.watchOHLCV(symbol, '5m')
            candles = transform_candles(ohlcv)
            for candle in candles:
                await ohlcv_queue.put((symbol, candle))
        except Exception as e:
            print(f"[{symbol}] Error: {e}")
        await asyncio.sleep(0.1)

async def redis_publisher():
    while True:
        try:
            items = []
            while len(items) < 50:
                try:
                    items.append(await asyncio.wait_for(ohlcv_queue.get(), timeout=1))
                except asyncio.TimeoutError:
                    break
            if not items: continue
            async with redis_client.pipeline(transaction=False) as pipe:
                for symbol, candle in items: pipe.xadd(f"ohlcv:{symbol}", candle)
                await pipe.execute()
                for _ in items: ohlcv_queue.task_done()
        except Exception as e:
            print(f"[Redis Publisher] Error: {e}")

async def main(ex, symbols):
    tasks = [asyncio.create_task(streamer(ex, symbol)) for symbol in symbols]
    tasks.append(asyncio.create_task(redis_publisher()))
    await asyncio.gather(*tasks)

In [None]:
ex = ccxtpro.binance({'newUpdates': True})
await ex.load_markets()
df = pd.DataFrame(ex.markets).T
symbols = df.loc[(df['type']=='spot') & (df['active']) & (df['quote']=='USDT')]['symbol'].to_list()

In [None]:
tasks = [asyncio.create_task(streamer(ex, s)) for s in symbols]
await asyncio.gather(*tasks)