Skip to content

Commit

Permalink
Update with bflw 2.12.0b0 for testing
Browse files Browse the repository at this point in the history
2x speed improvement!
  • Loading branch information
liampauling committed Jan 11, 2021
1 parent 707a819 commit dde4f86
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 43 deletions.
2 changes: 1 addition & 1 deletion examples/backtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

framework = FlumineBacktest(client=client)

_market = "/Users/liampauling/Downloads/1.169399847"
_market = "tests/resources/PRO-1.170258213"

strategy = LowestLayer(
market_filter={"markets": [_market]},
Expand Down
16 changes: 9 additions & 7 deletions flumine/streams/historicalstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _process(self, data: list, publish_time: int) -> bool:
)
continue
market_book_cache = MarketBookCache(
publish_time=publish_time, **market_book
market_id, publish_time, self._lightweight
)
self._caches[market_id] = market_book_cache
logger.info(
Expand Down Expand Up @@ -69,9 +69,7 @@ def snap(self, market_ids: list = None) -> list:
if self._listener.inplay is False:
if cache.market_definition["inPlay"]:
continue
market_books.append(
cache.create_resource(self.unique_id, self._lightweight, snap=True)
)
market_books.append(cache.create_resource(self.unique_id, snap=True))
return market_books


Expand All @@ -87,7 +85,10 @@ def _process(self, race_updates: list, publish_time: int) -> bool:
market_id = update["mid"]
race_cache = self._caches.get(market_id)
if race_cache is None:
race_cache = RaceCache(publish_time=publish_time, **update)
race_id = update.get("id")
race_cache = RaceCache(
market_id, publish_time, race_id, self._lightweight
)
self._caches[market_id] = race_cache
logger.info(
"[%s: %s]: %s added, %s markets in cache"
Expand All @@ -111,9 +112,9 @@ def __init__(self, inplay: bool = None, seconds_to_start: float = None, **kwargs

def _add_stream(self, unique_id, stream_type):
if stream_type == "marketSubscription":
return FlumineMarketStream(self)
return FlumineMarketStream(self, unique_id)
elif stream_type == "raceSubscription":
return FlumineRaceStream(self)
return FlumineRaceStream(self, unique_id)


class HistoricalStream(BaseStream):
Expand All @@ -132,5 +133,6 @@ def create_generator(self):
file_path=self.market_filter,
listener=self._listener,
operation=self.operation,
unique_id=0,
)
return stream.get_generator()
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
betfairlightweight==2.11.1
betfairlightweight==2.12.0b0
tenacity==5.0.3
python-json-logger==2.0.1
requests
31 changes: 16 additions & 15 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def check_market_book(self, market, market_book):

def process_market_book(self, market, market_book):
for runner in market_book.runners:
if runner.last_price_traded < 2:
if runner.status == "ACTIVE" and runner.last_price_traded < 2:
lay = get_price(runner.ex.available_to_lay, 0)
trade = Trade(
market_book.market_id,
Expand All @@ -68,21 +68,22 @@ def check_market_book(self, market, market_book):

def process_market_book(self, market, market_book):
for runner in market_book.runners:
runner_context = self.get_runner_context(
market.market_id, runner.selection_id
)
if runner_context.trade_count == 0:
trade = Trade(
market_book.market_id,
runner.selection_id,
runner.handicap,
self,
if runner.status == "ACTIVE":
runner_context = self.get_runner_context(
market.market_id, runner.selection_id
)
order = trade.create_order(
side="LAY",
order_type=MarketOnCloseOrder(100.00),
)
self.place_order(market, order)
if runner_context.trade_count == 0:
trade = Trade(
market_book.market_id,
runner.selection_id,
runner.handicap,
self,
)
order = trade.create_order(
side="LAY",
order_type=MarketOnCloseOrder(100.00),
)
self.place_order(market, order)

client = clients.BacktestClient()
framework = FlumineBacktest(client=client)
Expand Down
39 changes: 20 additions & 19 deletions tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,20 @@ def test_flumine_listener(self, mock_market_stream, mock_race_stream):

def test_flumine_stream(self):
mock_listener = mock.Mock()
stream = datastream.FlumineStream(mock_listener)
stream = datastream.FlumineStream(mock_listener, 0)
self.assertEqual(str(stream), "FlumineStream")
self.assertEqual(repr(stream), "<FlumineStream [0]>")

def test_flumine_stream_on_process(self):
mock_listener = mock.Mock()
stream = datastream.FlumineStream(mock_listener)
stream = datastream.FlumineStream(mock_listener, 0)
stream.on_process([1, 2, 3])
mock_listener.output_queue.put.called_with(datastream.RawDataEvent([1, 2, 3]))

@mock.patch("flumine.streams.datastream.FlumineMarketStream.on_process")
def test_flumine_market_stream(self, mock_on_process):
mock_listener = mock.Mock()
stream = datastream.FlumineMarketStream(mock_listener)
mock_listener = mock.Mock(stream_unique_id=0)
stream = datastream.FlumineMarketStream(mock_listener, 0)
market_books = [{"id": "1.123"}, {"id": "1.456"}, {"id": "1.123"}]
stream._process(market_books, 123)

Expand All @@ -377,8 +377,8 @@ def test_flumine_market_stream(self, mock_on_process):

@mock.patch("flumine.streams.datastream.FlumineMarketStream.on_process")
def test_flumine_market_stream_market_closed(self, mock_on_process):
mock_listener = mock.Mock()
stream = datastream.FlumineMarketStream(mock_listener)
mock_listener = mock.Mock(stream_unique_id=0)
stream = datastream.FlumineMarketStream(mock_listener, 0)
stream._caches = {"1.123": object}
market_books = [{"id": "1.123", "marketDefinition": {"status": "CLOSED"}}]
stream._process(market_books, 123)
Expand All @@ -392,8 +392,8 @@ def test_flumine_market_stream_market_closed(self, mock_on_process):

@mock.patch("flumine.streams.datastream.FlumineRaceStream.on_process")
def test_flumine_race_stream(self, mock_on_process):
mock_listener = mock.Mock()
stream = datastream.FlumineRaceStream(mock_listener)
mock_listener = mock.Mock(stream_unique_id=0)
stream = datastream.FlumineRaceStream(mock_listener, 0)
race_updates = [{"mid": "1.123"}, {"mid": "1.456"}, {"mid": "1.123"}]
stream._process(race_updates, 123)

Expand Down Expand Up @@ -444,14 +444,15 @@ def test_create_generator(self, mock_generator):
file_path={"test": "me"},
listener=self.stream._listener,
operation="marketSubscription",
unique_id=0,
)
self.assertEqual(generator, mock_generator().get_generator())


class TestFlumineMarketStream(unittest.TestCase):
def setUp(self) -> None:
self.listener = mock.Mock()
self.stream = historicalstream.FlumineMarketStream(self.listener)
self.stream = historicalstream.FlumineMarketStream(self.listener, 0)

def test_init(self):
self.assertEqual(self.stream._listener, self.listener)
Expand All @@ -467,17 +468,15 @@ def test__process(self, mock_cache):
)
self.assertEqual(len(self.stream._caches), 1)
self.assertEqual(self.stream._updates_processed, 1)
mock_cache.assert_called_with(
publish_time=12345, id="1.23", img={1: 2}, marketDefinition={"runners": []}
)
mock_cache.assert_called_with("1.23", 12345, self.stream._listener.lightweight)
mock_cache().update_cache.assert_called_with(
{"id": "1.23", "img": {1: 2}, "marketDefinition": {"runners": []}}, 12345
)

def test_snap_inplay(self):
# inPlay
self.stream = historicalstream.FlumineMarketStream(
mock.Mock(inplay=True, seconds_to_start=None)
mock.Mock(inplay=True, seconds_to_start=None), 0
)
self.stream._caches = {
"1.123": mock.Mock(market_definition={"status": "OPEN", "inPlay": False})
Expand All @@ -493,7 +492,7 @@ def test_snap_inplay(self):
self.assertEqual(len(self.stream.snap()), 1)

self.stream = historicalstream.FlumineMarketStream(
mock.Mock(inplay=False, seconds_to_start=None)
mock.Mock(inplay=False, seconds_to_start=None), 0
)
self.stream._caches = {
"1.123": mock.Mock(market_definition={"status": "OPEN", "inPlay": False})
Expand All @@ -507,7 +506,7 @@ def test_snap_inplay(self):
def test_snap_seconds_to_start(self):
# secondsToStart
self.stream = historicalstream.FlumineMarketStream(
mock.Mock(inplay=None, seconds_to_start=600)
mock.Mock(inplay=None, seconds_to_start=600), 0
)
self.stream._caches = {
"1.123": mock.Mock(
Expand Down Expand Up @@ -536,21 +535,23 @@ def test_snap_seconds_to_start(self):
class TestFlumineRaceStream(unittest.TestCase):
def setUp(self) -> None:
self.listener = mock.Mock()
self.stream = historicalstream.FlumineRaceStream(self.listener)
self.stream = historicalstream.FlumineRaceStream(self.listener, 0)

@mock.patch("flumine.streams.historicalstream.RaceCache")
def test__process(self, mock_cache):
self.assertFalse(
self.stream._process(
[{"mid": "1.23", "img": {1: 2}}],
[{"mid": "1.23", "id": 1, "img": {1: 2}}],
12345,
)
)
self.assertEqual(len(self.stream._caches), 1)
self.assertEqual(self.stream._updates_processed, 1)
mock_cache.assert_called_with(publish_time=12345, mid="1.23", img={1: 2})
mock_cache.assert_called_with(
"1.23", 12345, 1, self.stream._listener.lightweight
)
mock_cache().update_cache.assert_called_with(
{"mid": "1.23", "img": {1: 2}}, 12345
{"mid": "1.23", "id": 1, "img": {1: 2}}, 12345
)


Expand Down

0 comments on commit dde4f86

Please sign in to comment.