Skip to content
Permalink
Browse files

feat: add reconstruct_market method to tardis_client - currently impl…

…emented only for bitmex.

Provides full tick by tick market reconstruction - full order book state at every tick and trades
  • Loading branch information...
Tardis-Thad committed Aug 27, 2019
1 parent 118a3b1 commit 0fdc0bbb068313fb7fc3cf0b2d9df8207e12fc79
@@ -61,6 +61,14 @@ optional = false
python-versions = ">=3.4.1"
version = "4.5.2"

[[package]]
category = "main"
description = "Sorted Containers -- Sorted List, Sorted Dict, Sorted Set"
name = "sortedcontainers"
optional = false
python-versions = "*"
version = "2.1.0"

[[package]]
category = "main"
description = "Yet another URL library"
@@ -74,7 +82,7 @@ idna = ">=2.0"
multidict = ">=4.0"

[metadata]
content-hash = "f48962027316600cbc23f7b5056925c24522d2af72ad15e2ee1c71b6b1114608"
content-hash = "b4c5d285dc3bacbb5704964fe85b6675eed06c8d3af22e8a0b3715eed0f59d52"
python-versions = ">=3.7"

[metadata.hashes]
@@ -85,4 +93,5 @@ attrs = ["69c0dbf2ed392de1cb5ec704444b08a5ef81680a61cb899dc08127123af36a79", "f0
chardet = ["84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae", "fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"]
idna = ["c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", "ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"]
multidict = ["024b8129695a952ebd93373e45b5d341dbb87c17ce49637b34000093f243dd4f", "041e9442b11409be5e4fc8b6a97e4bcead758ab1e11768d1e69160bdde18acc3", "045b4dd0e5f6121e6f314d81759abd2c257db4634260abcfe0d3f7083c4908ef", "047c0a04e382ef8bd74b0de01407e8d8632d7d1b4db6f2561106af812a68741b", "068167c2d7bbeebd359665ac4fff756be5ffac9cda02375b5c5a7c4777038e73", "148ff60e0fffa2f5fad2eb25aae7bef23d8f3b8bdaf947a65cdbe84a978092bc", "1d1c77013a259971a72ddaa83b9f42c80a93ff12df6a4723be99d858fa30bee3", "1d48bc124a6b7a55006d97917f695effa9725d05abe8ee78fd60d6588b8344cd", "31dfa2fc323097f8ad7acd41aa38d7c614dd1960ac6681745b6da124093dc351", "34f82db7f80c49f38b032c5abb605c458bac997a6c3142e0d6c130be6fb2b941", "3d5dd8e5998fb4ace04789d1d008e2bb532de501218519d70bb672c4c5a2fc5d", "4a6ae52bd3ee41ee0f3acf4c60ceb3f44e0e3bc52ab7da1c2b2aa6703363a3d1", "4b02a3b2a2f01d0490dd39321c74273fed0568568ea0e7ea23e02bd1fb10a10b", "4b843f8e1dd6a3195679d9838eb4670222e8b8d01bc36c9894d6c3538316fa0a", "5de53a28f40ef3c4fd57aeab6b590c2c663de87a5af76136ced519923d3efbb3", "61b2b33ede821b94fa99ce0b09c9ece049c7067a33b279f343adfe35108a4ea7", "6a3a9b0f45fd75dc05d8e93dc21b18fc1670135ec9544d1ad4acbcf6b86781d0", "76ad8e4c69dadbb31bad17c16baee61c0d1a4a73bed2590b741b2e1a46d3edd0", "7ba19b777dc00194d1b473180d4ca89a054dd18de27d0ee2e42a103ec9b7d014", "7c1b7eab7a49aa96f3db1f716f0113a8a2e93c7375dd3d5d21c4941f1405c9c5", "7fc0eee3046041387cbace9314926aa48b681202f8897f8bff3809967a049036", "8ccd1c5fff1aa1427100ce188557fc31f1e0a383ad8ec42c559aabd4ff08802d", "8e08dd76de80539d613654915a2f5196dbccc67448df291e69a88712ea21e24a", "c18498c50c59263841862ea0501da9f2b3659c00db54abfbf823a80787fde8ce", "c49db89d602c24928e68c0d510f4fcf8989d77defd01c973d6cbe27e684833b1", "ce20044d0317649ddbb4e54dab3c1bcc7483c78c27d3f58ab3d0c7e6bc60d26a", "d1071414dd06ca2eafa90c85a079169bfeb0e5f57fd0b45d44c092546fcd6fd9", "d3be11ac43ab1a3e979dac80843b42226d5d3cccd3986f2e03152720a4297cd7", "db603a1c235d110c860d5f39988ebc8218ee028f07a7cbc056ba6424372ca31b"]
sortedcontainers = ["974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a", "d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60"]
yarl = ["024ecdc12bc02b321bc66b41327f930d1c2c543fa9a561b39861da9388ba7aa9", "2f3010703295fbe1aec51023740871e64bb9664c789cba5a6bdf404e93f7568f", "3890ab952d508523ef4881457c4099056546593fa05e93da84c7250516e632eb", "3e2724eb9af5dc41648e5bb304fcf4891adc33258c6e14e2a7414ea32541e320", "5badb97dd0abf26623a9982cd448ff12cb39b8e4c94032ccdedf22ce01a64842", "73f447d11b530d860ca1e6b582f947688286ad16ca42256413083d13f260b7a0", "7ab825726f2940c16d92aaec7d204cfc34ac26c0040da727cf8ba87255a33829", "b25de84a8c20540531526dfbb0e2d2b648c13fd5dd126728c496d7c3fea33310", "c6e341f5a6562af74ba55205dbd56d248daf1b5748ec48a0200ba227bb9e33f4", "c9bb7c249c4432cd47e75af3864bc02d26c9594f49c82e2a28624417f0ae63b8", "e060906c0c585565c718d1c3841747b61c5439af2211e185f6739a9412dfbde1"]
@@ -23,6 +23,7 @@ keywords = [
python = ">=3.7"
aiohttp = "^3.5"
aiofiles = "^0.4.0"
sortedcontainers = "^2.1"

[build-system]
requires = ["poetry>=0.12"]
@@ -1,3 +1,4 @@
__version__ = "1.1.0"
from tardis_client.tardis_client import TardisClient
from tardis_client.channel import Channel
from tardis_client.reconstructors.market_reconstructor import MESSAGE_TYPE, BOOK_UPDATE_TYPE
@@ -109,3 +109,7 @@
"bitfinex": BITFINEX_CHANNELS,
"ftx": FTX_CHANNELS,
}


ASK = "ask"
BID = "bid"
@@ -0,0 +1,9 @@
from typing import List
from tardis_client.reconstructors.bitmex import BitmexMarketReconstructor
from tardis_client.reconstructors.market_reconstructor import MarketReconstructor

reconstructors = {"bitmex": BitmexMarketReconstructor}


def get_market_reconstructor(exchange: str, symbols: List[str]) -> MarketReconstructor:
return reconstructors[exchange](symbols)
@@ -0,0 +1,106 @@
from typing import Optional, Any
from datetime import datetime

from tardis_client.consts import ASK, BID
from tardis_client.channel import Channel

from tardis_client.reconstructors.market_reconstructor import (
MarketReconstructor,
MarketResponse,
MESSAGE_TYPE,
Trade,
BookUpdate,
BOOK_UPDATE_TYPE,
)


class BitmexMarketReconstructor(MarketReconstructor):
def __init__(self, symbols):
super().__init__(symbols)
self._id_to_price_map = {}
self._action_to_update_type_map = {
"partial": BOOK_UPDATE_TYPE.NEW,
"insert": BOOK_UPDATE_TYPE.NEW,
"delete": BOOK_UPDATE_TYPE.DELETE,
"update": BOOK_UPDATE_TYPE.CHANGE,
}

def get_filters(self):
return [Channel("orderBookL2", self._symbols), Channel("trade", self._symbols)]

def reconstruct(self, local_timestamp, message) -> Optional[MarketResponse]:
table = message["table"]
action = message["action"]
is_trade = table == "trade"
is_order_book_delta = table == "orderBookL2"
is_partial = action == "partial"

if is_trade == False and is_order_book_delta == False:
return
# ignore trade partials, we could end up with duplicated trades otherwise
if is_trade and is_partial:
return

message_type = MESSAGE_TYPE.TRADES if is_trade else MESSAGE_TYPE.BOOK_DELTA
items = []

for item in message["data"]:
symbol = item["symbol"]
# ignore data items for symbols we're not requested for
# that could happen for example for partial messages that contained multiple symbols in single message
if symbol not in self._symbols:
continue

if is_trade:
items.append(self._map_trade(item))
else:

if action == "partial" or action == "insert":
# bitmex update messages do not contain price only id, so we need to keep state about mapping from id to price level locally
self._id_to_price_map[item["id"]] = item["price"]

price_level = item["price"] if "price" in item else self._id_to_price_map[item["id"]]
# ignore book update when we don't know price level for it, in theory it could happen
# when there was WS reconnection and after reconnection there was an immediate book update before partial message with book snapshot
# and that update didn't have matching price level, as we could miss insert message durring reconnection
# slim chances but in theory possible

if price_level is None:
continue

book_update = self._map_order_book_update(item, price_level, self._action_to_update_type_map[action])
items.append(book_update)

self._apply_book_update_to_order_book(book_update)

# return order book state for symbol that is in first normalized item - BitMEX messages are uniform (same symbol for all items in the message)
symbol = items[0].symbol
order_book_state = self._books_views[symbol]

return MarketResponse(
local_timestamp=local_timestamp, message_type=message_type, message=items, order_book_state=order_book_state
)

def _map_trade(self, item: Any) -> Trade:
return Trade(
symbol=item["symbol"],
side="buy" if item["side"] == "Buy" else "sell",
amount=item["size"],
price=item["price"],
timestamp=datetime.fromisoformat(item["timestamp"][:-1]),
)

def _map_order_book_update(self, item: Any, price_level: float, update_type: BOOK_UPDATE_TYPE) -> BookUpdate:
side = BID if item["side"] == "Buy" else ASK
amount = 0 if update_type == BOOK_UPDATE_TYPE.DELETE else item["size"]

return BookUpdate(symbol=item["symbol"], side=side, update_type=update_type, price_level=price_level, amount=amount)

def _apply_book_update_to_order_book(self, book_update: BookUpdate):
matching_book = self._books[book_update.symbol]

if book_update.update_type == BOOK_UPDATE_TYPE.DELETE:
del matching_book[book_update.side][book_update.price_level]
else:
matching_book[book_update.side][book_update.price_level] = book_update.amount

@@ -0,0 +1,59 @@
from typing import List, NamedTuple, Any, Optional, Union
from sortedcontainers import SortedDict
from enum import Enum
from decimal import Decimal
from datetime import datetime
from tardis_client.channel import Channel
from tardis_client.consts import ASK, BID


class BOOK_UPDATE_TYPE(Enum):
NEW = 1
CHANGE = 2
DELETE = 3


class MESSAGE_TYPE(Enum):
BOOK_DELTA = 1
TRADES = 2


class Trade(NamedTuple):
symbol: str
side: str
amount: float
price: float
timestamp: datetime


class BookUpdate(NamedTuple):
symbol: str
side: str
update_type: BOOK_UPDATE_TYPE
price_level: float
amount: float


class MarketResponse(NamedTuple):
local_timestamp: datetime
message_type: MESSAGE_TYPE
message: Union[List[Trade], List[BookUpdate]]
order_book_state: Any


class MarketReconstructor:
def __init__(self, symbols):
self._books = {}
self._books_views = {}
self._symbols = symbols

for symbol in symbols:
self._books[symbol] = {ASK: SortedDict(), BID: SortedDict()}

self._books_views[symbol] = {ASK: self._books[symbol][ASK].items(), BID: self._books[symbol][BID].items()}

def get_filters(self) -> List[Channel]:
raise NotImplementedError

def reconstruct(local_timestamp, message) -> Optional[MarketResponse]:
raise NotImplementedError
@@ -7,14 +7,16 @@
import shutil

from time import time
from typing import List
from typing import List, AsyncIterable
from collections import namedtuple
from datetime import datetime, timedelta

from tardis_client.consts import EXCHANGES, EXCHANGE_CHANNELS_INFO
from tardis_client.handy import get_slice_cache_path
from tardis_client.channel import Channel
from tardis_client.data_downloader import fetch_data_to_replay
from tardis_client.reconstructors import get_market_reconstructor
from tardis_client.reconstructors.market_reconstructor import MarketResponse

Response = namedtuple("Response", ["local_timestamp", "message"])

@@ -130,6 +132,17 @@ def __init__(self, endpoint="https://tardis.dev/api", cache_dir=DEFAULT_CACHE_DI
end_time - start_time,
)

async def reconstruct_market(self, exchange: str, from_date: str, to_date: str, symbols: List[str])-> AsyncIterable[MarketResponse]:
market_reconstructor = get_market_reconstructor(exchange, symbols)
filters = market_reconstructor.get_filters()

self._validate_payload(exchange, from_date, to_date, filters)

async for local_timestamp, message in self.replay(exchange, from_date, to_date, filters):
market_response = market_reconstructor.reconstruct(local_timestamp, message)
if market_response is not None:
yield market_response

def clear_cache(self):
shutil.rmtree(self.cache_dir)

0 comments on commit 0fdc0bb

Please sign in to comment.
You can’t perform that action at this time.