Skip to content

Commit

Permalink
Merge cf83220 into 53c58d3
Browse files Browse the repository at this point in the history
  • Loading branch information
liampauling committed May 26, 2019
2 parents 53c58d3 + cf83220 commit f6999ca
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 15 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,17 @@ In development so breaking changes likely.

>>> stream.start(async_=False)
```

or use the stream generator:

```python

>>> stream = trading.streaming.create_historical_generator_stream(
directory='horse-racing-pro-sample',
)

>>> g = stream.get_generator()

>>> for i in g:
>>> print(i)
```
17 changes: 17 additions & 0 deletions betfairlightweight/endpoints/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
BaseListener,
BetfairStream,
HistoricalStream,
HistoricalGeneratorStream
)


Expand Down Expand Up @@ -57,3 +58,19 @@ def create_historical_stream(directory, listener=None):
listener = listener if listener else BaseListener()
listener.register_stream('HISTORICAL', 'marketSubscription')
return HistoricalStream(directory, listener)

@staticmethod
def create_historical_generator_stream(directory, listener=None):
"""
Uses generator listener/cache to parse betfair
historical data:
https://historicdata.betfair.com/#/home
:param str directory: Directory of betfair data
:param BaseListener listener: Listener object
:rtype: HistoricalGeneratorStream
"""
listener = listener if listener else BaseListener()
listener.register_stream('HISTORICAL', 'marketSubscription')
return HistoricalGeneratorStream(directory, listener)
1 change: 1 addition & 0 deletions betfairlightweight/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .betfairstream import (
BetfairStream,
HistoricalStream,
HistoricalGeneratorStream,
)
from .listener import (
BaseListener,
Expand Down
25 changes: 25 additions & 0 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,3 +297,28 @@ def _read_loop(self):
else:
# if f has finished, also stop the stream
self.stop()


class HistoricalGeneratorStream(HistoricalStream):
"""Copy of 'Betfair Stream' for parsing
historical data (no threads).
"""

def get_generator(self):
return self._read_loop

def _read_loop(self):
self._running = True
with open(self.directory, "r") as f:
for update in f:
if self.listener.on_data(update) is False:
# if on_data returns an error stop the stream and raise error
self.stop()
raise ListenerError("HISTORICAL", update)
if not self._running:
break
else:
yield self.listener.snap()
else:
# if f has finished, also stop the stream
self.stop()
39 changes: 36 additions & 3 deletions tests/unit/test_betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from betfairlightweight.streaming.betfairstream import (
BetfairStream,
HistoricalStream,
HistoricalGeneratorStream,
)
from betfairlightweight.exceptions import (
SocketError,
Expand Down Expand Up @@ -281,7 +282,7 @@ def test_str(self):
class HistoricalStreamTest(unittest.TestCase):

def setUp(self):
self.directory = 'test'
self.directory = 'tests/resources/historicaldata/BASIC-1.132153978'
self.listener = mock.Mock()
self.stream = HistoricalStream(self.directory, self.listener)

Expand All @@ -308,5 +309,37 @@ def test_stop(self):
self.stream.stop()
assert self.stream._running is False

# def test_read_loop(self):
# pass
@mock.patch('betfairlightweight.streaming.betfairstream.HistoricalStream.stop')
def test__read_loop(self, mock_stop):
self.stream._running = True
self.stream._read_loop()
self.assertEqual(self.listener.on_data.call_count, 480)
self.listener.on_data.snap()
mock_stop.assert_called_with()
self.assertTrue(self.stream._running)


class HistoricalGeneratorStreamTest(unittest.TestCase):

def setUp(self):
self.directory = 'tests/resources/historicaldata/BASIC-1.132153978'
self.listener = mock.Mock()
self.stream = HistoricalGeneratorStream(self.directory, self.listener)

def test_init(self):
assert self.stream.directory == self.directory
assert self.stream.listener == self.listener
assert self.stream._running is False

@mock.patch('betfairlightweight.streaming.betfairstream.HistoricalGeneratorStream._read_loop')
def test_get_generator(self, mock_read_loop):
self.assertEqual(self.stream.get_generator(), mock_read_loop)

@mock.patch('betfairlightweight.streaming.betfairstream.HistoricalGeneratorStream.stop')
def test__read_loop(self, mock_stop):
data = [i for i in self.stream._read_loop()]
self.assertEqual(len(data), 480)
self.assertEqual(self.listener.on_data.call_count, 480)
self.listener.on_data.snap()
mock_stop.assert_called_with()
self.assertTrue(self.stream._running)
28 changes: 16 additions & 12 deletions tests/unit/test_streamingendpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@
from betfairlightweight.endpoints import Streaming


class StreamingInit(unittest.TestCase):

def test_base_endpoint_init(self):
client = APIClient('username', 'password', 'app_key')
streaming = Streaming(client)
assert streaming.client == client


class StreamingTest(unittest.TestCase):

def setUp(self):
client = APIClient('username', 'password', 'app_key', 'UK')
self.streaming = Streaming(client)
self.client = APIClient('username', 'password', 'app_key', 'UK')
self.streaming = Streaming(self.client)

def test_init(self):
self.assertEqual(self.streaming.client, self.client)

@mock.patch('betfairlightweight.endpoints.streaming.BetfairStream')
def test_list_race_details(self, mock_betfair_stream):
def test_create_stream(self, mock_betfair_stream):
response = self.streaming.create_stream(1, 2, 6, 1024, 'TestSocket')

assert mock_betfair_stream.call_count == 1
Expand All @@ -30,10 +25,19 @@ def test_list_race_details(self, mock_betfair_stream):
assert response == mock_betfair_stream()

@mock.patch('betfairlightweight.endpoints.streaming.HistoricalStream')
def test_create_stream(self, mock_stream):
def test_create_historical_stream(self, mock_stream):
dir = 'test'
listener = mock.Mock()
self.streaming.create_historical_stream(dir, listener)

listener.register_stream.assert_called_with('HISTORICAL', 'marketSubscription')
mock_stream.assert_called_with(dir, listener)

@mock.patch('betfairlightweight.endpoints.streaming.HistoricalGeneratorStream')
def test_create_historical_generator_stream(self, mock_stream):
dir = 'test'
listener = mock.Mock()
self.streaming.create_historical_generator_stream(dir, listener)

listener.register_stream.assert_called_with('HISTORICAL', 'marketSubscription')
mock_stream.assert_called_with(dir, listener)

0 comments on commit f6999ca

Please sign in to comment.