Skip to content

Commit

Permalink
Updated the CryptoCompare feed to perform per instrument event confla…
Browse files Browse the repository at this point in the history
…tion to streamline processing if the main loop runs much slower than the rate of price tick enqueueing from the instrument subscriptions
  • Loading branch information
orominiyi committed Oct 15, 2018
1 parent ac3ae85 commit 25c37f6
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 29 deletions.
5 changes: 4 additions & 1 deletion cosine/core/proc_workers.py
Expand Up @@ -44,16 +44,19 @@ def init_queue(self):
self._events = Queue()
self._slots = FieldSet()

def process_events(self):
def process_events(self, process_events=None):
# consume all the available events...
evts = []
if process_events is None:
process_events = lambda events, handlers: events
try:
while not self._events.empty():
evts.append( self._events.get_nowait() )
except EmptyException:
pass

# process any event handlers registered for these events...
evts = process_events(evts, self.events)
for evt in evts:
(name, data) = evt
if name in self.events:
Expand Down
5 changes: 4 additions & 1 deletion cosine/pricing/base_feed.py
Expand Up @@ -56,7 +56,7 @@ def teardown(self):

def update(self):
if self._worker:
self._worker.process_events()
self._worker.process_events(process_events=self._process_received_events)


"""Worker process run or inline run"""
Expand Down Expand Up @@ -113,6 +113,9 @@ def _run_via_worker(self):
def _setup_events(self, worker):
pass

def _process_received_events(self, events, slots):
return events


@property
def events(self):
Expand Down
76 changes: 51 additions & 25 deletions cosine/pricing/cryptocompare.py
Expand Up @@ -63,26 +63,8 @@ def new_recv_packet(self):
# MODULE CLASSES
class CryptoCompareSocketIOFeed(CosineBaseFeed):

def __init__(self, name, pool, cxt, logger=None, **kwargs):
super().__init__(name, pool, cxt, logger=logger, **kwargs)
self._socketio = None
self._ticker_map = {}
self._triangulators = {}


def _snapshot_cache(self):
# nothing to do since we'll auto-snapshot on subscription to the websockets feed...
pass


def _setup_events(self, worker):
worker.events.OnRawTick += self._on_raw_tick


def _on_raw_tick(self, msg):
# decode & cache pricing...
FIELDS = {
'TYPE': 0x0
FIELDS = {
'TYPE': 0x0
, 'MARKET': 0x0
, 'FROMSYMBOL': 0x0
, 'TOSYMBOL': 0x0
Expand All @@ -106,26 +88,70 @@ def _on_raw_tick(self, msg):
, 'HIGH24HOUR': 0x10000
, 'LOW24HOUR': 0x20000
, 'LASTMARKET': 0x40000
}
}

def __init__(self, name, pool, cxt, logger=None, **kwargs):
super().__init__(name, pool, cxt, logger=logger, **kwargs)
self._socketio = None
self._ticker_map = {}
self._triangulators = {}


def _snapshot_cache(self):
# nothing to do since we'll auto-snapshot on subscription to the websockets feed...
pass


def _setup_events(self, worker):
worker.events.OnRawTick += self._on_raw_tick

def _process_received_events(self, events, slots):
# conflate the events to just process for the latest pricing ticks per product...
evt_group = {}
icount = len(events)
for evt in events:
(name, msg) = evt
data = self._parse_msg_event(msg)
if not data: continue
tkr = str(data["FROMSYMBOL"]) + "/" + str(data["TOSYMBOL"])
if tkr not in evt_group:
evt_group[tkr] = evt

events = [(n, m) for (n, m) in evt_group.items()]
ecount = len(events)
if icount != ecount:
self.logger.info("CryptoCompareSocketIOFeed - Conflated events: {0} / {1}".format(icount, ecount))
return events


def _parse_msg_event(self, msg):
# decode msg...
fields = msg.split('~')
try:
mask = int(fields[-1], 16)
except:
self.logger.warn("Failed to decode price feed data: {0}".format(msg))
return
return None

fields = fields[:-1]
curr = 0
data = {}
for prop in FIELDS:
if FIELDS[prop] == 0:
for prop in self.FIELDS:
if self.FIELDS[prop] == 0:
data[prop] = fields[curr]
curr += 1
elif mask & FIELDS[prop]:
elif mask & self.FIELDS[prop]:
if prop == 'LASTMARKET':
data[prop] = fields[curr]
else:
data[prop] = float(fields[curr])
curr += 1
return data


def _on_raw_tick(self, msg):
# decode & cache pricing...
data = self._parse_msg_event(msg)

# if it's a triangulated pair then process it separately...
ticker_pair = str(data["FROMSYMBOL"]) + "/" + str(data["TOSYMBOL"])
Expand Down
2 changes: 1 addition & 1 deletion cosine_crypto.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: cosine-crypto
Version: 0.2.49
Version: 0.2.50
Summary: A modular open source cryptocurrency trading algo framework.
Home-page: https://github.com/oladotunr/cosine
Author: Oladotun Rominiyi
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="cosine-crypto",
version="0.2.49",
version="0.2.50",
author="Oladotun Rominiyi",
author_email="dotun@voxex.io",
description="A modular open source cryptocurrency trading algo framework.",
Expand Down

0 comments on commit 25c37f6

Please sign in to comment.