From bde13f74b9b7915719eaefdf969a0d81f9be7314 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Wed, 10 Jun 2020 12:41:04 +0200 Subject: [PATCH 01/32] create_l2_order_book removed --- ssc2ce/deribit/l2_book.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ssc2ce/deribit/l2_book.py b/ssc2ce/deribit/l2_book.py index 428a4ac..ba9d959 100644 --- a/ssc2ce/deribit/l2_book.py +++ b/ssc2ce/deribit/l2_book.py @@ -61,7 +61,3 @@ def handle_update(self, message: dict) -> None: self._asks.delete(change[1]) else: self._asks.update(price=change[1], size=change[2]) - - -def create_l2_order_book(instrument: str) -> AbstractL2Book: - return L2Book(instrument) From 734fa125c0d094873895f00f93cd213f29d29dbd Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Wed, 10 Jun 2020 12:44:29 +0200 Subject: [PATCH 02/32] Deribit is divided into Parser and Controller --- ssc2ce/deribit/deribit.py | 91 +++++++++++++++++------------------ ssc2ce/deribit/icontroller.py | 15 ++++++ ssc2ce/deribit/iparser.py | 7 +++ ssc2ce/deribit/parser.py | 32 ++++++++++++ 4 files changed, 98 insertions(+), 47 deletions(-) create mode 100644 ssc2ce/deribit/icontroller.py create mode 100644 ssc2ce/deribit/iparser.py create mode 100644 ssc2ce/deribit/parser.py diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index 99e2e7e..e0ca6f8 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -1,4 +1,5 @@ import logging + from time import time import aiohttp @@ -7,9 +8,10 @@ from ssc2ce.common import AuthType from ssc2ce.common.session import SessionWrapper from ssc2ce.common.utils import resolve_route, hide_secret, IntId +from ssc2ce.deribit.parser import DeribitParser, IDeribitController -class Deribit(SessionWrapper): +class Deribit(SessionWrapper, IDeribitController): """ Handlers: - on_connect_ws - Called after the connection is established. @@ -73,6 +75,8 @@ def __init__(self, ("", self.empty_handler), ] + self.parser = DeribitParser(self) + async def run_receiver(self): """ Establish a connection and start the receiver loop. @@ -98,8 +102,12 @@ async def run_receiver(self): self.logger.debug(f"Connection closing {repr(message)}") continue - if self.on_message: - await self.on_message(message) + if message.type == aiohttp.WSMsgType.TEXT: + await self.on_message(message.data) + else: + self.logger.warning(f"Unknown type of message {repr(message)}") + + # if self.on_message: def close(self): super()._close() @@ -343,48 +351,13 @@ async def get_instruments(self, currency: str, kind: str = None, expired: bool = return await self.send_public(request=request, callback=callback) - async def handle_message(self, message: aiohttp.WSMessage) -> None: + async def handle_message(self, message: str) -> None: """ :param message: :return: """ - if message.type == aiohttp.WSMsgType.TEXT: - data = message.json() - self.logger.debug(f"handling:{repr(hide_secret(data))}") - - if "method" in data: - await self.handle_method_message(data) - else: - if "id" in data: - if "error" in data: - if self.on_response_error: - await self.on_response_error(data) - else: - self.logger.error(f"Receive error {repr(data)}") - else: - request_id = data["id"] - request = self.requests.get(request_id) - if request: - if "callback" in request: - callback = request["callback"] - await callback(data) - else: - await self.handle_response(request=request, response=data) - - del self.requests[request_id] - else: - if self.on_handle_response: - await self.on_handle_response(data) - else: - self.logger.warning( - f"Unknown id:{request_id}, the on_handle_response event must be defined." - f" Unhandled message {data}") - - else: - self.logger.warning(f"Unsupported message {message.data}") - else: - self.logger.warning(f"Unknown type of message {repr(message)}") + await self.parser.handle_message(message) async def empty_handler(self, **kwargs) -> None: """ @@ -394,20 +367,38 @@ async def empty_handler(self, **kwargs) -> None: """ self.logger.debug(f"{repr(kwargs)}") - async def handle_response(self, request, response) -> None: + async def handle_response(self, request_id: int, response: dict) -> None: """ - :param request: + :param request_id: :param response: :return: """ - method = request["method"] - handler = resolve_route(method, self.response_routes) - if handler: - return await handler(request=request, response=response) + request = self.requests.get(request_id) + if request: + if "callback" in request: + callback = request["callback"] + await callback(response) + else: + method = request["method"] + handler = resolve_route(method, self.response_routes) + if handler: + return await handler(request=request, response=response) + + self.logger.warning(f"Unhandled method:{method} response:{repr(response)} " + f"to request:{repr(request)}.") + + del self.requests[request_id] + else: + if self.on_handle_response: + return await self.on_handle_response(response) + else: + self.logger.warning( + f"Unknown id:{request_id}, the on_handle_response event must be defined." + f" Unhandled message {response}") - self.logger.warning(f"Unhandled method:{method} response:{repr(response)} to request:{repr(request)}.") + # self.logger.warning(f"Unhandled method:{method} response:{repr(response)} to request:{repr(request)}.") async def handle_method_message(self, data) -> None: """ @@ -454,3 +445,9 @@ async def handle_auth(self, request, response) -> None: pass else: self.logger.error(f"Unknown grant_type {repr(hide_secret(request))} : {repr(hide_secret(response))}") + + async def handle_error(self, message): + if self.on_response_error: + self.on_response_error(message) + else: + self.logger.error(message) \ No newline at end of file diff --git a/ssc2ce/deribit/icontroller.py b/ssc2ce/deribit/icontroller.py new file mode 100644 index 0000000..62e2046 --- /dev/null +++ b/ssc2ce/deribit/icontroller.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class IDeribitController(ABC): + @abstractmethod + async def handle_method_message(self, message: dict) -> None: + pass + + @abstractmethod + async def handle_error(self, message: dict) -> None: + pass + + @abstractmethod + async def handle_response(self, request_id: int, data: dict): + pass diff --git a/ssc2ce/deribit/iparser.py b/ssc2ce/deribit/iparser.py new file mode 100644 index 0000000..4f29a6f --- /dev/null +++ b/ssc2ce/deribit/iparser.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class IDeribitParser(ABC): + @abstractmethod + async def handle_message(self, message: str) -> None: + pass diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py new file mode 100644 index 0000000..b5be469 --- /dev/null +++ b/ssc2ce/deribit/parser.py @@ -0,0 +1,32 @@ +import json +import logging + +from ssc2ce.common.utils import hide_secret +from ssc2ce.deribit.icontroller import IDeribitController +from ssc2ce.deribit.iparser import IDeribitParser + + +class DeribitParser(IDeribitParser): + """ + + """ + def __init__(self, controller: IDeribitController): + self.logger = logging.getLogger(__name__) + self.controller = controller + + async def handle_message(self, message): + data = json.loads(message) + self.logger.info(f"handling:{repr(hide_secret(data))}") + + if "method" in data: + await self.controller.handle_method_message(data) + else: + if "id" in data: + if "error" in data: + await self.controller.handle_error(data) + else: + request_id = data["id"] + if request_id: + await self.controller.handle_response(request_id, data) + else: + self.logger.warning(f"Unsupported message {message}") \ No newline at end of file From d318566fbb7ef4bcdb038870823008153510c3b1 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Wed, 10 Jun 2020 14:06:54 +0200 Subject: [PATCH 03/32] the deribit_writer example added --- examples/deribit_writer.py | 79 ++++++++++++++++++++++++++++++++++++++ ssc2ce/deribit/deribit.py | 4 +- ssc2ce/deribit/parser.py | 10 +++-- 3 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 examples/deribit_writer.py diff --git a/examples/deribit_writer.py b/examples/deribit_writer.py new file mode 100644 index 0000000..a022dbe --- /dev/null +++ b/examples/deribit_writer.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +import asyncio +import json +import logging +from ssc2ce import Deribit +from ssc2ce.deribit.l2_book import L2Book + +conn = Deribit() + +pending = {} + +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger("deribit-book") + +instruments = [] + + +async def handle_instruments(data: dict): + global instruments + request_id = data["id"] + del pending[request_id] + print(json.dumps(data)) + instruments += list(set([x["instrument_name"][:11] if not x["instrument_name"].endswith("-PERPETUAL") else x["instrument_name"] for x in data["result"]])) + + # print(", ".join()) + if not pending: + print(instruments) + await subscribe_books(instruments) + + +async def handle_book_summary_by_currency(data: dict): + request_id = data["id"] + del pending[request_id] + print(json.dumps(data)) + + +async def handle_currencies(data: dict): + for currency in data["result"]: + symbol = currency["currency"] + print(json.dumps(data)) + request_id = await conn.get_instruments(symbol, callback=handle_instruments) + pending[request_id] = symbol + request_id = await conn.send_public( + request={"method": "public/get_book_summary_by_currency", "params": {"currency": symbol}}, + callback=handle_instruments) + pending[request_id] = symbol + + +async def get_currencies(): + await conn.get_currencies(handle_currencies) + + +async def subscribe_books(instruments: list): + await conn.send_public(request={ + "method": "public/subscribe", + "params": { + "channels": [f"book.{i}.raw" for i in instruments] + } + }) + + +output = open("dump.txt", "w") + + +def dump(msg: str): + output.write(msg) + output.write('\n') + + +conn.parser.on_before_handling = dump + +conn.on_connect_ws = get_currencies + +loop = asyncio.get_event_loop() + +try: + loop.run_until_complete(conn.run_receiver()) +except KeyboardInterrupt: + print("Application closed by KeyboardInterrupt.") diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index e0ca6f8..86deee8 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -411,8 +411,8 @@ async def handle_method_message(self, data) -> None: if handler: return await handler(data) - - self.logger.warning(f"Unhandled message:{repr(data)}.") + elif self.parser.on_before_handling is None: + self.logger.warning(f"Unhandled message:{repr(data)}.") async def handle_heartbeat(self, data): """ diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py index b5be469..db5c0a3 100644 --- a/ssc2ce/deribit/parser.py +++ b/ssc2ce/deribit/parser.py @@ -10,13 +10,17 @@ class DeribitParser(IDeribitParser): """ """ + def __init__(self, controller: IDeribitController): self.logger = logging.getLogger(__name__) self.controller = controller + self.on_before_handling = None async def handle_message(self, message): + if self.on_before_handling: + self.on_before_handling(message) + data = json.loads(message) - self.logger.info(f"handling:{repr(hide_secret(data))}") if "method" in data: await self.controller.handle_method_message(data) @@ -28,5 +32,5 @@ async def handle_message(self, message): request_id = data["id"] if request_id: await self.controller.handle_response(request_id, data) - else: - self.logger.warning(f"Unsupported message {message}") \ No newline at end of file + elif self.on_before_handling is not None: + self.logger.warning(f"Unsupported message {message}") From b2472b160cc8bd510f3063c0cdc46b21db8fe680 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Sat, 13 Jun 2020 08:48:37 +0200 Subject: [PATCH 04/32] conn.on_before_handling = dump --- examples/deribit_writer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/deribit_writer.py b/examples/deribit_writer.py index a022dbe..855ff3d 100644 --- a/examples/deribit_writer.py +++ b/examples/deribit_writer.py @@ -10,7 +10,7 @@ pending = {} logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) -logger = logging.getLogger("deribit-book") +logger = logging.getLogger("deribit-writer") instruments = [] @@ -67,7 +67,7 @@ def dump(msg: str): output.write('\n') -conn.parser.on_before_handling = dump +conn.on_before_handling = dump conn.on_connect_ws = get_currencies From 85cbd046e6dc9aafee69212378b191fc6595c2fa Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Sat, 13 Jun 2020 09:41:36 +0200 Subject: [PATCH 05/32] parser keeps books --- examples/bitfinex_book.py | 2 +- examples/book_watcher.py | 20 ++++++++++++++ examples/deribit_book.py | 55 +++++++++++--------------------------- examples/deribit_parser.py | 39 +++++++++++++++++++++++++++ ssc2ce/deribit/deribit.py | 30 ++++++++++++++------- ssc2ce/deribit/parser.py | 47 +++++++++++++++++++++++++++----- 6 files changed, 137 insertions(+), 56 deletions(-) create mode 100644 examples/book_watcher.py create mode 100644 examples/deribit_parser.py diff --git a/examples/bitfinex_book.py b/examples/bitfinex_book.py index f850710..c901404 100644 --- a/examples/bitfinex_book.py +++ b/examples/bitfinex_book.py @@ -32,7 +32,7 @@ async def handle_book(self, message, connector): if self.top_ask[0] != self.book.asks[0][0] or self.top_ask[0] != self.book.asks[0][0]: self.top_ask = self.book.asks[0].copy() self.top_bid = self.book.bids[0].copy() - print(f"{self.book.instrument} bid:{self.top_bid[0]} ask:{self.top_ask[0]}") + logger.info(f"{self.book.instrument} bid:{self.top_bid[0]} ask:{self.top_ask[0]}") btc = BookMaintainer("BTC-USD") diff --git a/examples/book_watcher.py b/examples/book_watcher.py new file mode 100644 index 0000000..65a0699 --- /dev/null +++ b/examples/book_watcher.py @@ -0,0 +1,20 @@ +from ssc2ce.deribit.l2_book import L2Book + + +class BookWatcher: + def __init__(self, parser): + self.betters = {} + parser.on_book_setup = self.handle_book_setup + parser.on_book_update = self.handle_book_update + + async def handle_book_setup(self, book: L2Book): + top = [book.bids[0][0], book.asks[0][0]] + self.betters[book.instrument] = top + print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") + + async def handle_book_update(self, book: L2Book): + top = self.betters[book.instrument] + if top[1] != book.asks[0][0] or top[0] != book.bids[0][0]: + top[1] = book.asks[0][0] + top[0] = book.bids[0][0] + print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") diff --git a/examples/deribit_book.py b/examples/deribit_book.py index d98c838..cf28852 100644 --- a/examples/deribit_book.py +++ b/examples/deribit_book.py @@ -2,13 +2,19 @@ import asyncio import json import logging + +from examples.book_watcher import BookWatcher from ssc2ce import Deribit -from ssc2ce.deribit.l2_book import L2Book + +from ssc2ce.deribit.parser import DeribitParser conn = Deribit() +parser = DeribitParser(conn) +conn.on_message = parser.handle_message +watcher = BookWatcher(parser) pending = {} -books = {} +instruments = [] logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger("deribit-book") @@ -19,17 +25,18 @@ async def handle_instruments(data: dict): del pending[request_id] print(json.dumps(data)) if not pending: - await subscribe_books(list(books.keys())) + await conn.send_public(request={ + "method": "public/subscribe", + "params": { + "channels": [f"book.{i}.raw" for i in instruments] + } + }) async def handle_currencies(data: dict): for currency in data["result"]: symbol = currency["currency"] - instrument = symbol+"-PERPETUAL" - book = L2Book(instrument) - book.top_bid = [0., 0.] - book.top_ask = [0., 0.] - books[instrument] = book + instruments.append(symbol + "-PERPETUAL") request_id = await conn.get_instruments(symbol, kind="future", callback=handle_instruments) pending[request_id] = symbol @@ -38,39 +45,7 @@ async def get_currencies(): await conn.get_currencies(handle_currencies) -async def subscribe_books(instruments: list): - await conn.send_public(request={ - "method": "public/subscribe", - "params": { - "channels": [f"book.{i}.raw" for i in instruments] - } - }) - - -async def handle_subscription(data): - method = data.get("method") - if method and method == "subscription": - params = data["params"] - channel = params["channel"] - if channel.startswith("book."): - params_data = params["data"] - instrument = params_data["instrument_name"] - book: L2Book = books[instrument] - if "prev_change_id" in params_data: - book.handle_update(params_data) - else: - book.handle_snapshot(params_data) - - if book.top_ask[0] != book.asks[0][0] or book.top_bid[0] != book.bids[0][0]: - book.top_ask = book.asks[0].copy() - book.top_bid = book.bids[0].copy() - print(f"{instrument} bid:{book.top_bid[0]} ask:{book.top_ask[0]}") - else: - print("Unknown channel", json.dumps(data)) - - conn.on_connect_ws = get_currencies -conn.method_routes += [("subscription", handle_subscription)] loop = asyncio.get_event_loop() diff --git a/examples/deribit_parser.py b/examples/deribit_parser.py new file mode 100644 index 0000000..a7943c5 --- /dev/null +++ b/examples/deribit_parser.py @@ -0,0 +1,39 @@ +import asyncio +import json +import logging + +from examples.book_watcher import BookWatcher +from ssc2ce.deribit.icontroller import IDeribitController +from ssc2ce.deribit.parser import DeribitParser + +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger("deribit-parser") + + +class FileController(IDeribitController): + def __init__(self): + self.parser = DeribitParser(self) + self.counter = 0 + self.watcher = BookWatcher(self.parser) + + async def run(self, filename: str): + with open(filename) as f: + for line in f: + await self.parser.handle_message(line) + + logger.info(f"{self.counter}") + + async def handle_method_message(self, message: dict) -> None: + + self.counter += 1 + + async def handle_error(self, message: dict) -> None: + self.counter += 1 + logger.info(message) + + async def handle_response(self, request_id: int, data: dict): + self.counter += 1 + # logger.info(json.dumps(data)) + + +asyncio.run(FileController().run("dump.txt")) diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index 86deee8..b6e92fd 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -1,3 +1,4 @@ +import json import logging from time import time @@ -8,7 +9,7 @@ from ssc2ce.common import AuthType from ssc2ce.common.session import SessionWrapper from ssc2ce.common.utils import resolve_route, hide_secret, IntId -from ssc2ce.deribit.parser import DeribitParser, IDeribitController +from ssc2ce.deribit.icontroller import IDeribitController class Deribit(SessionWrapper, IDeribitController): @@ -49,7 +50,7 @@ def __init__(self, self.on_message = self.handle_message self.on_authenticated = None self.on_token = None - # self.on_subscription = None + self.on_before_handling = None self.on_response_error = None self.on_handle_response = None @@ -75,8 +76,6 @@ def __init__(self, ("", self.empty_handler), ] - self.parser = DeribitParser(self) - async def run_receiver(self): """ Establish a connection and start the receiver loop. @@ -103,12 +102,13 @@ async def run_receiver(self): continue if message.type == aiohttp.WSMsgType.TEXT: + if self.on_before_handling: + self.on_before_handling(message.data) + await self.on_message(message.data) else: self.logger.warning(f"Unknown type of message {repr(message)}") - # if self.on_message: - def close(self): super()._close() @@ -357,7 +357,19 @@ async def handle_message(self, message: str) -> None: :param message: :return: """ - await self.parser.handle_message(message) + data = json.loads(message) + if "method" in data: + await self.handle_method_message(data) + else: + if "id" in data: + if "error" in data: + await self.handle_error(data) + else: + request_id = data["id"] + if request_id: + await self.handle_response(request_id, data) + else: + await self.handle_method_message(data) async def empty_handler(self, **kwargs) -> None: """ @@ -411,7 +423,7 @@ async def handle_method_message(self, data) -> None: if handler: return await handler(data) - elif self.parser.on_before_handling is None: + elif not self.on_before_handling: self.logger.warning(f"Unhandled message:{repr(data)}.") async def handle_heartbeat(self, data): @@ -450,4 +462,4 @@ async def handle_error(self, message): if self.on_response_error: self.on_response_error(message) else: - self.logger.error(message) \ No newline at end of file + self.logger.error(message) diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py index db5c0a3..95365a2 100644 --- a/ssc2ce/deribit/parser.py +++ b/ssc2ce/deribit/parser.py @@ -4,6 +4,7 @@ from ssc2ce.common.utils import hide_secret from ssc2ce.deribit.icontroller import IDeribitController from ssc2ce.deribit.iparser import IDeribitParser +from ssc2ce.deribit.l2_book import L2Book class DeribitParser(IDeribitParser): @@ -12,18 +13,18 @@ class DeribitParser(IDeribitParser): """ def __init__(self, controller: IDeribitController): + self.on_book_setup = None + self.on_book_update = None self.logger = logging.getLogger(__name__) self.controller = controller - self.on_before_handling = None - async def handle_message(self, message): - if self.on_before_handling: - self.on_before_handling(message) + self.books = {} + async def handle_message(self, message): data = json.loads(message) if "method" in data: - await self.controller.handle_method_message(data) + await self.handle_method_message(data) else: if "id" in data: if "error" in data: @@ -32,5 +33,39 @@ async def handle_message(self, message): request_id = data["id"] if request_id: await self.controller.handle_response(request_id, data) - elif self.on_before_handling is not None: + elif self.controller.handle_method_message(data) is not None: self.logger.warning(f"Unsupported message {message}") + + async def handle_method_message(self, data: dict): + method = data.get("method") + if method is None: + return + + if method == "subscription": + params = data["params"] + channel = params["channel"] + if channel.startswith("book."): + params_data = params["data"] + instrument = params_data["instrument_name"] + book = self.get_book(instrument) + + if "prev_change_id" in params_data: + book.handle_update(params_data) + if self.on_book_update: + await self.on_book_update(book) + else: + book.handle_snapshot(params_data) + if self.on_book_setup: + await self.on_book_setup(book) + else: + await self.controller.handle_method_message(data) + + return + + def get_book(self, instrument: str) -> L2Book: + book: L2Book = self.books.get(instrument) + if book is None: + book = L2Book(instrument) + self.books[instrument] = book + + return book From d3ee919f1127db82c90d9d2a9fad0ea0a29dd9a9 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 11:58:34 +0200 Subject: [PATCH 06/32] removed asyn for some methods --- ssc2ce/deribit/deribit.py | 52 +++++++++++++++---- ssc2ce/deribit/parser.py | 104 ++++++++++++++++++++++++++++++-------- 2 files changed, 124 insertions(+), 32 deletions(-) diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index b6e92fd..a8f1b72 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -1,3 +1,4 @@ +import asyncio import json import logging @@ -47,6 +48,10 @@ def __init__(self, self.on_connect_ws = self.auth_login if auth_type != AuthType.NONE else None self.on_close_ws = None + + self._on_message_is_routine = False + self._on_message = None + self.on_message = self.handle_message self.on_authenticated = None self.on_token = None @@ -76,6 +81,22 @@ def __init__(self, ("", self.empty_handler), ] + @property + def on_message(self): + return self._on_message + + @on_message.setter + def on_message(self, value): + self._on_message_is_routine = asyncio.iscoroutinefunction(value) + self._on_message = value + + async def _call_on_message(self, message: str): + if self._on_message: + if self._on_message_is_routine: + await self._on_message(message) + else: + self._on_message(message) + async def run_receiver(self): """ Establish a connection and start the receiver loop. @@ -105,7 +126,7 @@ async def run_receiver(self): if self.on_before_handling: self.on_before_handling(message.data) - await self.on_message(message.data) + await self._call_on_message(message.data) else: self.logger.warning(f"Unknown type of message {repr(message)}") @@ -119,7 +140,7 @@ async def stop(self): """ await self.ws.close() - async def send_public(self, request: dict, callback=None) -> int: + async def send_public(self, request: dict, callback=None, logging_it: bool = True) -> int: """ Send a public request @@ -133,7 +154,8 @@ async def send_public(self, request: dict, callback=None) -> int: "id": request_id, **request } - self.logger.info(f"sending:{repr(hide_secret(request))}") + if logging_it: + self.logger.info(f"sending:{repr(hide_secret(request))}") await self.ws.send_json(request) if callback: @@ -367,7 +389,7 @@ async def handle_message(self, message: str) -> None: else: request_id = data["id"] if request_id: - await self.handle_response(request_id, data) + self.handle_response(request_id, data) else: await self.handle_method_message(data) @@ -379,7 +401,7 @@ async def empty_handler(self, **kwargs) -> None: """ self.logger.debug(f"{repr(kwargs)}") - async def handle_response(self, request_id: int, response: dict) -> None: + def handle_response(self, request_id: int, response: dict) -> None: """ :param request_id: @@ -391,12 +413,18 @@ async def handle_response(self, request_id: int, response: dict) -> None: if request: if "callback" in request: callback = request["callback"] - await callback(response) + if asyncio.iscoroutinefunction(callback): + asyncio.ensure_future(callback(response)) + else: + callback(response) else: method = request["method"] handler = resolve_route(method, self.response_routes) if handler: - return await handler(request=request, response=response) + if asyncio.iscoroutinefunction(handler): + asyncio.ensure_future(handler(request=request, response=response)) + else: + handler(request=request, response=response) self.logger.warning(f"Unhandled method:{method} response:{repr(response)} " f"to request:{repr(request)}.") @@ -404,14 +432,16 @@ async def handle_response(self, request_id: int, response: dict) -> None: del self.requests[request_id] else: if self.on_handle_response: - return await self.on_handle_response(response) + if asyncio.iscoroutinefunction(self.on_handle_response): + asyncio.ensure_future(self.on_handle_response(response)) + else: + self.on_handle_response(response) + return else: self.logger.warning( f"Unknown id:{request_id}, the on_handle_response event must be defined." f" Unhandled message {response}") - # self.logger.warning(f"Unhandled method:{method} response:{repr(response)} to request:{repr(request)}.") - async def handle_method_message(self, data) -> None: """ @@ -433,7 +463,7 @@ async def handle_heartbeat(self, data): :return: """ if data["params"]["type"] == "test_request": - await self.send_public({"method": "public/test", "params": {}}) + await self.send_public({"method": "public/test", "params": {}}, logging_it=False) async def handle_auth(self, request, response) -> None: """ diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py index 95365a2..d632096 100644 --- a/ssc2ce/deribit/parser.py +++ b/ssc2ce/deribit/parser.py @@ -1,42 +1,55 @@ +import asyncio import json import logging -from ssc2ce.common.utils import hide_secret from ssc2ce.deribit.icontroller import IDeribitController -from ssc2ce.deribit.iparser import IDeribitParser -from ssc2ce.deribit.l2_book import L2Book +from ssc2ce.deribit.l2_book import DeribitL2Book, L2Book +from ssc2ce.common.exceptions import BrokenOrderBook -class DeribitParser(IDeribitParser): + +class DeribitParser: """ """ - def __init__(self, controller: IDeribitController): - self.on_book_setup = None - self.on_book_update = None + def __init__(self, controller: IDeribitController = None): + self._on_book_setup = None + self._on_book_update = None self.logger = logging.getLogger(__name__) self.controller = controller - + self.deprecated_already_warn = False self.books = {} + def set_on_book_setup(self, handler) -> None: + self._on_book_setup = handler + + def set_on_book_update(self, handler) -> None: + self._on_book_update = handler + async def handle_message(self, message): + if not self.deprecated_already_warn: + self.deprecated_already_warn = True + self.logger.warning("The handle_message method of DeribitParser is deprecated, please use parse instead") + self.parse(message) + + def parse(self, message) -> None: data = json.loads(message) if "method" in data: - await self.handle_method_message(data) + self.handle_method_message(data) else: if "id" in data: if "error" in data: - await self.controller.handle_error(data) + if self.controller: self.controller.handle_error(data) else: request_id = data["id"] - if request_id: - await self.controller.handle_response(request_id, data) - elif self.controller.handle_method_message(data) is not None: + if request_id and self.controller: + self.controller.handle_response(request_id, data) + elif self.controller and self.controller.handle_method_message(data) is not None: self.logger.warning(f"Unsupported message {message}") - async def handle_method_message(self, data: dict): + def handle_method_message(self, data: dict): method = data.get("method") if method is None: return @@ -50,15 +63,16 @@ async def handle_method_message(self, data: dict): book = self.get_book(instrument) if "prev_change_id" in params_data: - book.handle_update(params_data) - if self.on_book_update: - await self.on_book_update(book) + self.handle_update(book, params_data) + if self._on_book_update: + self._on_book_update(book) else: - book.handle_snapshot(params_data) - if self.on_book_setup: - await self.on_book_setup(book) + self.handle_snapshot(book, params_data) + if self._on_book_setup: + self._on_book_setup(book) else: - await self.controller.handle_method_message(data) + if self.controller: + self.controller.handle_method_message(data) return @@ -69,3 +83,51 @@ def get_book(self, instrument: str) -> L2Book: self.books[instrument] = book return book + + @staticmethod + def handle_snapshot(book: DeribitL2Book, message: dict) -> None: + """ + + :param book: + :param message: + :return: + """ + book.clear() + + book.change_id = message["change_id"] + book.timestamp = message["timestamp"] + for i in message['bids']: + book.bids.add(i[1], i[2]) + + for i in message['asks']: + book.asks.add(i[1], i[2]) + + @staticmethod + def handle_update(book: DeribitL2Book, message: dict) -> None: + """ + + :param book: + :param message: + :return: + """ + prev_change_id = message["prev_change_id"] + if prev_change_id != book.change_id: + raise BrokenOrderBook(book.instrument, prev_change_id, book.change_id) + + book.change_id = message["change_id"] + book.timestamp = message["timestamp"] + for change in message['bids']: + if change[0] == 'new': + book.bids.add(price=change[1], size=change[2]) + elif change[0] == 'delete': + book.bids.delete(change[1]) + else: + book.bids.update(price=change[1], size=change[2]) + + for change in message['asks']: + if change[0] == 'new': + book.asks.add(price=change[1], size=change[2]) + elif change[0] == 'delete': + book.asks.delete(change[1]) + else: + book.asks.update(price=change[1], size=change[2]) From 7521cc428168e0789f6c8cdb95818f56949125a2 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 11:59:38 +0200 Subject: [PATCH 07/32] handle_message replaced to parse --- examples/deribit_book.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/deribit_book.py b/examples/deribit_book.py index cf28852..7e355b9 100644 --- a/examples/deribit_book.py +++ b/examples/deribit_book.py @@ -10,13 +10,13 @@ conn = Deribit() parser = DeribitParser(conn) -conn.on_message = parser.handle_message +conn.on_message = parser.parse watcher = BookWatcher(parser) pending = {} instruments = [] -logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.DEBUG) logger = logging.getLogger("deribit-book") From d53f94d68d2c3f6bdeb3ceb446817d46c8272310 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 12:00:25 +0200 Subject: [PATCH 08/32] minor changes of book --- ssc2ce/bitfinex/l2_book.py | 4 +-- ssc2ce/common/l2_book.py | 34 ++++++++++++++++++ ssc2ce/common/l2_book_side.py | 13 +------ ssc2ce/deribit/l2_book.py | 65 +++-------------------------------- 4 files changed, 42 insertions(+), 74 deletions(-) create mode 100644 ssc2ce/common/l2_book.py diff --git a/ssc2ce/bitfinex/l2_book.py b/ssc2ce/bitfinex/l2_book.py index 71d1599..c5f7077 100644 --- a/ssc2ce/bitfinex/l2_book.py +++ b/ssc2ce/bitfinex/l2_book.py @@ -1,11 +1,11 @@ import logging -from ssc2ce.common.abstract_l2_book import AbstractL2Book +from ssc2ce.common.l2_book import L2Book from collections import deque from ssc2ce.common.exceptions import BrokenOrderBook -class L2Book(AbstractL2Book): +class L2Book(L2Book): change_id = None timestamp = None logger = logging.getLogger(__name__) diff --git a/ssc2ce/common/l2_book.py b/ssc2ce/common/l2_book.py new file mode 100644 index 0000000..d9cdfde --- /dev/null +++ b/ssc2ce/common/l2_book.py @@ -0,0 +1,34 @@ +from .l2_book_side import L2BookSide + + +class L2Book: + """ + + """ + + def __init__(self, instrument: str): + """ + + :param instrument: + """ + self.instrument = instrument + self._bids = L2BookSide(is_bids=True) + self._asks = L2BookSide(is_bids=False) + + @property + def bids(self) -> L2BookSide: + return self._bids + + @property + def asks(self) -> L2BookSide: + return self._asks + + def clear(self): + self._bids.data.clear() + self._asks.data.clear() + + def top_ask_price(self): + return self._asks.data[0][0] + + def top_bid_price(self): + return self._bids.data[0][0] diff --git a/ssc2ce/common/l2_book_side.py b/ssc2ce/common/l2_book_side.py index 14874dd..581b99e 100644 --- a/ssc2ce/common/l2_book_side.py +++ b/ssc2ce/common/l2_book_side.py @@ -12,17 +12,13 @@ def __init__(self, is_bids: bool): self.data = SortedKeyList(key=lambda val: val[0]) self.is_bids = is_bids self.time = None - self.changes = list() def fill(self, source): self.data.clear() for item in source: self.add(item) - def add(self, item): - price = float(item[0]) - size = float(item[1]) - self.changes.append([price, size, size]) + def add(self, price: float, size: float): self.data.add([price, size]) def update(self, price: float, size: float): @@ -33,30 +29,23 @@ def update(self, price: float, size: float): value = self.data[i] else: if size <= VERY_SMALL_NUMBER: - self.changes.append([price, size, 0.0]) return False self.data.add([price, size]) - self.changes.append([price, size, size]) return True if size <= VERY_SMALL_NUMBER: if value[0] == price: old_size = self.data[i][1] self.data.discard(value) - self.changes.append([price, size, -old_size]) return True else: - self.changes.append([price, size, 0.0]) return False if value[0] == price: - old_size = self.data[i][1] self.data[i][1] = size - self.changes.append([price, size, size - old_size]) else: self.data.add([price, size]) - self.changes.append([price, size, size]) return True def delete(self, price: float): diff --git a/ssc2ce/deribit/l2_book.py b/ssc2ce/deribit/l2_book.py index ba9d959..e9801ae 100644 --- a/ssc2ce/deribit/l2_book.py +++ b/ssc2ce/deribit/l2_book.py @@ -1,63 +1,8 @@ -import logging -from ssc2ce.common.abstract_l2_book import AbstractL2Book -from collections import deque +from ssc2ce.common.l2_book import L2Book -from ssc2ce.common.exceptions import BrokenOrderBook - - -class L2Book(AbstractL2Book): - change_id = None - timestamp = None - logger = logging.getLogger(__name__) +class DeribitL2Book(L2Book): def __init__(self, instrument: str): - """ - - :param instrument: - """ - AbstractL2Book.__init__(self, instrument) - - def handle_snapshot(self, message: dict) -> None: - """ - - :param message: - :return: - """ - self.asks.clear() - self.bids.clear() - - self.change_id = message["change_id"] - self.timestamp = message["timestamp"] - for i in message['bids']: - self.bids.add([i[1], i[2]]) - - for i in message['asks']: - self.asks.add([i[1], i[2]]) - - def handle_update(self, message: dict) -> None: - """ - - :param message: - :return: - """ - prev_change_id = message["prev_change_id"] - if prev_change_id != self.change_id: - raise BrokenOrderBook(self.instrument, prev_change_id, self.change_id) - - self.change_id = message["change_id"] - self.timestamp = message["timestamp"] - for change in message['bids']: - if change[0] == 'new': - self._bids.add(change[1:]) - elif change[0] == 'delete': - self._bids.delete(change[1]) - else: - self._bids.update(price=change[1], size=change[2]) - - for change in message['asks']: - if change[0] == 'new': - self._asks.add(change[1:]) - elif change[0] == 'delete': - self._asks.delete(change[1]) - else: - self._asks.update(price=change[1], size=change[2]) + L2Book.__init__(self, instrument) + self.change_id = None + self.timestamp = None From 57a9761c65b3d1bf821e6a7ed185e4f2d852b49e Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 12:00:43 +0200 Subject: [PATCH 09/32] public/set_heartbeat added --- examples/deribit_writer.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/examples/deribit_writer.py b/examples/deribit_writer.py index 855ff3d..169052c 100644 --- a/examples/deribit_writer.py +++ b/examples/deribit_writer.py @@ -3,7 +3,6 @@ import json import logging from ssc2ce import Deribit -from ssc2ce.deribit.l2_book import L2Book conn = Deribit() @@ -20,9 +19,10 @@ async def handle_instruments(data: dict): request_id = data["id"] del pending[request_id] print(json.dumps(data)) - instruments += list(set([x["instrument_name"][:11] if not x["instrument_name"].endswith("-PERPETUAL") else x["instrument_name"] for x in data["result"]])) + instruments += list(set( + [x["instrument_name"][:11] if not x["instrument_name"].endswith("-PERPETUAL") else x["instrument_name"] for x in + data["result"]])) - # print(", ".join()) if not pending: print(instruments) await subscribe_books(instruments) @@ -58,6 +58,13 @@ async def subscribe_books(instruments: list): } }) + await conn.send_public(request={ + "method": "public/set_heartbeat", + "params": { + "interval": 15 + } + }) + output = open("dump.txt", "w") @@ -67,11 +74,16 @@ def dump(msg: str): output.write('\n') +def stop(): + asyncio.ensure_future(conn.ws.close()) + + conn.on_before_handling = dump conn.on_connect_ws = get_currencies loop = asyncio.get_event_loop() +loop.call_later(3600, stop) try: loop.run_until_complete(conn.run_receiver()) From 5afde828865c47aa321ce533bd79024da44ad3af Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 12:06:48 +0200 Subject: [PATCH 10/32] ssc2ce_cpp support --- examples/book_watcher.py | 25 ++++++++++++++----------- examples/deribit_parser.py | 26 +++++++++++++++++++------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/examples/book_watcher.py b/examples/book_watcher.py index 65a0699..216ea07 100644 --- a/examples/book_watcher.py +++ b/examples/book_watcher.py @@ -2,19 +2,22 @@ class BookWatcher: - def __init__(self, parser): + def __init__(self, parser, print_it: bool = True): self.betters = {} - parser.on_book_setup = self.handle_book_setup - parser.on_book_update = self.handle_book_update + self.print_it = print_it + parser.set_on_book_setup(self.handle_book_setup) + parser.set_on_book_update(self.handle_book_update) - async def handle_book_setup(self, book: L2Book): - top = [book.bids[0][0], book.asks[0][0]] + def handle_book_setup(self, book: L2Book): + top = [book.top_bid_price(), book.top_ask_price()] self.betters[book.instrument] = top - print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") + if self.print_it: + print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") - async def handle_book_update(self, book: L2Book): + def handle_book_update(self, book: L2Book): top = self.betters[book.instrument] - if top[1] != book.asks[0][0] or top[0] != book.bids[0][0]: - top[1] = book.asks[0][0] - top[0] = book.bids[0][0] - print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") + if top[1] != book.top_ask_price() or top[0] != book.top_bid_price(): + top[1] = book.top_ask_price() + top[0] = book.top_bid_price() + if self.print_it: + print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") diff --git a/examples/deribit_parser.py b/examples/deribit_parser.py index a7943c5..6a47ddc 100644 --- a/examples/deribit_parser.py +++ b/examples/deribit_parser.py @@ -1,10 +1,22 @@ import asyncio -import json import logging +from time import time from examples.book_watcher import BookWatcher from ssc2ce.deribit.icontroller import IDeribitController -from ssc2ce.deribit.parser import DeribitParser + +import sys + +if len(sys.argv) > 1 and "cpp" in sys.argv: + from importlib import util + ssc2ce_cpp_spec = util.find_spec("ssc2ce_cpp") + if ssc2ce_cpp_spec: + from ssc2ce_cpp import DeribitParser + else: + print("You must install the ssc2ce_cpp module to use its features.\n pip install ssc2ce_cpp") + exit(1) +else: + from ssc2ce.deribit.parser import DeribitParser logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger("deribit-parser") @@ -12,19 +24,19 @@ class FileController(IDeribitController): def __init__(self): - self.parser = DeribitParser(self) + self.parser = DeribitParser() self.counter = 0 - self.watcher = BookWatcher(self.parser) + self.watcher = BookWatcher(self.parser, False) async def run(self, filename: str): + start = time() with open(filename) as f: for line in f: - await self.parser.handle_message(line) + self.parser.parse(line) - logger.info(f"{self.counter}") + logger.info(f"{self.counter} in {time()-start} sec.") async def handle_method_message(self, message: dict) -> None: - self.counter += 1 async def handle_error(self, message: dict) -> None: From d2c970524718ae264c6da39bdfbb1d88b6deedf7 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:28:21 +0200 Subject: [PATCH 11/32] controller removed from parser --- ssc2ce/deribit/parser.py | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py index d632096..3fe4780 100644 --- a/ssc2ce/deribit/parser.py +++ b/ssc2ce/deribit/parser.py @@ -13,11 +13,10 @@ class DeribitParser: """ - def __init__(self, controller: IDeribitController = None): + def __init__(self): self._on_book_setup = None self._on_book_update = None self.logger = logging.getLogger(__name__) - self.controller = controller self.deprecated_already_warn = False self.books = {} @@ -33,28 +32,15 @@ async def handle_message(self, message): self.logger.warning("The handle_message method of DeribitParser is deprecated, please use parse instead") self.parse(message) - def parse(self, message) -> None: + def parse(self, message) -> bool: data = json.loads(message) - if "method" in data: - self.handle_method_message(data) - else: - if "id" in data: - if "error" in data: - if self.controller: self.controller.handle_error(data) - else: - request_id = data["id"] - if request_id and self.controller: - self.controller.handle_response(request_id, data) - elif self.controller and self.controller.handle_method_message(data) is not None: - self.logger.warning(f"Unsupported message {message}") + return self.handle_method_message(data) - def handle_method_message(self, data: dict): + def handle_method_message(self, data: dict) -> bool: method = data.get("method") - if method is None: - return - - if method == "subscription": + processed = False + if method and method == "subscription": params = data["params"] channel = params["channel"] if channel.startswith("book."): @@ -70,11 +56,10 @@ def handle_method_message(self, data: dict): self.handle_snapshot(book, params_data) if self._on_book_setup: self._on_book_setup(book) - else: - if self.controller: - self.controller.handle_method_message(data) - return + processed = True + + return processed def get_book(self, instrument: str) -> L2Book: book: L2Book = self.books.get(instrument) From 0692a6bbdb42bd7a56e6e8403e63a835d64228d8 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:28:35 +0200 Subject: [PATCH 12/32] def instrument(self) -> str --- ssc2ce/common/l2_book.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ssc2ce/common/l2_book.py b/ssc2ce/common/l2_book.py index d9cdfde..ca41995 100644 --- a/ssc2ce/common/l2_book.py +++ b/ssc2ce/common/l2_book.py @@ -11,10 +11,13 @@ def __init__(self, instrument: str): :param instrument: """ - self.instrument = instrument + self._instrument = instrument self._bids = L2BookSide(is_bids=True) self._asks = L2BookSide(is_bids=False) + def instrument(self) -> str: + return self._instrument + @property def bids(self) -> L2BookSide: return self._bids From 6313ffba6a4209f5bf4d6d6ddb0ebba4781d6149 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:31:21 +0200 Subject: [PATCH 13/32] IDeribitController removed --- ssc2ce/deribit/icontroller.py | 15 --------------- ssc2ce/deribit/parser.py | 2 -- 2 files changed, 17 deletions(-) delete mode 100644 ssc2ce/deribit/icontroller.py diff --git a/ssc2ce/deribit/icontroller.py b/ssc2ce/deribit/icontroller.py deleted file mode 100644 index 62e2046..0000000 --- a/ssc2ce/deribit/icontroller.py +++ /dev/null @@ -1,15 +0,0 @@ -from abc import ABC, abstractmethod - - -class IDeribitController(ABC): - @abstractmethod - async def handle_method_message(self, message: dict) -> None: - pass - - @abstractmethod - async def handle_error(self, message: dict) -> None: - pass - - @abstractmethod - async def handle_response(self, request_id: int, data: dict): - pass diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py index 3fe4780..a2a6ff5 100644 --- a/ssc2ce/deribit/parser.py +++ b/ssc2ce/deribit/parser.py @@ -2,8 +2,6 @@ import json import logging -from ssc2ce.deribit.icontroller import IDeribitController - from ssc2ce.deribit.l2_book import DeribitL2Book, L2Book from ssc2ce.common.exceptions import BrokenOrderBook From 30bd2f5a7e4dbffb475bdd16b40d4384d79431e9 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:31:30 +0200 Subject: [PATCH 14/32] IDeribitParser removed --- ssc2ce/deribit/iparser.py | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 ssc2ce/deribit/iparser.py diff --git a/ssc2ce/deribit/iparser.py b/ssc2ce/deribit/iparser.py deleted file mode 100644 index 4f29a6f..0000000 --- a/ssc2ce/deribit/iparser.py +++ /dev/null @@ -1,7 +0,0 @@ -from abc import ABC, abstractmethod - - -class IDeribitParser(ABC): - @abstractmethod - async def handle_message(self, message: str) -> None: - pass From 02aceed8847b515cc004213ad17c5553857a6c1c Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:31:50 +0200 Subject: [PATCH 15/32] minor changes --- ssc2ce/VERSION.py | 1 - 1 file changed, 1 deletion(-) delete mode 100644 ssc2ce/VERSION.py diff --git a/ssc2ce/VERSION.py b/ssc2ce/VERSION.py deleted file mode 100644 index 8088f75..0000000 --- a/ssc2ce/VERSION.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.8.1" From 8dc81812e21fc5ede0acf3062a5c969a276a128b Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:32:24 +0200 Subject: [PATCH 16/32] async removed from some methods --- examples/deribit_private.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/deribit_private.py b/examples/deribit_private.py index f561c33..3df705b 100644 --- a/examples/deribit_private.py +++ b/examples/deribit_private.py @@ -31,6 +31,7 @@ def __init__(self): auth_type=AuthType.CREDENTIALS, scope=None, get_id=lambda: str(uuid4())) + self.deribit.on_handle_response = self.on_handle_response self.deribit.on_authenticated = self.after_login self.deribit.on_token = self.on_token @@ -99,57 +100,57 @@ def resolve_route(value, routes): if key is not None and key == "" and handler: return handler - async def handle_subscription(self, data: dict): + def handle_subscription(self, data: dict): channel = data["params"]["channel"] handler = self.resolve_route(channel, self.subscription_route) if handler: - return await handler(data) + return handler(data) - async def after_login(self): + def after_login(self): asyncio.ensure_future(self.do_something_after_login()) async def setup_refresh(self, refresh_interval): await asyncio.sleep(refresh_interval) await self.deribit.auth_refresh_token() - async def on_token(self, params): + def on_token(self, params): refresh_interval = min(600, params["expires_in"]) asyncio.ensure_future(self.setup_refresh(refresh_interval)) - async def on_handle_response(self, data): + def on_handle_response(self, data): request_id = data["id"] if request_id in self.direct_requests: self.logger.info(f"Caught response {repr(data)} to direct request {self.direct_requests[request_id]}") else: self.logger.error(f"Can't find request with id:{request_id} for response:{repr(data)}") - async def handle_order_book_change(self, message): + def handle_order_book_change(self, message): data = message["params"]["data"] self.logger.debug(f"{repr(data)}") - async def handle_user_trades(self, message): + def handle_user_trades(self, message): data = message["params"]["data"] self.logger.info(f"{repr(data)}") - async def handle_user_orders(self, message): + def handle_user_orders(self, message): data = message["params"]["data"] self.logger.info(f"{repr(data)}") - async def handle_price_index(self, message): + def handle_price_index(self, message): data = message["params"]["data"] index_name = data['index_name'] self.logger.debug(f"{index_name}: {repr(data)}") - async def handle_trades(self, message): + def handle_trades(self, message): data = message["params"]["data"] self.logger.debug(f"{repr(data)}") - async def handle_ticker(self, message): + def handle_ticker(self, message): data = message["params"]["data"] instrument_name = data['instrument_name'] self.logger.debug(f"{instrument_name}: {repr(data)}") - async def on_response_error(self, data): + def on_response_error(self, data): self.logger.error(f"Receive error {repr(data)}") asyncio.ensure_future(self.deribit.stop()) From 4f2085aa200a1530b90935de831bb627c4ff385e Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:32:45 +0200 Subject: [PATCH 17/32] IDeribitController removed --- examples/deribit_parser.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/examples/deribit_parser.py b/examples/deribit_parser.py index 6a47ddc..61d53ed 100644 --- a/examples/deribit_parser.py +++ b/examples/deribit_parser.py @@ -3,12 +3,12 @@ from time import time from examples.book_watcher import BookWatcher -from ssc2ce.deribit.icontroller import IDeribitController import sys if len(sys.argv) > 1 and "cpp" in sys.argv: from importlib import util + ssc2ce_cpp_spec = util.find_spec("ssc2ce_cpp") if ssc2ce_cpp_spec: from ssc2ce_cpp import DeribitParser @@ -22,30 +22,26 @@ logger = logging.getLogger("deribit-parser") -class FileController(IDeribitController): +class FileController: def __init__(self): self.parser = DeribitParser() self.counter = 0 self.watcher = BookWatcher(self.parser, False) - async def run(self, filename: str): + def run(self, filename: str): start = time() + i = 0 with open(filename) as f: for line in f: - self.parser.parse(line) - - logger.info(f"{self.counter} in {time()-start} sec.") + i += 1 + if not self.parser.parse(line): + self.handle_message(line) - async def handle_method_message(self, message: dict) -> None: - self.counter += 1 - - async def handle_error(self, message: dict) -> None: - self.counter += 1 - logger.info(message) + logger.info(f"{i} in {time() - start} sec. {self.counter}") - async def handle_response(self, request_id: int, data: dict): + def handle_message(self, message: str) -> None: self.counter += 1 - # logger.info(json.dumps(data)) + # logger.info(message[:-1]) -asyncio.run(FileController().run("dump.txt")) +FileController().run("dump.txt") From 8d9d1560e588ff9c9d4cd8b1051e6b1a8f712e21 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:33:06 +0200 Subject: [PATCH 18/32] using ssc2ce_cpp_spec added --- examples/deribit_book.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/examples/deribit_book.py b/examples/deribit_book.py index 7e355b9..84b6f2d 100644 --- a/examples/deribit_book.py +++ b/examples/deribit_book.py @@ -2,17 +2,30 @@ import asyncio import json import logging +import sys from examples.book_watcher import BookWatcher from ssc2ce import Deribit -from ssc2ce.deribit.parser import DeribitParser +if len(sys.argv) > 1 and "cpp" in sys.argv: + from importlib import util + + ssc2ce_cpp_spec = util.find_spec("ssc2ce_cpp") + if ssc2ce_cpp_spec: + from ssc2ce_cpp import DeribitParser + else: + print("You must install the ssc2ce_cpp module to use its features.\n pip install ssc2ce_cpp") + exit(1) +else: + from ssc2ce.deribit.parser import DeribitParser conn = Deribit() -parser = DeribitParser(conn) -conn.on_message = parser.parse +parser = DeribitParser() watcher = BookWatcher(parser) +# parser.set_on_unprocessed_message(conn.handle_message) +conn.on_message = parser.parse + pending = {} instruments = [] From 1426477ddb09e0def1373acc7b5ebf75307e403d Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:33:24 +0200 Subject: [PATCH 19/32] async removed from some methods --- examples/deribit_basic_example.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/deribit_basic_example.py b/examples/deribit_basic_example.py index e0675b2..8626554 100644 --- a/examples/deribit_basic_example.py +++ b/examples/deribit_basic_example.py @@ -8,18 +8,21 @@ pending = {} -async def handle_instruments(data: dict): +def handle_instruments(data: dict): del pending[data["id"]] - print(json.dumps(data)) + print("handle_instruments", json.dumps(data)) if not pending: - await subscribe() + asyncio.ensure_future(subscribe()) -async def handle_currencies(data: dict): +async def get_instruments(symbol): + request_id = await conn.get_instruments(symbol, callback=handle_instruments) + pending[request_id] = symbol + + +def handle_currencies(data: dict): for currency in data["result"]: - symbol = currency["currency"] - id = await conn.get_instruments(symbol, callback=handle_instruments) - pending[id] = symbol + asyncio.ensure_future(get_instruments(currency["currency"])) async def get_currencies(): @@ -30,12 +33,13 @@ async def subscribe(): await conn.send_public(request={ "method": "public/subscribe", "params": { - "channels": ["deribit_price_index.btc_usd"] + "channels": ["deribit_price_index.btc_usd", + "deribit_price_index.eth_usd"] } }) -async def handle_subscription(data): +def handle_subscription(data): method = data.get("method") if method and method == "subscription": if data["params"]["channel"].startswith("deribit_price_index"): From 0b0622d85d18aa6e3c9b43886b7d6af37a3ed21e Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:33:35 +0200 Subject: [PATCH 20/32] IDeribitController removed --- ssc2ce/deribit/deribit.py | 60 ++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index a8f1b72..a803d5b 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -10,10 +10,9 @@ from ssc2ce.common import AuthType from ssc2ce.common.session import SessionWrapper from ssc2ce.common.utils import resolve_route, hide_secret, IntId -from ssc2ce.deribit.icontroller import IDeribitController -class Deribit(SessionWrapper, IDeribitController): +class Deribit(SessionWrapper): """ Handlers: - on_connect_ws - Called after the connection is established. @@ -52,7 +51,6 @@ def __init__(self, self._on_message_is_routine = False self._on_message = None - self.on_message = self.handle_message self.on_authenticated = None self.on_token = None self.on_before_handling = None @@ -90,13 +88,6 @@ def on_message(self, value): self._on_message_is_routine = asyncio.iscoroutinefunction(value) self._on_message = value - async def _call_on_message(self, message: str): - if self._on_message: - if self._on_message_is_routine: - await self._on_message(message) - else: - self._on_message(message) - async def run_receiver(self): """ Establish a connection and start the receiver loop. @@ -104,7 +95,10 @@ async def run_receiver(self): """ self.ws = await self._session.ws_connect(self.ws_api) if self.on_connect_ws: - await self.on_connect_ws() + if asyncio.iscoroutinefunction(self.on_connect_ws): + await self.on_connect_ws() + else: + self.on_connect_ws() # A receiver loop while self.ws and not self.ws.closed: @@ -126,7 +120,15 @@ async def run_receiver(self): if self.on_before_handling: self.on_before_handling(message.data) - await self._call_on_message(message.data) + processed = False + if self._on_message: + if self._on_message_is_routine: + processed = await self._on_message(message.data) + else: + processed = self._on_message(message.data) + + if not processed: + self.handle_message(message.data) else: self.logger.warning(f"Unknown type of message {repr(message)}") @@ -373,7 +375,7 @@ async def get_instruments(self, currency: str, kind: str = None, expired: bool = return await self.send_public(request=request, callback=callback) - async def handle_message(self, message: str) -> None: + def handle_message(self, message: str) -> None: """ :param message: @@ -381,19 +383,19 @@ async def handle_message(self, message: str) -> None: """ data = json.loads(message) if "method" in data: - await self.handle_method_message(data) + self.handle_method_message(data) else: if "id" in data: if "error" in data: - await self.handle_error(data) + self.handle_error(data) else: request_id = data["id"] if request_id: self.handle_response(request_id, data) else: - await self.handle_method_message(data) + self.handle_method_message(data) - async def empty_handler(self, **kwargs) -> None: + def empty_handler(self, **kwargs) -> None: """ A default handler :param kwargs: @@ -425,9 +427,9 @@ def handle_response(self, request_id: int, response: dict) -> None: asyncio.ensure_future(handler(request=request, response=response)) else: handler(request=request, response=response) - - self.logger.warning(f"Unhandled method:{method} response:{repr(response)} " - f"to request:{repr(request)}.") + else: + self.logger.warning(f"Unhandled method:{method} response:{repr(response)} " + f"to request:{repr(request)}.") del self.requests[request_id] else: @@ -442,7 +444,7 @@ def handle_response(self, request_id: int, response: dict) -> None: f"Unknown id:{request_id}, the on_handle_response event must be defined." f" Unhandled message {response}") - async def handle_method_message(self, data) -> None: + def handle_method_message(self, data) -> None: """ :param data: @@ -452,20 +454,20 @@ async def handle_method_message(self, data) -> None: handler = resolve_route(method, self.method_routes) if handler: - return await handler(data) + handler(data) elif not self.on_before_handling: self.logger.warning(f"Unhandled message:{repr(data)}.") - async def handle_heartbeat(self, data): + def handle_heartbeat(self, data): """ :param data: :return: """ if data["params"]["type"] == "test_request": - await self.send_public({"method": "public/test", "params": {}}, logging_it=False) + asyncio.ensure_future(self.send_public({"method": "public/test", "params": {}}, logging_it=False)) - async def handle_auth(self, request, response) -> None: + def handle_auth(self, request, response) -> None: """ :param request: @@ -476,19 +478,19 @@ async def handle_auth(self, request, response) -> None: grant_type = request["params"]["grant_type"] if grant_type == "": if self.on_token: - await self.on_token(response["result"]) + self.on_token(response["result"]) elif grant_type in ("client_credentials", "password"): # TODO - Why we use two handlers? if self.on_authenticated: - await self.on_authenticated() + self.on_authenticated() if self.on_token: - await self.on_token(response["result"]) + self.on_token(response["result"]) elif grant_type == "client_signature": pass else: self.logger.error(f"Unknown grant_type {repr(hide_secret(request))} : {repr(hide_secret(response))}") - async def handle_error(self, message): + def handle_error(self, message): if self.on_response_error: self.on_response_error(message) else: From 1397c98d8527a506809892a5affd02abc9b108cd Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:34:29 +0200 Subject: [PATCH 21/32] book.instrument becomes a method --- examples/book_watcher.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/book_watcher.py b/examples/book_watcher.py index 216ea07..2a11135 100644 --- a/examples/book_watcher.py +++ b/examples/book_watcher.py @@ -10,14 +10,14 @@ def __init__(self, parser, print_it: bool = True): def handle_book_setup(self, book: L2Book): top = [book.top_bid_price(), book.top_ask_price()] - self.betters[book.instrument] = top + self.betters[book.instrument()] = top if self.print_it: - print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") + print(f"{book.instrument()} bid:{top[0]} ask:{top[1]}") def handle_book_update(self, book: L2Book): - top = self.betters[book.instrument] + top = self.betters[book.instrument()] if top[1] != book.top_ask_price() or top[0] != book.top_bid_price(): top[1] = book.top_ask_price() top[0] = book.top_bid_price() if self.print_it: - print(f"{book.instrument} bid:{top[0]} ask:{top[1]}") + print(f"{book.instrument()} bid:{top[0]} ask:{top[1]}") From 41f8b45ecaca8a57d3d144fadc3f4638e221f92a Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 14:34:46 +0200 Subject: [PATCH 22/32] AbstractL2Book removed --- ssc2ce/common/abstract_l2_book.py | 44 ------------------------------- 1 file changed, 44 deletions(-) delete mode 100644 ssc2ce/common/abstract_l2_book.py diff --git a/ssc2ce/common/abstract_l2_book.py b/ssc2ce/common/abstract_l2_book.py deleted file mode 100644 index 7535508..0000000 --- a/ssc2ce/common/abstract_l2_book.py +++ /dev/null @@ -1,44 +0,0 @@ -from sortedcontainers import SortedKeyList -from abc import abstractmethod, ABC -from .l2_book_side import L2BookSide, SortedKeyList - - -class AbstractL2Book(ABC): - """ - - """ - - def __init__(self, instrument: str): - """ - - :param instrument: - """ - self.instrument = instrument - self._bids = L2BookSide(is_bids=True) - self._asks = L2BookSide(is_bids=False) - - @abstractmethod - def handle_snapshot(self, message: dict) -> None: - """ - - :param message: - :return: - """ - pass - - @abstractmethod - def handle_update(self, message: dict) -> None: - """ - - :param message: - :return: - """ - pass - - @property - def bids(self): - return self._bids.data - - @property - def asks(self): - return self._asks.data From 005ed30c42a48f8e474dddbef342a497005d2561 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 15:08:03 +0200 Subject: [PATCH 23/32] README.md changed --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 2640d8d..a0e58b6 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,15 @@ A Set of Simple Connectors for access To Cryptocurrency Exchanges via websocket based on [aiohttp](https://aiohttp.readthedocs.io) . +This is more of a pilot project, if you have any wishes for adding exchanges or expanding functionality, please register issues + ## Installation Install ssc2ce with: ```bash $ pip install ssc2ce ``` +You can run some examples with ## Bitfinex ### Description API description look at [Websocket API v2](https://docs.bitfinex.com/v2/docs/ws-general) From 62bf93e076028bbcdd261cf132140950ab1c5a6e Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 15:50:23 +0200 Subject: [PATCH 24/32] minor changes --- .gitignore | 2 ++ requirements.txt | 7 ++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 894a44c..04e1286 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.idea/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/requirements.txt b/requirements.txt index cd2dd54..1f3b5da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ -aiohttp +aiohttp==3.6.2 +sortedcontainers==2.1.0 + +ssc2ce_cpp python-dotenv twine -wheel -sortedcontainers From 1125c9cd8ef1572d3e029b2ec6428530a6b9f27a Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 15:50:48 +0200 Subject: [PATCH 25/32] version="0.12.0" --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index cc01795..b813a89 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name='ssc2ce', - version="0.11.0", + version="0.12.0", author='Oleg Nedbaylo', author_email='olned64@gmail.com', description='A Set of Simple Connectors for access To Cryptocurrency Exchanges', @@ -15,7 +15,7 @@ long_description_content_type="text/markdown", url='https://github.com/olned/ssc2ce-python', packages=setuptools.find_packages(), - install_requires=['aiohttp'], + install_requires=['aiohttp', 'sortedcontainers'], classifiers=[ "Programming Language :: Python :: 3.6", "License :: OSI Approved :: MIT License", From cb673d423b4a4888db5d5c25121a9b444d1562ec Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo <22174154+olned@users.noreply.github.com> Date: Tue, 16 Jun 2020 16:06:24 +0200 Subject: [PATCH 26/32] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a0e58b6..1088d96 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ async def subscribe(): }) -async def handle_subscription(data): +def handle_subscription(data): method = data.get("method") if method and method == "subscription": if data["params"]["channel"].startswith("deribit_price_index"): From e352b0cda8a505e94b37467ac6eab0e04becb077 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 16:42:51 +0200 Subject: [PATCH 27/32] bitfinex book fixed --- ssc2ce/bitfinex/__init__.py | 2 +- ssc2ce/bitfinex/l2_book.py | 13 ++++++------- ssc2ce/common/__init__.py | 3 ++- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/ssc2ce/bitfinex/__init__.py b/ssc2ce/bitfinex/__init__.py index 5c1f100..1f2dac7 100644 --- a/ssc2ce/bitfinex/__init__.py +++ b/ssc2ce/bitfinex/__init__.py @@ -1,2 +1,2 @@ from .bitfinex import Bitfinex -from .l2_book import L2Book +from .l2_book import BitfinexL2Book diff --git a/ssc2ce/bitfinex/l2_book.py b/ssc2ce/bitfinex/l2_book.py index c5f7077..1cd89b4 100644 --- a/ssc2ce/bitfinex/l2_book.py +++ b/ssc2ce/bitfinex/l2_book.py @@ -1,11 +1,11 @@ import logging -from ssc2ce.common.l2_book import L2Book +from ssc2ce.common import L2Book from collections import deque from ssc2ce.common.exceptions import BrokenOrderBook -class L2Book(L2Book): +class BitfinexL2Book(L2Book): change_id = None timestamp = None logger = logging.getLogger(__name__) @@ -15,7 +15,7 @@ def __init__(self, instrument: str): :param instrument: """ - AbstractL2Book.__init__(self, instrument) + L2Book.__init__(self, instrument) def handle_snapshot(self, message: dict) -> None: """ @@ -23,15 +23,14 @@ def handle_snapshot(self, message: dict) -> None: :param message: :return: """ - self.asks.clear() - self.bids.clear() + self.clear() for item in message[1]: price = item[2] if price < 0: - self.asks.add([item[0], -price]) + self.asks.add(item[0], -price) else: - self.bids.add([item[0], price]) + self.bids.add(item[0], price) def handle_update(self, message: dict) -> None: """ diff --git a/ssc2ce/common/__init__.py b/ssc2ce/common/__init__.py index ae5724c..3f4442b 100644 --- a/ssc2ce/common/__init__.py +++ b/ssc2ce/common/__init__.py @@ -1 +1,2 @@ -from .auth_type import AuthType \ No newline at end of file +from .auth_type import AuthType +from .l2_book import L2Book, L2BookSide From 575938815dd50a4e7b7acc8d04e423163b22caa6 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 16:45:30 +0200 Subject: [PATCH 28/32] some examples changed --- examples/bitfinex_book.py | 21 ++++++++++++--------- examples/deribit_private.py | 34 ++++++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/examples/bitfinex_book.py b/examples/bitfinex_book.py index c901404..fb9fdf0 100644 --- a/examples/bitfinex_book.py +++ b/examples/bitfinex_book.py @@ -2,9 +2,11 @@ import asyncio import logging -from ssc2ce.bitfinex import Bitfinex, L2Book +from ssc2ce.bitfinex import Bitfinex, BitfinexL2Book -logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig( + format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', + level=logging.INFO) logger = logging.getLogger("bitfinex-basic-example") conn = Bitfinex() @@ -12,11 +14,11 @@ class BookMaintainer: def __init__(self, instrument): - self.book = L2Book(instrument) + self.book = BitfinexL2Book(instrument) self.check_sum = None self.active = False - self.top_bid = [0, 0] - self.top_ask = [0, 0] + self.top_bid = 0. + self.top_ask = 0. async def handle_book(self, message, connector): if type(message[1]) is str: @@ -29,10 +31,11 @@ async def handle_book(self, message, connector): self.book.handle_snapshot(message) self.book.time = message[-1] - if self.top_ask[0] != self.book.asks[0][0] or self.top_ask[0] != self.book.asks[0][0]: - self.top_ask = self.book.asks[0].copy() - self.top_bid = self.book.bids[0].copy() - logger.info(f"{self.book.instrument} bid:{self.top_bid[0]} ask:{self.top_ask[0]}") + if self.top_ask != self.book.top_ask_price() or self.top_bid != self.book.top_bid_price(): + self.top_ask = self.book.top_ask_price() + self.top_bid = self.book.top_bid_price() + logger.info( + f"{self.book.instrument()} bid:{self.top_bid} ask:{self.top_ask}") btc = BookMaintainer("BTC-USD") diff --git a/examples/deribit_private.py b/examples/deribit_private.py index 3df705b..4f35e82 100644 --- a/examples/deribit_private.py +++ b/examples/deribit_private.py @@ -12,7 +12,8 @@ class MyApp: def __init__(self): - logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) + logging.basicConfig( + format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) self.logger = logging.getLogger("deribit-private") self.direct_requests = {} @@ -23,7 +24,8 @@ def __init__(self): client_secret = os.environ.get('DERIBIT_CLIENT_SECRET') if client_id is None or client_secret is None: - self.logger.error("Please setup environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET") + self.logger.error( + "Please setup environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET") exit(0) self.deribit = Deribit(client_id=client_id, @@ -76,17 +78,27 @@ async def do_something_after_login(self): await self.deribit.send_public(request={ "method": "private/subscribe", "params": { - "channels": ["book.BTC-PERPETUAL.raw", - "trades.BTC-PERPETUAL.raw", - "user.orders.BTC-PERPETUAL.raw", - "user.trades.BTC-PERPETUAL.raw"] + "channels": [ + "deribit_price_index.btc_usd", + "book.BTC-PERPETUAL.raw", + "trades.BTC-PERPETUAL.raw", + "user.orders.BTC-PERPETUAL.raw", + "user.trades.BTC-PERPETUAL.raw" + ] + } + }) + + await self.deribit.send_public(request={ + "method": "public/set_heartbeat", + "params": { + "interval": 15 } }) async def printer(self, **kwargs): self.logger.info(f"{repr(kwargs)}") - @staticmethod + @ staticmethod def resolve_route(value, routes): key, handler = None, None for key, handler in routes: @@ -120,9 +132,11 @@ def on_token(self, params): def on_handle_response(self, data): request_id = data["id"] if request_id in self.direct_requests: - self.logger.info(f"Caught response {repr(data)} to direct request {self.direct_requests[request_id]}") + self.logger.info( + f"Caught response {repr(data)} to direct request {self.direct_requests[request_id]}") else: - self.logger.error(f"Can't find request with id:{request_id} for response:{repr(data)}") + self.logger.error( + f"Can't find request with id:{request_id} for response:{repr(data)}") def handle_order_book_change(self, message): data = message["params"]["data"] @@ -139,7 +153,7 @@ def handle_user_orders(self, message): def handle_price_index(self, message): data = message["params"]["data"] index_name = data['index_name'] - self.logger.debug(f"{index_name}: {repr(data)}") + self.logger.info(f"{index_name}: {repr(data)}") def handle_trades(self, message): data = message["params"]["data"] From 88aa9503862dbfd2aa8f4a855e808b4b9b30ce55 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 16:48:46 +0200 Subject: [PATCH 29/32] minor changes --- .gitignore | 1 + requirements.txt | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 04e1286..99ced0e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea/ +dump.txt # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/requirements.txt b/requirements.txt index 1f3b5da..39b72bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ -aiohttp==3.6.2 -sortedcontainers==2.1.0 +aiohttp>=3.6 +sortedcontainers>=2.2 ssc2ce_cpp python-dotenv twine +pylint +autopep8 \ No newline at end of file From a053d84f5669189cdba72d4ae3c9e1f4cc425a32 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo Date: Tue, 16 Jun 2020 16:49:19 +0200 Subject: [PATCH 30/32] Run examples from a clone - updated --- README.md | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 1088d96..d76ef1b 100644 --- a/README.md +++ b/README.md @@ -91,21 +91,19 @@ except KeyboardInterrupt: ## Run examples from a clone If you clone repository you can run examples from the root directory. + ```bash -$ PYTHONPATH=.:$PYTHONPATH python examples/basic_example.py +$ PYTHONPATH=.:$PYTHONPATH python examples/bitfinex_basic_example.py ``` -The deribit_private.py example uses [python-dotenv](https://github.com/theskumar/python-dotenv), you must either install it if you want the example to work right out of the box, +To run some examples, you may need additional modules, you can install them from the '_requirements.txt_' file. + ```bash -$ pip install python-dotenv -``` -or make the corresponding changes, removed followed code. -```python -from dotenv import load_dotenv -dotenv_path = os.path.join(os.path.dirname(__file__), '.env') -load_dotenv(dotenv_path) +$ pip install -r requirements.txt ``` + To run the private.py example, you must either fill in the .env file or set the environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET. Look at .env_default. + ```bash -$ PYTHONPATH=.:$PYTHONPATH DERIBIT_CLIENT_ID=YOU_ACCESS_KEY DERIBIT_CLIENT_SECRET=YOU_ACCESS_SECRET python examples/private.py +$ PYTHONPATH=.:$PYTHONPATH DERIBIT_CLIENT_ID=YOU_ACCESS_KEY DERIBIT_CLIENT_SECRET=YOU_ACCESS_SECRET python examples/deribit_private.py ``` From 94652ddaf3e67c023928d962c86d29a097e46988 Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo <22174154+olned@users.noreply.github.com> Date: Tue, 16 Jun 2020 16:50:33 +0200 Subject: [PATCH 31/32] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d76ef1b..8fab786 100644 --- a/README.md +++ b/README.md @@ -96,7 +96,7 @@ If you clone repository you can run examples from the root directory. $ PYTHONPATH=.:$PYTHONPATH python examples/bitfinex_basic_example.py ``` -To run some examples, you may need additional modules, you can install them from the '_requirements.txt_' file. +To run some examples, you may need additional modules, you can install them from the `requirements.txt` file. ```bash $ pip install -r requirements.txt From 509ebd5b27c78d1c051ab50172ed6641ecacc6cc Mon Sep 17 00:00:00 2001 From: Oleg Nedbaylo <22174154+olned@users.noreply.github.com> Date: Tue, 16 Jun 2020 16:53:18 +0200 Subject: [PATCH 32/32] version="0.12.1" --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index b813a89..3c74d27 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name='ssc2ce', - version="0.12.0", + version="0.12.1", author='Oleg Nedbaylo', author_email='olned64@gmail.com', description='A Set of Simple Connectors for access To Cryptocurrency Exchanges',