Skip to content

Commit

Permalink
Add handle_revised_bars option
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Jan 13, 2023
1 parent bd86ae6 commit 860993c
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 2 deletions.
3 changes: 3 additions & 0 deletions nautilus_trader/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,14 @@ class DataEngineConfig(NautilusConfig):
----------
build_time_bars_with_no_updates : bool, default True
If time bar aggregators will build and emit bars with no new market updates.
handle_revised_bars : bool, default True
If revised bars older than the latest should be handled by the engine.
debug : bool, default False
If debug mode is active (will provide extra debug logging).
"""

build_time_bars_with_no_updates: bool = True
handle_revised_bars: bool = True
debug: bool = False


Expand Down
1 change: 1 addition & 0 deletions nautilus_trader/data/engine.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ cdef class DataEngine(Component):
cdef dict _order_book_intervals
cdef dict _bar_aggregators
cdef bint _build_time_bars_with_no_updates
cdef bint _handle_revised_bars

cdef readonly bint debug
"""If debug mode is active (will provide extra debug logging).\n\n:returns: `bool`"""
Expand Down
11 changes: 10 additions & 1 deletion nautilus_trader/data/engine.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ cdef class DataEngine(Component):
# Settings
self.debug = config.debug
self._build_time_bars_with_no_updates = config.build_time_bars_with_no_updates
self._handle_revised_bars = config.handle_revised_bars

# Counters
self.command_count = 0
Expand Down Expand Up @@ -1140,9 +1141,17 @@ cdef class DataEngine(Component):
)

cdef void _handle_bar(self, Bar bar) except *:
cdef BarType bar_type = bar.bar_type

cdef Bar last_bar = None
if not self._handle_revised_bars:
last_bar = self._cache.bar(bar_type)
if last_bar is not None and (bar.ts_event < last_bar.ts_event or bar.ts_init <= last_bar.ts_init):
return

self._cache.add_bar(bar)

self._msgbus.publish_c(topic=f"data.bars.{bar.bar_type}", msg=bar)
self._msgbus.publish_c(topic=f"data.bars.{bar_type}", msg=bar)

cdef void _handle_status_update(self, StatusUpdate data) except *:
self._msgbus.publish_c(topic=f"data.venue.status", msg=data)
Expand Down
68 changes: 67 additions & 1 deletion tests/unit_tests/data/test_data_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def setup(self):
logger=self.logger,
)

config = DataEngineConfig(debug=True)
config = DataEngineConfig(
handle_revised_bars=False,
debug=True,
)
self.data_engine = DataEngine(
msgbus=self.msgbus,
cache=self.cache,
Expand Down Expand Up @@ -1637,6 +1640,69 @@ def test_process_bar_when_subscribers_then_sends_to_registered_handlers(self):
assert handler1 == [bar]
assert handler2 == [bar]

def test_process_bar_when_revised_with_older_timestamp_does_not_cache_or_publish(self):
# Arrange
self.data_engine.register_client(self.binance_client)
self.binance_client.start()

bar_spec = BarSpecification(1000, BarAggregation.TICK, PriceType.MID)
bar_type = BarType(ETHUSDT_BINANCE.id, bar_spec)

handler = []
self.msgbus.subscribe(topic=f"data.bars.{bar_type}", handler=handler.append)

subscribe = Subscribe(
client_id=ClientId(BINANCE.value),
venue=BINANCE,
data_type=DataType(Bar, metadata={"bar_type": bar_type}),
command_id=UUID4(),
ts_init=self.clock.timestamp_ns(),
)

self.data_engine.execute(subscribe)

bar1 = Bar(
bar_type,
Price.from_str("1051.00000"),
Price.from_str("1055.00000"),
Price.from_str("1050.00000"),
Price.from_str("1052.00000"),
Quantity.from_int(100),
1,
1,
)

bar2 = Bar(
bar_type,
Price.from_str("1051.00000"),
Price.from_str("1055.00000"),
Price.from_str("1050.00000"),
Price.from_str("1051.00000"),
Quantity.from_int(100),
0,
1,
)

bar3 = Bar(
bar_type,
Price.from_str("1051.00000"),
Price.from_str("1055.00000"),
Price.from_str("1050.00000"),
Price.from_str("1050.50000"),
Quantity.from_int(100),
0,
0,
)

# Act
self.data_engine.process(bar1)
self.data_engine.process(bar2)
self.data_engine.process(bar3)

# Assert
assert handler == [bar1]
assert self.cache.bar(bar_type) == bar1

def test_request_instrument_reaches_client(self):
# Arrange
self.data_engine.register_client(self.binance_client)
Expand Down

0 comments on commit 860993c

Please sign in to comment.