Skip to content

Commit

Permalink
Merge pull request #363 from liampauling/release/2.11.0
Browse files Browse the repository at this point in the history
Release/2.11.0
  • Loading branch information
liampauling committed Dec 7, 2020
2 parents 71b3853 + 4141973 commit 4b4127a
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 10 deletions.
11 changes: 11 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
Release History
---------------

2.11.0 (2020-12-07)
+++++++++++++++++++

**Improvements**

- Stream updates

**Dependencies**

- orjson upgraded to 3.4.5

2.10.2 (2020-11-28)
+++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "betfairlightweight"
__description__ = "Lightweight python wrapper for Betfair API-NG"
__url__ = "https://github.com/liampauling/betfair"
__version__ = "2.10.2"
__version__ = "2.11.0"
__author__ = "Liam Pauling"
__license__ = "MIT"
2 changes: 1 addition & 1 deletion betfairlightweight/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@

from .inplayserviceresources import EventTimeline, Scores

from .streamingresources import MarketDefinition, MarketDefinitionRunner
from .streamingresources import MarketDefinition, MarketDefinitionRunner, Race
74 changes: 74 additions & 0 deletions betfairlightweight/resources/streamingresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,77 @@ def __init__(

self.name = name # historic data only
self.event_name = eventName # historic data only


class Race(BaseResource):
"""
:type market_id: unicode
:type race_id: unicode
:type rpm: dict
:type rcm: dict
"""

def __init__(self, **kwargs):
self.streaming_unique_id = kwargs.pop("streaming_unique_id", None)
self.streaming_update = kwargs.pop("streaming_update", None)
self.streaming_snap = kwargs.pop("streaming_snap", False)
self.publish_time_epoch = kwargs.get("pt")
self.publish_time = self.strip_datetime(kwargs.get("pt"))
super(Race, self).__init__(**kwargs)
self.market_id = kwargs.get("mid")
self.race_id = kwargs.get("id")
self.race_progress = (
RaceProgress(**kwargs["rpc"]) if kwargs.get("rpc") else None
)
self.race_runners = [RaceChange(**runner) for runner in kwargs.get("rrc") or []]


class RaceProgress(BaseResource):
"""
:type publish_time: int
:type feed_time: int
:type race_id: unicode
:type gate: unicode
:type sectional_time: float
:type running_time: float
:type speed: float
:type progress: float
:type order: list
"""

def __init__(self, **kwargs):
super(RaceProgress, self).__init__(**kwargs)
self.feed_time_epoch = kwargs.get("ft")
self.feed_time = self.strip_datetime(kwargs.get("ft"))
self.gate_name = kwargs.get("g")
self.sectional_time = kwargs.get("st")
self.running_time = kwargs.get("rt")
self.speed = kwargs.get("spd")
self.progress = kwargs.get("prg")
self.order = kwargs.get("ord")
self.jumps = kwargs.get("J")


class RaceChange(BaseResource):
"""
:type publish_time: int
:type feed_time: int
:type race_id: unicode
:type selection_id: int
:type lat: float
:type long: float
:type speed: float
:type progress: float
:type stride_frequency: float
"""

def __init__(self, **kwargs):
super(RaceChange, self).__init__(**kwargs)
self.feed_time_epoch = kwargs.get("ft")
self.feed_time = self.strip_datetime(kwargs.get("ft"))
self.selection_id = kwargs.get("id")
self.lat = kwargs.get("lat")
self.long = kwargs.get("long")
self.speed = kwargs.get("spd")
self.progress = kwargs.get("prg")
self.stride_frequency = kwargs.get("sfq") # in Hz
9 changes: 9 additions & 0 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class BetfairStream:
HOSTS = collections.defaultdict(
lambda: "stream-api.betfair.com",
integration="stream-api-integration.betfair.com",
race="sports-data-stream-api.betfair.com",
)

def __init__(
Expand Down Expand Up @@ -171,6 +172,14 @@ def subscribe_to_orders(
self._send(message)
return unique_id

def subscribe_to_races(self) -> int:
"""Race subscription request."""
unique_id = self.new_unique_id()
message = {"op": "raceSubscription", "id": unique_id}
self.listener.register_stream(unique_id, "raceSubscription")
self._send(message)
return unique_id

def new_unique_id(self) -> int:
self._unique_id += 1
return self._unique_id
Expand Down
63 changes: 62 additions & 1 deletion betfairlightweight/streaming/cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from typing import Union

from ..resources import BaseResource, MarketBook, CurrentOrders, MarketDefinition
from ..resources import BaseResource, MarketBook, CurrentOrders, MarketDefinition, Race
from ..enums import (
StreamingOrderType,
StreamingPersistenceType,
Expand Down Expand Up @@ -469,3 +469,64 @@ def serialise(self) -> dict:
for runner in runners:
orders.extend(runner.serialise_orders(self.market_id))
return {"currentOrders": orders, "moreAvailable": False}


class RunnerChange:
def __init__(self, change: dict):
self.change = change


class RaceCache(BaseResource):
def __init__(self, **kwargs):
super(RaceCache, self).__init__(**kwargs)
self.publish_time = kwargs.get("publish_time")
self.market_id = kwargs.get("mid")
self.race_id = kwargs.get("id")
self.rpc = kwargs.get("rpc") # RaceProgressChange
self.rrc = [RunnerChange(i) for i in kwargs.get("rrc", [])] # RaceRunnerChange
self.streaming_update = None

def update_cache(self, update: dict, publish_time: int) -> None:
self._datetime_updated = self.strip_datetime(publish_time)
self.publish_time = publish_time
self.streaming_update = update

if "rpc" in update:
self.rpc = update["rpc"]

if "rrc" in update:
runner_dict = {runner.change["id"]: runner for runner in self.rrc}

for runner_update in update["rrc"]:
runner = runner_dict.get(runner_update["id"])
if runner:
runner.change = runner_update
else:
self.rrc.append(RunnerChange(runner_update))

def create_resource(
self, unique_id: int, lightweight: bool, snap: bool = False
) -> Union[dict, Race]:
data = self.serialise
data["streaming_unique_id"] = unique_id
data["streaming_update"] = self.streaming_update
data["streaming_snap"] = snap
if lightweight:
return data
else:
return Race(
elapsed_time=(
datetime.datetime.utcnow() - self._datetime_updated
).total_seconds(),
**data
)

@property
def serialise(self) -> dict:
return {
"pt": self.publish_time,
"mid": self.market_id,
"id": self.race_id,
"rpc": self.rpc,
"rrc": [runner.change for runner in self.rrc],
}
8 changes: 5 additions & 3 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import queue
from typing import Optional

from .stream import BaseStream, MarketStream, OrderStream
from .stream import BaseStream, MarketStream, OrderStream, RaceStream
from ..compat import json

logger = logging.getLogger(__name__)
Expand All @@ -15,7 +15,7 @@ def __init__(self, max_latency: Optional[float] = 0.5):
self.connection_id = None
self.status = None
self.stream = None
self.stream_type = None # marketSubscription/orderSubscription
self.stream_type = None # marketSubscription/orderSubscription/raceSubscription
self.stream_unique_id = None
self.connections_available = None # connection throttling

Expand Down Expand Up @@ -64,6 +64,8 @@ def _add_stream(self, unique_id: int, operation: str) -> BaseStream:
return MarketStream(self)
elif operation == "orderSubscription":
return OrderStream(self)
elif operation == "raceSubscription":
return RaceStream(self)

def __str__(self) -> str:
return "{0}".format(self.__class__.__name__)
Expand Down Expand Up @@ -118,7 +120,7 @@ def on_data(self, raw_data: str) -> Optional[bool]:
self._on_connection(data, unique_id)
elif operation == "status":
self._on_status(data, unique_id)
elif operation in ["mcm", "ocm"]:
elif operation in ["mcm", "ocm", "rcm"]:
# historic data does not contain unique_id
if self.stream_unique_id not in [unique_id, 0]:
logger.warning(
Expand Down
40 changes: 39 additions & 1 deletion betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import queue

from .cache import MarketBookCache, OrderBookCache
from .cache import MarketBookCache, OrderBookCache, RaceCache

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,3 +211,41 @@ def _process(self, data: list, publish_time: int) -> bool:
self._updates_processed += 1
self.on_process(caches)
return img


class RaceStream(BaseStream):

"""
Cache contains latest update:
marketId: RaceCache
"""

_lookup = "rc"
_name = "RaceStream"

def on_subscribe(self, data: dict) -> None:
"""The initial message returned after
a subscribe - This will currently not
contain any Race Changes (rc) but may
do in the future"""
pass

def _process(self, race_updates: list, publish_time: int) -> bool:
caches, img = [], False # todo cache.closed / img=True
for update in race_updates:
market_id = update["mid"]

race_cache = self._caches.get(market_id)
if race_cache is None:
race_cache = RaceCache(publish_time=publish_time, **update)
self._caches[market_id] = race_cache
logger.info(
"[%s: %s]: %s added, %s markets in cache"
% (self, self.unique_id, market_id, len(self._caches))
)

race_cache.update_cache(update, publish_time)
caches.append(race_cache)
self._updates_processed += 1
self.on_process(caches)
return img
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ python3 setup.py sdist bdist_wheel

twine upload dist/*

mkdocs gh-deploy
# mkdocs gh-deploy
2 changes: 1 addition & 1 deletion requirements-speed.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
ciso8601==2.1.3
orjson==3.4.4
orjson==3.4.5
28 changes: 28 additions & 0 deletions tests/integration/test_historicalstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,31 @@ def test_historical_stream(self):
market = stream.listener.stream._caches.get("1.132153978")
assert len(market.runners) == 14
assert stream._running is False


class HistoricalRaceStreamTest(unittest.TestCase):
def test_historical_stream(self):
trading = betfairlightweight.APIClient("username", "password", app_key="appKey")
stream = trading.streaming.create_historical_stream(
file_path="tests/resources/historicaldata/RACE-1.140075353",
listener=StreamListener(),
operation="raceSubscription",
)
stream.start()

for cache in stream.listener.stream._caches.values():
cache.create_resource(1, False)

assert stream.listener.stream_type == "raceSubscription"
assert stream.listener.stream_unique_id == 0

assert stream.listener.stream._updates_processed == 4
assert len(stream.listener.stream._caches) == 2

market = stream.listener.stream._caches.get("1.1234567")
assert len(market.rrc) == 2

market = stream.listener.stream._caches.get("1.173853449")
assert len(market.rrc) == 4

assert stream._running is False
4 changes: 4 additions & 0 deletions tests/resources/historicaldata/RACE-1.140075353
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"op":"rcm","id":2,"clk":12,"pt":1518626764,"rc": [{"id": "28587288.1650","mid": "1.1234567","rrc": [{"ft":1518626674,"id":7390417,"lat":51.4189543,"long":-0.4058491,"spd":17.8}],"rpc": {"ft":1518626674,"g":"1f","st":10.6,"rt":46.7,"spd":17.8,"prg":87.5,"ord": [7390417,5600338,11527189,6395118,8706072]}}]}
{"op":"rcm","id":2,"clk":12,"pt":1518626764,"rc": [{"id": "28587288.1650","mid": "1.1234567","rrc": [{"ft":1518626674,"id":5600338,"lat":51.4189543,"long":-0.4058491,"spd":17.8}],"rpc": {"ft":1518626674,"g":"1f","st":10.6,"rt":46.7,"spd":17.8,"prg":87.5,"ord": [7390417,5600338,11527189,6395118,8706072]}}]}
{"clk": "453814994750", "rc": [{"rpc": {"rt": 157, "prg": 0, "ft": 1602006840100, "g": "Finish", "J": [], "st": 13.09, "ord": [], "spd": 7.9}, "rrc": [{"prg": 0, "ft": 1602006840100, "long": -0.8996722, "sfq": 1.72, "lat": 53.0681664, "spd": 8.45, "id": 26601175}, {"prg": 0, "ft": 1602006840100, "long": -0.9025625, "sfq": 2.56, "lat": 53.0669335, "spd": 2.68, "id": 14569149}, {"prg": 0, "ft": 1602006840100, "long": -0.9026623, "sfq": 2.59, "lat": 53.066871, "spd": 1.89, "id": 21664718}], "id": "30050924.1750", "mid": "1.173853449"}], "pt": 1602006840130, "op": "rcm"}
{"clk": "453814994750", "rc": [{"rrc": [{"prg": 0, "ft": 1602006840200, "long": -0.8996551, "sfq": 1.8, "lat": 53.0682874, "spd": 7.89, "id": 19516830}], "id": "30050924.1750", "mid": "1.173853449"}], "pt": 1602006840220, "op": "rcm"}
42 changes: 42 additions & 0 deletions tests/resources/streaming_rcm.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"op":"rcm",
"id":2,
"clk":12,
"pt":1518626764,
"rc": [
{
"id": "28587288.1650",
"mid": "1.1234567",
"rrc": [
{
"ft":1518626674,
"id":7390417,
"lat":51.4189543,
"long":-0.4058491,
"spd":17.8,
"prg":2051,
"sfq":2.07
}
],
"rpc": {
"ft":1518626674,
"g":"1f",
"st":10.6,
"rt":46.7,
"spd":17.8,
"prg":87.5,
"ord": [
7390417,
5600338,
11527189,
6395118,
8706072
],
"J": [
{"J":2,"L":370.1},
{"J":1,"L":203.8}
]
}
}
]
}

0 comments on commit 4b4127a

Please sign in to comment.