In [1]:
import ccxt.async_support as ccxt  # Async API support
import pandas as pd
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import cudf  # GPU-accelerated DataFrame (Requires RAPIDS)

# --- Configurable Parameters ---
TRADE_AMOUNT = 10  # Trade amount in USD
DEFAULT_FEE = 0.001  # Default maker/taker fee (0.1% per trade)
RESULTS_FILE = "arbitrage_results_gpu.csv"
SLIPPAGE_TRADE_SIZE = 500  # Trade size for slippage estimation

# --- API Call Tracking ---
api_calls = {"binance": 0, "bybit": 0}
start_time = time.time()

def track_api_call(exchange_name):
    """Track API calls for each exchange."""
    api_calls[exchange_name] += 1

# --- Async Function to Fetch Data ---
async def fetch_ticker(exchange, symbol):
    """Fetches latest price for a symbol asynchronously."""
    try:
        ticker = await exchange.fetch_ticker(symbol)
        track_api_call(exchange.id)
        return ticker.get("last", None)
    except Exception as e:
        print(f"⚠️ Error fetching price for {symbol} on {exchange.id}: {e}")
        return None

async def fetch_order_book(exchange, symbol):
    """Fetches order book depth asynchronously."""
    try:
        order_book = await exchange.fetch_order_book(symbol)
        track_api_call(exchange.id)

        best_bid = order_book["bids"][0] if order_book["bids"] else (0, 0)
        best_ask = order_book["asks"][0] if order_book["asks"] else (0, 0)

        return best_bid[1], best_ask[1], best_ask[0] - best_bid[0]
    except Exception as e:
        print(f"⚠️ Error fetching order book for {symbol} on {exchange.id}: {e}")
        return "N/A", "N/A", "N/A"

async def fetch_all_data(exchange, symbols):
    """Fetches all market data asynchronously."""
    tasks = [fetch_ticker(exchange, symbol) for symbol in symbols]
    return await asyncio.gather(*tasks)

async def fetch_liquidity_and_slippage(exchange, symbol):
    """Fetches liquidity & slippage asynchronously."""
    price = await fetch_ticker(exchange, symbol)
    bid_depth, ask_depth, spread = await fetch_order_book(exchange, symbol)

    # Estimate slippage
    if bid_depth != "N/A" and ask_depth != "N/A" and bid_depth > 0 and ask_depth > 0:
        estimated_slippage = (SLIPPAGE_TRADE_SIZE / max(bid_depth, ask_depth)) * spread
    else:
        estimated_slippage = "N/A"

    return price, estimated_slippage

async def compare_fees_spreads_liquidity():
    """Compares trading fees, spreads, and liquidity across exchanges asynchronously."""
    exchange1 = ccxt.binance()
    exchange2 = ccxt.bybit()

    markets1 = await exchange1.load_markets()
    markets2 = await exchange2.load_markets()
    track_api_call("binance")
    track_api_call("bybit")

    symbols1 = set([s for s in markets1.keys() if "USDT" in s])
    symbols2 = set([s for s in markets2.keys() if "USDT" in s])
    common_symbols = list(symbols1 & symbols2)

    print(f"✅ Found {len(common_symbols)} common USDT trading pairs.")

    # Fetch all prices & slippage in parallel
    tasks = []
    for symbol in common_symbols:
        tasks.append(fetch_liquidity_and_slippage(exchange1, symbol))
        tasks.append(fetch_liquidity_and_slippage(exchange2, symbol))

    results = await asyncio.gather(*tasks)

    # Close connections
    await exchange1.close()
    await exchange2.close()

    # Process Results
    final_data = []
    for i in range(0, len(results), 2):
        symbol = common_symbols[i // 2]
        price1, slippage1 = results[i]
        price2, slippage2 = results[i + 1]

        if price1 is None or price2 is None:
            continue

        spread = abs(price1 - price2) / ((price1 + price2) / 2) * 100
        potential_profit = calculate_potential_profit(price1, price2, DEFAULT_FEE, DEFAULT_FEE, TRADE_AMOUNT)

        final_data.append([symbol, price1, price2, spread, slippage1, slippage2, potential_profit])

    # Convert to GPU DataFrame (cuDF)
    df_gpu = cudf.DataFrame(final_data, columns=[
        "Symbol", "Price 1", "Price 2", "Spread (%)",
        "Slippage 1", "Slippage 2", "Potential Profit ($)"
    ])

    # Save results
    df_gpu.to_csv(RESULTS_FILE, index=False)
    print("📁 Results saved.")

    # Print API Stats
    print_api_stats()

# --- Function to Calculate Potential Profit ---
def calculate_potential_profit(price1, price2, fee1, fee2, trade_amount):
    """Calculates potential profit for a given trade amount."""
    buy_price = min(price1, price2)
    sell_price = max(price1, price2)

    buy_fee = fee1 if buy_price == price1 else fee2
    sell_fee = fee2 if sell_price == price2 else fee1

    crypto_amount = trade_amount / buy_price

    buy_fee_cost = buy_price * crypto_amount * buy_fee
    sell_fee_cost = sell_price * crypto_amount * sell_fee
    total_fees = buy_fee_cost + sell_fee_cost

    raw_profit = (sell_price - buy_price) * crypto_amount

    final_profit = raw_profit - total_fees
    return round(final_profit, 4)

# --- Print API Usage Stats ---
def print_api_stats():
    """Prints total API calls and API calls per second."""
    total_calls = sum(api_calls.values())
    elapsed_time = time.time() - start_time
    calls_per_sec = total_calls / elapsed_time if elapsed_time > 0 else 0

    print("\n📊 **API Call Summary**")
    print(f"🔹 Binance API Calls: {api_calls['binance']}")
    print(f"🔹 Bybit API Calls: {api_calls['bybit']}")
    print(f"🔹 Total API Calls: {total_calls}")
    print(f"🔹 API Calls per Second: {calls_per_sec:.2f}")

# --- Run Program (Parallel & GPU) ---
loop = asyncio.get_event_loop()
loop.run_until_complete(compare_fees_spreads_liquidity())


ModuleNotFoundError: No module named 'cudf'