Skip to content

Commit

Permalink
Merge bec6aec into 1af4115
Browse files Browse the repository at this point in the history
  • Loading branch information
liampauling committed Nov 16, 2020
2 parents 1af4115 + bec6aec commit 07383e5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
28 changes: 13 additions & 15 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

logger = logging.getLogger(__name__)

MAX_CACHE_AGE = 60 * 60 * 8
MAX_CACHE_AGE = 60 * 60 * 8 # 8hrs


class BaseStream:
Expand Down Expand Up @@ -91,14 +91,18 @@ def snap(self, market_ids: list = None) -> list:
if market_ids is None or cache.market_id in market_ids
]

def on_process(self, output: list) -> None:
def on_process(self, caches: list) -> None:
if self.output_queue:
output = [
cache.create_resource(self.unique_id, self._lightweight)
for cache in caches
]
self.output_queue.put(output)

def _on_creation(self) -> None:
logger.info('[%s: %s]: "%s" created' % (self, self.unique_id, self))

def _process(self, data: dict, publish_time: int) -> bool:
def _process(self, data: list, publish_time: int) -> bool:
# Return True if new img within data
pass

Expand Down Expand Up @@ -146,7 +150,7 @@ class MarketStream(BaseStream):
_name = "MarketStream"

def _process(self, data: list, publish_time: int) -> bool:
output_market_book, img = [], False
caches, img = [], False
for market_book in data:
market_id = market_book["id"]
full_image = market_book.get("img", False)
Expand All @@ -173,12 +177,9 @@ def _process(self, data: list, publish_time: int) -> bool:
)

market_book_cache.update_cache(market_book, publish_time)
caches.append(market_book_cache)
self._updates_processed += 1

output_market_book.append(
market_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_market_book)
self.on_process(caches)
return img


Expand All @@ -188,7 +189,7 @@ class OrderStream(BaseStream):
_name = "OrderStream"

def _process(self, data: list, publish_time: int) -> bool:
output_order_book, img = [], False
caches, img = [], False
for order_book in data:
market_id = order_book["id"]
full_image = order_book.get("fullImage", False)
Expand All @@ -206,10 +207,7 @@ def _process(self, data: list, publish_time: int) -> bool:
)

order_book_cache.update_cache(order_book, publish_time)
caches.append(order_book_cache)
self._updates_processed += 1

output_order_book.append(
order_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_order_book)
self.on_process(caches)
return img
1 change: 1 addition & 0 deletions tests/resources/streaming_ocm_EMPTY_IMAGE.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"op":"ocm","id":2,"clk":"AAgAAAAAAAAAAA==","pt":1604418055629,"oc":[{"accountId":1234567,"fullImage":true,"id":"1.161613698"}]}
20 changes: 18 additions & 2 deletions tests/unit/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ def test_process(self):
self.stream._process(None, None)

def test_on_process(self):
self.stream.on_process([1, 2])
self.stream.output_queue.put.assert_called_with([1, 2])
mock_cache_one = mock.Mock()
mock_cache_two = mock.Mock()
self.stream.on_process([mock_cache_one, mock_cache_two])
self.stream.output_queue.put.assert_called_with(
[mock_cache_one.create_resource(), mock_cache_two.create_resource()]
)

def test_update_clk(self):
self.stream._update_clk({"initialClk": 1234})
Expand Down Expand Up @@ -228,6 +232,7 @@ def test_process(self, mock_on_process, mock_cache):
data = sub_image.json()["mc"]
self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))
mock_on_process.assert_called_with([mock_cache()])

@mock.patch("betfairlightweight.streaming.stream.MarketBookCache")
@mock.patch("betfairlightweight.streaming.stream.MarketStream.on_process")
Expand Down Expand Up @@ -284,6 +289,7 @@ def test_process(self, mock_on_process, mock_cache):
self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))
self.assertFalse(self.stream._process(data, 123))
mock_on_process.assert_called_with([mock_cache()])

@mock.patch("betfairlightweight.streaming.stream.OrderBookCache")
@mock.patch("betfairlightweight.streaming.stream.OrderStream.on_process")
Expand All @@ -297,6 +303,16 @@ def test_process_new_image(self, mock_on_process, mock_cache):
self.assertEqual(len(self.stream), len(data))
self.assertTrue(self.stream._process(data, 123))

@mock.patch("betfairlightweight.streaming.stream.OrderBookCache")
@mock.patch("betfairlightweight.streaming.stream.OrderStream.on_process")
def test_process_empty_image(self, mock_on_process, mock_cache):
self.stream._caches = {"1.161613698": mock.Mock()}
sub_image = create_mock_json("tests/resources/streaming_ocm_EMPTY_IMAGE.json")
data = sub_image.json()["oc"]
self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))
self.assertTrue(self.stream._process(data, 123))

def test_str(self):
assert str(self.stream) == "OrderStream"

Expand Down

0 comments on commit 07383e5

Please sign in to comment.