## CEX-DEX Arbitrage Research

Use this Notebook as the starting point for your CEX-DEX arbitrage research.

In [1]:
import os
import asyncio
import nest_asyncio
import aioprocessing
from functools import partial
from dotenv import load_dotenv

load_dotenv(override=True)

HTTP_RPC_URL = os.getenv('HTTP_RPC_URL')
WS_RPC_URL = os.getenv('WS_RPC_URL')

#### Utils

A function for handling websocket connection errors. Wrapping data streaming functions with **reconnecting_websocket_loop** will automatically reconnect everytime a connection error occurs.

In [2]:
from utils import (
    reconnecting_websocket_loop,
    calculate_next_block_base_fee,
)

#### CEX data streams

Currently implemented for Binance websocket stream of depth5 data.

In [3]:
from cex_streams import (
    stream_binance_usdm_orderbook,
    stream_okx_usdm_orderbook,
)

#### DEX data streams

In [4]:
from dex_streams import (
    stream_new_blocks,
    stream_uniswap_v2_events,
)

#### Aggregator

In [5]:
from aggregator import aggregate_cex_orderbooks

#### Event handler

#### 1. CEX only event handler example:

In [6]:
import datetime
from spread_chart import Publisher

"""
Using taker fee to make spread calculations more realistic (Tier: LV 1)

- OKX: https://www.okx.com/fees
- Binance: https://www.binance.com/en/fee/futureFee
"""
FEE = {
    'okx': 0.0005,      # 0.05%
    'binance': 0.0004,  # 0.04%
}

async def cex_event_handler(port: int, event_queue: aioprocessing.AioQueue):   
    """
    If you want to update the Finplot chart as you run this function,
    run spread_chart.py from a separate terminal.
    """
    pub = Publisher(port)

    orderbooks = {}
    
    last_updated = datetime.datetime.now()
    
    while True:
        data = await event_queue.coro_get()
        
        symbol = data['symbol']
        
        if data['source'] == 'cex':
            if symbol not in orderbooks:
                orderbooks[symbol] = {}
                
            orderbooks[symbol][data['exchange']] = data
            multi_orderbook = aggregate_cex_orderbooks(orderbooks[symbol])
            
            now = datetime.datetime.now()
            if (now - last_updated).total_seconds() > 1:
                best_bid = multi_orderbook['bids'][0][0]
                best_ask = multi_orderbook['asks'][0][0]
                bid_ask_spread = round(float((best_bid / best_ask - 1) * 100), 3)
                
                best_bid_exchange = multi_orderbook['bids'][0][2]
                best_ask_exchange = multi_orderbook['asks'][0][2]
                target_exchanges = f'{best_bid_exchange}/{best_ask_exchange}'
                
                bid_fee = FEE[best_bid_exchange] * 2 * 100  # buy, sell fee (x2)
                ask_fee = FEE[best_ask_exchange] * 2 * 100  # buy, sell fee (x2)
                bid_ask_spread_real = bid_ask_spread - (bid_fee + ask_fee)
                
                await pub.send({'spread': bid_ask_spread_real})
                
                last_updated = now
                print(f'[{now}] Spread: {bid_ask_spread_real}% ({target_exchanges})')

#### 2. DEX only event handler example:

In [7]:
import datetime
from spread_chart import Publisher

from simulator import UniswapV2Simulator

FEE = {
    'uniswap': 0.003,   # 0.3%
    'sushiswap': 0.003, # 0.3%
}

async def dex_event_handler(port: int, event_queue: aioprocessing.AioQueue):
    pub = Publisher(port)
    sim = UniswapV2Simulator()
    
    block = {}
    prices = {}
    
    while True:
        data = await event_queue.coro_get()
        
        if data['source'] == 'block':
            block = data
        
        elif data['source'] == 'dex':
            event_type = data['type']
            
            if event_type == 'pool_update':
                symbol = data['symbol']
                token = symbol.replace('USDT', '')
            
                if symbol not in prices:
                    prices[symbol] = {}
                    
                prices[symbol][data['exchange']] = data
                
                # A simple DEX aggregator with price from multiple DEX sources
                dex_aggregator = {}
                for exchange, _data in prices[symbol].items():
                    token_idx = data['token_idx']
                    decimals = list(_data['decimals'].values())
                    reserves = list(_data['reserves'].values())
                    # get the price of ETH/USDT: 1 ETH -> xxxx USDT
                    price = sim.reserves_to_price(
                        reserves[0],
                        reserves[1],
                        decimals[0],
                        decimals[1],
                        token_idx[token] == 0
                    )
                    dex_aggregator[exchange] = price
                
                best_path = None
                best_spread = -1
                
                exchanges = list(dex_aggregator.keys())
                for e_1 in exchanges:
                    for e_2 in exchanges:
                        if e_1 == e_2:
                            continue
                        
                        p_sell = dex_aggregator[e_1]
                        p_buy = dex_aggregator[e_2]
                        
                        spread = round(float((p_sell / p_buy - 1) * 100), 3)
                        
                        sell_fee = FEE[e_1] * 100
                        buy_fee = FEE[e_2] * 100
                        spread_real = spread - (sell_fee + buy_fee)
                        
                        if spread_real >= best_spread:
                            best_path = f'{e_1}/{e_2}'
                            best_spread = spread_real
                            
                if best_path:
                    await pub.send({'spread': best_spread})
                    now = datetime.datetime.now()
                    print(f'[{now}] Block #{block.get("block_number")} Spread: {best_spread}% ({best_path})')

#### 3. CEX-DEX event handler example:

In [6]:
import datetime
from spread_chart import Publisher

FEE = {
    'okx': 0.0005,      # 0.05%
    'binance': 0.0004,  # 0.04%
    'uniswap': 0.003,   # 0.3%
    'sushiswap': 0.003, # 0.3%
}

async def cex_dex_event_handler(port: int, event_queue: aioprocessing.AioQueue):
    pub = Publisher(port)

    orderbooks = {}
    
    while True:
        data = await event_queue.coro_get()
        
        if data['source'] == 'cex':
            symbol = data['symbol']
            
            if symbol not in orderbooks:
                orderbooks[symbol] = {}
                
            orderbooks[symbol][data['exchange']] = data
            multi_orderbook = aggregate_cex_orderbooks(orderbooks[symbol])
            print(multi_orderbook)
        
        elif data['source'] == 'dex':
            pass

#### Run

In [7]:
from threading import Thread

from constants import TOKENS, POOLS

def event_handler_loop(port: int, event_queue: aioprocessing.AioQueue):
    asyncio.run(cex_dex_event_handler(port, event_queue))

# define an event_queue to publish realtime data
event_queue = aioprocessing.AioQueue()

symbols = ['ETH/USDT']

# CEX related streams
binance_stream = reconnecting_websocket_loop(
    partial(stream_binance_usdm_orderbook, symbols, event_queue),
    tag='binance_stream'
)

okx_stream = reconnecting_websocket_loop(
    partial(stream_okx_usdm_orderbook, symbols, event_queue),
    tag='okx_stream'
)

# DEX related streams
new_blocks_stream = reconnecting_websocket_loop(
    partial(stream_new_blocks, WS_RPC_URL, event_queue, False),
    tag='new_blocks_stream'
)

uniswap_v2_stream = reconnecting_websocket_loop(
    partial(stream_uniswap_v2_events, HTTP_RPC_URL, WS_RPC_URL, TOKENS, POOLS, event_queue, False),
    tag='uniswap_v2_stream'
)
    
# start event_handler_loop in a new thread
chart_port = 9999

t = Thread(target=event_handler_loop, args=(chart_port, event_queue,))
t.start()

nest_asyncio.apply()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
    binance_stream,
    okx_stream,
    new_blocks_stream,
    uniswap_v2_stream,
]))

{'bids': [[Decimal('1715.17'), Decimal('22.742'), 'binance'], [Decimal('1715.16'), Decimal('0.003'), 'binance'], [Decimal('1715.15'), Decimal('12.790'), 'binance'], [Decimal('1715.14'), Decimal('0.010'), 'binance'], [Decimal('1715.13'), Decimal('0.010'), 'binance']], 'asks': [[Decimal('1715.18'), Decimal('95.287'), 'binance'], [Decimal('1715.19'), Decimal('101.882'), 'binance'], [Decimal('1715.20'), Decimal('0.018'), 'binance'], [Decimal('1715.21'), Decimal('0.010'), 'binance'], [Decimal('1715.22'), Decimal('8.808'), 'binance']]}
{'bids': [[Decimal('1715.17'), Decimal('22.742'), 'binance'], [Decimal('1715.16'), Decimal('0.003'), 'binance'], [Decimal('1715.15'), Decimal('12.790'), 'binance'], [Decimal('1715.14'), Decimal('0.010'), 'binance'], [Decimal('1715.13'), Decimal('0.010'), 'binance']], 'asks': [[Decimal('1715.18'), Decimal('95.314'), 'binance'], [Decimal('1715.19'), Decimal('101.882'), 'binance'], [Decimal('1715.20'), Decimal('0.018'), 'binance'], [Decimal('1715.21'), Decimal('0

{'bids': [[Decimal('1716.11'), Decimal('21.6'), 'okx'], [Decimal('1716.1'), Decimal('67.9'), 'okx'], [Decimal('1716.02'), Decimal('0.4'), 'okx'], [Decimal('1716.01'), Decimal('36.7'), 'okx'], [Decimal('1716'), Decimal('54.4'), 'okx'], [Decimal('1715.08'), Decimal('27.607'), 'binance'], [Decimal('1715.07'), Decimal('0.011'), 'binance'], [Decimal('1715.06'), Decimal('0.527'), 'binance'], [Decimal('1715.05'), Decimal('0.500'), 'binance'], [Decimal('1715.04'), Decimal('15.160'), 'binance']], 'asks': [[Decimal('1715.09'), Decimal('76.912'), 'binance'], [Decimal('1715.10'), Decimal('0.753'), 'binance'], [Decimal('1715.11'), Decimal('0.055'), 'binance'], [Decimal('1715.12'), Decimal('0.010'), 'binance'], [Decimal('1715.13'), Decimal('0.010'), 'binance'], [Decimal('1716.12'), Decimal('91.6'), 'okx'], [Decimal('1716.13'), Decimal('0.1'), 'okx'], [Decimal('1716.17'), Decimal('4.0'), 'okx'], [Decimal('1716.18'), Decimal('4.6'), 'okx'], [Decimal('1716.19'), Decimal('31.0'), 'okx']]}
{'bids': [[Dec