Skip to content

Commit

Permalink
Merge pull request #374 from liampauling/task/373-market-cache-refactor
Browse files Browse the repository at this point in the history
Task/373 market cache refactor
  • Loading branch information
liampauling committed Jan 11, 2021
2 parents 7accae3 + 9762500 commit bc74f6a
Show file tree
Hide file tree
Showing 23 changed files with 881 additions and 425 deletions.
4 changes: 0 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ install:

jobs:
include:
- name: "Tests: python 3.5"
python: 3.5
script:
- coverage run --source=betfairlightweight setup.py test
- name: "Tests: python 3.6"
python: 3.6
script:
Expand Down
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ Release History

**Improvements**

- #373 Streaming refactor (2-3x+ speed improvement across the board)
- #369 Output streaming matched backs/lays
- #370 Session timeout updated to 24hrs for international exchange
- License update
- Removed build.sh

**Dependencies**

- py3.5 testing removed

2.11.1 (2020-12-26)
+++++++++++++++++++

Expand Down
3 changes: 0 additions & 3 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ build: off

environment:
matrix:
- PYTHON: "C:\\Python35"
PYTHON_VERSION: "3.5.x"

- PYTHON: "C:\\Python36"
PYTHON_VERSION: "3.6.x"

Expand Down
8 changes: 6 additions & 2 deletions betfairlightweight/endpoints/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def create_historical_stream(
file_path: str = None,
listener: BaseListener = None,
operation: str = "marketSubscription",
unique_id: int = 0,
) -> HistoricalStream:
"""
Uses streaming listener/cache to parse betfair
Expand All @@ -63,17 +64,19 @@ def create_historical_stream(
:param str file_path: Path to historic betfair file
:param BaseListener listener: Listener object
:param str operation: Operation type
:param int unique_id: Stream id (added to updates)
:rtype: HistoricalStream
"""
listener = listener if listener else BaseListener()
return HistoricalStream(file_path, listener, operation)
return HistoricalStream(file_path, listener, operation, unique_id)

@staticmethod
def create_historical_generator_stream(
file_path: str = None,
listener: BaseListener = None,
operation: str = "marketSubscription",
unique_id: int = 0,
) -> HistoricalGeneratorStream:
"""
Uses generator listener/cache to parse betfair
Expand All @@ -83,8 +86,9 @@ def create_historical_generator_stream(
:param str file_path: Path to historic betfair file
:param BaseListener listener: Listener object
:param str operation: Operation type
:param int unique_id: Stream id (added to updates)
:rtype: HistoricalGeneratorStream
"""
listener = listener if listener else StreamListener()
return HistoricalGeneratorStream(file_path, listener, operation)
return HistoricalGeneratorStream(file_path, listener, operation, unique_id)
1 change: 1 addition & 0 deletions betfairlightweight/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
VenueResult,
MarketCatalogue,
MarketBook,
RunnerBook,
Match,
CurrentOrders,
ClearedOrders,
Expand Down
10 changes: 4 additions & 6 deletions betfairlightweight/resources/streamingresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def __init__(self, **kwargs):
self.race_runners = [RaceChange(**runner) for runner in kwargs.get("rrc") or []]


class RaceProgress(BaseResource):
class RaceProgress:
"""
:type publish_time: int
:type feed_time: int
Expand All @@ -209,9 +209,8 @@ class RaceProgress(BaseResource):
"""

def __init__(self, **kwargs):
super(RaceProgress, self).__init__(**kwargs)
self.feed_time_epoch = kwargs.get("ft")
self.feed_time = self.strip_datetime(kwargs.get("ft"))
self.feed_time = BaseResource.strip_datetime(kwargs.get("ft"))
self.gate_name = kwargs.get("g")
self.sectional_time = kwargs.get("st")
self.running_time = kwargs.get("rt")
Expand All @@ -221,7 +220,7 @@ def __init__(self, **kwargs):
self.jumps = kwargs.get("J")


class RaceChange(BaseResource):
class RaceChange:
"""
:type publish_time: int
:type feed_time: int
Expand All @@ -235,9 +234,8 @@ class RaceChange(BaseResource):
"""

def __init__(self, **kwargs):
super(RaceChange, self).__init__(**kwargs)
self.feed_time_epoch = kwargs.get("ft")
self.feed_time = self.strip_datetime(kwargs.get("ft"))
self.feed_time = BaseResource.strip_datetime(kwargs.get("ft"))
self.selection_id = kwargs.get("id")
self.lat = kwargs.get("lat")
self.long = kwargs.get("long")
Expand Down
14 changes: 9 additions & 5 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def subscribe_to_markets(
}
if initial_clk and clk:
# if resubscribe only update unique_id
self.listener.stream_unique_id = unique_id
self.listener.update_unique_id(unique_id)
else:
self.listener.register_stream(unique_id, "marketSubscription")
self._send(message)
Expand Down Expand Up @@ -166,7 +166,7 @@ def subscribe_to_orders(
}
if initial_clk and clk:
# if resubscribe only update unique_id
self.listener.stream_unique_id = unique_id
self.listener.update_unique_id(unique_id)
else:
self.listener.register_stream(unique_id, "orderSubscription")
self._send(message)
Expand Down Expand Up @@ -295,15 +295,19 @@ class HistoricalStream:
historical data.
"""

def __init__(self, file_path: str, listener: BaseListener, operation: str):
def __init__(
self, file_path: str, listener: BaseListener, operation: str, unique_id: int
):
"""
:param str file_path: Directory of betfair data
:param BaseListener listener: Listener object
:param str operation: Operation type
:param int unique_id: Stream id (added to updates)
"""
self.file_path = file_path
self.listener = listener
self.operation = operation
self.unique_id = unique_id
self._running = False

def start(self) -> None:
Expand All @@ -314,7 +318,7 @@ def stop(self) -> None:
self._running = False

def _read_loop(self) -> None:
self.listener.register_stream(0, self.operation)
self.listener.register_stream(self.unique_id, self.operation)
with open(self.file_path, "r") as f:
for update in f:
if self.listener.on_data(update) is False:
Expand All @@ -338,7 +342,7 @@ def get_generator(self):

def _read_loop(self) -> dict:
self._running = True
self.listener.register_stream(0, self.operation)
self.listener.register_stream(self.unique_id, self.operation)
with open(self.file_path, "r") as f:
for update in f:
if self.listener.on_data(update) is False:
Expand Down

0 comments on commit bc74f6a

Please sign in to comment.