Skip to content

Commit

Permalink
Merge pull request #104 from liampauling/streaming_refactor
Browse files Browse the repository at this point in the history
Streaming refactor
  • Loading branch information
liampauling committed Jul 1, 2017
2 parents 5d43a25 + 1c04cce commit 80639f4
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 556 deletions.
2 changes: 1 addition & 1 deletion betfairlightweight/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from . import filters

__title__ = 'betfairlightweight'
__version__ = '1.3.2'
__version__ = '1.4.0'
__author__ = 'Liam Pauling'

# Set default logging handler to avoid "No handler found" warnings.
Expand Down
5 changes: 4 additions & 1 deletion betfairlightweight/endpoints/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def __init__(self, parent):
"""
self.client = parent

def create_stream(self, unique_id=0, listener=None, timeout=11, buffer_size=1024, description='BetfairSocket'):
def create_stream(self, unique_id=0, listener=None, timeout=11, buffer_size=1024, description='BetfairSocket',
host=None):
"""
Creates BetfairStream.
Expand All @@ -24,6 +25,7 @@ def create_stream(self, unique_id=0, listener=None, timeout=11, buffer_size=1024
:param float timeout: Socket timeout
:param int buffer_size: Socket buffer size
:param str description: Betfair stream description
:param str host: Host endpoint (prod (default) or integration)
:rtype: resources.BetfairStream
"""
Expand All @@ -36,4 +38,5 @@ def create_stream(self, unique_id=0, listener=None, timeout=11, buffer_size=1024
timeout=timeout,
buffer_size=buffer_size,
description=description,
host=host,
)
264 changes: 103 additions & 161 deletions betfairlightweight/resources/streamingresources.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import datetime
from operator import itemgetter

from ..utils import update_available
from .baseresource import BaseResource
from .bettingresources import (
MarketBook,
Expand Down Expand Up @@ -110,9 +108,60 @@ def __init__(self, betDelay, bettingType, bspMarket, bspReconciled, complete, cr
self.line_max_unit = lineMaxUnit
self.line_min_unit = lineMinUnit
self.line_interval = lineInterval
self.runners = [MarketDefinitionRunner(**i) for i in runners]
self.runners = [
MarketDefinitionRunner(**i) for i in runners
]
self.name = name # historic data only
self.event_name = eventName # historic data only
self.runners_dict = {
(runner.selection_id, runner.handicap): runner for runner in self.runners
}


class Available(object):
"""
Data structure to hold prices/traded amount,
designed to be as quick as possible.
"""

def __init__(self, prices, deletion_select, reverse=False):
"""
:param list prices: Current prices
:param int deletion_select: Used to decide if update should delete cache
:param bool reverse: Used for sorting
"""
self.prices = prices or []
self.deletion_select = deletion_select
self.reverse = reverse

self.serialise = []
self.sort()

def sort(self):
self.prices.sort(reverse=self.reverse)
self.serialise = [
{'price': volume[self.deletion_select-1], 'size': volume[self.deletion_select]} for volume in self.prices
]

def clear(self):
self.prices = []
self.sort()

def update(self, book_update):
for book in book_update:
for (count, trade) in enumerate(self.prices):
if trade[0] == book[0]:
if book[self.deletion_select] == 0:
del self.prices[count]
break
else:
self.prices[count] = book
break
else:
if book[self.deletion_select] != 0:
# handles betfair bug, http://forum.bdp.betfair.com/showthread.php?t=3351
self.prices.append(book)
self.sort()


class RunnerBook(object):
Expand All @@ -122,156 +171,59 @@ def __init__(self, id, ltp=None, tv=None, trd=None, atb=None, batb=None, bdatb=N
self.selection_id = id
self.last_price_traded = ltp
self.total_matched = tv
self.traded = trd
self.available_to_back = atb
self.best_available_to_back = batb
self.best_display_available_to_back = bdatb
self.available_to_lay = atl
self.best_available_to_lay = batl
self.best_display_available_to_lay = bdatl
self.traded = Available(trd, 1)
self.available_to_back = Available(atb, 1, True)
self.best_available_to_back = Available(batb, 2)
self.best_display_available_to_back = Available(bdatb, 2)
self.available_to_lay = Available(atl, 1)
self.best_available_to_lay = Available(batl, 2)
self.best_display_available_to_lay = Available(bdatl, 2)
self.starting_price_back = Available(spb, 1)
self.starting_price_lay = Available(spl, 1)
self.starting_price_near = spn
self.starting_price_far = spf
self.starting_price_back = spb
self.starting_price_lay = spl
self.handicap = hc

def update_traded(self, traded_update):
""":param traded_update: [price, size]
"""
if not traded_update:
self.traded = traded_update
elif self.traded is None:
self.traded = traded_update
else:
update_available(self.traded, traded_update, 1)

def update_available_to_back(self, book_update):
""":param book_update: [price, size]
"""
if self.available_to_back is None:
self.available_to_back = book_update
else:
update_available(self.available_to_back, book_update, 1)

def update_available_to_lay(self, book_update):
""":param book_update: [price, size]
"""
if self.available_to_lay is None:
self.available_to_lay = book_update
else:
update_available(self.available_to_lay, book_update, 1)

def update_best_available_to_back(self, book_update):
""":param book_update: [level, price, size]
"""
if self.best_available_to_back is None:
self.best_available_to_back = book_update
else:
update_available(self.best_available_to_back, book_update, 2)

def update_best_available_to_lay(self, book_update):
""":param book_update: [level, price, size]
"""
if self.best_available_to_lay is None:
self.best_available_to_lay = book_update
else:
update_available(self.best_available_to_lay, book_update, 2)

def update_best_display_available_to_back(self, book_update):
""":param book_update: [level, price, size]
"""
if self.best_display_available_to_back is None:
self.best_display_available_to_back = book_update
else:
update_available(self.best_display_available_to_back, book_update, 2)

def update_best_display_available_to_lay(self, book_update):
""":param book_update: [level, price, size]
"""
if self.best_display_available_to_lay is None:
self.best_display_available_to_lay = book_update
else:
update_available(self.best_display_available_to_lay, book_update, 2)

def update_starting_price_back(self, book_update):
""":param book_update: [price, size]
"""
if self.starting_price_back is None:
self.starting_price_back = book_update
self.traded.clear()
else:
update_available(self.starting_price_back, book_update, 1)

def update_starting_price_lay(self, book_update):
""":param book_update: [price, size]
"""
if self.starting_price_lay is None:
self.starting_price_lay = book_update
else:
update_available(self.starting_price_lay, book_update, 1)

@property
def serialise_traded_volume(self):
if self.traded:
return [
{'price': volume[0], 'size': volume[1]} for volume in sorted(self.traded, key=itemgetter(0))
]
else:
return []
self.traded.update(traded_update)

@property
def serialise_available_to_back(self):
if self.available_to_back:
return [{'price': volume[0], 'size': volume[1]}
for volume in sorted(self.available_to_back, key=itemgetter(0), reverse=True)]
elif self.best_display_available_to_back:
return [{'price': volume[1], 'size': volume[2]}
for volume in sorted(self.best_display_available_to_back, key=itemgetter(0))]
elif self.best_available_to_back:
return [{'price': volume[1], 'size': volume[2]}
for volume in sorted(self.best_available_to_back, key=itemgetter(0))]
if self.available_to_back.prices:
return self.available_to_back.serialise
elif self.best_display_available_to_back.prices:
return self.best_display_available_to_back.serialise
elif self.best_available_to_back.prices:
return self.best_available_to_back.serialise
else:
return []

@property
def serialise_available_to_lay(self):
if self.available_to_lay:
return [{'price': volume[0], 'size': volume[1]}
for volume in sorted(self.available_to_lay, key=itemgetter(0))]
elif self.best_display_available_to_lay:
return [{'price': volume[1], 'size': volume[2]}
for volume in sorted(self.best_display_available_to_lay, key=itemgetter(0))]
elif self.best_available_to_lay:
return [{'price': volume[1], 'size': volume[2]}
for volume in sorted(self.best_available_to_lay, key=itemgetter(0))]
return []

@property
def serialise_starting_price_back(self):
if self.starting_price_back:
return [{'price': volume[0], 'size': volume[1]}
for volume in sorted(self.starting_price_back, key=itemgetter(0))]
return []

@property
def serialise_starting_price_lay(self):
if self.starting_price_lay:
return [{'price': volume[0], 'size': volume[1]}
for volume in sorted(self.starting_price_lay, key=itemgetter(0))]
if self.available_to_lay.prices:
return self.available_to_lay.serialise
elif self.best_display_available_to_lay.prices:
return self.best_display_available_to_lay.serialise
elif self.best_available_to_lay.prices:
return self.best_available_to_lay.serialise
return []

def serialise(self, runner_definition):
return {
'status': runner_definition.status,
'ex': {
'tradedVolume': self.serialise_traded_volume,
'availableToBack': self.serialise_available_to_back,
'availableToLay': self.serialise_available_to_lay
'tradedVolume': self.traded.serialise,
'availableToBack': self.serialise_available_to_back(),
'availableToLay': self.serialise_available_to_lay()
},
'sp': {
'nearPrice': self.starting_price_near,
'farPrice': self.starting_price_far,
'backStakeTaken': self.serialise_starting_price_back,
'layLiabilityTaken': self.serialise_starting_price_lay,
'backStakeTaken': self.starting_price_back.serialise,
'layLiabilityTaken': self.starting_price_lay.serialise,
'actualSP': runner_definition.bsp
},
'adjustmentFactor': runner_definition.adjustment_factor,
Expand Down Expand Up @@ -307,8 +259,9 @@ def update_cache(self, market_change, publish_time):

if 'rc' in market_change:
for new_data in market_change['rc']:
selection_id = new_data['id']
runner = self.runner_dict.get(selection_id)
runner = self.runner_dict.get(
(new_data['id'], new_data.get('hc'))
)
if runner:
if 'ltp' in new_data:
runner.last_price_traded = new_data['ltp']
Expand All @@ -321,21 +274,21 @@ def update_cache(self, market_change, publish_time):
if 'trd' in new_data:
runner.update_traded(new_data['trd'])
if 'atb' in new_data:
runner.update_available_to_back(new_data['atb'])
runner.available_to_back.update(new_data['atb'])
if 'atl' in new_data:
runner.update_available_to_lay(new_data['atl'])
runner.available_to_lay.update(new_data['atl'])
if 'batb' in new_data:
runner.update_best_available_to_back(new_data['batb'])
runner.best_available_to_back.update(new_data['batb'])
if 'batl' in new_data:
runner.update_best_available_to_lay(new_data['batl'])
runner.best_available_to_lay.update(new_data['batl'])
if 'bdatb' in new_data:
runner.update_best_display_available_to_back(new_data['bdatb'])
runner.best_display_available_to_back.update(new_data['bdatb'])
if 'bdatl' in new_data:
runner.update_best_display_available_to_lay(new_data['bdatl'])
runner.best_display_available_to_lay.update(new_data['bdatl'])
if 'spb' in new_data:
runner.update_starting_price_back(new_data['spb'])
runner.starting_price_back.update(new_data['spb'])
if 'spl' in new_data:
runner.update_starting_price_lay(new_data['spl'])
runner.starting_price_lay.update(new_data['spl'])
else:
self.runners.append(RunnerBook(**new_data))

Expand All @@ -353,11 +306,7 @@ def create_market_book(self, unique_id, streaming_update, lightweight):

@property
def runner_dict(self):
return {runner.selection_id: runner for runner in self.runners}

@property
def market_definition_dict(self):
return {runner.selection_id: runner for runner in self.market_definition.runners}
return {(runner.selection_id, runner.handicap): runner for runner in self.runners}

@property
def serialise(self):
Expand All @@ -382,7 +331,9 @@ def serialise(self):
'numberOfRunners': len(self.market_definition.runners),
'numberOfActiveRunners': self.market_definition.number_of_active_runners,
'runners': [
runner.serialise(self.market_definition_dict.get(runner.selection_id)) for runner in self.runners
runner.serialise(
self.market_definition.runners_dict[(runner.selection_id, runner.handicap)]
) for runner in self.runners
],
'publishTime': self.publish_time,
}
Expand Down Expand Up @@ -449,24 +400,12 @@ class OrderBookRunner(object):
def __init__(self, id, fullImage=None, ml=None, mb=None, uo=None, hc=None, smc=None):
self.selection_id = id
self.full_image = fullImage
self.matched_lays = ml
self.matched_backs = mb
self.matched_lays = Available(ml, 1)
self.matched_backs = Available(mb, 1)
self.unmatched_orders = [UnmatchedOrder(**i) for i in uo] if uo else []
self.handicap = hc
self.strategy_matches = smc

def update_matched_backs(self, matched_backs):
if not self.matched_backs:
self.matched_backs = [matched_back for matched_back in matched_backs]
else:
update_available(self.matched_backs, matched_backs, 1)

def update_matched_lays(self, matched_lays):
if not self.matched_lays:
self.matched_lays = [matched_lay for matched_lay in matched_lays]
else:
update_available(self.matched_lays, matched_lays, 1)

def update_unmatched(self, unmatched_orders):
order_dict = {order.bet_id: order for order in self.unmatched_orders}
for unmatched_order in unmatched_orders:
Expand Down Expand Up @@ -501,9 +440,12 @@ def update_cache(self, order_book, publish_time):
selection_id = order_changes['id']
runner = self.runner_dict.get(selection_id)
if runner:
runner.update_matched_lays(order_changes.get('ml', []))
runner.update_matched_backs(order_changes.get('mb', []))
runner.update_unmatched(order_changes.get('uo', []))
if 'ml' in order_changes:
runner.matched_lays.update(order_changes['ml'])
if 'mb' in order_changes:
runner.matched_backs.update(order_changes['mb'])
if 'uo' in order_changes:
runner.update_unmatched(order_changes['uo'])
else:
self.runners.append(OrderBookRunner(**order_changes))

Expand Down

0 comments on commit 80639f4

Please sign in to comment.