Skip to content

Commit

Permalink
Merge pull request #323 from liampauling/release/2.7.0
Browse files Browse the repository at this point in the history
Release/2.7.0
  • Loading branch information
liampauling committed Jul 27, 2020
2 parents 737e17f + c39d91e commit cb47fcd
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 64 deletions.
12 changes: 12 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
Release History
---------------

2.7.0 (2020-07-27)
+++++++++++++++++++

**Improvements**

- #308 remove directory warnings / handling (breaking change)
- #318 include streaming_update in generator

**Bug Fixes**

- #320 generator reuse fix

2.6.0 (2020-07-09)
+++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "betfairlightweight"
__description__ = "Lightweight python wrapper for Betfair API-NG"
__url__ = "https://github.com/liampauling/betfair"
__version__ = "2.6.0"
__version__ = "2.7.0"
__author__ = "Liam Pauling"
__license__ = "MIT"
18 changes: 2 additions & 16 deletions betfairlightweight/endpoints/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def create_stream(

@staticmethod
def create_historical_stream(
file_path: str = None, listener: BaseListener = None, **kwargs
file_path: str = None, listener: BaseListener = None
) -> HistoricalStream:
"""
Uses streaming listener/cache to parse betfair
Expand All @@ -65,19 +65,12 @@ def create_historical_stream(
:rtype: HistoricalStream
"""
if file_path is None and kwargs.get("directory"):
warnings.warn(
"directory is deprecated; use file_path", DeprecationWarning,
)
file_path = kwargs.get("directory")

listener = listener if listener else BaseListener()
listener.register_stream(0, "marketSubscription")
return HistoricalStream(file_path, listener)

@staticmethod
def create_historical_generator_stream(
file_path: str = None, listener: BaseListener = None, **kwargs
file_path: str = None, listener: BaseListener = None
) -> HistoricalGeneratorStream:
"""
Uses generator listener/cache to parse betfair
Expand All @@ -89,12 +82,5 @@ def create_historical_generator_stream(
:rtype: HistoricalGeneratorStream
"""
if file_path is None and kwargs.get("directory"):
warnings.warn(
"directory is deprecated; use file_path", DeprecationWarning,
)
file_path = kwargs.get("directory")

listener = listener if listener else StreamListener()
listener.register_stream(0, "marketSubscription")
return HistoricalGeneratorStream(file_path, listener)
2 changes: 2 additions & 0 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def stop(self) -> None:
self._running = False

def _read_loop(self) -> None:
self.listener.register_stream(0, "marketSubscription")
with open(self.file_path, "r") as f:
for update in f:
if self.listener.on_data(update) is False:
Expand All @@ -321,6 +322,7 @@ def get_generator(self):

def _read_loop(self):
self._running = True
self.listener.register_stream(0, "marketSubscription")
with open(self.file_path, "r") as f:
for update in f:
if self.listener.on_data(update) is False:
Expand Down
12 changes: 8 additions & 4 deletions betfairlightweight/streaming/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(self, **kwargs):
raise CacheError('"EX_MARKET_DEF" must be requested to use cache')
self.market_definition = kwargs["marketDefinition"]

self.streaming_update = None
self.runners = []
self.runner_dict = {}
self.market_definition_runner_dict = {}
Expand All @@ -170,6 +171,7 @@ def update_cache(self, market_change: dict, publish_time: int) -> None:
self.strip_datetime(publish_time) or self._datetime_updated
)
self.publish_time = publish_time
self.streaming_update = market_change

if "marketDefinition" in market_change:
self.market_definition = market_change["marketDefinition"]
Expand Down Expand Up @@ -213,11 +215,11 @@ def update_cache(self, market_change: dict, publish_time: int) -> None:
self._update_runner_dict()

def create_resource(
self, unique_id: int, streaming_update: dict, lightweight: bool
self, unique_id: int, lightweight: bool
) -> Union[dict, MarketBook]:
data = self.serialise
data["streaming_unique_id"] = unique_id
data["streaming_update"] = streaming_update
data["streaming_update"] = self.streaming_update
if lightweight:
return data
else:
Expand Down Expand Up @@ -408,11 +410,13 @@ def __init__(self, **kwargs):
self.publish_time = kwargs.get("publish_time")
self.market_id = kwargs.get("id")
self.closed = kwargs.get("closed")
self.streaming_update = None
self.runners = []

def update_cache(self, order_book: dict, publish_time: int) -> None:
self._datetime_updated = self.strip_datetime(publish_time)
self.publish_time = publish_time
self.streaming_update = order_book
if "closed" in order_book:
self.closed = order_book["closed"]

Expand All @@ -431,11 +435,11 @@ def update_cache(self, order_book: dict, publish_time: int) -> None:
self.runners.append(OrderBookRunner(**order_changes))

def create_resource(
self, unique_id: int, streaming_update: dict, lightweight: bool
self, unique_id: int, lightweight: bool
) -> Union[dict, CurrentOrders]:
data = self.serialise
data["streaming_unique_id"] = unique_id
data["streaming_update"] = streaming_update
data["streaming_update"] = self.streaming_update
if lightweight:
return data
else:
Expand Down
10 changes: 3 additions & 7 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def clear_cache(self) -> None:

def snap(self, market_ids: list = None) -> list:
return [
cache.create_resource(self.unique_id, None, self._lightweight)
cache.create_resource(self.unique_id, self._lightweight)
for cache in list(self._caches.values())
if market_ids is None or cache.market_id in market_ids
]
Expand Down Expand Up @@ -149,9 +149,7 @@ def _process(self, data: list, publish_time: int) -> None:
self._updates_processed += 1

output_market_book.append(
market_book_cache.create_resource(
self.unique_id, market_book, self._lightweight
)
market_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_market_book)

Expand Down Expand Up @@ -180,8 +178,6 @@ def _process(self, data: list, publish_time: int) -> None:
self._updates_processed += 1

output_order_book.append(
order_book_cache.create_resource(
self.unique_id, order_book, self._lightweight
)
order_book_cache.create_resource(self.unique_id, self._lightweight)
)
self.on_process(output_order_book)
2 changes: 2 additions & 0 deletions tests/unit/test_betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def test__read_loop(self, mock_stop):
self.listener.on_data.snap()
mock_stop.assert_called_with()
self.assertTrue(self.stream._running)
self.listener.register_stream.assert_called_with(0, "marketSubscription")


class HistoricalGeneratorStreamTest(unittest.TestCase):
Expand Down Expand Up @@ -401,3 +402,4 @@ def test__read_loop(self, mock_stop):
self.listener.on_data.snap()
mock_stop.assert_called_with()
self.assertTrue(self.stream._running)
self.listener.register_stream.assert_called_with(0, "marketSubscription")
18 changes: 12 additions & 6 deletions tests/unit/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def test_update_cache_md(self, mock_strip_datetime):
assert self.market_book_cache.market_definition == book.get(
"marketDefinition"
)
self.assertEqual(self.market_book_cache.streaming_update, book)

@mock.patch("betfairlightweight.streaming.cache.MarketBookCache.strip_datetime")
def test_update_cache_tv(self, mock_strip_datetime):
Expand All @@ -153,6 +154,7 @@ def test_update_cache_tv(self, mock_strip_datetime):
self.market_book_cache.update_cache(book, publish_time)
mock_strip_datetime.assert_called_with(publish_time)
assert self.market_book_cache.total_matched == book.get("tv")
self.assertEqual(self.market_book_cache.streaming_update, book)

# @mock.patch('betfairlightweight.resources.streamingresources.MarketBookCache.strip_datetime')
# def test_update_cache_rc(self, mock_strip_datetime):
Expand All @@ -177,14 +179,14 @@ def test_create_resource(
self, mock_market_book, mock_market_definition, mock_serialise
):
# lightweight
market_book = self.market_book_cache.create_resource(1234, {"test"}, True)
market_book = self.market_book_cache.create_resource(1234, True)
assert market_book == {
"streaming_update": {"test"},
"streaming_update": self.market_book_cache.streaming_update,
"streaming_unique_id": 1234,
}
assert market_book == mock_serialise()
# not lightweight
market_book = self.market_book_cache.create_resource(1234, {}, False)
market_book = self.market_book_cache.create_resource(1234, False)
assert market_book == mock_market_book()

def test_update_runner_dict(self):
Expand Down Expand Up @@ -317,6 +319,7 @@ def test_full_image(self):
)
for order_book in mock_response.json().get("oc"):
self.order_book_cache.update_cache(order_book, 1234)
self.assertEqual(self.order_book_cache.streaming_update, order_book)

self.assertEqual(len(self.order_book_cache.runners), 5)
self.assertEqual(len(self.order_book_cache.runner_dict), 5)
Expand All @@ -330,6 +333,7 @@ def test_update_cache(self):
mock_response = create_mock_json("tests/resources/streaming_ocm_UPDATE.json")
for order_book in mock_response.json().get("oc"):
self.order_book_cache.update_cache(order_book, 1234)
self.assertEqual(self.order_book_cache.streaming_update, order_book)

for order_changes in order_book.get("orc"):
# self.runner.matched_lays.update.assert_called_with(order_changes.get('ml', []))
Expand All @@ -344,6 +348,7 @@ def test_update_cache_new(self, mock_order_book_runner):
mock_response = create_mock_json("tests/resources/streaming_ocm_UPDATE.json")
for order_book in mock_response.json().get("oc"):
self.order_book_cache.update_cache(order_book, 1234)
self.assertEqual(self.order_book_cache.streaming_update, order_book)

for order_changes in order_book.get("orc"):
mock_order_book_runner.assert_called_with(**order_changes)
Expand All @@ -352,6 +357,7 @@ def test_update_cache_closed(self):
mock_response = create_mock_json("tests/resources/streaming_ocm_SUB_IMAGE.json")
for order_book in mock_response.json().get("oc"):
self.order_book_cache.update_cache(order_book, 1234)
self.assertEqual(self.order_book_cache.streaming_update, order_book)
self.assertTrue(self.order_book_cache.closed)

@mock.patch(
Expand All @@ -362,14 +368,14 @@ def test_update_cache_closed(self):
@mock.patch("betfairlightweight.streaming.cache.CurrentOrders")
def test_create_resource(self, mock_current_orders, mock_serialise):
# lightweight
current_orders = self.order_book_cache.create_resource(123, {"test"}, True)
current_orders = self.order_book_cache.create_resource(123, True)
assert current_orders == mock_serialise()
assert current_orders == {
"streaming_update": {"test"},
"streaming_update": self.order_book_cache.streaming_update,
"streaming_unique_id": 123,
}
# not lightweight
current_orders = self.order_book_cache.create_resource(123, {}, False)
current_orders = self.order_book_cache.create_resource(123, False)
assert current_orders == mock_current_orders()

def test_runner_dict(self):
Expand Down
30 changes: 0 additions & 30 deletions tests/unit/test_streamingendpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ def test_create_historical_stream(self, mock_stream):
file_path = "test"
listener = mock.Mock()
self.streaming.create_historical_stream(file_path=file_path, listener=listener)

listener.register_stream.assert_called_with(0, "marketSubscription")
mock_stream.assert_called_with(file_path, listener)

@mock.patch("betfairlightweight.endpoints.streaming.HistoricalGeneratorStream")
Expand All @@ -45,32 +43,4 @@ def test_create_historical_generator_stream(self, mock_stream):
self.streaming.create_historical_generator_stream(
file_path=file_path, listener=listener
)

listener.register_stream.assert_called_with(0, "marketSubscription")
mock_stream.assert_called_with(file_path, listener)

@mock.patch("betfairlightweight.endpoints.streaming.HistoricalStream")
def test_create_historical_stream_with_directory(self, mock_stream):
file_path = "test"
listener = mock.Mock()

with self.assertWarns(DeprecationWarning):
self.streaming.create_historical_stream(
directory=file_path, listener=listener
)

listener.register_stream.assert_called_with(0, "marketSubscription")
mock_stream.assert_called_with(file_path, listener)

@mock.patch("betfairlightweight.endpoints.streaming.HistoricalGeneratorStream")
def test_create_historical_generator_stream_with_directory(self, mock_stream):
file_path = "test"
listener = mock.Mock()

with self.assertWarns(DeprecationWarning):
self.streaming.create_historical_generator_stream(
directory=file_path, listener=listener
)

listener.register_stream.assert_called_with(0, "marketSubscription")
mock_stream.assert_called_with(file_path, listener)

0 comments on commit cb47fcd

Please sign in to comment.