In [1]:
import json
import queue
import threading
from datetime import datetime
from threading import Thread

import numpy as np
import requests
import snoop
from websocket import create_connection

* Open a stream to [wss://stream.binance.com:9443/ws/bnbbtc@depth](http://).
* Buffer the events you receive from the stream.
* Get a depth snapshot from [https://api.binance.com/api/v3/depth?symbol=BNBBTC&limit=1000](http://) .
* Drop any event where u is <= lastUpdateId in the snapshot.
* The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1.
* While listening to the stream, each new event's U should be equal to the previous event's u+1.
* The data in each event is the absolute quantity for a price level.
* If the quantity is 0, remove the price level.
* Receiving an event that removes a price level that is not in your local order book can happen and is normal.

In [2]:
# class Cache:
#     def __init__(self):
#         self.contents = np.array([])

#     def read(self, stream):
#         read_proc = threading.currentThread()
#         while getattr(read_proc, "do_run", True):
#             self.contents = np.append(self.contents, Message(stream.recieve()))
#             if (
#                 len(self.contents) > 2
#                 and self.contents[-1].U != self.contents[-2].u + 1
#             ):
#                 print("?")
#                 stream.close()
#                 break

#     def clear_previous(self, book):
#         id_start = 0
#         for num, msg in enumerate(self.contents):
#             if book.find_beg(msg):
#                 id_start = num
#         inds = list(range(id_start))
#         np.delete(self.contents, inds)

#     def clear(self):
#         self.contents = np.array([])

#     def length(self):
#         return print(len(self.contents))

#     def give(self):
#         block = self.contents[0]
#         np.delete(self.contents, 0)
#         return block

In [2]:
cacheQueue = queue.Queue()
snaps = queue.Queue()

In [41]:
obj_lock = threading.Lock()
def producer(cacheQueue, stream):
    t1 = threading.currentThread()
    while getattr(t1, "do_run", True):
        cacheQueue.put(stream.recieve())


def consumer(cacheQueue, book):
    t2 = threading.currentThread()
    while getattr(t2, "do_run", True):
        try:
            message = cacheQueue.get(block=False)
        except queue.Empty:
            pass
        else:
            with obj_lock:
                if book.find_beg(message):
                    book.update_bids(message)
                    book.update_asks(message)
                    snaps.append(book)
                else:
                    pass

In [42]:
t1 = Thread(target=producer, args=(cacheQueue, s))

In [None]:
t1.start()

In [45]:
t2 = Thread(target=consumer, args=(cacheQueue, a))

In [46]:
t2.start()

In [4]:
class Stream:
    def __init__(self, symbol):
        self.endpoint = "wss://stream.binance.com:9443/ws/" + symbol.lower() + "@depth"
        self.ws = create_connection(self.endpoint)

    def close(self):
        self.ws.close()

    def recieve(self):
        return Message(json.loads(self.ws.recv()))

In [5]:
class Message:
    def __init__(self, message):
        self.u, self.U = message["u"], message["U"]
        self.bids = np.array(message["b"])[:, 0].astype("float")
        self.bq = np.array(message["b"])[:, 1].astype("float")
        self.asks, self.aq = np.array(message["a"])[:, 0].astype("float"), np.array(
            message["a"]
        )[:, 1].astype("float")
        self.date = datetime.fromtimestamp(message["E"] / 1000).strftime(
            "%A, %B %d, %Y %I:%M:%S"
        )

In [6]:
class OrderBook:
    def __init__(self, symbol):
        self.endpoint = (
            "https://api.binance.com/api/v3/depth?symbol="
            + symbol.upper()
            + "&limit=1000"
        )
        self.snapshot = requests.get(self.endpoint).json()
        self.lastUpdateId = self.snapshot["lastUpdateId"]
        self.bids = np.array(self.snapshot["bids"])[:, 0].astype("float")
        self.bq = np.array(self.snapshot["bids"])[:, 1].astype("float")
        self.asks, self.aq = np.array(self.snapshot["asks"])[:, 0].astype(
            "float"
        ), np.array(self.snapshot["aq"])[:, 1].astype("float")

    def find_beg(self, message):
        if message.u <= self.lastUpdateId:
            return False
        elif message.U <= self.lastUpdateId + 1 and message.u >= self.lastUpdateId + 1:
            return True

    def update_bids(self, message):
        for num, i in enumerate(message.bids):
            ind = np.where(self.bids == i)
            if ind[0].any():
                if i == 0:
                    np.delete(self.bids, ind)
                else:
                    self.bq[ind] += message.bq[num]
            else:
                np.searchsorted(self.bids, i)

    def update_asks(self, message):
        for num, i in enumerate(message.asks):
            ind = np.where(self.asks == i)
            if ind[0].any():
                if i == 0:
                    np.delete(self.asks, ind)
                else:
                    self.aq[ind] += message.aq[num]
            else:
                np.searchsorted(self.asks, i)

In [7]:
s = Stream("BTCUSDT");

In [44]:
a = OrderBook("btcusdt")

In [66]:
sum(a.aq)

131.84359999999955

In [56]:
sorted(a.bq)

[1e-05,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00018,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.00019,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.0002,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00021,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00022,
 0.00023,
 0.00023,
 0.00023,
 0.00023,
 0.00024,
 0.00024,
 0.00024,
 0.00024,
 0.00024,
 0.00024,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00025,
 0.00026,
 0.00026,
 0.00027,
 0.00027,
 0.00027,
 0.00028,
 0.00028,
 0.00028,
 0.00029,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003,
 0.0003

In [58]:
s.close()

Exception in thread Thread-12:
Traceback (most recent call last):
  File "/Users/glebsokolov/opt/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Users/glebsokolov/opt/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/t0/fw3jvl196_v78v94hzhxhpgw0000gn/T/ipykernel_44593/2257816734.py", line 7, in producer
Exception in thread Thread-9:
Traceback (most recent call last):
  File "/Users/glebsokolov/opt/anaconda3/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()  File "/var/folders/t0/fw3jvl196_v78v94hzhxhpgw0000gn/T/ipykernel_44593/2063050034.py", line 8, in recieve

  File "/Users/glebsokolov/opt/anaconda3/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/t0/fw3jvl196_v78v94hzhxhpgw0000gn/T/ipykernel_44593/4275462483.py", line 5, in producer
  File "/var/folders/t0/fw3jvl196_v78v94hzhxhpg