diff --git a/.gitignore b/.gitignore index 894a44c..99ced0e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +.idea/ +dump.txt + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/README.md b/README.md index 2640d8d..8fab786 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,15 @@ A Set of Simple Connectors for access To Cryptocurrency Exchanges via websocket based on [aiohttp](https://aiohttp.readthedocs.io) . +This is more of a pilot project, if you have any wishes for adding exchanges or expanding functionality, please register issues + ## Installation Install ssc2ce with: ```bash $ pip install ssc2ce ``` +You can run some examples with ## Bitfinex ### Description API description look at [Websocket API v2](https://docs.bitfinex.com/v2/docs/ws-general) @@ -64,7 +67,7 @@ async def subscribe(): }) -async def handle_subscription(data): +def handle_subscription(data): method = data.get("method") if method and method == "subscription": if data["params"]["channel"].startswith("deribit_price_index"): @@ -88,21 +91,19 @@ except KeyboardInterrupt: ## Run examples from a clone If you clone repository you can run examples from the root directory. + ```bash -$ PYTHONPATH=.:$PYTHONPATH python examples/basic_example.py +$ PYTHONPATH=.:$PYTHONPATH python examples/bitfinex_basic_example.py ``` -The deribit_private.py example uses [python-dotenv](https://github.com/theskumar/python-dotenv), you must either install it if you want the example to work right out of the box, +To run some examples, you may need additional modules, you can install them from the `requirements.txt` file. + ```bash -$ pip install python-dotenv -``` -or make the corresponding changes, removed followed code. -```python -from dotenv import load_dotenv -dotenv_path = os.path.join(os.path.dirname(__file__), '.env') -load_dotenv(dotenv_path) +$ pip install -r requirements.txt ``` + To run the private.py example, you must either fill in the .env file or set the environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET. Look at .env_default. + ```bash -$ PYTHONPATH=.:$PYTHONPATH DERIBIT_CLIENT_ID=YOU_ACCESS_KEY DERIBIT_CLIENT_SECRET=YOU_ACCESS_SECRET python examples/private.py +$ PYTHONPATH=.:$PYTHONPATH DERIBIT_CLIENT_ID=YOU_ACCESS_KEY DERIBIT_CLIENT_SECRET=YOU_ACCESS_SECRET python examples/deribit_private.py ``` diff --git a/examples/bitfinex_book.py b/examples/bitfinex_book.py index f850710..fb9fdf0 100644 --- a/examples/bitfinex_book.py +++ b/examples/bitfinex_book.py @@ -2,9 +2,11 @@ import asyncio import logging -from ssc2ce.bitfinex import Bitfinex, L2Book +from ssc2ce.bitfinex import Bitfinex, BitfinexL2Book -logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig( + format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', + level=logging.INFO) logger = logging.getLogger("bitfinex-basic-example") conn = Bitfinex() @@ -12,11 +14,11 @@ class BookMaintainer: def __init__(self, instrument): - self.book = L2Book(instrument) + self.book = BitfinexL2Book(instrument) self.check_sum = None self.active = False - self.top_bid = [0, 0] - self.top_ask = [0, 0] + self.top_bid = 0. + self.top_ask = 0. async def handle_book(self, message, connector): if type(message[1]) is str: @@ -29,10 +31,11 @@ async def handle_book(self, message, connector): self.book.handle_snapshot(message) self.book.time = message[-1] - if self.top_ask[0] != self.book.asks[0][0] or self.top_ask[0] != self.book.asks[0][0]: - self.top_ask = self.book.asks[0].copy() - self.top_bid = self.book.bids[0].copy() - print(f"{self.book.instrument} bid:{self.top_bid[0]} ask:{self.top_ask[0]}") + if self.top_ask != self.book.top_ask_price() or self.top_bid != self.book.top_bid_price(): + self.top_ask = self.book.top_ask_price() + self.top_bid = self.book.top_bid_price() + logger.info( + f"{self.book.instrument()} bid:{self.top_bid} ask:{self.top_ask}") btc = BookMaintainer("BTC-USD") diff --git a/examples/book_watcher.py b/examples/book_watcher.py new file mode 100644 index 0000000..2a11135 --- /dev/null +++ b/examples/book_watcher.py @@ -0,0 +1,23 @@ +from ssc2ce.deribit.l2_book import L2Book + + +class BookWatcher: + def __init__(self, parser, print_it: bool = True): + self.betters = {} + self.print_it = print_it + parser.set_on_book_setup(self.handle_book_setup) + parser.set_on_book_update(self.handle_book_update) + + def handle_book_setup(self, book: L2Book): + top = [book.top_bid_price(), book.top_ask_price()] + self.betters[book.instrument()] = top + if self.print_it: + print(f"{book.instrument()} bid:{top[0]} ask:{top[1]}") + + def handle_book_update(self, book: L2Book): + top = self.betters[book.instrument()] + if top[1] != book.top_ask_price() or top[0] != book.top_bid_price(): + top[1] = book.top_ask_price() + top[0] = book.top_bid_price() + if self.print_it: + print(f"{book.instrument()} bid:{top[0]} ask:{top[1]}") diff --git a/examples/deribit_basic_example.py b/examples/deribit_basic_example.py index e0675b2..8626554 100644 --- a/examples/deribit_basic_example.py +++ b/examples/deribit_basic_example.py @@ -8,18 +8,21 @@ pending = {} -async def handle_instruments(data: dict): +def handle_instruments(data: dict): del pending[data["id"]] - print(json.dumps(data)) + print("handle_instruments", json.dumps(data)) if not pending: - await subscribe() + asyncio.ensure_future(subscribe()) -async def handle_currencies(data: dict): +async def get_instruments(symbol): + request_id = await conn.get_instruments(symbol, callback=handle_instruments) + pending[request_id] = symbol + + +def handle_currencies(data: dict): for currency in data["result"]: - symbol = currency["currency"] - id = await conn.get_instruments(symbol, callback=handle_instruments) - pending[id] = symbol + asyncio.ensure_future(get_instruments(currency["currency"])) async def get_currencies(): @@ -30,12 +33,13 @@ async def subscribe(): await conn.send_public(request={ "method": "public/subscribe", "params": { - "channels": ["deribit_price_index.btc_usd"] + "channels": ["deribit_price_index.btc_usd", + "deribit_price_index.eth_usd"] } }) -async def handle_subscription(data): +def handle_subscription(data): method = data.get("method") if method and method == "subscription": if data["params"]["channel"].startswith("deribit_price_index"): diff --git a/examples/deribit_book.py b/examples/deribit_book.py index d98c838..84b6f2d 100644 --- a/examples/deribit_book.py +++ b/examples/deribit_book.py @@ -2,15 +2,34 @@ import asyncio import json import logging +import sys + +from examples.book_watcher import BookWatcher from ssc2ce import Deribit -from ssc2ce.deribit.l2_book import L2Book + +if len(sys.argv) > 1 and "cpp" in sys.argv: + from importlib import util + + ssc2ce_cpp_spec = util.find_spec("ssc2ce_cpp") + if ssc2ce_cpp_spec: + from ssc2ce_cpp import DeribitParser + else: + print("You must install the ssc2ce_cpp module to use its features.\n pip install ssc2ce_cpp") + exit(1) +else: + from ssc2ce.deribit.parser import DeribitParser conn = Deribit() +parser = DeribitParser() +watcher = BookWatcher(parser) + +# parser.set_on_unprocessed_message(conn.handle_message) +conn.on_message = parser.parse pending = {} -books = {} +instruments = [] -logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.DEBUG) logger = logging.getLogger("deribit-book") @@ -19,17 +38,18 @@ async def handle_instruments(data: dict): del pending[request_id] print(json.dumps(data)) if not pending: - await subscribe_books(list(books.keys())) + await conn.send_public(request={ + "method": "public/subscribe", + "params": { + "channels": [f"book.{i}.raw" for i in instruments] + } + }) async def handle_currencies(data: dict): for currency in data["result"]: symbol = currency["currency"] - instrument = symbol+"-PERPETUAL" - book = L2Book(instrument) - book.top_bid = [0., 0.] - book.top_ask = [0., 0.] - books[instrument] = book + instruments.append(symbol + "-PERPETUAL") request_id = await conn.get_instruments(symbol, kind="future", callback=handle_instruments) pending[request_id] = symbol @@ -38,39 +58,7 @@ async def get_currencies(): await conn.get_currencies(handle_currencies) -async def subscribe_books(instruments: list): - await conn.send_public(request={ - "method": "public/subscribe", - "params": { - "channels": [f"book.{i}.raw" for i in instruments] - } - }) - - -async def handle_subscription(data): - method = data.get("method") - if method and method == "subscription": - params = data["params"] - channel = params["channel"] - if channel.startswith("book."): - params_data = params["data"] - instrument = params_data["instrument_name"] - book: L2Book = books[instrument] - if "prev_change_id" in params_data: - book.handle_update(params_data) - else: - book.handle_snapshot(params_data) - - if book.top_ask[0] != book.asks[0][0] or book.top_bid[0] != book.bids[0][0]: - book.top_ask = book.asks[0].copy() - book.top_bid = book.bids[0].copy() - print(f"{instrument} bid:{book.top_bid[0]} ask:{book.top_ask[0]}") - else: - print("Unknown channel", json.dumps(data)) - - conn.on_connect_ws = get_currencies -conn.method_routes += [("subscription", handle_subscription)] loop = asyncio.get_event_loop() diff --git a/examples/deribit_parser.py b/examples/deribit_parser.py new file mode 100644 index 0000000..61d53ed --- /dev/null +++ b/examples/deribit_parser.py @@ -0,0 +1,47 @@ +import asyncio +import logging +from time import time + +from examples.book_watcher import BookWatcher + +import sys + +if len(sys.argv) > 1 and "cpp" in sys.argv: + from importlib import util + + ssc2ce_cpp_spec = util.find_spec("ssc2ce_cpp") + if ssc2ce_cpp_spec: + from ssc2ce_cpp import DeribitParser + else: + print("You must install the ssc2ce_cpp module to use its features.\n pip install ssc2ce_cpp") + exit(1) +else: + from ssc2ce.deribit.parser import DeribitParser + +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger("deribit-parser") + + +class FileController: + def __init__(self): + self.parser = DeribitParser() + self.counter = 0 + self.watcher = BookWatcher(self.parser, False) + + def run(self, filename: str): + start = time() + i = 0 + with open(filename) as f: + for line in f: + i += 1 + if not self.parser.parse(line): + self.handle_message(line) + + logger.info(f"{i} in {time() - start} sec. {self.counter}") + + def handle_message(self, message: str) -> None: + self.counter += 1 + # logger.info(message[:-1]) + + +FileController().run("dump.txt") diff --git a/examples/deribit_private.py b/examples/deribit_private.py index f561c33..4f35e82 100644 --- a/examples/deribit_private.py +++ b/examples/deribit_private.py @@ -12,7 +12,8 @@ class MyApp: def __init__(self): - logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) + logging.basicConfig( + format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) self.logger = logging.getLogger("deribit-private") self.direct_requests = {} @@ -23,7 +24,8 @@ def __init__(self): client_secret = os.environ.get('DERIBIT_CLIENT_SECRET') if client_id is None or client_secret is None: - self.logger.error("Please setup environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET") + self.logger.error( + "Please setup environment variables DERIBIT_CLIENT_ID and DERIBIT_CLIENT_SECRET") exit(0) self.deribit = Deribit(client_id=client_id, @@ -31,6 +33,7 @@ def __init__(self): auth_type=AuthType.CREDENTIALS, scope=None, get_id=lambda: str(uuid4())) + self.deribit.on_handle_response = self.on_handle_response self.deribit.on_authenticated = self.after_login self.deribit.on_token = self.on_token @@ -75,17 +78,27 @@ async def do_something_after_login(self): await self.deribit.send_public(request={ "method": "private/subscribe", "params": { - "channels": ["book.BTC-PERPETUAL.raw", - "trades.BTC-PERPETUAL.raw", - "user.orders.BTC-PERPETUAL.raw", - "user.trades.BTC-PERPETUAL.raw"] + "channels": [ + "deribit_price_index.btc_usd", + "book.BTC-PERPETUAL.raw", + "trades.BTC-PERPETUAL.raw", + "user.orders.BTC-PERPETUAL.raw", + "user.trades.BTC-PERPETUAL.raw" + ] + } + }) + + await self.deribit.send_public(request={ + "method": "public/set_heartbeat", + "params": { + "interval": 15 } }) async def printer(self, **kwargs): self.logger.info(f"{repr(kwargs)}") - @staticmethod + @ staticmethod def resolve_route(value, routes): key, handler = None, None for key, handler in routes: @@ -99,57 +112,59 @@ def resolve_route(value, routes): if key is not None and key == "" and handler: return handler - async def handle_subscription(self, data: dict): + def handle_subscription(self, data: dict): channel = data["params"]["channel"] handler = self.resolve_route(channel, self.subscription_route) if handler: - return await handler(data) + return handler(data) - async def after_login(self): + def after_login(self): asyncio.ensure_future(self.do_something_after_login()) async def setup_refresh(self, refresh_interval): await asyncio.sleep(refresh_interval) await self.deribit.auth_refresh_token() - async def on_token(self, params): + def on_token(self, params): refresh_interval = min(600, params["expires_in"]) asyncio.ensure_future(self.setup_refresh(refresh_interval)) - async def on_handle_response(self, data): + def on_handle_response(self, data): request_id = data["id"] if request_id in self.direct_requests: - self.logger.info(f"Caught response {repr(data)} to direct request {self.direct_requests[request_id]}") + self.logger.info( + f"Caught response {repr(data)} to direct request {self.direct_requests[request_id]}") else: - self.logger.error(f"Can't find request with id:{request_id} for response:{repr(data)}") + self.logger.error( + f"Can't find request with id:{request_id} for response:{repr(data)}") - async def handle_order_book_change(self, message): + def handle_order_book_change(self, message): data = message["params"]["data"] self.logger.debug(f"{repr(data)}") - async def handle_user_trades(self, message): + def handle_user_trades(self, message): data = message["params"]["data"] self.logger.info(f"{repr(data)}") - async def handle_user_orders(self, message): + def handle_user_orders(self, message): data = message["params"]["data"] self.logger.info(f"{repr(data)}") - async def handle_price_index(self, message): + def handle_price_index(self, message): data = message["params"]["data"] index_name = data['index_name'] - self.logger.debug(f"{index_name}: {repr(data)}") + self.logger.info(f"{index_name}: {repr(data)}") - async def handle_trades(self, message): + def handle_trades(self, message): data = message["params"]["data"] self.logger.debug(f"{repr(data)}") - async def handle_ticker(self, message): + def handle_ticker(self, message): data = message["params"]["data"] instrument_name = data['instrument_name'] self.logger.debug(f"{instrument_name}: {repr(data)}") - async def on_response_error(self, data): + def on_response_error(self, data): self.logger.error(f"Receive error {repr(data)}") asyncio.ensure_future(self.deribit.stop()) diff --git a/examples/deribit_writer.py b/examples/deribit_writer.py new file mode 100644 index 0000000..169052c --- /dev/null +++ b/examples/deribit_writer.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +import asyncio +import json +import logging +from ssc2ce import Deribit + +conn = Deribit() + +pending = {} + +logging.basicConfig(format='%(asctime)s %(name)s %(funcName)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger("deribit-writer") + +instruments = [] + + +async def handle_instruments(data: dict): + global instruments + request_id = data["id"] + del pending[request_id] + print(json.dumps(data)) + instruments += list(set( + [x["instrument_name"][:11] if not x["instrument_name"].endswith("-PERPETUAL") else x["instrument_name"] for x in + data["result"]])) + + if not pending: + print(instruments) + await subscribe_books(instruments) + + +async def handle_book_summary_by_currency(data: dict): + request_id = data["id"] + del pending[request_id] + print(json.dumps(data)) + + +async def handle_currencies(data: dict): + for currency in data["result"]: + symbol = currency["currency"] + print(json.dumps(data)) + request_id = await conn.get_instruments(symbol, callback=handle_instruments) + pending[request_id] = symbol + request_id = await conn.send_public( + request={"method": "public/get_book_summary_by_currency", "params": {"currency": symbol}}, + callback=handle_instruments) + pending[request_id] = symbol + + +async def get_currencies(): + await conn.get_currencies(handle_currencies) + + +async def subscribe_books(instruments: list): + await conn.send_public(request={ + "method": "public/subscribe", + "params": { + "channels": [f"book.{i}.raw" for i in instruments] + } + }) + + await conn.send_public(request={ + "method": "public/set_heartbeat", + "params": { + "interval": 15 + } + }) + + +output = open("dump.txt", "w") + + +def dump(msg: str): + output.write(msg) + output.write('\n') + + +def stop(): + asyncio.ensure_future(conn.ws.close()) + + +conn.on_before_handling = dump + +conn.on_connect_ws = get_currencies + +loop = asyncio.get_event_loop() +loop.call_later(3600, stop) + +try: + loop.run_until_complete(conn.run_receiver()) +except KeyboardInterrupt: + print("Application closed by KeyboardInterrupt.") diff --git a/requirements.txt b/requirements.txt index cd2dd54..39b72bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ -aiohttp +aiohttp>=3.6 +sortedcontainers>=2.2 + +ssc2ce_cpp python-dotenv twine -wheel -sortedcontainers +pylint +autopep8 \ No newline at end of file diff --git a/setup.py b/setup.py index cc01795..3c74d27 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setuptools.setup( name='ssc2ce', - version="0.11.0", + version="0.12.1", author='Oleg Nedbaylo', author_email='olned64@gmail.com', description='A Set of Simple Connectors for access To Cryptocurrency Exchanges', @@ -15,7 +15,7 @@ long_description_content_type="text/markdown", url='https://github.com/olned/ssc2ce-python', packages=setuptools.find_packages(), - install_requires=['aiohttp'], + install_requires=['aiohttp', 'sortedcontainers'], classifiers=[ "Programming Language :: Python :: 3.6", "License :: OSI Approved :: MIT License", diff --git a/ssc2ce/VERSION.py b/ssc2ce/VERSION.py deleted file mode 100644 index 8088f75..0000000 --- a/ssc2ce/VERSION.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.8.1" diff --git a/ssc2ce/bitfinex/__init__.py b/ssc2ce/bitfinex/__init__.py index 5c1f100..1f2dac7 100644 --- a/ssc2ce/bitfinex/__init__.py +++ b/ssc2ce/bitfinex/__init__.py @@ -1,2 +1,2 @@ from .bitfinex import Bitfinex -from .l2_book import L2Book +from .l2_book import BitfinexL2Book diff --git a/ssc2ce/bitfinex/l2_book.py b/ssc2ce/bitfinex/l2_book.py index 71d1599..1cd89b4 100644 --- a/ssc2ce/bitfinex/l2_book.py +++ b/ssc2ce/bitfinex/l2_book.py @@ -1,11 +1,11 @@ import logging -from ssc2ce.common.abstract_l2_book import AbstractL2Book +from ssc2ce.common import L2Book from collections import deque from ssc2ce.common.exceptions import BrokenOrderBook -class L2Book(AbstractL2Book): +class BitfinexL2Book(L2Book): change_id = None timestamp = None logger = logging.getLogger(__name__) @@ -15,7 +15,7 @@ def __init__(self, instrument: str): :param instrument: """ - AbstractL2Book.__init__(self, instrument) + L2Book.__init__(self, instrument) def handle_snapshot(self, message: dict) -> None: """ @@ -23,15 +23,14 @@ def handle_snapshot(self, message: dict) -> None: :param message: :return: """ - self.asks.clear() - self.bids.clear() + self.clear() for item in message[1]: price = item[2] if price < 0: - self.asks.add([item[0], -price]) + self.asks.add(item[0], -price) else: - self.bids.add([item[0], price]) + self.bids.add(item[0], price) def handle_update(self, message: dict) -> None: """ diff --git a/ssc2ce/common/__init__.py b/ssc2ce/common/__init__.py index ae5724c..3f4442b 100644 --- a/ssc2ce/common/__init__.py +++ b/ssc2ce/common/__init__.py @@ -1 +1,2 @@ -from .auth_type import AuthType \ No newline at end of file +from .auth_type import AuthType +from .l2_book import L2Book, L2BookSide diff --git a/ssc2ce/common/abstract_l2_book.py b/ssc2ce/common/abstract_l2_book.py deleted file mode 100644 index 7535508..0000000 --- a/ssc2ce/common/abstract_l2_book.py +++ /dev/null @@ -1,44 +0,0 @@ -from sortedcontainers import SortedKeyList -from abc import abstractmethod, ABC -from .l2_book_side import L2BookSide, SortedKeyList - - -class AbstractL2Book(ABC): - """ - - """ - - def __init__(self, instrument: str): - """ - - :param instrument: - """ - self.instrument = instrument - self._bids = L2BookSide(is_bids=True) - self._asks = L2BookSide(is_bids=False) - - @abstractmethod - def handle_snapshot(self, message: dict) -> None: - """ - - :param message: - :return: - """ - pass - - @abstractmethod - def handle_update(self, message: dict) -> None: - """ - - :param message: - :return: - """ - pass - - @property - def bids(self): - return self._bids.data - - @property - def asks(self): - return self._asks.data diff --git a/ssc2ce/common/l2_book.py b/ssc2ce/common/l2_book.py new file mode 100644 index 0000000..ca41995 --- /dev/null +++ b/ssc2ce/common/l2_book.py @@ -0,0 +1,37 @@ +from .l2_book_side import L2BookSide + + +class L2Book: + """ + + """ + + def __init__(self, instrument: str): + """ + + :param instrument: + """ + self._instrument = instrument + self._bids = L2BookSide(is_bids=True) + self._asks = L2BookSide(is_bids=False) + + def instrument(self) -> str: + return self._instrument + + @property + def bids(self) -> L2BookSide: + return self._bids + + @property + def asks(self) -> L2BookSide: + return self._asks + + def clear(self): + self._bids.data.clear() + self._asks.data.clear() + + def top_ask_price(self): + return self._asks.data[0][0] + + def top_bid_price(self): + return self._bids.data[0][0] diff --git a/ssc2ce/common/l2_book_side.py b/ssc2ce/common/l2_book_side.py index 14874dd..581b99e 100644 --- a/ssc2ce/common/l2_book_side.py +++ b/ssc2ce/common/l2_book_side.py @@ -12,17 +12,13 @@ def __init__(self, is_bids: bool): self.data = SortedKeyList(key=lambda val: val[0]) self.is_bids = is_bids self.time = None - self.changes = list() def fill(self, source): self.data.clear() for item in source: self.add(item) - def add(self, item): - price = float(item[0]) - size = float(item[1]) - self.changes.append([price, size, size]) + def add(self, price: float, size: float): self.data.add([price, size]) def update(self, price: float, size: float): @@ -33,30 +29,23 @@ def update(self, price: float, size: float): value = self.data[i] else: if size <= VERY_SMALL_NUMBER: - self.changes.append([price, size, 0.0]) return False self.data.add([price, size]) - self.changes.append([price, size, size]) return True if size <= VERY_SMALL_NUMBER: if value[0] == price: old_size = self.data[i][1] self.data.discard(value) - self.changes.append([price, size, -old_size]) return True else: - self.changes.append([price, size, 0.0]) return False if value[0] == price: - old_size = self.data[i][1] self.data[i][1] = size - self.changes.append([price, size, size - old_size]) else: self.data.add([price, size]) - self.changes.append([price, size, size]) return True def delete(self, price: float): diff --git a/ssc2ce/deribit/deribit.py b/ssc2ce/deribit/deribit.py index 99e2e7e..a803d5b 100644 --- a/ssc2ce/deribit/deribit.py +++ b/ssc2ce/deribit/deribit.py @@ -1,4 +1,7 @@ +import asyncio +import json import logging + from time import time import aiohttp @@ -44,10 +47,13 @@ def __init__(self, self.on_connect_ws = self.auth_login if auth_type != AuthType.NONE else None self.on_close_ws = None - self.on_message = self.handle_message + + self._on_message_is_routine = False + self._on_message = None + self.on_authenticated = None self.on_token = None - # self.on_subscription = None + self.on_before_handling = None self.on_response_error = None self.on_handle_response = None @@ -73,6 +79,15 @@ def __init__(self, ("", self.empty_handler), ] + @property + def on_message(self): + return self._on_message + + @on_message.setter + def on_message(self, value): + self._on_message_is_routine = asyncio.iscoroutinefunction(value) + self._on_message = value + async def run_receiver(self): """ Establish a connection and start the receiver loop. @@ -80,7 +95,10 @@ async def run_receiver(self): """ self.ws = await self._session.ws_connect(self.ws_api) if self.on_connect_ws: - await self.on_connect_ws() + if asyncio.iscoroutinefunction(self.on_connect_ws): + await self.on_connect_ws() + else: + self.on_connect_ws() # A receiver loop while self.ws and not self.ws.closed: @@ -98,8 +116,21 @@ async def run_receiver(self): self.logger.debug(f"Connection closing {repr(message)}") continue - if self.on_message: - await self.on_message(message) + if message.type == aiohttp.WSMsgType.TEXT: + if self.on_before_handling: + self.on_before_handling(message.data) + + processed = False + if self._on_message: + if self._on_message_is_routine: + processed = await self._on_message(message.data) + else: + processed = self._on_message(message.data) + + if not processed: + self.handle_message(message.data) + else: + self.logger.warning(f"Unknown type of message {repr(message)}") def close(self): super()._close() @@ -111,7 +142,7 @@ async def stop(self): """ await self.ws.close() - async def send_public(self, request: dict, callback=None) -> int: + async def send_public(self, request: dict, callback=None, logging_it: bool = True) -> int: """ Send a public request @@ -125,7 +156,8 @@ async def send_public(self, request: dict, callback=None) -> int: "id": request_id, **request } - self.logger.info(f"sending:{repr(hide_secret(request))}") + if logging_it: + self.logger.info(f"sending:{repr(hide_secret(request))}") await self.ws.send_json(request) if callback: @@ -343,50 +375,27 @@ async def get_instruments(self, currency: str, kind: str = None, expired: bool = return await self.send_public(request=request, callback=callback) - async def handle_message(self, message: aiohttp.WSMessage) -> None: + def handle_message(self, message: str) -> None: """ :param message: :return: """ - if message.type == aiohttp.WSMsgType.TEXT: - data = message.json() - self.logger.debug(f"handling:{repr(hide_secret(data))}") - - if "method" in data: - await self.handle_method_message(data) - else: - if "id" in data: - if "error" in data: - if self.on_response_error: - await self.on_response_error(data) - else: - self.logger.error(f"Receive error {repr(data)}") - else: - request_id = data["id"] - request = self.requests.get(request_id) - if request: - if "callback" in request: - callback = request["callback"] - await callback(data) - else: - await self.handle_response(request=request, response=data) - - del self.requests[request_id] - else: - if self.on_handle_response: - await self.on_handle_response(data) - else: - self.logger.warning( - f"Unknown id:{request_id}, the on_handle_response event must be defined." - f" Unhandled message {data}") - - else: - self.logger.warning(f"Unsupported message {message.data}") + data = json.loads(message) + if "method" in data: + self.handle_method_message(data) else: - self.logger.warning(f"Unknown type of message {repr(message)}") + if "id" in data: + if "error" in data: + self.handle_error(data) + else: + request_id = data["id"] + if request_id: + self.handle_response(request_id, data) + else: + self.handle_method_message(data) - async def empty_handler(self, **kwargs) -> None: + def empty_handler(self, **kwargs) -> None: """ A default handler :param kwargs: @@ -394,22 +403,48 @@ async def empty_handler(self, **kwargs) -> None: """ self.logger.debug(f"{repr(kwargs)}") - async def handle_response(self, request, response) -> None: + def handle_response(self, request_id: int, response: dict) -> None: """ - :param request: + :param request_id: :param response: :return: """ - method = request["method"] - handler = resolve_route(method, self.response_routes) - if handler: - return await handler(request=request, response=response) + request = self.requests.get(request_id) + if request: + if "callback" in request: + callback = request["callback"] + if asyncio.iscoroutinefunction(callback): + asyncio.ensure_future(callback(response)) + else: + callback(response) + else: + method = request["method"] + handler = resolve_route(method, self.response_routes) + if handler: + if asyncio.iscoroutinefunction(handler): + asyncio.ensure_future(handler(request=request, response=response)) + else: + handler(request=request, response=response) + else: + self.logger.warning(f"Unhandled method:{method} response:{repr(response)} " + f"to request:{repr(request)}.") - self.logger.warning(f"Unhandled method:{method} response:{repr(response)} to request:{repr(request)}.") + del self.requests[request_id] + else: + if self.on_handle_response: + if asyncio.iscoroutinefunction(self.on_handle_response): + asyncio.ensure_future(self.on_handle_response(response)) + else: + self.on_handle_response(response) + return + else: + self.logger.warning( + f"Unknown id:{request_id}, the on_handle_response event must be defined." + f" Unhandled message {response}") - async def handle_method_message(self, data) -> None: + def handle_method_message(self, data) -> None: """ :param data: @@ -419,20 +454,20 @@ async def handle_method_message(self, data) -> None: handler = resolve_route(method, self.method_routes) if handler: - return await handler(data) + handler(data) + elif not self.on_before_handling: + self.logger.warning(f"Unhandled message:{repr(data)}.") - self.logger.warning(f"Unhandled message:{repr(data)}.") - - async def handle_heartbeat(self, data): + def handle_heartbeat(self, data): """ :param data: :return: """ if data["params"]["type"] == "test_request": - await self.send_public({"method": "public/test", "params": {}}) + asyncio.ensure_future(self.send_public({"method": "public/test", "params": {}}, logging_it=False)) - async def handle_auth(self, request, response) -> None: + def handle_auth(self, request, response) -> None: """ :param request: @@ -443,14 +478,20 @@ async def handle_auth(self, request, response) -> None: grant_type = request["params"]["grant_type"] if grant_type == "": if self.on_token: - await self.on_token(response["result"]) + self.on_token(response["result"]) elif grant_type in ("client_credentials", "password"): # TODO - Why we use two handlers? if self.on_authenticated: - await self.on_authenticated() + self.on_authenticated() if self.on_token: - await self.on_token(response["result"]) + self.on_token(response["result"]) elif grant_type == "client_signature": pass else: self.logger.error(f"Unknown grant_type {repr(hide_secret(request))} : {repr(hide_secret(response))}") + + def handle_error(self, message): + if self.on_response_error: + self.on_response_error(message) + else: + self.logger.error(message) diff --git a/ssc2ce/deribit/l2_book.py b/ssc2ce/deribit/l2_book.py index 428a4ac..e9801ae 100644 --- a/ssc2ce/deribit/l2_book.py +++ b/ssc2ce/deribit/l2_book.py @@ -1,67 +1,8 @@ -import logging -from ssc2ce.common.abstract_l2_book import AbstractL2Book -from collections import deque +from ssc2ce.common.l2_book import L2Book -from ssc2ce.common.exceptions import BrokenOrderBook - - -class L2Book(AbstractL2Book): - change_id = None - timestamp = None - logger = logging.getLogger(__name__) +class DeribitL2Book(L2Book): def __init__(self, instrument: str): - """ - - :param instrument: - """ - AbstractL2Book.__init__(self, instrument) - - def handle_snapshot(self, message: dict) -> None: - """ - - :param message: - :return: - """ - self.asks.clear() - self.bids.clear() - - self.change_id = message["change_id"] - self.timestamp = message["timestamp"] - for i in message['bids']: - self.bids.add([i[1], i[2]]) - - for i in message['asks']: - self.asks.add([i[1], i[2]]) - - def handle_update(self, message: dict) -> None: - """ - - :param message: - :return: - """ - prev_change_id = message["prev_change_id"] - if prev_change_id != self.change_id: - raise BrokenOrderBook(self.instrument, prev_change_id, self.change_id) - - self.change_id = message["change_id"] - self.timestamp = message["timestamp"] - for change in message['bids']: - if change[0] == 'new': - self._bids.add(change[1:]) - elif change[0] == 'delete': - self._bids.delete(change[1]) - else: - self._bids.update(price=change[1], size=change[2]) - - for change in message['asks']: - if change[0] == 'new': - self._asks.add(change[1:]) - elif change[0] == 'delete': - self._asks.delete(change[1]) - else: - self._asks.update(price=change[1], size=change[2]) - - -def create_l2_order_book(instrument: str) -> AbstractL2Book: - return L2Book(instrument) + L2Book.__init__(self, instrument) + self.change_id = None + self.timestamp = None diff --git a/ssc2ce/deribit/parser.py b/ssc2ce/deribit/parser.py new file mode 100644 index 0000000..a2a6ff5 --- /dev/null +++ b/ssc2ce/deribit/parser.py @@ -0,0 +1,116 @@ +import asyncio +import json +import logging + +from ssc2ce.deribit.l2_book import DeribitL2Book, L2Book +from ssc2ce.common.exceptions import BrokenOrderBook + + +class DeribitParser: + """ + + """ + + def __init__(self): + self._on_book_setup = None + self._on_book_update = None + self.logger = logging.getLogger(__name__) + self.deprecated_already_warn = False + self.books = {} + + def set_on_book_setup(self, handler) -> None: + self._on_book_setup = handler + + def set_on_book_update(self, handler) -> None: + self._on_book_update = handler + + async def handle_message(self, message): + if not self.deprecated_already_warn: + self.deprecated_already_warn = True + self.logger.warning("The handle_message method of DeribitParser is deprecated, please use parse instead") + self.parse(message) + + def parse(self, message) -> bool: + data = json.loads(message) + + return self.handle_method_message(data) + + def handle_method_message(self, data: dict) -> bool: + method = data.get("method") + processed = False + if method and method == "subscription": + params = data["params"] + channel = params["channel"] + if channel.startswith("book."): + params_data = params["data"] + instrument = params_data["instrument_name"] + book = self.get_book(instrument) + + if "prev_change_id" in params_data: + self.handle_update(book, params_data) + if self._on_book_update: + self._on_book_update(book) + else: + self.handle_snapshot(book, params_data) + if self._on_book_setup: + self._on_book_setup(book) + + processed = True + + return processed + + def get_book(self, instrument: str) -> L2Book: + book: L2Book = self.books.get(instrument) + if book is None: + book = L2Book(instrument) + self.books[instrument] = book + + return book + + @staticmethod + def handle_snapshot(book: DeribitL2Book, message: dict) -> None: + """ + + :param book: + :param message: + :return: + """ + book.clear() + + book.change_id = message["change_id"] + book.timestamp = message["timestamp"] + for i in message['bids']: + book.bids.add(i[1], i[2]) + + for i in message['asks']: + book.asks.add(i[1], i[2]) + + @staticmethod + def handle_update(book: DeribitL2Book, message: dict) -> None: + """ + + :param book: + :param message: + :return: + """ + prev_change_id = message["prev_change_id"] + if prev_change_id != book.change_id: + raise BrokenOrderBook(book.instrument, prev_change_id, book.change_id) + + book.change_id = message["change_id"] + book.timestamp = message["timestamp"] + for change in message['bids']: + if change[0] == 'new': + book.bids.add(price=change[1], size=change[2]) + elif change[0] == 'delete': + book.bids.delete(change[1]) + else: + book.bids.update(price=change[1], size=change[2]) + + for change in message['asks']: + if change[0] == 'new': + book.asks.add(price=change[1], size=change[2]) + elif change[0] == 'delete': + book.asks.delete(change[1]) + else: + book.asks.update(price=change[1], size=change[2])