Skip to content

Commit

Permalink
Merge 81ddf83 into 274c002
Browse files Browse the repository at this point in the history
  • Loading branch information
liampauling committed Sep 14, 2020
2 parents 274c002 + 81ddf83 commit 0a8703c
Show file tree
Hide file tree
Showing 30 changed files with 108 additions and 56 deletions.
16 changes: 16 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
Release History
---------------

1.12.0 (2020-09-14)
+++++++++++++++++++

**Improvements**

-

**Bug Fixes**

- #248 addition of weakref to try and break circular reference (@synapticarbors) + deletion of each event

**Libraries**

- betfairlightweight upgraded to 2.8.0 (orjson)
- black updated to 20.8b1

1.11.2 (2020-08-28)
+++++++++++++++++++

Expand Down
10 changes: 8 additions & 2 deletions examples/marketrecorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
strategy = MarketRecorder(
name="WIN",
market_filter=betfairlightweight.filters.streaming_market_filter(
event_type_ids=["7"], country_codes=["GB", "IE"], market_types=["WIN"],
event_type_ids=["7"],
country_codes=["GB", "IE"],
market_types=["WIN"],
),
stream_class=DataStream,
context={"local_dir": "/tmp", "force_update": False, "remove_file": True,},
context={
"local_dir": "/tmp",
"force_update": False,
"remove_file": True,
},
)

framework.add_strategy(strategy)
Expand Down
8 changes: 6 additions & 2 deletions examples/strategies/lowestlayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ def process_market_book(self, market, market_book):
# lay at current best lay price
lay = get_price(runner.ex.available_to_lay, 0)
trade = Trade(
market_book.market_id, runner.selection_id, runner.handicap, self,
market_book.market_id,
runner.selection_id,
runner.handicap,
self,
)
order = trade.create_order(
side="LAY", order_type=LimitOrder(lay, self.context["stake"]),
side="LAY",
order_type=LimitOrder(lay, self.context["stake"]),
)
self.place_order(market, order)

Expand Down
7 changes: 4 additions & 3 deletions examples/strategies/marketrecorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def process_closed_market(self, market, data: dict) -> None:
self._loaded_markets.append(market_id)

def _zip_file(self, file_dir: str, market_id: str) -> str:
"""zips txt file into filename.zip
"""
"""zips txt file into filename.zip"""
zip_file_directory = os.path.join(
self.local_dir, self.recorder_id, "%s.zip" % market_id
)
Expand Down Expand Up @@ -187,7 +186,9 @@ def _load(self, market, zip_file_dir: str, market_definition: dict) -> None:
Body=market.market_catalogue.json(),
Bucket=self._bucket,
Key=os.path.join(
"marketdata", "marketCatalogue", market.market_id,
"marketdata",
"marketCatalogue",
market.market_id,
),
)
logger.info(
Expand Down
2 changes: 1 addition & 1 deletion flumine/__version__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = "flumine"
__description__ = "Betfair trading framework"
__url__ = "https://github.com/liampauling/flumine"
__version__ = "1.11.2"
__version__ = "1.12.0"
__author__ = "Liam Pauling"
__license__ = "MIT"
10 changes: 7 additions & 3 deletions flumine/backtest/simulated.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def place(

if runner.status == "REMOVED":
return self._create_place_response(
bet_id, status="FAILURE", error_code="RUNNER_REMOVED",
bet_id,
status="FAILURE",
error_code="RUNNER_REMOVED",
)

available_to_back = get_price(runner.ex.available_to_back, 0) or 1.01
Expand Down Expand Up @@ -136,7 +138,8 @@ def cancel(self) -> SimulatedCancelResponse:
)
else:
return SimulatedCancelResponse(
status="FAILURE", error_code="BET_ACTION_ERROR", # todo ?
status="FAILURE",
error_code="BET_ACTION_ERROR", # todo ?
)

def update(self, instruction: dict):
Expand All @@ -151,7 +154,8 @@ def update(self, instruction: dict):
return SimulatedUpdateResponse(status="SUCCESS")
else:
return SimulatedCancelResponse(
status="FAILURE", error_code="BET_ACTION_ERROR",
status="FAILURE",
error_code="BET_ACTION_ERROR",
)

def _get_runner(self, market_book: MarketBook) -> RunnerBook:
Expand Down
4 changes: 3 additions & 1 deletion flumine/backtest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def __init__(

class SimulatedUpdateResponse:
def __init__(
self, status: str = None, error_code: str = None,
self,
status: str = None,
error_code: str = None,
):
self.status = status
self.error_code = error_code
14 changes: 14 additions & 0 deletions flumine/baseflumine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import queue
import logging
import threading
Expand Down Expand Up @@ -113,6 +114,19 @@ def _add_default_workers(self) -> None:
def _process_market_books(self, event: events.MarketBookEvent) -> None:
for market_book in event.event:
market_id = market_book.market_id

# check latency
latency = time.time() - (market_book.publish_time_epoch / 1e3)
if latency > 2:
logger.warning(
"High latency between current time and MarketBook publish time",
extra={
"market_id": market_id,
"latency": latency,
"pt": market_book.publish_time,
},
)

if market_book.status == "CLOSED":
self.handler_queue.put(events.CloseMarketEvent(market_book))
continue
Expand Down
2 changes: 1 addition & 1 deletion flumine/execution/baseexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, flumine, max_workers: int = MAX_WORKERS):
self._sessions_created = 0

def handler(self, order_package: BaseOrderPackage):
""" Handles order_package, capable of place, cancel,
"""Handles order_package, capable of place, cancel,
replace and update.
"""
http_session = self._get_http_session()
Expand Down
7 changes: 4 additions & 3 deletions flumine/execution/simulatedexecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ class SimulatedExecution(BaseExecution):
REPLACE_LATENCY = 0.280

def handler(self, order_package: BaseOrderPackage) -> None:
""" Only uses _thread_pool if paper_trade
"""
"""Only uses _thread_pool if paper_trade"""
if order_package.package_type == OrderPackageType.PLACE:
func = self.execute_place
elif order_package.package_type == OrderPackageType.CANCEL:
Expand Down Expand Up @@ -107,7 +106,9 @@ def execute_replace(
else:
order.lapsed() # todo do not carry out replace
self._order_logger(
order, cancel_instruction_report, OrderPackageType.CANCEL,
order,
cancel_instruction_report,
OrderPackageType.CANCEL,
)

# place new order
Expand Down
2 changes: 2 additions & 0 deletions flumine/flumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def run(self) -> None:
else:
logger.error("Unknown item in handler_queue: %s" % str(event))

del event

def _add_default_workers(self):
self.add_worker(
worker.BackgroundWorker(self, function=worker.keep_alive, interval=1200)
Expand Down
14 changes: 5 additions & 9 deletions flumine/markets/blotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@


class Blotter:
def __init__(self, market):
self.market = market
def __init__(self, market, market_id: str):
self.market = market # weakref
self.market_id = market_id
self._orders = {} # {Order.id: Order}
self._live_orders = [] # cached list of live orders
# pending orders
Expand All @@ -23,8 +24,7 @@ def __init__(self, market):
self.pending_replace = []

def strategy_orders(self, strategy) -> list:
"""Returns all orders related to a strategy.
"""
"""Returns all orders related to a strategy."""
return [order for order in self if order.trade.strategy == strategy]

def process_orders(self, client) -> list:
Expand Down Expand Up @@ -64,7 +64,7 @@ def _create_packages(
market_id=self.market_id,
orders=chunked_orders,
package_type=package_type,
market=self.market,
market=self.market(),
)
packages.append(order_package)
orders.clear()
Expand Down Expand Up @@ -112,10 +112,6 @@ def selection_exposure(self, strategy, lookup: tuple) -> float:
ml.append((order.average_price_matched, order.size_matched))
return calculate_exposure(mb, ml)

@property
def market_id(self):
return self.market.market_id

""" getters / setters """

def complete_order(self, order) -> None:
Expand Down
3 changes: 2 additions & 1 deletion flumine/markets/market.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
import weakref
from typing import Optional
from collections import defaultdict
from betfairlightweight.resources.bettingresources import MarketBook, MarketCatalogue
Expand All @@ -25,7 +26,7 @@ def __init__(
self.market_book = market_book
self.market_catalogue = market_catalogue
self.context = {"simulated": {}} # data store (raceCard / scores etc)
self.blotter = Blotter(self)
self.blotter = Blotter(weakref.ref(self), market_id)

def __call__(self, market_book: MarketBook):
self.market_book = market_book
Expand Down
2 changes: 1 addition & 1 deletion flumine/order/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def current_order(self) -> Union[CurrentOrder, Simulated]:

@property
def complete(self) -> bool:
""" Returns False if order is
"""Returns False if order is
live or pending in the market"""
if self.status in [
OrderStatus.PENDING,
Expand Down
6 changes: 4 additions & 2 deletions flumine/order/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def process_current_orders(markets: Markets, strategies: Strategies, event):
for current_order in current_orders.orders:
strategy_name_hash, order_id = current_order.customer_order_ref.split("-")
order = markets.get_order(
market_id=current_order.market_id, order_id=order_id,
market_id=current_order.market_id,
order_id=order_id,
)
if not order:
logger.warning(
Expand All @@ -42,7 +43,8 @@ def process_current_orders(markets: Markets, strategies: Strategies, event):
continue
if order.bet_id != current_order.bet_id: # replaceOrder handling (hacky)
order = markets.get_order_from_bet_id(
market_id=current_order.market_id, bet_id=current_order.bet_id,
market_id=current_order.market_id,
bet_id=current_order.bet_id,
)

if order:
Expand Down
3 changes: 1 addition & 2 deletions flumine/streams/marketstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def run(self) -> None:
logger.info("Stopped MarketStream {0}".format(self.stream_id))

def handle_output(self) -> None:
"""Handles output from stream.
"""
"""Handles output from stream."""
while self.is_alive():
try:
market_books = self._output_queue.get(
Expand Down
3 changes: 1 addition & 2 deletions flumine/streams/orderstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ def run(self) -> None:
logger.info("Stopped OrderStream {0}".format(self.stream_id))

def handle_output(self) -> None:
"""Handles output from stream.
"""
"""Handles output from stream."""
while self.is_alive():
try:
order_books = self._output_queue.get(
Expand Down
3 changes: 1 addition & 2 deletions flumine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ def call_process_market_book(
def get_runner_book(
market_book: MarketBook, selection_id: int, handicap=0
) -> Optional[RunnerBook]:
"""Returns runner book based on selection id.
"""
"""Returns runner book based on selection id."""
for runner_book in market_book.runners:
if (
runner_book.selection_id == selection_id
Expand Down
2 changes: 1 addition & 1 deletion flumine/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run(self) -> None:


def keep_alive(context: dict, flumine) -> None:
""" Attempt keep alive if required or
"""Attempt keep alive if required or
login if keep alive failed
"""
client = flumine.client
Expand Down
2 changes: 1 addition & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Tests & Linting
black==19.10b0
black==20.8b1
coverage

# Documentation
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
betfairlightweight==2.7.2
betfairlightweight==2.8.0
tenacity==5.0.3
python-json-logger==0.1.11
requests
2 changes: 1 addition & 1 deletion tests/test_baseflumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test__add_default_workers(self):

def test__process_market_books(self):
mock_event = mock.Mock()
mock_market_book = mock.Mock()
mock_market_book = mock.Mock(publish_time_epoch=123)
mock_market_book.runners = []
mock_event.event = [mock_market_book]
self.base_flumine._process_market_books(mock_event)
Expand Down
12 changes: 6 additions & 6 deletions tests/test_blotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
class BlotterTest(unittest.TestCase):
def setUp(self) -> None:
self.mock_market = mock.Mock()
self.blotter = Blotter(self.mock_market)
self.blotter = Blotter(self.mock_market, "1.23")

def test_init(self):
self.assertEqual(self.blotter.market, self.mock_market)
self.assertEqual(self.blotter.market_id, "1.23")
self.assertEqual(self.blotter._orders, {})
self.assertEqual(self.blotter.pending_place, [])
self.assertEqual(self.blotter.pending_cancel, [])
Expand Down Expand Up @@ -105,7 +106,8 @@ def test_selection_exposure(self):
)
self.blotter._orders = {"12345": mock_order}
self.assertEqual(
self.blotter.selection_exposure(mock_strategy, mock_order.lookup), -2,
self.blotter.selection_exposure(mock_strategy, mock_order.lookup),
-2,
)

def test_selection_exposure_no_match(self):
Expand All @@ -120,12 +122,10 @@ def test_selection_exposure_no_match(self):
)
self.blotter._orders = {"12345": mock_order}
self.assertEqual(
self.blotter.selection_exposure(mock_strategy, mock_order.lookup), 0,
self.blotter.selection_exposure(mock_strategy, mock_order.lookup),
0,
)

def test_market_id(self):
self.assertEqual(self.blotter.market_id, self.mock_market.market_id)

def test_complete_order(self):
self.blotter._live_orders = ["test"]
self.blotter.complete_order("test")
Expand Down
2 changes: 1 addition & 1 deletion tests/test_clientcontrols.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_check_hour(self, mock_set_next_hour):
self.trading_control._next_hour = datetime.datetime.utcnow()
self.trading_control._check_hour()

self.trading_control.transaction_count = 1069
self.trading_control.transaction_count = 5069
now = datetime.datetime.now()
self.trading_control._next_hour = (now + datetime.timedelta(hours=-1)).replace(
minute=0, second=0, microsecond=0
Expand Down
Loading

0 comments on commit 0a8703c

Please sign in to comment.