Skip to content

Commit

Permalink
fix: data util wasn't properly process the data file for Rust impleme…
Browse files Browse the repository at this point in the history
…ntation.
  • Loading branch information
nkaz001 committed Apr 24, 2024
1 parent a6009fe commit c4cef91
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 30 deletions.
46 changes: 34 additions & 12 deletions hftbacktest/data/utils/binancefutures.py
Expand Up @@ -12,6 +12,7 @@
DEPTH_CLEAR_EVENT,
DEPTH_SNAPSHOT_EVENT,
TRADE_EVENT,
COL_EVENT,
COL_EXCH_TIMESTAMP,
COL_LOCAL_TIMESTAMP,
correct_local_timestamp
Expand All @@ -24,7 +25,9 @@ def convert(
opt: Literal['', 'm', 't', 'mt'] = '',
base_latency: float = 0,
compress: bool = False,
structured_array: bool = False
structured_array: bool = False,
timestamp_unit: Literal['us', 'ns'] = 'us',
combined_stream: bool = True
) -> NDArray:
r"""
Converts raw Binance Futures feed stream file into a format compatible with HftBacktest.
Expand Down Expand Up @@ -65,19 +68,37 @@ def convert(
See :func:`.correct_local_timestamp`.
compress: If this is set to True, the output file will be compressed.
structured_array: If this is set to True, the output is converted into the new format(currently only Rust impl).
timestamp_unit: The timestamp unit for exchange timestamp to be converted in. Binance provides timestamps in
milliseconds. Both local timestamp and exchange timestamp should be in the same unit.
combined_stream: Raw stream type.
combined stream:
{"stream":"solusdt@bookTicker","data":{"e":"bookTicker","u":4456408609867,"s":"SOLUSDT","b":"142.4440","B":"50","a":"142.4450","A":"3","T":1713571200009,"E":1713571200010}}
regular stream:
{"e":"bookTicker","u":4456408609867,"s":"SOLUSDT","b":"142.4440","B":"50","a":"142.4450","A":"3","T":1713571200009,"E":1713571200010}
Returns:
Converted data compatible with HftBacktest.
"""
if timestamp_unit == 'us':
timestamp_slice = 16
timestamp_mul = 1000
elif timestamp_unit == 'ns':
timestamp_slice = 19
timestamp_mul = 1000000
else:
raise ValueError
rows = []
with gzip.open(input_filename, 'r') as f:
while True:
line = f.readline()
if not line:
break
local_timestamp = int(line[:16])
message = json.loads(line[17:])
data = message.get('data')
local_timestamp = int(line[:timestamp_slice])
message = json.loads(line[timestamp_slice + 1:])
if combined_stream:
data = message.get('data')
else:
data = message
if data is not None:
evt = data['e']
if evt == 'trade':
Expand All @@ -88,14 +109,14 @@ def convert(
price = data['p']
qty = data['q']
side = -1 if data['m'] else 1 # trade initiator's side
exch_timestamp = int(transaction_time) * 1000
exch_timestamp = int(transaction_time) * timestamp_mul
rows.append([TRADE_EVENT, exch_timestamp, local_timestamp, side, float(price), float(qty)])
elif evt == 'depthUpdate':
# event_time = data['E']
transaction_time = data['T']
bids = data['b']
asks = data['a']
exch_timestamp = int(transaction_time) * 1000
exch_timestamp = int(transaction_time) * timestamp_mul
rows += [[DEPTH_EVENT, exch_timestamp, local_timestamp, 1, float(bid[0]), float(bid[1])] for bid in bids]
rows += [[DEPTH_EVENT, exch_timestamp, local_timestamp, -1, float(ask[0]), float(ask[1])] for ask in asks]
elif evt == 'markPriceUpdate' and 'm' in opt:
Expand All @@ -115,7 +136,7 @@ def convert(
bid_qty = data['B']
ask_price = data['a']
ask_qty = data['A']
exch_timestamp = int(transaction_time) * 1000
exch_timestamp = int(transaction_time) * timestamp_mul
rows.append([103, exch_timestamp, local_timestamp, 1, float(bid_price), float(bid_qty)])
rows.append([104, exch_timestamp, local_timestamp, -1, float(ask_price), float(ask_qty)])
else:
Expand All @@ -124,7 +145,7 @@ def convert(
transaction_time = message['T']
bids = message['bids']
asks = message['asks']
exch_timestamp = int(transaction_time) * 1000
exch_timestamp = int(transaction_time) * timestamp_mul
if len(bids) > 0:
bid_clear_upto = float(bids[-1][0])
# clears the existing market depth upto the prices in the snapshot.
Expand All @@ -151,10 +172,11 @@ def convert(

data = correct_event_order(sorted_exch_ts, sorted_local_ts, structured_array)

# Validate again.
num_corr = validate_data(data)
if num_corr < 0:
raise ValueError
if not structured_array:
# Validate again.
num_corr = validate_data(data)
if num_corr < 0:
raise ValueError

if structured_array:
data = convert_to_struct_arr(data)
Expand Down
49 changes: 31 additions & 18 deletions hftbacktest/data/utils/tardis.py
Expand Up @@ -25,11 +25,16 @@ def convert(
base_latency: float = 0,
snapshot_mode: Literal['process', 'ignore_sod', 'ignore'] = 'process',
compress: bool = False,
structured_array: bool = False
structured_array: bool = False,
timestamp_unit: Literal['us', 'ns'] = 'us'
) -> NDArray:
r"""
Converts Tardis.dev data files into a format compatible with HftBacktest.
For Tardis's Binance Futures feed data, they use the 'E' event timestamp, representing the sending time, rather
than the 'T' transaction time, indicating when the matching occurs. So the latency is slightly less than it actually
is.
Args:
input_files: Input filenames for both incremental book and trades files,
e.g. ['incremental_book.csv', 'trades.csv'].
Expand All @@ -48,10 +53,17 @@ def convert(
- Otherwise, all snapshot events will be processed.
compress: If this is set to True, the output file will be compressed.
structured_array: If this is set to True, the output is converted into the new format(currently only Rust impl).
timestamp_unit: The timestamp unit for timestamp to be converted in. Tardis provides timestamps in microseconds.
Returns:
Converted data compatible with HftBacktest.
"""
if timestamp_unit == 'us':
timestamp_mul = 1
elif timestamp_unit == 'ns':
timestamp_mul = 1000
else:
raise ValueError

TRADE = 0
DEPTH = 1

Expand Down Expand Up @@ -103,8 +115,8 @@ def convert(
# Insert TRADE_EVENT
tmp[row_num] = [
TRADE_EVENT,
int(cols[2]),
int(cols[3]),
int(cols[2]) * timestamp_mul,
int(cols[3]) * timestamp_mul,
1 if cols[5] == 'buy' else -1,
float(cols[6]),
float(cols[7])
Expand All @@ -124,8 +136,8 @@ def convert(
if cols[5] == 'bid':
ss_bid[ss_bid_rn] = [
DEPTH_SNAPSHOT_EVENT,
int(cols[2]),
int(cols[3]),
int(cols[2]) * timestamp_mul,
int(cols[3]) * timestamp_mul,
1,
float(cols[6]),
float(cols[7])
Expand All @@ -134,8 +146,8 @@ def convert(
else:
ss_ask[ss_ask_rn] = [
DEPTH_SNAPSHOT_EVENT,
int(cols[2]),
int(cols[3]),
int(cols[2]) * timestamp_mul,
int(cols[3]) * timestamp_mul,
-1,
float(cols[6]),
float(cols[7])
Expand All @@ -153,8 +165,8 @@ def convert(
# Clear the bid market depth within the snapshot bid range.
tmp[row_num] = [
DEPTH_CLEAR_EVENT,
ss_bid[0, 1],
ss_bid[0, 2],
ss_bid[0, 1] * timestamp_mul,
ss_bid[0, 2] * timestamp_mul,
1,
ss_bid[-1, 4],
0
Expand All @@ -170,8 +182,8 @@ def convert(
# Clear the ask market depth within the snapshot ask range.
tmp[row_num] = [
DEPTH_CLEAR_EVENT,
ss_ask[0, 1],
ss_ask[0, 2],
ss_ask[0, 1] * timestamp_mul,
ss_ask[0, 2] * timestamp_mul,
-1,
ss_ask[-1, 4],
0
Expand All @@ -184,8 +196,8 @@ def convert(
# Insert DEPTH_EVENT
tmp[row_num] = [
DEPTH_EVENT,
int(cols[2]),
int(cols[3]),
int(cols[2]) * timestamp_mul,
int(cols[3]) * timestamp_mul,
1 if cols[5] == 'bid' else -1,
float(cols[6]),
float(cols[7])
Expand All @@ -205,10 +217,11 @@ def convert(

data = correct_event_order(sorted_exch_ts, sorted_local_ts, structured_array)

# Validate again.
num_corr = validate_data(data)
if num_corr < 0:
raise ValueError
if not structured_array:
# Validate again.
num_corr = validate_data(data)
if num_corr < 0:
raise ValueError

if structured_array:
data = convert_to_struct_arr(data)
Expand Down

0 comments on commit c4cef91

Please sign in to comment.