Skip to content

Commit

Permalink
Merge dc36c33 into 9fab5c4
Browse files Browse the repository at this point in the history
  • Loading branch information
liampauling committed Mar 2, 2020
2 parents 9fab5c4 + dc36c33 commit 00009f3
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 19 deletions.
13 changes: 13 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
Release History
---------------

2.1.0 (2020-03-02)
+++++++++++++++++++

**Improvements**

- datetime handling added to time_range filter (@trigvi)
- connectionsAvailable handling added

**Bug Fixes**

- #273 error handling added for markets without marketDefinition
- #233 sendall used instead of send so that all data is sent (bug present since 2016!)

2.0.1 (2020-02-17)
+++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "betfairlightweight"
__description__ = "Lightweight python wrapper for Betfair API-NG"
__url__ = "https://github.com/liampauling/betfair"
__version__ = "2.0.1"
__version__ = "2.1.0"
__author__ = "Liam Pauling"
__license__ = "MIT"
24 changes: 21 additions & 3 deletions betfairlightweight/filters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime
from typing import Union
from .utils import to_camel_case

from .resources import bettingresources
Expand Down Expand Up @@ -65,13 +67,29 @@ def streaming_order_filter(
return {to_camel_case(k): v for k, v in args.items() if v is not None}


def time_range(from_: str = None, to: str = None) -> dict: # todo datetime conversion
def time_range(
from_: Union[str, datetime.datetime] = None,
to: Union[str, datetime.datetime] = None,
) -> dict:
"""
:param str from_:
:param str to:
:param Union[str, datetime.datetime] from_:
:param Union[str, datetime.datetime] to:
:return: dict
"""

if from_ != None:
if isinstance(from_, datetime.datetime):
from_ = from_.isoformat()
elif not isinstance(from_, str):
raise TypeError("The 'from_' value must be string or datetime (not date)")

if to != None:
if isinstance(to, datetime.datetime):
to = to.isoformat()
elif not isinstance(to, str):
raise TypeError("The 'to' value must be string or datetime (not date)")

args = locals().copy()
return {k.replace("_", ""): v for k, v in args.items()}

Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _send(self, message: dict) -> None:
"[Subscription: %s] Sending: %s" % (self._unique_id, repr(message_dumped))
)
try:
self._socket.send(message_dumped.encode())
self._socket.sendall(message_dumped.encode())
except (socket.timeout, socket.error) as e:
self.stop()
raise SocketError("[Connect: %s]: Socket %s" % (self._unique_id, e))
Expand Down
12 changes: 9 additions & 3 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self, max_latency: float = 0.5):
self.stream = None
self.stream_type = None # marketSubscription/orderSubscription
self.stream_unique_id = None
self.connections_available = None # connection throttling

def register_stream(self, unique_id: int, operation: str) -> None:
logger.info("Register: %s %s" % (operation, unique_id))
Expand Down Expand Up @@ -137,14 +138,19 @@ def _on_connection(self, data: dict, unique_id: int) -> None:
"[Connect: %s]: connection_id: %s" % (unique_id, self.connection_id)
)

@staticmethod
def _on_status(data: dict, unique_id: int) -> None:
def _on_status(self, data: dict, unique_id: int) -> None:
"""Called on status operation
:param data: Received data
"""
status_code = data.get("statusCode")
logger.info("[Subscription: %s]: %s" % (unique_id, status_code))
connections_available = data.get("connectionsAvailable")
if connections_available:
self.connections_available = data.get("connectionsAvailable")
logger.info(
"[Subscription: %s]: %s (%s connections available)"
% (unique_id, status_code, self.connections_available)
)

def _on_change_message(self, data: dict, unique_id: int) -> None:
change_type = data.get("ct", "UPDATE")
Expand Down
7 changes: 7 additions & 0 deletions betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ def _process(self, data: list, publish_time: int) -> None:
if (
market_book.get("img") or market_book_cache is None
): # historic data does not contain img
if "marketDefinition" not in market_book:
logger.error(
"[MarketStream: %s] Unable to add %s to cache due to marketDefinition "
"not being present (make sure EX_MARKET_DEF is requested)"
% (self.unique_id, market_id)
)
continue
market_book_cache = MarketBookCache(
publish_time=publish_time, **market_book
)
Expand Down
1 change: 1 addition & 0 deletions tests/resources/streaming_mcm_SUB_IMAGE_no_market_def.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions tests/unit/test_betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,14 @@ def test_send(self, mock_connect, mock_authenticate):
self.betfair_stream._send(message)
assert mock_connect.call_count == 1
assert mock_authenticate.call_count == 1
assert mock_socket.send.call_count == 1
assert mock_socket.sendall.call_count == 1

@mock.patch("betfairlightweight.streaming.betfairstream.BetfairStream.stop")
def test_send_timeout(self, mock_stop):
self.betfair_stream._running = True
mock_socket = mock.Mock()
self.betfair_stream._socket = mock_socket
mock_socket.send.side_effect = socket.timeout()
mock_socket.sendall.side_effect = socket.timeout()
message = {"message": 1}

with self.assertRaises(SocketError):
Expand All @@ -326,7 +326,7 @@ def test_send_error(self, mock_stop):
self.betfair_stream._running = True
mock_socket = mock.Mock()
self.betfair_stream._socket = mock_socket
mock_socket.send.side_effect = socket.error()
mock_socket.sendall.side_effect = socket.error()
message = {"message": 1}

with self.assertRaises(SocketError):
Expand Down
15 changes: 15 additions & 0 deletions tests/unit/test_filters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import unittest

from betfairlightweight.filters import (
Expand Down Expand Up @@ -42,9 +43,23 @@ def test_streaming_order_filter(self):
assert response == {"includeOverallPosition": True}

def test_time_range(self):
dt1 = datetime.datetime.now()
dt2 = datetime.datetime.now() + datetime.timedelta(days=1)

cases = ((dt1.date(), None), (None, dt1.date()), (123, None), (None, 456))

for case in cases:
from_ = case[0]
to = case[1]
with self.assertRaises(TypeError):
time_range(from_=from_, to=to)

response = time_range()
assert response == {"from": None, "to": None}

response = time_range(from_=dt1, to=dt2)
assert response == {"from": dt1.isoformat(), "to": dt2.isoformat()}

response = time_range(from_="123", to="456")
assert response == {"from": "123", "to": "456"}

Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_init(self):
assert self.base_listener.stream_unique_id is None
assert self.base_listener.stream_type is None
assert self.base_listener.max_latency == 0.5
assert self.base_listener.connections_available is None

@mock.patch(
"betfairlightweight.streaming.listener.BaseListener._add_stream",
Expand Down Expand Up @@ -156,6 +157,9 @@ def test_on_connection(self):

def test_on_status(self):
self.stream_listener._on_status({}, 1)
self.assertIsNone(self.stream_listener.connections_available)
self.stream_listener._on_status({"connectionsAvailable": 69}, 1)
self.assertEqual(self.stream_listener.connections_available, 69)

def test_on_change_message(self):
stream = mock.Mock()
Expand Down
36 changes: 28 additions & 8 deletions tests/unit/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,26 @@ def test_on_subscribe(self, mock_update_clk, mock_process):
self.stream.on_subscribe({"mc": {123}})
mock_process.assert_called_once_with({123}, None)

# @mock.patch('betfairlightweight.streaming.stream.MarketBookCache')
# def test_process(self, mock_market_book_cache):
# now = mock.Mock()
# market_books = [mock.Mock()]
# self.stream._caches = mock.Mock()
# # self.stream._process(market_books, now)
@mock.patch("betfairlightweight.streaming.stream.MarketBookCache")
@mock.patch("betfairlightweight.streaming.stream.MarketStream.on_process")
def test_process(self, mock_on_process, mock_cache):
sub_image = create_mock_json("tests/resources/streaming_mcm_SUB_IMAGE.json")
data = sub_image.json()["mc"]
self.stream._process(data, 123)

self.assertEqual(len(self.stream), len(data))

@mock.patch("betfairlightweight.streaming.stream.MarketBookCache")
@mock.patch("betfairlightweight.streaming.stream.MarketStream.on_process")
def test_process_no_market_definition(self, mock_on_process, mock_cache):
sub_image_error = create_mock_json(
"tests/resources/streaming_mcm_SUB_IMAGE_no_market_def.json"
)
data = sub_image_error.json()["mc"]
self.stream._process(data, 123)

self.assertEqual(len(data), 137)
self.assertEqual(len(self.stream), 135) # two markets missing marketDef

def test_str(self):
assert str(self.stream) == "MarketStream"
Expand All @@ -189,8 +203,14 @@ def test_on_subscribe(self, mock_update_clk, mock_process):
self.stream.on_subscribe({"oc": {123}})
mock_process.assert_called_once_with({123}, None)

# def test_process(self):
# pass
@mock.patch("betfairlightweight.streaming.stream.OrderBookCache")
@mock.patch("betfairlightweight.streaming.stream.OrderStream.on_process")
def test_process(self, mock_on_process, mock_cache):
sub_image = create_mock_json("tests/resources/streaming_ocm_FULL_IMAGE.json")
data = sub_image.json()["oc"]
self.stream._process(data, 123)

self.assertEqual(len(self.stream), len(data))

def test_str(self):
assert str(self.stream) == "OrderStream"
Expand Down

0 comments on commit 00009f3

Please sign in to comment.