Skip to content

Commit

Permalink
clear stale cache logic moved to function
Browse files Browse the repository at this point in the history
img var added so stale cache logic is only called when a new market is added
  • Loading branch information
liampauling committed Oct 12, 2020
1 parent 32a5f6c commit 3b2bf77
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 49 deletions.
50 changes: 29 additions & 21 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,30 @@ def on_update(self, data: dict) -> None:
)

if self._lookup in data:
self._process(data[self._lookup], publish_time)
img = self._process(data[self._lookup], publish_time)

# remove stale cache data to prevent memory leaks (only live)
if self.unique_id != 0:
_to_remove = []
for cache in self._caches.values():
if (
cache.closed
and (publish_time - cache.publish_time) / 1e3 > MAX_CACHE_AGE
):
_to_remove.append(cache.market_id)
for market_id in _to_remove:
del self._caches[market_id]
logger.info(
"[%s: %s]: %s removed, %s markets in cache"
% (self, self.unique_id, market_id, len(self._caches))
)
# remove stale cache data on any new img to prevent memory leaks (only live)
if img and self.unique_id != 0:
self.clear_stale_cache(publish_time)

def clear_cache(self) -> None:
self._caches.clear()

def clear_stale_cache(self, publish_time: int) -> None:
_to_remove = []
for cache in self._caches.values():
if (
cache.closed
and (publish_time - cache.publish_time) / 1e3 > MAX_CACHE_AGE
):
_to_remove.append(cache.market_id)
for market_id in _to_remove:
del self._caches[market_id]
logger.info(
"[%s: %s]: %s removed, %s markets in cache"
% (self, self.unique_id, market_id, len(self._caches))
)

def snap(self, market_ids: list = None) -> list:
return [
cache.create_resource(self.unique_id, self._lightweight, snap=True)
Expand All @@ -95,7 +98,8 @@ def on_process(self, output: list) -> None:
def _on_creation(self) -> None:
logger.info('[%s: %s]: "%s" created' % (self, self.unique_id, self))

def _process(self, data: dict, publish_time: int) -> None:
def _process(self, data: dict, publish_time: int) -> bool:
# Return True if new img within data
pass

def _update_clk(self, data: dict) -> None:
Expand Down Expand Up @@ -141,15 +145,16 @@ class MarketStream(BaseStream):
_lookup = "mc"
_name = "MarketStream"

def _process(self, data: list, publish_time: int) -> None:
output_market_book = []
def _process(self, data: list, publish_time: int) -> bool:
output_market_book, img = [], False
for market_book in data:
market_id = market_book["id"]
market_book_cache = self._caches.get(market_id)

if (
market_book.get("img") or market_book_cache is None
): # historic data does not contain img
img = True
if "marketDefinition" not in market_book:
logger.error(
"[%s: %s]: Unable to add %s to cache due to marketDefinition "
Expand All @@ -173,20 +178,22 @@ def _process(self, data: list, publish_time: int) -> None:
market_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_market_book)
return img


class OrderStream(BaseStream):

_lookup = "oc"
_name = "OrderStream"

def _process(self, data: list, publish_time: int) -> None:
output_order_book = []
def _process(self, data: list, publish_time: int) -> bool:
output_order_book, img = [], False
for order_book in data:
market_id = order_book["id"]
order_book_cache = self._caches.get(market_id)

if order_book_cache is None:
img = True
order_book_cache = OrderBookCache(
publish_time=publish_time, **order_book
)
Expand All @@ -203,3 +210,4 @@ def _process(self, data: list, publish_time: int) -> None:
order_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_order_book)
return img
84 changes: 56 additions & 28 deletions tests/unit/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ def test_on_resubscribe(self, mock_on_update):
self.stream.on_resubscribe({})
mock_on_update.assert_called_once_with({})

@mock.patch("betfairlightweight.streaming.stream.BaseStream._process")
@mock.patch("betfairlightweight.streaming.stream.BaseStream.clear_stale_cache")
@mock.patch(
"betfairlightweight.streaming.stream.BaseStream._process", return_value=False
)
@mock.patch(
"betfairlightweight.streaming.stream.BaseStream._calc_latency", return_value=0.1
)
@mock.patch("betfairlightweight.streaming.stream.BaseStream._update_clk")
def test_on_update(self, mock_update_clk, mock_calc_latency, mock_process):
def test_on_update(
self, mock_update_clk, mock_calc_latency, mock_process, mock_clear_stale_cache
):
mock_response = create_mock_json("tests/resources/streaming_mcm_update.json")
self.stream.on_update(mock_response.json())

Expand All @@ -64,6 +69,31 @@ def test_on_update(self, mock_update_clk, mock_calc_latency, mock_process):

mock_calc_latency.return_value = 10
self.stream.on_update(mock_response.json())
mock_clear_stale_cache.assert_not_called()

@mock.patch("betfairlightweight.streaming.stream.BaseStream.clear_stale_cache")
@mock.patch(
"betfairlightweight.streaming.stream.BaseStream._process", return_value=True
)
@mock.patch(
"betfairlightweight.streaming.stream.BaseStream._calc_latency", return_value=0.1
)
@mock.patch("betfairlightweight.streaming.stream.BaseStream._update_clk")
def test_on_update_clear_cache(
self, mock_update_clk, mock_calc_latency, mock_process, mock_clear_stale_cache
):
mock_response = create_mock_json("tests/resources/streaming_mcm_update.json")
self.stream.on_update(mock_response.json())

mock_update_clk.assert_called_with(mock_response.json())
mock_calc_latency.assert_called_with(mock_response.json().get("pt"))
mock_process.assert_called_with(
mock_response.json().get("mc"), mock_response.json().get("pt")
)

mock_calc_latency.return_value = 10
self.stream.on_update(mock_response.json())
mock_clear_stale_cache.assert_called_with(mock_response.json().get("pt"))

@mock.patch("betfairlightweight.streaming.stream.BaseStream._process")
@mock.patch(
Expand All @@ -81,32 +111,22 @@ def test_on_update_no_latency(
mock_calc_latency.assert_called_with(data.get("pt"))
mock_process.assert_called_with(data.get("mc"), data.get("pt"))

@mock.patch("betfairlightweight.streaming.stream.BaseStream._process")
@mock.patch(
"betfairlightweight.streaming.stream.BaseStream._calc_latency", return_value=0.1
)
@mock.patch("betfairlightweight.streaming.stream.BaseStream._update_clk")
def test_on_update_stale_cache(
self, mock_update_clk, mock_calc_latency, mock_process
):
def test_clear_cache(self):
self.stream._caches = {1: "abc"}
self.stream.clear_cache()

assert self.stream._caches == {}

def test_clear_stale_cache(self):
market_a = mock.Mock(market_id="1.23", publish_time=123, closed=False)
market_b = mock.Mock(market_id="4.56", publish_time=123, closed=True)
self.stream._caches = {
"1.23": market_a,
"4.56": market_b,
}
mock_response = {"pt": 123456789}
self.stream.on_update(mock_response)
mock_update_clk.assert_called_with(mock_response)
mock_calc_latency.assert_called_with(mock_response.get("pt"))
self.stream.clear_stale_cache(123456789)
self.assertEqual(self.stream._caches, {"1.23": market_a})

def test_clear_cache(self):
self.stream._caches = {1: "abc"}
self.stream.clear_cache()

assert self.stream._caches == {}

def test_snap(self):
market_books = self.stream.snap()
assert market_books == []
Expand Down Expand Up @@ -198,7 +218,6 @@ def test_init(self):
def test_on_subscribe(self, mock_update_clk, mock_process):
self.stream.on_subscribe({})
mock_update_clk.assert_called_once_with({})

self.stream.on_subscribe({"mc": {123}})
mock_process.assert_called_once_with({123}, None)

Expand All @@ -207,8 +226,7 @@ def test_on_subscribe(self, mock_update_clk, mock_process):
def test_process(self, mock_on_process, mock_cache):
sub_image = create_mock_json("tests/resources/streaming_mcm_SUB_IMAGE.json")
data = sub_image.json()["mc"]
self.stream._process(data, 123)

self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))

@mock.patch("betfairlightweight.streaming.stream.MarketBookCache")
Expand All @@ -218,11 +236,22 @@ def test_process_no_market_definition(self, mock_on_process, mock_cache):
"tests/resources/streaming_mcm_SUB_IMAGE_no_market_def.json"
)
data = sub_image_error.json()["mc"]
self.stream._process(data, 123)

self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(data), 137)
self.assertEqual(len(self.stream), 135) # two markets missing marketDef

@mock.patch("betfairlightweight.streaming.stream.MarketBookCache")
@mock.patch("betfairlightweight.streaming.stream.MarketStream.on_process")
def test_process_no_img(self, mock_on_process, mock_cache):
sub_image = create_mock_json("tests/resources/streaming_mcm_SUB_IMAGE.json")
data = sub_image.json()["mc"]
data[0]["img"] = False
mock_market_cache = mock_cache()
self.stream._caches = {data[0]["id"]: mock_market_cache}
self.assertFalse(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))
mock_market_cache.update_cache.assert_called_with(data[0], 123)

def test_str(self):
assert str(self.stream) == "MarketStream"

Expand All @@ -244,7 +273,6 @@ def test_init(self):
def test_on_subscribe(self, mock_update_clk, mock_process):
self.stream.on_subscribe({})
mock_update_clk.assert_called_once_with({})

self.stream.on_subscribe({"oc": {123}})
mock_process.assert_called_once_with({123}, None)

Expand All @@ -253,9 +281,9 @@ def test_on_subscribe(self, mock_update_clk, mock_process):
def test_process(self, mock_on_process, mock_cache):
sub_image = create_mock_json("tests/resources/streaming_ocm_FULL_IMAGE.json")
data = sub_image.json()["oc"]
self.stream._process(data, 123)

self.assertTrue(self.stream._process(data, 123))
self.assertEqual(len(self.stream), len(data))
self.assertFalse(self.stream._process(data, 123))

def test_str(self):
assert str(self.stream) == "OrderStream"
Expand Down

0 comments on commit 3b2bf77

Please sign in to comment.