Skip to content

Commit

Permalink
order resubscribe fixed
Browse files Browse the repository at this point in the history
listener/stream bug on unique id not updating
listener/stream clean up
  • Loading branch information
liampauling committed Aug 6, 2017
1 parent 87cc876 commit 0362d3b
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 37 deletions.
2 changes: 1 addition & 1 deletion betfairlightweight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import filters

__title__ = 'betfairlightweight'
__version__ = '1.4.1'
__version__ = '1.4.2'
__author__ = 'Liam Pauling'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
6 changes: 5 additions & 1 deletion betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ def subscribe_to_orders(self, order_filter=None, initial_clk=None, clk=None, con
'heartbeatMs': heartbeat_ms,
'segmentationEnabled': segmentation_enabled,
}
self.listener.register_stream(unique_id, 'orderSubscription')
if initial_clk and clk:
# if resubscribe only update unique_id
self.listener.stream_unique_id = unique_id
else:
self.listener.register_stream(unique_id, 'orderSubscription')
self._send(message)
return unique_id

Expand Down
12 changes: 4 additions & 8 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def snap(self, market_ids=None):

@property
def initial_clk(self):
if self.stream:
if self.stream is not None:
return self.stream._initial_clk

@property
def clk(self):
if self.stream:
if self.stream is not None:
return self.stream._clk

def _add_stream(self, unique_id, operation):
Expand Down Expand Up @@ -131,13 +131,9 @@ def _on_change_message(self, data, unique_id):

def _add_stream(self, unique_id, stream_type):
if stream_type == 'marketSubscription':
return MarketStream(
unique_id, self.output_queue, self.max_latency, self.lightweight
)
return MarketStream(self)
elif stream_type == 'orderSubscription':
return OrderStream(
unique_id, self.output_queue, self.max_latency, self.lightweight
)
return OrderStream(self)

@staticmethod
def _error_handler(data, unique_id):
Expand Down
31 changes: 22 additions & 9 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,8 @@ class BaseStream(object):

_lookup = 'mc'

def __init__(self, unique_id, output_queue, max_latency, lightweight):
self.unique_id = unique_id
self.output_queue = output_queue
self._max_latency = max_latency
self._lightweight = lightweight
def __init__(self, listener):
self._listener = listener

self._initial_clk = None
self._clk = None
Expand All @@ -35,9 +32,8 @@ def on_subscribe(self, data):
self._update_clk(data)
publish_time = data.get('pt')

book_data = data.get(self._lookup)
if book_data:
self._process(book_data, publish_time)
if self._lookup in data:
self._process(data[self._lookup], publish_time)
logger.info('[Stream: %s]: %s %s added' % (self.unique_id, len(self._caches), self._lookup))

def on_heartbeat(self, data):
Expand All @@ -55,7 +51,8 @@ def on_update(self, data):
if latency > self._max_latency:
logger.warning('[Stream: %s]: Latency high: %s' % (self.unique_id, latency))

self._process(data[self._lookup], publish_time)
if self._lookup in data:
self._process(data[self._lookup], publish_time)

def clear_cache(self):
self._caches.clear()
Expand All @@ -81,6 +78,22 @@ def _update_clk(self, data):
self._clk = clk
self.time_updated = datetime.datetime.utcnow()

@property
def unique_id(self):
return self._listener.stream_unique_id

@property
def output_queue(self):
return self._listener.output_queue

@property
def _max_latency(self):
return self._listener.max_latency

@property
def _lightweight(self):
return self._listener.lightweight

@staticmethod
def _calc_latency(publish_time):
return time.time() - publish_time / 1e3
Expand Down
4 changes: 2 additions & 2 deletions tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ def test_on_change_message(self):
def test_add_stream(self, mock_market_stream, mock_order_stream):
new_stream = self.stream_listener._add_stream(1, 'marketSubscription')
assert new_stream == 123
mock_market_stream.assert_called_with(1, self.output_queue, self.max_latency, False)
mock_market_stream.assert_called_with(self.stream_listener)

new_stream = self.stream_listener._add_stream(1, 'orderSubscription')
assert new_stream == 456
mock_order_stream.assert_called_with(1, self.output_queue, self.max_latency, False)
mock_order_stream.assert_called_with(self.stream_listener)

def test_error_handler(self):
mock_response = create_mock_json('tests/resources/streaming_connection.json')
Expand Down
37 changes: 21 additions & 16 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@
class BaseStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = BaseStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.listener.max_latency = 0.5
self.stream = BaseStream(self.listener)

def test_init(self):
assert self.stream.unique_id == self.unique_id
assert self.stream.output_queue == self.output_queue
assert self.stream._max_latency == self.max_latency
assert self.stream._listener == self.listener

assert self.stream._initial_clk is None
assert self.stream._clk is None
Expand Down Expand Up @@ -72,7 +69,7 @@ def test_process(self):

def test_on_process(self):
self.stream.on_process([1, 2])
self.output_queue.put.assert_called_with([1, 2])
self.stream.output_queue.put.assert_called_with([1, 2])

def test_update_clk(self):
self.stream._update_clk({'initialClk': 1234})
Expand All @@ -81,6 +78,18 @@ def test_update_clk(self):
self.stream._update_clk({'clk': 123})
assert self.stream._clk == 123

def test_unique_id(self):
assert self.stream.unique_id == self.listener.stream_unique_id

def test_output_queue(self):
assert self.stream.output_queue == self.listener.output_queue

def test_max_latency(self):
assert self.stream._max_latency == self.listener.max_latency

def test_lightweight(self):
assert self.stream._lightweight == self.listener.lightweight

@mock.patch('time.time', return_value=1485554805.107185)
def test_calc_latency(self, mock_time):
pt = 1485554796455
Expand All @@ -100,10 +109,8 @@ def test_repr(self):
class MarketStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = MarketStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.stream = MarketStream(self.listener)

@mock.patch('betfairlightweight.streaming.stream.MarketStream._process')
@mock.patch('betfairlightweight.streaming.stream.MarketStream._update_clk')
Expand Down Expand Up @@ -131,10 +138,8 @@ def test_repr(self):
class OrderStreamTest(unittest.TestCase):

def setUp(self):
self.output_queue = mock.Mock()
self.unique_id = 1
self.max_latency = 1.5
self.stream = OrderStream(self.unique_id, self.output_queue, self.max_latency, False)
self.listener = mock.Mock()
self.stream = OrderStream(self.listener)

def test_process(self):
pass
Expand Down

0 comments on commit 0362d3b

Please sign in to comment.