Skip to content

Commit

Permalink
Merge b838611 into 6d209a8
Browse files Browse the repository at this point in the history
  • Loading branch information
liampauling committed Sep 9, 2019
2 parents 6d209a8 + b838611 commit a923901
Show file tree
Hide file tree
Showing 18 changed files with 491 additions and 319 deletions.
31 changes: 31 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
*.py[cod]
*.sw[op]
.cache/

# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64

# Installer logs
pip-log.txt

# Unit test / coverage reports
.coverage
.tox
nosetests.xml

.DS_Store

# IDE metafile ignores
.idea
20 changes: 20 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
.. :changelog:
Release History
---------------

0.8.0 (2019-09-09)
+++++++++++++++++++

**Improvements**

- black fmt
- _async renamed to async_ to match bflw

**Bug Fixes**

-

**Libraries**

- betfairlightweight upgraded to 1.10.2
37 changes: 18 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,31 @@ $ pip install flumine
The framework can be used as follows:

```python
>>> import flumine
from flumine.resources import StreamRecorder
from flumine.storage import storageengine
from flumine import Flumine
from flumine.resources import MarketRecorder
from flumine.storage import storageengine

>>> market_filter = {"marketIds": ["1.132452335"]}
market_filter = {"marketIds": ["1.132452335"]}

storage_engine = flumine.storageengine.S3('flumine')
storage_engine = storageengine.S3('flumine')

flumine = Flumine(
recorder=StreamRecorder(
storage_engine=storage_engine,
market_filter=market_filter,
),
settings={'certificate_login': False}
)
flumine = Flumine(
recorder=MarketRecorder(
storage_engine=storage_engine,
market_filter=market_filter,
),
settings={'certificate_login': False}
)

>>> flumine.start()
flumine.start(async_=True)

>>> flumine
<Flumine [running]>
flumine
# <Flumine [running]>

>>> flumine.stop()

>>> flumine
<Flumine [not running]>
flumine.stop()

flumine
# <Flumine [not running]>
```

## docker
Expand Down
6 changes: 3 additions & 3 deletions flumine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
from .exceptions import FlumineException


__title__ = 'flumine'
__version__ = '0.7.0'
__author__ = 'Liam Pauling'
__title__ = "flumine"
__version__ = "0.8.0"
__author__ = "Liam Pauling"


# Set default logging handler to avoid "No handler found" warnings.
Expand Down
3 changes: 1 addition & 2 deletions flumine/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@


class FlumineException(Exception):
"""Base class for Flumine Errors"""

pass


Expand Down
87 changes: 43 additions & 44 deletions flumine/flumine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,22 @@
import logging
import threading
from tenacity import retry, wait_exponential
from betfairlightweight import (
APIClient,
BetfairError,
)
from betfairlightweight import APIClient, BetfairError

from .listener import FlumineListener
from .exceptions import (
RunError,
StreamError,
)
from .exceptions import RunError, StreamError

logger = logging.getLogger(__name__)

# dir used to store all data
FLUMINE_DATA = '/data'
FLUMINE_DATA = "/data"


class Flumine:

def __init__(self, recorder, settings=None, unique_id=1e3):
self._certificate_login = settings.pop('certificate_login', True) if settings else True
self._certificate_login = (
settings.pop("certificate_login", True) if settings else True
)
self.trading = self._create_client(settings)
self.recorder = recorder
self.unique_id = unique_id
Expand All @@ -31,19 +26,25 @@ def __init__(self, recorder, settings=None, unique_id=1e3):
self._socket = None
self.listener = FlumineListener(recorder) # output queue hack

def start(self, heartbeat_ms=None, conflate_ms=None, segmentation_enabled=None, _async=True):
def start(
self,
heartbeat_ms=None,
conflate_ms=None,
segmentation_enabled=None,
async_=True,
):
"""Checks trading is logged in, creates socket,
subscribes to markets, sets running to True and
starts handler/run threads.
"""
logger.info('Starting stream: %s' % self.unique_id)
logger.info("Starting stream: %s" % self.unique_id)
if self._running:
raise RunError('Flumine is already running, call .stop() first')
if _async:
raise RunError("Flumine is already running, call .stop() first")
if async_:
threading.Thread(
target=self._run,
args=(conflate_ms, heartbeat_ms, segmentation_enabled),
daemon=True
daemon=True,
).start()
else:
self._run(conflate_ms, heartbeat_ms, segmentation_enabled)
Expand All @@ -52,9 +53,9 @@ def stop(self):
"""Stops socket, sets running to false
and socket to None
"""
logger.info('Stopping stream: %s' % self.unique_id)
logger.info("Stopping stream: %s" % self.unique_id)
if not self._running:
raise RunError('Flumine is not running')
raise RunError("Flumine is not running")
if self._socket:
self._socket.stop()
self._running = False
Expand All @@ -63,22 +64,18 @@ def stop(self):
def stream_status(self):
"""Checks sockets status
"""
return str(self._socket) if self._socket else 'Socket not created'
return str(self._socket) if self._socket else "Socket not created"

@staticmethod
def _create_client(settings):
"""Returns APIClient based on settings
or looks for username in os.environ
"""
if settings:
return APIClient(
**settings.get('betfairlightweight')
)
return APIClient(**settings.get("betfairlightweight"))
else:
username = os.environ.get('username')
return APIClient(
username=username
)
username = os.environ.get("username")
return APIClient(username=username)

@retry(wait=wait_exponential(multiplier=1, min=2, max=20))
def _run(self, conflate_ms, heartbeat_ms, segmentation_enabled):
Expand All @@ -90,8 +87,8 @@ def _run(self, conflate_ms, heartbeat_ms, segmentation_enabled):

self._create_socket()

if self.recorder.STREAM_TYPE == 'market':
logger.info('Subscribing to markets')
if self.recorder.STREAM_TYPE == "market":
logger.info("Subscribing to markets")
try:
self.unique_id = self._socket.subscribe_to_markets(
market_filter=self.recorder.market_filter,
Expand All @@ -103,27 +100,29 @@ def _run(self, conflate_ms, heartbeat_ms, segmentation_enabled):
clk=self.listener.clk,
)
except BetfairError as e:
logger.error('Betfair subscribe_to_markets error: %s' % e)
logger.error("Betfair subscribe_to_markets error: %s" % e)
raise
elif self.recorder.STREAM_TYPE == 'race':
logger.info('Subscribing to races')
elif self.recorder.STREAM_TYPE == "race":
logger.info("Subscribing to races")
try:
self.unique_id = self._socket.subscribe_to_races()
except BetfairError as e:
logger.error('Betfair subscribe_to_races error: %s' % e)
logger.error("Betfair subscribe_to_races error: %s" % e)
raise
else:
raise StreamError('%s is not a valid stream type' % self.recorder.STREAM_TYPE)
raise StreamError(
"%s is not a valid stream type" % self.recorder.STREAM_TYPE
)

self._running = True
try:
logger.info('Starting socket..')
self._socket.start(_async=False)
logger.info("Starting socket..")
self._socket.start(async_=False)
except BetfairError as e:
logger.error('Betfair error: %s' % e)
logger.error("Betfair error: %s" % e)
raise
except Exception as e:
logger.error('Unknown error: %s' % e)
logger.error("Unknown error: %s" % e)
raise

def _check_login(self, force=False):
Expand All @@ -134,22 +133,22 @@ def _check_login(self, force=False):
try:
self.trading.login()
except BetfairError as e:
logger.error('Betfair login error: %s' % e)
logger.error("Betfair login error: %s" % e)
raise
else:
try:
self.trading.login_interactive()
except BetfairError as e:
logger.error('Betfair login_interactive error: %s' % e)
logger.error("Betfair login_interactive error: %s" % e)
raise

def _create_socket(self):
"""Creates stream
"""
logger.info('Creating socket')
logger.info("Creating socket")
self._socket = self.trading.streaming.create_stream(
unique_id=self.unique_id,
description='Flumine Socket',
description="Flumine Socket",
listener=self.listener,
host=self.recorder.HOST,
)
Expand All @@ -160,10 +159,10 @@ def running(self):

@property
def status(self):
return 'running' if self._running else 'not running'
return "running" if self._running else "not running"

def __repr__(self):
return '<Flumine>'
return "<Flumine>"

def __str__(self):
return '<Flumine [%s]>' % self.status
return "<Flumine [%s]>" % self.status

0 comments on commit a923901

Please sign in to comment.