Skip to content

Commit

Permalink
Merge pull request #241 from liampauling/task/remove-async
Browse files Browse the repository at this point in the history
Task/remove async
  • Loading branch information
liampauling committed Oct 14, 2019
2 parents 413d124 + b11ac1a commit ec520f0
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 39 deletions.
2 changes: 1 addition & 1 deletion HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Release History

**Improvements**

-
- async removed from streaming (force user to handle thread)

**Bug Fixes**

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ In development so breaking changes likely.
market_data_filter=market_data_filter,
)

>>> betfair_socket.start(async_=False)
>>> betfair_socket.start()
```

# historic data
Expand Down Expand Up @@ -131,7 +131,7 @@ In development so breaking changes likely.
directory='horse-racing-pro-sample',
)

>>> stream.start(async_=False)
>>> stream.start()
```

or use the stream generator:
Expand Down
25 changes: 6 additions & 19 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import threading
import socket
import ssl
import datetime
Expand Down Expand Up @@ -40,21 +39,14 @@ def __init__(self, unique_id, listener, app_key, session_token, timeout, buffer_
self._socket = None
self._running = False

def start(self, async_=False):
"""Starts read loop, new thread if async and
connects/authenticates if not already running.
:param async_: If True new thread is started
def start(self):
"""Starts read loop, connects/authenticates
if not already running.
"""
if not self._running:
self._connect()
self.authenticate()
if async_:
t = threading.Thread(name=self.description, target=self._read_loop)
t.daemon = False
t.start()
else:
self._read_loop()
self._read_loop()

def stop(self):
"""Stops read loop and closes socket if it has been created.
Expand Down Expand Up @@ -270,14 +262,9 @@ def __init__(self, directory, listener):
self.listener = listener
self._running = False

def start(self, async_=False):
def start(self):
self._running = True
if async_:
t = threading.Thread(name='HistoricalStream', target=self._read_loop)
t.daemon = False
t.start()
else:
self._read_loop()
self._read_loop()

def stop(self):
self._running = False
Expand Down
6 changes: 4 additions & 2 deletions examples/examplestreaming.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import logging
import queue
import threading

import betfairlightweight
from betfairlightweight.filters import (
Expand Down Expand Up @@ -48,8 +49,9 @@
conflate_ms=1000, # send update every 1000ms
)

# start stream
stream.start(async_=True)
# start stream in a new thread (in production would need err handling)
t = threading.Thread(target=stream.start, daemon=True)
t.start()

"""
Data can also be accessed by using the snap function in the listener, e.g:
Expand Down
2 changes: 1 addition & 1 deletion examples/examplestreaminghistorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ def _add_stream(self, unique_id, stream_type):
)

# start stream
stream.start(async_=False)
stream.start()
2 changes: 1 addition & 1 deletion tests/integration/test_historicalstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_historical_stream(self):
directory='tests/resources/historicaldata/BASIC-1.132153978',
listener=StreamListener()
)
stream.start(async_=False)
stream.start()

assert stream.listener.stream_type == 'marketSubscription'
assert stream.listener.stream_unique_id == 'HISTORICAL'
Expand Down
15 changes: 2 additions & 13 deletions tests/unit/test_betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,14 @@ def test_host_init(self):

@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream.authenticate')
@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream._connect')
@mock.patch('betfairlightweight.streaming.betfairstream.threading')
@mock.patch('betfairlightweight.streaming.betfairstream.BetfairStream._read_loop')
def test_start(self, mock_read_loop, mock_threading, mock_connect, mock_authenticate):
def test_start(self, mock_read_loop, mock_connect, mock_authenticate):
self.betfair_stream._running = True
self.betfair_stream.start()
mock_read_loop.assert_called_with()

self.betfair_stream.start(async_=True)
mock_threading.Thread.assert_called_with(name=self.description, target=mock_read_loop)

self.betfair_stream._running = False
self.betfair_stream.start(async_=False)
self.betfair_stream.start()
mock_connect.assert_called_with()
mock_authenticate.assert_called_with()

Expand Down Expand Up @@ -297,13 +293,6 @@ def test_start(self, mock_read_loop):
mock_read_loop.assert_called_with()
assert self.stream._running is True

@mock.patch('betfairlightweight.streaming.betfairstream.HistoricalStream._read_loop')
@mock.patch('betfairlightweight.streaming.betfairstream.threading')
def test_start_thread(self, mock_threading, mock_read_loop):
self.stream.start(async_=True)
mock_threading.Thread.assert_called_with(name='HistoricalStream', target=mock_read_loop)
assert self.stream._running is True

def test_stop(self):
self.stream._running = True
self.stream.stop()
Expand Down

0 comments on commit ec520f0

Please sign in to comment.