In [1]:
import json

from bytewax.dataflow import Dataflow
from bytewax.inputs import DynamicInput, StatelessSource
from bytewax.connectors.stdio import StdOutput
from websocket import create_connection  # pip install websocket-client


class CoinbaseSource(StatelessSource):
    def __init__(self, product_ids):
        self.product_ids = product_ids

        self.ws = create_connection("wss://ws-feed.pro.coinbase.com")
        self.ws.send(
            json.dumps(
                {
                    "type": "subscribe",
                    "product_ids": product_ids,
                    "channels": ["level2_batch"],
                }
            )
        )
        # The first msg is just a confirmation that we have subscribed.
        print(self.ws.recv())

    def next(self):
        return self.ws.recv()


class CoinbaseInput(DynamicInput):
    PRODUCT_IDS = ["BTC-USD", "ETH-USD", "SOL-USD"]

    def build(self, worker_index, worker_count):
        prods_per_worker = int(len(self.PRODUCT_IDS) / worker_count)
        product_ids = self.PRODUCT_IDS[
                      int(worker_index * prods_per_worker) : int(
                          worker_index * prods_per_worker + prods_per_worker
                      )
                      ]
        return CoinbaseSource(product_ids)


flow = Dataflow()
flow.input("input", CoinbaseInput())

flow.map(json.loads)
# {'type': 'l2update', 'product_id': 'BTC-USD', 'changes': [['buy', '36905.39', '0.00334873']], 'time': '2022-05-05T17:25:09.072519Z'}


def key_on_product(data):
    return (data["product_id"], data)


flow.map(key_on_product)
# ('BTC-USD', {'type': 'l2update', 'product_id': 'BTC-USD', 'changes': [['buy', '36905.39', '0.00334873']], 'time': '2022-05-05T17:25:09.072519Z'})


class OrderBook:
    def __init__(self):
        self.bids = {}
        self.asks = {}

    def update(self, data):
        if self.bids == {}:
            self.bids = {float(price): float(size) for price, size in data["bids"]}
            # The bid_price is the highest priced buy limit order.
            # since the bids are in order, the first item of our newly constructed bids
            # will have our bid price, so we can track the best bid
            self.bid_price = next(iter(self.bids))
        if self.asks == {}:
            self.asks = {float(price): float(size) for price, size in data["asks"]}
            # The ask price is the lowest priced sell limit order.
            # since the asks are in order, the first item of our newly constructed
            # asks will be our ask price, so we can track the best ask
            self.ask_price = next(iter(self.asks))
        else:
            # We receive a list of lists here, normally it is only one change,
            # but could be more than one.
            for update in data["changes"]:
                price = float(update[1])
                size = float(update[2])
            if update[0] == "sell":
                # first check if the size is zero and needs to be removed
                if size == 0.0:
                    try:
                        del self.asks[price]
                        # if it was the ask price removed,
                        # update with new ask price
                        if price <= self.ask_price:
                            self.ask_price = min(self.asks.keys())
                    except KeyError:
                        # don't need to add price with size zero
                        pass
                else:
                    self.asks[price] = size
                    if price < self.ask_price:
                        self.ask_price = price
            if update[0] == "buy":
                # first check if the size is zero and needs to be removed
                if size == 0.0:
                    try:
                        del self.bids[price]
                        # if it was the bid price removed,
                        # update with new bid price
                        if price >= self.bid_price:
                            self.bid_price = max(self.bids.keys())
                    except KeyError:
                        # don't need to add price with size zero
                        pass
                else:
                    self.bids[price] = size
                    if price > self.bid_price:
                        self.bid_price = price
        return {
            "bid": self.bid_price,
            "bid_size": self.bids[self.bid_price],
            "ask": self.ask_price,
            "ask_price": self.asks[self.ask_price],
            "spread": self.ask_price - self.bid_price,
        }


def update_orderbook(orderbook, new_order):
    spread = orderbook.update(new_order)
    return orderbook, spread


flow.stateful_map("order_book", lambda: OrderBook(), update_orderbook)
# ('BTC-USD', (36905.39, 0.00334873, 36905.4, 1.6e-05, 0.010000000002037268))
flow.filter(
    lambda x: x[-1]["spread"] / x[-1]["ask"] > 0.0001
)  # filter on 0.1% spread as a per

flow.output("out", StdOutput())


In [2]:
# run dataflow
from bytewax.testing import run_main
run_main(flow)

{"type":"subscriptions","channels":[{"name":"level2_50","product_ids":["BTC-USD","ETH-USD","SOL-USD"]}]}
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 121.829, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 121.829, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 121.829, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 89.041, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 89.041, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 89.041, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 89.041, 'spread': 0.020000000000003126})
('SOL-USD', {'bid': 61.61, 'bid_size': 55.898, 'ask': 61.63, 'ask_price': 176.172, 'spr

thread '<unnamed>' panicked at 'Box<dyn Any>', src/inputs.rs:474:31
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace



KeyboardInterrupt: (src/run.rs:145:17) interrupt signal received, all processes have been shut down