Skip to content

Commit

Permalink
runner_dict parameter added
Browse files Browse the repository at this point in the history
market_def_dict parameter added
filters added for market and streaming
streaming can take a BaseFilter instance
  • Loading branch information
liampauling committed Jan 15, 2017
1 parent 7f54c0c commit 4a88984
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 23 deletions.
20 changes: 11 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,19 @@ stream. The listener can hold a cache and push market_books/order_books out via
[Exchange Stream API](http://docs.developer.betfair.com/docs/display/1smk3cen4v3lu3yomq5qye0ni/Exchange+Stream+API)

```python
from betfairlightweight import StreamingMarketFilter, StreamingMarketDataFilter

betfair_socket = trading.streaming.create_stream(unique_id=2, description='Test Market Socket')

market_filter = {
'eventTypeIds': ['7'],
'countryCodes': ['GB', 'IE'],
'marketTypes': ['WIN']
}
market_data_filter = {
'fields': ['EX_BEST_OFFERS', 'EX_MARKET_DEF'],
'ladderLevels': 1
}
market_filter = StreamingMarketFilter(
event_type_ids=['7'],
country_codes=['IE'],
market_types=['WIN'],
)
market_data_filter = StreamingMarketDataFilter(
fields=['EX_ALL_OFFERS', 'EX_MARKET_DEF'],
ladder_levels=3
)

betfair_socket.subscribe_to_markets(unique_id=12345,
market_filter=market_filter,
Expand Down
1 change: 1 addition & 0 deletions betfairlightweight/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .apiclient import APIClient
from .exceptions import BetfairError
from .streaming import StreamListener
from .filters import MarketFilter, StreamingMarketFilter, StreamingMarketDataFilter


__title__ = 'betfairlightweight'
Expand Down
92 changes: 92 additions & 0 deletions betfairlightweight/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@


class BaseFilter:
"""Base Filter"""


class MarketFilter(BaseFilter):

def __init__(self, market_ids=None, bsp_only=None, market_betting_types=None, event_type_ids=None, event_ids=None,
turn_in_play_enabled=None, market_type_codes=None, venues=None, market_countries=None, text_query=None,
competition_ids=None, in_play_only=None, market_start_time=None, with_orders=None):
self.text_query = text_query
self.event_type_ids = event_type_ids or []
self.event_ids = event_ids or []
self.competition_ids = competition_ids or []
self.market_ids = market_ids or []
self.venues = venues or []
self.bsp_only = bsp_only
self.turn_in_play_enabled = turn_in_play_enabled
self.in_play_only = in_play_only
self.market_betting_types = market_betting_types or []
self.market_type_codes = market_type_codes or []
self.market_countries = market_countries or []
self.market_start_time = market_start_time
self.with_orders = with_orders or []

@property
def serialise(self):
return {
'marketIds': self.market_ids,
'textQuery': self.text_query,
'marketBettingTypes': self.market_betting_types,
'eventTypeIds': self.event_type_ids,
'eventIds': self.event_ids,
'turnInPlayEnabled': self.turn_in_play_enabled,
'inPlayOnly': self.in_play_only,
'marketTypeCodes': self.market_type_codes,
'venues': self.venues,
'marketCountries': self.market_countries,
'bspOnly': self.bsp_only,
'competitionIds': self.competition_ids,
'marketStartTime': self.market_start_time,
'withOrders': self.with_orders,
}


class StreamingMarketFilter(BaseFilter):

def __init__(self, market_ids=None, bsp_market=None, betting_types=None, event_type_ids=None, event_ids=None,
turn_in_play_enabled=None, market_types=None, venues=None, country_codes=None):
self.market_ids = market_ids or []
self.bsp_market = bsp_market
self.betting_types = betting_types or []
self.event_type_ids = event_type_ids or []
self.event_ids = event_ids or []
self.turn_in_play_enabled = turn_in_play_enabled
self.market_types = market_types or []
self.venues = venues or []
self.country_codes = country_codes or []

@property
def serialise(self):
return {
'marketIds': self.market_ids,
'bspMarket': self.bsp_market,
'bettingTypes': self.betting_types,
'eventTypeIds': self.event_type_ids,
'eventIds': self.event_ids,
'turnInPlayEnabled': self.turn_in_play_enabled,
'marketTypes': self.market_types,
'venues': self.venues,
'countryCodes': self.country_codes,
}


class StreamingMarketDataFilter(BaseFilter):

def __init__(self, fields=None, ladder_levels=None):
"""
fields: EX_BEST_OFFERS_DISP, EX_BEST_OFFERS, EX_ALL_OFFERS, EX_TRADED,
EX_TRADED_VOL, EX_LTP, EX_MARKET_DEF, SP_TRADED, SP_PROJECTED
ladder_levels: 1->10
"""
self.fields = fields or []
self.ladder_levels = ladder_levels

@property
def serialise(self):
return {
'fields': self.fields,
'ladderLevels': self.ladder_levels
}
25 changes: 17 additions & 8 deletions betfairlightweight/resources/streamingresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ def update_cache(self, market_change, publish_time):
self.total_matched = traded_volume

runner_change = market_change.get('rc')
runner_dict = {runner.selection_id: runner for runner in self.runners}
if runner_change:
for new_data in runner_change:
selection_id = new_data.get('id')
runner = runner_dict.get(selection_id)
runner = self.runner_dict.get(selection_id)
if runner:
if new_data.get('ltp'):
runner.last_price_traded = new_data.get('ltp')
Expand Down Expand Up @@ -270,16 +269,23 @@ def update_cache(self, market_change, publish_time):
runner.update_starting_price_back(new_data.get('spl'))
else:
self.runners.append(RunnerBook(**new_data))
runner_dict = {runner.selection_id: runner for runner in self.runners}

def create_market_book(self, unique_id):
return MarketBook(date_time_sent=self._datetime_updated, streaming_unique_id=unique_id, **self.serialise)

@property
def runner_dict(self):
return {runner.selection_id: runner for runner in self.runners}

@property
def market_definition_dict(self):
return {runner.id: runner for runner in self.market_definition.runners}

@property
def serialise(self):
"""Creates standard market book json response
"""Creates standard market book json response,
will error if EX_MARKET_DEF not incl.
"""
market_definition_dict = {runner.id: runner for runner in self.market_definition.runners}
return {
'marketId': self.market_id,
'totalAvailable': None,
Expand All @@ -297,7 +303,7 @@ def serialise(self):
'inplay': self.market_definition.in_play,
'numberOfWinners': self.market_definition.number_of_winners,
'numberOfActiveRunners': self.market_definition.number_of_active_runners,
'runners': [runner.serialise(market_definition_dict.get(runner.selection_id).status)
'runners': [runner.serialise(self.market_definition_dict.get(runner.selection_id).status)
for runner in self.runners],
'publishTime': self.publish_time,
}
Expand Down Expand Up @@ -421,11 +427,10 @@ class Meta(BaseResource.Meta):

def update_cache(self, order_book, publish_time):
self._datetime_updated = self.strip_datetime(publish_time)
runner_dict = {runner.selection_id: runner for runner in self.runners}

for order_changes in order_book.get('orc', []):
selection_id = order_changes.get('id')
runner = runner_dict.get(selection_id)
runner = self.runner_dict.get(selection_id)
if runner:
runner.update_matched_lays(order_changes.get('ml', []))
runner.update_matched_backs(order_changes.get('mb', []))
Expand All @@ -436,6 +441,10 @@ def update_cache(self, order_book, publish_time):
def create_order_book(self, unique_id):
return CurrentOrders(date_time_sent=self._datetime_updated, streaming_unique_id=unique_id, **self.serialise)

@property
def runner_dict(self):
return {runner.selection_id: runner for runner in self.runners}

@property
def serialise(self):
orders = []
Expand Down
6 changes: 4 additions & 2 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ssl
import datetime

from ..filters import BaseFilter
from ..exceptions import SocketError
from ..compat import is_py3

Expand Down Expand Up @@ -90,8 +91,9 @@ def subscribe_to_markets(self, unique_id, market_filter, market_data_filter, ini
message = {
'op': 'marketSubscription',
'id': unique_id,
'marketFilter': market_filter,
'marketDataFilter': market_data_filter,
'marketFilter': market_filter.serialise if isinstance(market_filter, BaseFilter) else market_filter,
'marketDataFilter': market_data_filter.serialise if isinstance(
market_data_filter, BaseFilter) else market_data_filter,
'initialClk': initial_clk,
'clk': clk,
}
Expand Down
3 changes: 3 additions & 0 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,7 @@ def _error_handler(data, unique_id):
return True

def __str__(self):
return 'StreamListener'

def __repr__(self):
return '<StreamListener>'
8 changes: 7 additions & 1 deletion betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def clear_cache(self):
self._caches.clear()

def _on_creation(self):
logging.info('[Stream: %s]: "%s" stream created' % (self.unique_id, str(self)))
logging.info('[Stream: %s]: "%s" created' % (self.unique_id, str(self)))

def _process(self, book_data, publish_time):
pass
Expand Down Expand Up @@ -105,6 +105,9 @@ def _process(self, market_books, publish_time):
self.output_queue.put(output_market_book)

def __str__(self):
return 'MarketStream'

def __repr__(self):
return '<MarketStream [%s]>' % len(self._caches)


Expand All @@ -131,4 +134,7 @@ def _process(self, order_books, publish_time):
self.output_queue.put(output_order_book)

def __str__(self):
return 'OrderStream'

def __repr__(self):
return '<OrderStream [%s]>' % len(self._caches)
92 changes: 92 additions & 0 deletions tests/test_filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import unittest
from unittest import mock

from betfairlightweight.filters import MarketFilter, StreamingMarketDataFilter, StreamingMarketFilter


class MarketFilterTest(unittest.TestCase):

def setUp(self):
self.market_filter = MarketFilter()

def test_init(self):
assert self.market_filter.text_query is None
assert self.market_filter.event_type_ids == []
assert self.market_filter.event_ids == []
assert self.market_filter.competition_ids == []
assert self.market_filter.market_ids == []
assert self.market_filter.venues == []
assert self.market_filter.bsp_only is None
assert self.market_filter.turn_in_play_enabled is None
assert self.market_filter.in_play_only is None
assert self.market_filter.market_betting_types == []
assert self.market_filter.market_type_codes == []
assert self.market_filter.market_countries == []
assert self.market_filter.market_start_time is None
assert self.market_filter.with_orders == []

def test_serialise(self):
assert self.market_filter.serialise == {
'marketIds': [],
'textQuery': None,
'marketBettingTypes': [],
'eventTypeIds': [],
'eventIds': [],
'turnInPlayEnabled': None,
'inPlayOnly': None,
'marketTypeCodes': [],
'venues': [],
'marketCountries': [],
'bspOnly': None,
'competitionIds': [],
'marketStartTime': None,
'withOrders': [],
}


class StreamingMarketFilterTest(unittest.TestCase):

def setUp(self):
self.market_filter = StreamingMarketFilter()

def test_init(self):
assert self.market_filter.market_ids == []
assert self.market_filter.bsp_market is None
assert self.market_filter.betting_types == []
assert self.market_filter.event_type_ids == []
assert self.market_filter.event_ids == []
assert self.market_filter.turn_in_play_enabled is None
assert self.market_filter.market_types == []
assert self.market_filter.venues == []
assert self.market_filter.country_codes == []

def test_serialise(self):
assert self.market_filter.serialise == {
'marketIds': [],
'bspMarket': None,
'bettingTypes': [],
'eventTypeIds': [],
'eventIds': [],
'turnInPlayEnabled': None,
'marketTypes': [],
'venues': [],
'countryCodes': [],
}


class StreamingMarketDataFilterTest(unittest.TestCase):

def setUp(self):
self.fields = [1, 2, 3]
self.ladder_levels = 69
self.market_filter = StreamingMarketDataFilter(self.fields, self.ladder_levels)

def test_init(self):
assert self.market_filter.fields == self.fields
assert self.market_filter.ladder_levels == self.ladder_levels

def test_serialise(self):
assert self.market_filter.serialise == {
'fields': self.fields,
'ladderLevels': self.ladder_levels
}
2 changes: 1 addition & 1 deletion tests/test_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def test_error_handler(self):
assert return_value is None

def test_str(self):
assert str(self.stream_listener) == '<StreamListener>'
assert str(self.stream_listener) == 'StreamListener'

def test_repr(self):
assert repr(self.stream_listener) == '<StreamListener>'
10 changes: 8 additions & 2 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ def test_process(self, mock_market_book_cache):
# self.stream._process(market_books, now)

def test_str(self):
assert str(self.stream) == '<MarketStream [0]>'
assert str(self.stream) == 'MarketStream'

def test_repr(self):
assert repr(self.stream) == '<MarketStream [0]>'


class OrderStreamTest(unittest.TestCase):
Expand All @@ -127,4 +130,7 @@ def test_process(self):
pass

def test_str(self):
assert str(self.stream) == '<OrderStream [0]>'
assert str(self.stream) == 'OrderStream'

def test_repr(self):
assert repr(self.stream) == '<OrderStream [0]>'
Loading

0 comments on commit 4a88984

Please sign in to comment.