Skip to content

Commit

Permalink
Merge pull request #87 from liampauling/listenerstream_refactor
Browse files Browse the repository at this point in the history
Listenerstream refactor
  • Loading branch information
liampauling committed May 27, 2017
2 parents e82070a + 981dc9c commit c4aaa67
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 89 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Lightweight, super fast (uses c libraries) pythonic wrapper for [Betfair API-NG]

[Documentation](https://github.com/liampauling/betfairlightweight/wiki)

[Join slack group](https://betfairlightweight.herokuapp.com)

Currently tested on Python 2.7, 3.4, 3.5 and 3.6.

# installation
Expand Down Expand Up @@ -43,9 +45,7 @@ The library can then be used as follows:

# streaming

Currently two listeners available, below will run the base listener which prints anything it receives.
Stream listener is able to hold an order stream and a market stream, although it is recommended to have one socket per
stream. The listener can hold a cache and push market_books/order_books out via a queue.
Currently two listeners available, below will run the base listener which prints anything it receives. Stream listener is able to hold an order stream or a market stream (one per listener). The listener can hold a cache and push market_books/order_books out via a queue.

[Exchange Stream API](http://docs.developer.betfair.com/docs/display/1smk3cen4v3lu3yomq5qye0ni/Exchange+Stream+API)

Expand All @@ -56,7 +56,7 @@ from betfairlightweight.filters import (
)

betfair_socket = trading.streaming.create_stream(
unique_id=2,
unique_id=0,
description='Test Market Socket',
)

Expand All @@ -71,7 +71,6 @@ market_data_filter = streaming_market_data_filter(
)

betfair_socket.subscribe_to_markets(
unique_id=12345,
market_filter=market_filter,
market_data_filter=market_data_filter,
)
Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import filters

__title__ = 'betfairlightweight'
__version__ = '1.1.9'
__version__ = '1.2.0'
__author__ = 'Liam Pauling'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
5 changes: 2 additions & 3 deletions betfairlightweight/endpoints/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ def __init__(self, parent):
"""
self.client = parent

def create_stream(self, unique_id, listener=None, timeout=11, buffer_size=1024, description='BetfairSocket'):
def create_stream(self, unique_id=0, listener=None, timeout=11, buffer_size=1024, description='BetfairSocket'):
"""
Creates BetfairStream.
:param dict unique_id: Unique id of the stream
:param dict unique_id: Id used to start unique id's of the stream (+1 before every request)
:param resources.Listener listener: Listener class to use
:param float timeout: Socket timeout
:param int buffer_size: Socket buffer size
Expand All @@ -36,5 +36,4 @@ def create_stream(self, unique_id, listener=None, timeout=11, buffer_size=1024,
timeout=timeout,
buffer_size=buffer_size,
description=description,
# todo lightweight
)
2 changes: 0 additions & 2 deletions betfairlightweight/resources/bettingresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ def __init__(self, **kwargs):
self.total_available = kwargs.get('totalAvailable')
self.total_matched = kwargs.get('totalMatched')
self.version = kwargs.get('version')
self.publish_time = self.strip_datetime(kwargs.get('publishTime'))
self.runners = [RunnerBook(**i) for i in kwargs.get('runners')]


Expand Down Expand Up @@ -499,7 +498,6 @@ def __init__(self, **kwargs):
self.streaming_unique_id = kwargs.pop('streaming_unique_id', None)
self.streaming_update = kwargs.pop('streaming_update', None)
self.publish_time = kwargs.pop('publish_time', None)
self.market_definition = kwargs.pop('market_definition', None)
super(CurrentOrders, self).__init__(**kwargs)
self.more_available = kwargs.get('moreAvailable')
self.orders = [CurrentOrder(**i) for i in kwargs.get('currentOrders')]
Expand Down
5 changes: 4 additions & 1 deletion betfairlightweight/resources/streamingresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ def create_market_book(self, unique_id, streaming_update, lightweight):
elapsed_time=(datetime.datetime.utcnow()-self._datetime_updated).total_seconds(),
streaming_unique_id=unique_id,
streaming_update=streaming_update,
publish_time=self.publish_time,
market_definition=self.market_definition,
**self.serialise
)
Expand Down Expand Up @@ -380,7 +381,6 @@ def serialise(self):
'runners': [
runner.serialise(self.market_definition_dict.get(runner.selection_id)) for runner in self.runners
],
'publishTime': self.publish_time,
}


Expand Down Expand Up @@ -482,12 +482,14 @@ class OrderBookCache(BaseResource):

def __init__(self, **kwargs):
super(OrderBookCache, self).__init__(**kwargs)
self.publish_time = kwargs.get('publish_time')
self.market_id = kwargs.get('id')
self.closed = kwargs.get('closed')
self.runners = [OrderBookRunner(**i) for i in kwargs.get('orc', [])]

def update_cache(self, order_book, publish_time):
self._datetime_updated = self.strip_datetime(publish_time)
self.publish_time = publish_time

for order_changes in order_book.get('orc', []):
selection_id = order_changes['id']
Expand All @@ -507,6 +509,7 @@ def create_order_book(self, unique_id, streaming_update, lightweight):
elapsed_time=(datetime.datetime.utcnow()-self._datetime_updated).total_seconds(),
streaming_unique_id=unique_id,
streaming_update=streaming_update,
publish_time=self.publish_time,
**self.serialise
)

Expand Down
53 changes: 37 additions & 16 deletions betfairlightweight/streaming/betfairstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class BetfairStream(object):
__encoding = 'utf-8'

def __init__(self, unique_id, listener, app_key, session_token, timeout, buffer_size, description):
self.unique_id = unique_id
self._unique_id = unique_id
self.listener = listener
self.app_key = app_key
self.session_token = session_token
Expand Down Expand Up @@ -59,70 +59,91 @@ def stop(self):
except OSError:
pass

def authenticate(self, unique_id=None):
def authenticate(self):
"""Authentication request.
:param unique_id: self.unique_id used if not supplied.
"""
unique_id = self.new_unique_id()
message = {
'op': 'authentication',
'id': unique_id or self.unique_id,
'id': unique_id,
'appKey': self.app_key,
'session': self.session_token
'session': self.session_token,
}
self.listener.register_stream(unique_id or self.unique_id, 'authentication')
self._send(message)
return unique_id

def heartbeat(self, unique_id=None):
def heartbeat(self):
"""Heartbeat request to keep session alive.
:param unique_id: self.unique_id used if not supplied.
"""
unique_id = self.new_unique_id()
message = {
'op': 'heartbeat',
'id': unique_id or self.unique_id
'id': unique_id,
}
self._send(message)
return unique_id

def subscribe_to_markets(self, unique_id, market_filter, market_data_filter, initial_clk=None, clk=None):
def subscribe_to_markets(self, market_filter, market_data_filter, initial_clk=None, clk=None,
conflate_ms=None, heartbeat_ms=None, segmentation_enabled=True):
"""
Market subscription request.
:param dict market_filter: Market filter
:param dict market_data_filter: Market data filter
:param int unique_id: Unique id of stream
:param str initial_clk: Sequence token for reconnect
:param str clk: Sequence token for reconnect
:param int conflate_ms: conflation rate (bounds are 0 to 120000)
:param int heartbeat_ms: heartbeat rate (500 to 5000)
:param bool segmentation_enabled: allow the server to send large sets of data
in segments, instead of a single block
"""
unique_id = self.new_unique_id()
message = {
'op': 'marketSubscription',
'id': unique_id,
'marketFilter': market_filter,
'marketDataFilter': market_data_filter,
'initialClk': initial_clk,
'clk': clk,
'conflateMs': conflate_ms,
'heartbeatMs': heartbeat_ms,
'segmentationEnabled': segmentation_enabled,
}
self.listener.register_stream(unique_id, 'marketSubscription')
self._send(message)
return unique_id

def subscribe_to_orders(self, unique_id, order_filter=None, initial_clk=None, clk=None):
def subscribe_to_orders(self, order_filter=None, initial_clk=None, clk=None, conflate_ms=None,
heartbeat_ms=None, segmentation_enabled=True):
"""
Order subscription request.
:param int unique_id: Unique id of stream
:param dict order_filter: Order filter to be applied
:param str initial_clk: Sequence token for reconnect
:param str clk: Sequence token for reconnect
:param int conflate_ms: conflation rate (bounds are 0 to 120000)
:param int heartbeat_ms: heartbeat rate (500 to 5000)
:param bool segmentation_enabled: allow the server to send large sets of data
in segments, instead of a single block
"""
unique_id = self.new_unique_id()
message = {
'op': 'orderSubscription',
'id': unique_id,
'orderFilter': order_filter,
'initialClk': initial_clk,
'clk': clk,
'conflateMs': conflate_ms,
'heartbeatMs': heartbeat_ms,
'segmentationEnabled': segmentation_enabled,
}
self.listener.register_stream(unique_id, 'orderSubscription')
self._send(message)
return unique_id

def new_unique_id(self):
self._unique_id += 1
return self._unique_id

def _connect(self):
"""Creates socket and sets running to True.
Expand Down Expand Up @@ -156,7 +177,7 @@ def _read_loop(self):
self._data(received_data)
except (socket.timeout, socket.error) as e:
self.stop()
raise SocketError('[Connect: %s]: Socket %s' % (self.unique_id, e))
raise SocketError('[Connect: %s]: Socket %s' % (self._unique_id, e))

def _receive_all(self):
"""Whilst socket is running receives data from socket,
Expand Down
46 changes: 19 additions & 27 deletions betfairlightweight/streaming/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@ class BaseListener(object):

def __init__(self, max_latency=0.5):
self.max_latency = max_latency
self.market_stream = None
self.order_stream = None

def register_stream(self, unique_id, operation):
if operation == 'authentication':
logger.info('[Listener: %s]: %s' % (unique_id, operation))

elif operation == 'marketSubscription':
if self.market_stream is not None:
logger.warning('[Listener: %s]: marketSubscription stream already registered, replacing data' %
unique_id)
self.market_stream = self._add_stream(unique_id, operation)
self.stream = None
self.stream_type = None # marketSubscription/orderSubscription
self.stream_unique_id = None

elif operation == 'orderSubscription':
if self.order_stream is not None:
logger.warning('[Listener: %s]: orderSubscription stream already registered, replacing data' %
unique_id)
self.order_stream = self._add_stream(unique_id, operation)
def register_stream(self, unique_id, operation):
if self.stream is not None:
logger.warning('[Listener: %s]: stream already registered, replacing data' % unique_id)
self.stream = self._add_stream(unique_id, operation)
self.stream_type = operation
self.stream_unique_id = unique_id

def on_data(self, raw_data):
print(raw_data)
Expand All @@ -49,7 +42,7 @@ def __repr__(self):

class StreamListener(BaseListener):
"""Stream listener, processes results from socket,
holds a market and order stream which hold
holds a market or order stream which hold
market_book caches
"""

Expand All @@ -71,6 +64,7 @@ def on_data(self, raw_data):
except ValueError:
logger.error('value error: %s' % raw_data)
return

unique_id = data.get('id')

if self._error_handler(data, unique_id):
Expand All @@ -82,6 +76,10 @@ def on_data(self, raw_data):
elif operation == 'status':
self._on_status(data, unique_id)
elif operation in ['mcm', 'ocm']:
if unique_id != self.stream_unique_id:
logging.warning('Unwanted data received from uniqueId: %s, expecting: %s' %
(unique_id, self.stream_unique_id))
return
self._on_change_message(data, unique_id)

def _on_connection(self, data, unique_id):
Expand All @@ -103,23 +101,17 @@ def _on_status(data, unique_id):

def _on_change_message(self, data, unique_id):
change_type = data.get('ct', 'UPDATE')
operation = data.get('op')

if operation == 'mcm':
stream = self.market_stream
else:
stream = self.order_stream

logger.debug('[Subscription: %s]: %s: %s' % (unique_id, change_type, data))

if change_type == 'SUB_IMAGE':
stream.on_subscribe(data)
self.stream.on_subscribe(data)
elif change_type == 'RESUB_DELTA':
stream.on_resubscribe(data)
self.stream.on_resubscribe(data)
elif change_type == 'HEARTBEAT':
stream.on_heartbeat(data)
self.stream.on_heartbeat(data)
elif change_type == 'UPDATE':
stream.on_update(data)
self.stream.on_update(data)

def _add_stream(self, unique_id, stream_type):
if stream_type == 'marketSubscription':
Expand Down
2 changes: 1 addition & 1 deletion betfairlightweight/streaming/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _process(self, order_books, publish_time):
if order_book_cache:
order_book_cache.update_cache(order_book, publish_time)
else:
self._caches[market_id] = OrderBookCache(**order_book)
self._caches[market_id] = OrderBookCache(publish_time=publish_time, **order_book)
logger.info('[OrderStream: %s] %s added' % (self.unique_id, market_id))
self._updates_processed += 1

Expand Down

0 comments on commit c4aaa67

Please sign in to comment.