> Best Bid\Ask, Spead and Imblance

___

# Imports

In [3]:
import asyncio
import websockets
import threading
import json
import time
import requests
from collections import OrderedDict

import ssl
import certifi

# For dev/debug — disables SSL verification. DO NOT use in prod.
ssl_context = ssl._create_unverified_context()
# Use this for proper SSL:
# ssl_context = ssl.create_default_context(cafile=certifi.where())

___

# Local Order Book

In [4]:
class LocalOrderBook:
    def __init__(self):
        self.bids = OrderedDict()
        self.asks = OrderedDict()

    def update(self, updates, side):
        book = self.bids if side == "b" else self.asks
        for update in updates:
            price, volume = float(update[0]), float(update[1])
            if volume == 0:
                book.pop(price, None)
            else:
                book[price] = volume

        if side == "b":
            self.bids = OrderedDict(sorted(book.items(), reverse=True))
        else:
            self.asks = OrderedDict(sorted(book.items()))

    def patch_from_snapshot(self, snapshot_bids, snapshot_asks):
        self._patch_side(snapshot_bids, self.bids, side="b")
        self._patch_side(snapshot_asks, self.asks, side="a")

    def _patch_side(self, snapshot, book, side):
        if len(book) >= 100:
            return

        for update in snapshot:
            price, volume = float(update[0]), float(update[1])
            if price not in book and len(book) < 100:
                book[price] = volume

        if side == "b":
            self.bids = OrderedDict(sorted(book.items(), reverse=True))
        else:
            self.asks = OrderedDict(sorted(book.items()))

    def top_of_book(self):
        best_bid = next(iter(self.bids.items()), (None, None))
        best_ask = next(iter(self.asks.items()), (None, None))
        return best_bid, best_ask

    def spread(self):
        bid, ask = self.top_of_book()
        if bid[0] is not None and ask[0] is not None:
            return round(ask[0] - bid[0], 2)
        return None

    def order_book_imbalance(self, depth=10):
        top_bids = list(self.bids.items())[:depth]
        top_asks = list(self.asks.items())[:depth]

        bid_vol = sum(vol for _, vol in top_bids)
        ask_vol = sum(vol for _, vol in top_asks)

        if bid_vol + ask_vol == 0:
            return None

        imbalance = (bid_vol - ask_vol) / (bid_vol + ask_vol)
        return round(imbalance, 4)

    def needs_patch(self):
        return len(self.bids) < 100 or len(self.asks) < 100

___

# Kraken Client

In [5]:
class KrakenL2Client:
    def __init__(self, pairs=["XBT/USD"], depth=100):
        self.pairs = pairs
        self.depth = depth
        self.uri = "wss://ws.kraken.com"
        self.loop = None
        self.thread = None
        self.stop_event = threading.Event()
        self.order_books = {pair: LocalOrderBook() for pair in self.pairs}
        self.last_patch_time = time.time()

    async def connect(self):
        while not self.stop_event.is_set():
            try:
                async with websockets.connect(self.uri, ssl=ssl_context) as websocket:
                    await websocket.send(json.dumps({
                        "event": "subscribe",
                        "pair": self.pairs,
                        "subscription": {"name": "book", "depth": self.depth}
                    }))
                    print(f"📡 Subscribed to: {', '.join(self.pairs)}")

                    while not self.stop_event.is_set():
                        try:
                            message = await websocket.recv()
                            self.handle_message(json.loads(message))
                            self.maybe_patch_books()
                        except Exception as e:
                            print(f"⚠️ Message handling error: {e}")
                            break
            except Exception as e:
                print(f"💥 WebSocket connection failed, retrying in 5s: {e}")
                await asyncio.sleep(5)

    def maybe_patch_books(self):
        if time.time() - self.last_patch_time < 5:
            return

        for pair in self.pairs:
            book = self.order_books[pair]
            if book.needs_patch():
                print(f"🔧 Patching {pair}...")
                bids, asks = self.fetch_snapshot(pair)
                if bids and asks:
                    book.patch_from_snapshot(bids, asks)
        self.last_patch_time = time.time()

    def fetch_snapshot(self, pair):
        symbol = pair.replace("/", "")
        url = f"https://api.kraken.com/0/public/Depth?pair={symbol}&count=100"
        try:
            resp = requests.get(url, timeout=5)
            data = resp.json()
            key = list(data["result"].keys())[0]
            return data["result"][key]["bids"], data["result"][key]["asks"]
        except Exception as e:
            print(f"❌ Snapshot error: {e}")
            return None, None

    def handle_message(self, message):
        if isinstance(message, dict):
            if message.get("event") in ["heartbeat", "systemStatus", "subscriptionStatus"]:
                return

        if isinstance(message, list) and len(message) > 1:
            data = message[1]
            pair = message[-1]
            book = self.order_books.get(pair)
            if not book:
                return

            if 'a' in data:
                book.update(data['a'], 'a')
            if 'b' in data:
                book.update(data['b'], 'b')

            bid, ask = book.top_of_book()
            spread = book.spread()
            imbalance = book.order_book_imbalance()

            print(f"📊 {pair} → Bid: {bid[0]} | Ask: {ask[0]} | Spread: {spread} | Imbalance: {imbalance}")

    def start(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.connect())

    def run_in_thread(self):
        self.thread = threading.Thread(target=self.start)
        self.thread.start()

    def stop(self):
        self.stop_event.set()
        if self.loop:
            self.loop.call_soon_threadsafe(self.loop.stop)
        if self.thread:
            self.thread.join()


___

# Run

In [6]:
# ---------------------- USAGE ---------------------- #
if __name__ == "__main__":
    client = KrakenL2Client(pairs=["XBT/USD", "ETH/USD"])
    client.run_in_thread()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("🛑 Stopping client...")
        client.stop()


📡 Subscribed to: XBT/USD, ETH/USD
📊 XBT/USD → Bid: None | Ask: None | Spread: None | Imbalance: None
📊 ETH/USD → Bid: None | Ask: None | Spread: None | Imbalance: None
📊 XBT/USD → Bid: None | Ask: 103743.2 | Spread: None | Imbalance: -1.0
📊 XBT/USD → Bid: None | Ask: 103740.8 | Spread: None | Imbalance: -1.0
📊 ETH/USD → Bid: 2398.17 | Ask: None | Spread: None | Imbalance: 1.0
📊 ETH/USD → Bid: 2398.17 | Ask: 2408.04 | Spread: 9.87 | Imbalance: -0.8124
📊 XBT/USD → Bid: None | Ask: 103740.8 | Spread: None | Imbalance: -1.0
📊 XBT/USD → Bid: None | Ask: 103700.8 | Spread: None | Imbalance: -1.0
📊 ETH/USD → Bid: 2406.56 | Ask: 2408.04 | Spread: 1.48 | Imbalance: -0.809
📊 XBT/USD → Bid: None | Ask: 103700.8 | Spread: None | Imbalance: -1.0
📊 ETH/USD → Bid: 2406.57 | Ask: 2408.04 | Spread: 1.47 | Imbalance: -0.8071
📊 ETH/USD → Bid: 2406.57 | Ask: 2408.04 | Spread: 1.47 | Imbalance: -0.5991
📊 XBT/USD → Bid: None | Ask: 103700.8 | Spread: None | Imbalance: -1.0
📊 XBT/USD → Bid: 103679.9 | Ask: 1

Exception in thread Thread-5 (start):
Traceback (most recent call last):
  File "C:\Users\trrallele\AppData\Local\anaconda3\Lib\threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "c:\Users\trrallele\Momentum Metropolitan\REALEARN\CRYPTO STRATEGIES IN PYTHON\myenv2.0\Lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Users\trrallele\AppData\Local\anaconda3\Lib\threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\trrallele\AppData\Local\Temp\ipykernel_34672\1525992026.py", line 86, in start
  File "C:\Users\trrallele\AppData\Local\anaconda3\Lib\asyncio\base_events.py", line 651, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.


📊 ETH/USD → Bid: 2407.57 | Ask: 2407.58 | Spread: 0.01 | Imbalance: 0.1569
📊 ETH/USD → Bid: 2407.57 | Ask: 2407.58 | Spread: 0.01 | Imbalance: 0.1569
📊 ETH/USD → Bid: 2407.57 | Ask: 2407.58 | Spread: 0.01 | Imbalance: 0.1569
📊 ETH/USD → Bid: 2407.57 | Ask: 2407.58 | Spread: 0.01 | Imbalance: 0.1569
🛑 Stopping client...
