In [1]:
import pandas as pd
import numpy as np
import phitech.helpers.sierra as pth_sierra
import matplotlib.pyplot as plt
from progiter import ProgIter
import time

In [3]:
pth_sierra.bento_zst_to_depth("mes-bento-small.mbo.dbn.zst", "bento-small.depth", 10000)

first -> MboMsg { hd: RecordHeader { length: 14, rtype: Mbo, publisher_id: GlbxMdp3Glbx, instrument_id: 7114, ts_event: 1718539204680368535 }, order_id: 0, price: UNDEF_PRICE, size: 0, flags: SNAPSHOT | BAD_TS_RECV (40), channel_id: 8, action: 'R', side: 'N', ts_recv: 1718841600000000000, ts_in_delta: 0, sequence: 0 }
instrument_id -> 7114
publisher_id -> 1
 18111/?... rate=878.11 Hz, total=0:00:14n_states reached, break
make init depth
make depth (parse states)
 9999/?... rate=40.12 Hz, total=0:04:07
shape bento_depth -> (10295, 6)
first timestamp -> 2024-06-20 00:00:00.006000
last timestamp -> 2024-06-20 00:16:23.337000
 100.00% 10295/10295... rate=8590.29 Hz, eta=0:00:00, total=0:00:01
Writing bytes to bento-small.depth
done.


In [2]:
from __future__ import annotations
import os
from collections import defaultdict
from dataclasses import dataclass, field
import databento as db
from databento_dbn import FIXED_PRICE_SCALE, UNDEF_PRICE, BidAskPair
from sortedcontainers import SortedDict

@dataclass
class Order:
    id: int
    side: str
    price: int
    size: int
    ts_event: int
    is_tob: bool = field(default=False)


@dataclass
class LevelOrders:
    price: int
    orders: list[Order] = field(default_factory=list, compare=False)

    def __bool__(self) -> bool:
        return bool(self.orders)

    @property
    def level(self) -> PriceLevel:
        return PriceLevel(
            price=self.price,
            count=sum(1 for o in self.orders if not o.is_tob),
            size=sum(o.size for o in self.orders),
        )


@dataclass
class PriceLevel:
    price: int
    size: int = 0
    count: int = 0

    def __str__(self) -> str:
        price = self.price / FIXED_PRICE_SCALE
        return f"{self.size:4} @ {price:6.2f} | {self.count:2} order(s)"


@dataclass
class Book:
    orders_by_id: dict[int, Order] = field(default_factory=dict)
    offers: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)
    bids: SortedDict[int, LevelOrders] = field(default_factory=SortedDict)

    def bbo(self) -> tuple[PriceLevel | None, PriceLevel | None]:
        return self.get_bid_level(), self.get_ask_level()

    def get_bid_level(self, idx: int = 0) -> PriceLevel | None:
        if self.bids and len(self.bids) > idx:
            # Reverse for bids to get highest prices first
            return self.bids.peekitem(-(idx + 1))[1].level
        return None

    def get_ask_level(self, idx: int = 0) -> PriceLevel | None:
        if self.offers and len(self.offers) > idx:
            return self.offers.peekitem(idx)[1].level
        return None

    def get_bid_level_by_px(self, px: int) -> PriceLevel | None:
        try:
            return self._get_level(px, "B").level
        except KeyError:
            return None

    def get_ask_level_by_px(self, px: int) -> PriceLevel | None:
        try:
            return self._get_level(px, "A").level
        except KeyError:
            return None

    def get_snapshot(self, level_count: int = 1) -> list[BidAskPair]:
        snapshots = []
        for level in range(level_count):
            ba_pair = BidAskPair()
            bid = self.get_bid_level(level)
            if bid:
                ba_pair.bid_px = bid.price
                ba_pair.bid_sz = bid.size
                ba_pair.bid_ct = bid.count
            ask = self.get_ask_level(level)
            if ask:
                ba_pair.ask_px = ask.price
                ba_pair.ask_sz = ask.size
                ba_pair.ask_ct = ask.count
            snapshots.append(ba_pair)
        return snapshots

    def apply(
        self,
        ts_event: int,
        action: str,
        side: str,
        order_id: int,
        price: int,
        size: int,
        flags: db.RecordFlags,
    ) -> None:
        # Trade or Fill: no change
        if action == "T" or action == "F":
            return
        # Clear book: remove all resting orders
        if action == "R":
            self._clear()
            return
        # side=N is only valid with Trade, Fill, and Clear actions
        assert side == "A" or side == "B"
        # UNDEF_PRICE indicates the book level should be removed
        if price == UNDEF_PRICE and flags & db.RecordFlags.F_TOB:
            self._side_levels(side).clear()
            return
        # Add: insert a new order
        if action == "A":
            self._add(ts_event, side, order_id, price, size, flags)
        # Cancel: partially or fully cancel some size from a resting order
        elif action == "C":
            self._cancel(side, order_id, price, size)
        # Modify: change the price and/or size of a resting order
        elif action == "M":
            self._modify(ts_event, side, order_id, price, size, flags)
        else:
            raise ValueError(f"Unknown {action =}")

    def _clear(self):
        self.orders_by_id.clear()
        self.offers.clear()
        self.bids.clear()

    def _add(
        self,
        ts_event: int,
        side: str,
        order_id: int,
        price: int,
        size: int,
        flags: db.RecordFlags,
    ):
        order = Order(
            order_id,
            side,
            price,
            size,
            ts_event,
            is_tob=bool(flags & db.RecordFlags.F_TOB),
        )
        if order.is_tob:
            levels = self._side_levels(side)
            levels.clear()
            levels[price] = LevelOrders(price=price, orders=[order])
        else:
            level = self._get_or_insert_level(price, side)
            assert order_id not in self.orders_by_id
            self.orders_by_id[order_id] = order
            level.orders.append(order)

    def _cancel(
        self,
        side: str,
        order_id: int,
        price: int,
        size: int,
    ):
        order = self.orders_by_id[order_id]
        level = self._get_level(price, side)
        assert order.size >= size
        order.size -= size
        # If the full size is cancelled, remove the order from the book
        if order.size == 0:
            self.orders_by_id.pop(order_id)
            level.orders.remove(order)
            # If the level is now empty, remove it from the book
            if not level:
                self._remove_level(price, side)

    def _modify(
        self,
        ts_event: int,
        side: str,
        order_id: int,
        price: int,
        size: int,
        flags: db.RecordFlags,
    ):

        order = self.orders_by_id.get(order_id)
        if order is None:
            # If order not found, treat it as an add
            self._add(ts_event, side, order_id, price, size, flags)
            return
        assert order.side == side, f"Order {order} changed side to {side}"
        prev_level = self._get_level(order.price, side)
        if order.price != price:
            prev_level.orders.remove(order)
            if not prev_level:
                self._remove_level(order.price, side)
            level = self._get_or_insert_level(price, side)
            level.orders.append(order)
        else:
            level = prev_level
        # The order loses its priority if the price changes or the size increases
        if order.price != price or order.size < size:
            order.ts_event = ts_event
            level.orders.remove(order)
            level.orders.append(order)
        order.size = size
        order.price = price

    def _side_levels(self, side: str) -> SortedDict:
        if side == "A":
            return self.offers
        if side == "B":
            return self.bids
        raise ValueError(f"Invalid {side =}")

    def _get_level(self, price: int, side: str) -> LevelOrders:
        levels = self._side_levels(side)
        if price not in levels:
            raise KeyError(f"No price level found for {price =} and {side =}")
        return levels[price]

    def _get_or_insert_level(self, price: int, side: str) -> LevelOrders:
        levels = self._side_levels(side)
        if price in levels:
            return levels[price]
        level = LevelOrders(price=price)
        levels[price] = level
        return level

    def _remove_level(self, price: int, side: str):
        levels = self._side_levels(side)
        levels.pop(price)


@dataclass
class Market:
    books: defaultdict[int, defaultdict[int, Book]] = field(
        default_factory=lambda: defaultdict(lambda: defaultdict(Book)),
    )

    def get_books_by_pub(self, instrument_id: int) -> defaultdict[int, Book]:
        return self.books[instrument_id]

    def get_book(self, instrument_id: int, publisher_id: int) -> Book:
        return self.books[instrument_id][publisher_id]

    def bbo(
        self,
        instrument_id: int,
        publisher_id: int,
    ) -> tuple[PriceLevel | None, PriceLevel | None]:
        return self.books[instrument_id][publisher_id].bbo()

    def aggregated_bbo(
        self,
        instrument_id: int,
    ) -> tuple[PriceLevel | None, PriceLevel | None]:
        agg_bbo: list[PriceLevel | None] = [None, None]
        # max for bids, min for asks
        for idx, reducer in [(0, max), (1, min)]:
            all_best = [b.bbo()[idx] for b in self.books[instrument_id].values()]
            all_best = [b for b in all_best if b]
            if not all_best:
                continue
            best_price = reducer(b.price for b in all_best)
            best = [b for b in all_best if b.price == best_price]
            agg_bbo[idx] = PriceLevel(
                price=best_price,
                size=sum(b.size for b in best),
                count=sum(b.count for b in best),
            )
        return tuple(agg_bbo)

    def apply(self, mbo: db.MBOMsg):
        book = self.books[mbo.instrument_id][mbo.publisher_id]
        book.apply(
            ts_event=mbo.ts_event,
            action=mbo.action,
            side=mbo.side,
            order_id=mbo.order_id,
            price=mbo.price,
            size=mbo.size,
            flags=mbo.flags,
        )


client = db.Historical("db-bK9h5rwG4qSWNs4T4M4NCftDHDsYf")

# Next, we will request mbo data starting from the beginning of pre-market trading hours
# or load the file if we've already downloaded it.
data_path = "mes-bento-small.mbo.dbn.zst"
if os.path.exists(data_path):
    data = db.DBNStore.from_file(data_path)
else:
    data = client.timeseries.get_range(
        dataset="GLBX.MDP3",
        symbols="MESU4",
        schema="mbo",
        start="2024-06-20T00:00:00",
        end="2024-06-20T15:00:00",
        path=data_path,
    )

# Then we parse the symbology into a more usable format
#instrument_map = db.common.symbology.InstrumentMap()
#instrument_map.insert_metadata(data.metadata)

# # Finally, we iterate over each book update
# market = Market()
# for mbo in data:
#     # And apply it
#     market.apply(mbo)
#     # If it's the last update in an event, print the state of the aggregated book
#     if mbo.flags & db.RecordFlags.F_LAST:
#         symbol = (
#             instrument_map.resolve(mbo.instrument_id, mbo.pretty_ts_recv.date())
#             or ""
#         )
#         print(f"{symbol} Aggregated BBO | {mbo.pretty_ts_recv}")
#         best_bid, best_offer = market.aggregated_bbo(mbo.instrument_id)
#         print(f"    {best_offer}")
#         print(f"    {best_bid}")

In [35]:
bento = data.to_df()
bento.to_csv('./temp.csv', index=False)

In [88]:
print('make ticks')
bento = pd.read_csv('temp.csv')
primary = pth_sierra.bento_to_primary(bento)
ticks = pth_sierra.primary_to_ticks(primary)
pth_sierra.ticks_to_scid(ticks, './bento-small.scid')

make ticks
 100.00% 144287/144287... rate=3817.49 Hz, eta=0:00:00, total=0:00:38
write bytes to ./bento-small.scid
done.


In [91]:
init = True
init_state = None

In [136]:
for idx, first in enumerate(data):
    if idx == 100:
        break
first

MboMsg { hd: RecordHeader { length: 14, rtype: Mbo, publisher_id: GlbxMdp3Glbx, instrument_id: 7114, ts_event: 1718539204680368535 }, order_id: 6852953636532, price: 5385.750000000, size: 1, flags: SNAPSHOT | BAD_TS_RECV (40), channel_id: 8, action: 'A', side: 'B', ts_recv: 1718841600000000000, ts_in_delta: 0, sequence: 2510 }

In [92]:
price_resolution = 1000000000

snapshot_size = 100
init = True
init_state = None

counter, size = 0, 10000

t0 = time.time()

market = Market()
prev_timestamp = 0
book_states = []

instrument_id = bento.instrument_id.unique()[0]
publisher_id = bento.publisher_id.unique()[0]

for mbo in ProgIter(data):
    market.apply(mbo)
    if mbo.flags & db.RecordFlags.F_LAST:
        diff = mbo.ts_event - prev_timestamp
        prev_timestamp = mbo.ts_event
        # check if within range of Sierra timestamps (if less than 1000 we need to aggregate the state)
        if diff < 1000:
            continue

        book = market.get_book(instrument_id, publisher_id).get_snapshot(snapshot_size)
        rows = [] if not init else [(0, 0, 0.0, "R")]
        for b in book:
            rows.append((b.bid_ct, b.bid_sz, b.bid_px/price_resolution, "B"))
            rows.append((b.ask_ct, b.ask_sz, b.ask_px/price_resolution, "A"))

        book_state = pd.DataFrame(rows)
        book_state.columns = ['orders', 'quantity', 'price', 'side']
        book_state = book_state.sort_values('price', ascending=False).reset_index().drop(columns=['index'])
        sierra_timestamp = (pth_sierra.convert_to_sierra_timestamp(str(pd.to_datetime(mbo.ts_event, unit='ns')))//1000)*1000
        book_state['timestamp'] = sierra_timestamp
        book_state = book_state[['timestamp', 'orders', 'quantity', 'price', 'side']]
        #book_state['price'] = book_state.price.astype(float)
        book_state['command'] = ""

        if init:
            init_state = book_state
            init = False
            continue
        
        book_states.append(book_state)
        # time.sleep(.01)

        counter += 1
        if counter >= size:
            break

 18064/?... rate=832.21 Hz, total=0:00:14time took -> 14.725379943847656


In [93]:
pth_sierra.convert_sierra_timestamp_to_datetime(book_states[9999].timestamp.unique()[0])

'2024-06-20 00:16:23.337000'

In [94]:
init_depth = [
    (r.timestamp, 'R', 0, 0, 0.0, 0)
    if r.side == 'R'
    else (r.timestamp, f'A{r.side}', 0, r.orders, r.price, r.quantity)
    for r in init_state.itertuples()
]
init_depth

[(3928003199616000, 'AA', 0, 4, 5587.75, 41),
 (3928003199616000, 'AA', 0, 3, 5587.5, 39),
 (3928003199616000, 'AA', 0, 3, 5587.25, 40),
 (3928003199616000, 'AA', 0, 7, 5587.0, 60),
 (3928003199616000, 'AA', 0, 4, 5586.75, 45),
 (3928003199616000, 'AA', 0, 4, 5586.5, 41),
 (3928003199616000, 'AA', 0, 5, 5586.25, 47),
 (3928003199616000, 'AA', 0, 6, 5586.0, 42),
 (3928003199616000, 'AA', 0, 4, 5585.75, 40),
 (3928003199616000, 'AA', 0, 5, 5585.5, 41),
 (3928003199616000, 'AA', 0, 4, 5585.25, 41),
 (3928003199616000, 'AA', 0, 19, 5585.0, 83),
 (3928003199616000, 'AA', 0, 5, 5584.75, 43),
 (3928003199616000, 'AA', 0, 4, 5584.5, 40),
 (3928003199616000, 'AA', 0, 4, 5584.25, 49),
 (3928003199616000, 'AA', 0, 13, 5584.0, 60),
 (3928003199616000, 'AA', 0, 8, 5583.75, 52),
 (3928003199616000, 'AA', 0, 5, 5583.5, 44),
 (3928003199616000, 'AA', 0, 6, 5583.25, 51),
 (3928003199616000, 'AA', 0, 18, 5583.0, 91),
 (3928003199616000, 'AA', 0, 25, 5582.75, 77),
 (3928003199616000, 'AA', 0, 10, 5582.5,

In [95]:
bento_depth = [] + init_depth
max_states = 10000 # TODO: change me

for prev, current in ProgIter(zip(book_states[:max_states], book_states[1:max_states])):
    prev = prev[prev.side != 'R']
    current_depth = []
    current_timestamp = current.timestamp.unique()[0]
    prev_prices, current_prices = prev.price.unique(), current.price.unique()
    for r in prev.itertuples():
        if r.price not in current_prices:
            current_depth.append((current_timestamp, f'C{r.side}', 1, 0, r.price, 0))
    for r in current.itertuples():
        if r.price not in prev_prices:
            current_depth.append((current_timestamp, f'A{r.side}', 1, r.orders, r.price, r.quantity))
        else:
            prev_match = prev[prev.price == r.price].iloc[0]
            if prev_match.quantity == r.quantity and prev_match.orders == r.orders:
                continue
            if prev_match.side != r.side:
                current_depth.append((current_timestamp, f'C{prev_match.side}', 1, 0, r.price, 0))
            current_depth.append((current_timestamp, f'M{r.side}', 1, r.orders, r.price, r.quantity))
    bento_depth += current_depth

print(len(bento_depth))

 9999/?... rate=40.05 Hz, total=0:04:11
10295
done.


In [96]:
bento_depth = pd.DataFrame(bento_depth)
bento_depth.columns = ['timestamp', 'command', 'flag', 'orders', 'price', 'quantity']
bento_depth['command'] = bento_depth.command.apply(lambda c: pth_sierra.bento_to_sierra_command_mapping[c])
bento_depth['timestamp'] = bento_depth.timestamp.astype(int)
bento_depth

Unnamed: 0,timestamp,command,flag,orders,price,quantity
0,3928003199616000,3,0,4,5587.75,41
1,3928003199616000,3,0,3,5587.50,39
2,3928003199616000,3,0,3,5587.25,40
3,3928003199616000,3,0,7,5587.00,60
4,3928003199616000,3,0,4,5586.75,45
...,...,...,...,...,...,...
10290,3928004183133000,5,1,4,5564.25,8
10291,3928004183133000,5,1,3,5564.25,7
10292,3928004183236000,5,1,2,5564.25,5
10293,3928004183337000,5,1,3,5564.25,7


In [97]:
pth_sierra.convert_sierra_timestamp_to_datetime(bento_depth.timestamp.unique()[1]), pth_sierra.convert_sierra_timestamp_to_datetime(int(bento_depth.iloc[bento_depth.shape[0] - 1].timestamp))

('2024-06-20 00:00:00.006000', '2024-06-20 00:16:23.337000')

In [98]:
pth_sierra.depth_to_depth_file_for_sierra(bento_depth, 'bento-small.depth')

 100.00% 10295/10295... rate=8554.91 Hz, eta=0:00:00, total=0:00:01
Writing bytes to bento-small.depth
done.
