From bce5623e1c01a56c07bec818b6ba478a66b7773c Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 15 Jan 2025 23:02:54 +0200 Subject: [PATCH 01/10] Adds an event loop manager for executing coroutines easier. Removes initialisation of AsyncSubstrateInterface at object instantiation. Creates a factory function for creating an initialised AsyncSubstrateInterface. --- .../substrate_interface.py | 68 ++++++++++++------- async_substrate_interface/utils/__init__.py | 64 ++++++++++------- 2 files changed, 84 insertions(+), 48 deletions(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index 66876ff..9ef6d14 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -41,7 +41,7 @@ ExtrinsicNotFound, BlockNotFound, ) -from async_substrate_interface.utils import execute_coroutine, hex_to_bytes +from async_substrate_interface.utils import hex_to_bytes, EventLoopManager from async_substrate_interface.utils.storage import StorageKey if TYPE_CHECKING: @@ -978,6 +978,7 @@ async def send(self, payload: dict) -> int: self.id += 1 # self._open_subscriptions += 1 try: + print(self.ws) await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id except (ConnectionClosed, ssl.SSLError, EOFError): @@ -1026,9 +1027,7 @@ def __init__( sync_calls: bool = False, max_retries: int = 5, retry_timeout: float = 60.0, - event_loop: Optional[asyncio.BaseEventLoop] = None, _mock: bool = False, - pre_initialize: bool = True, ): """ The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to @@ -1080,16 +1079,10 @@ def __init__( ) self.__metadata_cache = {} self.metadata_version_hex = "0x0f000000" # v15 - self.event_loop = event_loop or asyncio.get_event_loop() - self.sync_calls = sync_calls self.extrinsic_receipt_cls = ( - AsyncExtrinsicReceipt if self.sync_calls is False else ExtrinsicReceipt + AsyncExtrinsicReceipt if sync_calls is False else ExtrinsicReceipt ) - if pre_initialize: - if not _mock: - self.event_loop.create_task(self.initialize()) - else: - self.reload_type_registry() + self.reload_type_registry() async def __aenter__(self): await self.initialize() @@ -1104,7 +1097,6 @@ async def initialize(self): if not self.__chain: chain = await self.rpc_request("system_chain", []) self.__chain = chain.get("result") - self.reload_type_registry() await asyncio.gather(self.load_registry(), self._init_init_runtime()) self.initialized = True @@ -3999,12 +3991,12 @@ async def _handler(block_data: dict[str, Any]): class SyncWebsocket: - def __init__(self, websocket: "Websocket", event_loop: asyncio.AbstractEventLoop): + def __init__(self, websocket: "Websocket", event_loop_manager: EventLoopManager): self._ws = websocket - self._event_loop = event_loop + self._event_loop_mgr = event_loop_manager def close(self): - execute_coroutine(self._ws.shutdown(), event_loop=self._event_loop) + self._event_loop_mgr.run(self._ws.shutdown()) class SubstrateInterface: @@ -4013,7 +4005,7 @@ class SubstrateInterface: """ url: str - event_loop: asyncio.AbstractEventLoop + event_loop_mgr: EventLoopManager websocket: "SyncWebsocket" def __init__( @@ -4024,11 +4016,10 @@ def __init__( ss58_format: Optional[int] = None, type_registry: Optional[dict] = None, chain_name: Optional[str] = None, - event_loop: Optional[asyncio.AbstractEventLoop] = None, + event_loop_manager: Optional[EventLoopManager] = None, _mock: bool = False, substrate: Optional["AsyncSubstrateInterface"] = None, ): - event_loop = substrate.event_loop if substrate else event_loop self.url = url self._async_instance = ( AsyncSubstrateInterface( @@ -4038,15 +4029,13 @@ def __init__( ss58_format=ss58_format, type_registry=type_registry, chain_name=chain_name, - sync_calls=True, - event_loop=event_loop, _mock=_mock, ) if not substrate else substrate ) - self.event_loop = event_loop or asyncio.get_event_loop() - self.websocket = SyncWebsocket(self._async_instance.ws, self.event_loop) + self.event_loop_mgr = event_loop_manager or EventLoopManager() + self.websocket = SyncWebsocket(self._async_instance.ws, self.event_loop_mgr) @property def last_block_hash(self): @@ -4057,10 +4046,10 @@ def metadata(self): return self._async_instance.metadata def __del__(self): - execute_coroutine(self._async_instance.close()) + self.event_loop_mgr.run(self._async_instance.close()) def _run(self, coroutine): - return execute_coroutine(coroutine, self.event_loop) + return self.event_loop_mgr.run(coroutine) def __getattr__(self, name): attr = getattr(self._async_instance, name) @@ -4258,3 +4247,34 @@ def create_storage_key( pallet, storage_function, params, block_hash ) ) + + +async def async_substrate_interface( + url: str, + use_remote_preset: bool = False, + auto_discover: bool = True, + ss58_format: Optional[int] = None, + type_registry: Optional[dict] = None, + chain_name: Optional[str] = None, + sync_calls: bool = False, + max_retries: int = 5, + retry_timeout: float = 60.0, + _mock: bool = False, +) -> "AsyncSubstrateInterface": + """ + Factory function for creating an initialized AsyncSubstrateInterface + """ + substrate = AsyncSubstrateInterface( + url, + use_remote_preset, + auto_discover, + ss58_format, + type_registry, + chain_name, + sync_calls, + max_retries, + retry_timeout, + _mock, + ) + await substrate.initialize() + return substrate diff --git a/async_substrate_interface/utils/__init__.py b/async_substrate_interface/utils/__init__.py index 48ef568..4fe53e1 100644 --- a/async_substrate_interface/utils/__init__.py +++ b/async_substrate_interface/utils/__init__.py @@ -1,9 +1,46 @@ import asyncio -from typing import Optional, TYPE_CHECKING +import threading +from typing import Optional -if TYPE_CHECKING: - from typing import Coroutine +class EventLoopManager: + """Singleton class to manage a living asyncio event loop.""" + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._init_event_loop() + return cls._instance + + def _init_event_loop(self): + self.loop = asyncio.new_event_loop() + self.thread = threading.Thread(target=self._start_loop, daemon=True) + self.thread.start() + + def _start_loop(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + + def run(self, coroutine): + while self.loop is None: + pass + future = asyncio.run_coroutine_threadsafe(coroutine, self.loop) + return future.result() # Blocks until coroutine completes + + def stop(self): + """Stop the event loop.""" + self.loop.call_soon_threadsafe(self.loop.stop) + self.thread.join() + + @classmethod + def get_event_loop(cls) -> asyncio.AbstractEventLoop: + return cls().loop def hex_to_bytes(hex_str: str) -> bytes: @@ -38,24 +75,3 @@ def get_event_loop() -> asyncio.AbstractEventLoop: event_loop = asyncio.get_event_loop() asyncio.set_event_loop(event_loop) return event_loop - - -def execute_coroutine( - coroutine: "Coroutine", event_loop: asyncio.AbstractEventLoop = None -): - """ - Helper function to run an asyncio coroutine synchronously. - - Args: - coroutine (Coroutine): The coroutine to run. - event_loop (AbstractEventLoop): The event loop to use. If `None`, attempts to fetch the already-running - loop. If one is not running, a new loop is created. - - Returns: - The result of the coroutine execution. - """ - if event_loop: - event_loop = event_loop - else: - event_loop = get_event_loop() - return event_loop.run_until_complete(asyncio.wait_for(coroutine, timeout=None)) From 737d25e8ce8fa332fe8ea991df44c835e105ade5 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 15 Jan 2025 23:10:24 +0200 Subject: [PATCH 02/10] Remove print --- async_substrate_interface/substrate_interface.py | 1 - 1 file changed, 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index 9ef6d14..b9393f1 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -978,7 +978,6 @@ async def send(self, payload: dict) -> int: self.id += 1 # self._open_subscriptions += 1 try: - print(self.ws) await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id except (ConnectionClosed, ssl.SSLError, EOFError): From 5f4a6418c6781a7ae30797554dd324836112c63a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Wed, 15 Jan 2025 23:51:22 +0200 Subject: [PATCH 03/10] Rename --- async_substrate_interface/substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index b9393f1..ea27198 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -4248,7 +4248,7 @@ def create_storage_key( ) -async def async_substrate_interface( +async def get_async_substrate_interface( url: str, use_remote_preset: bool = False, auto_discover: bool = True, From 6a0d527e709e0134c9fa012a418983b8b32a7ea4 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 00:41:20 +0200 Subject: [PATCH 04/10] Add event loop manager to Extrinsic Receipt --- .../substrate_interface.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index ea27198..cc2826b 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -15,6 +15,7 @@ from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime +from functools import partial from hashlib import blake2b from typing import ( Optional, @@ -178,7 +179,7 @@ def __init__( block_hash: Optional[str] = None, block_number: Optional[int] = None, extrinsic_idx: Optional[int] = None, - finalized=None, + finalized: bool = False, ): """ Object containing information of submitted extrinsic. Block hash where extrinsic is included is required @@ -512,7 +513,8 @@ def __init__( block_hash: Optional[str] = None, block_number: Optional[int] = None, extrinsic_idx: Optional[int] = None, - finalized=None, + finalized: bool = None, + event_loop_mgr: EventLoopManager = None, ): self._async_instance = AsyncExtrinsicReceipt( substrate, @@ -522,7 +524,7 @@ def __init__( extrinsic_idx, finalized, ) - self.event_loop = asyncio.get_event_loop() + self.event_loop_mgr = event_loop_mgr or EventLoopManager() def __getattr__(self, name): attr = getattr(self._async_instance, name) @@ -530,12 +532,12 @@ def __getattr__(self, name): if asyncio.iscoroutinefunction(attr): def sync_method(*args, **kwargs): - return self.event_loop.run_until_complete(attr(*args, **kwargs)) + return self.event_loop_mgr.run(attr(*args, **kwargs)) return sync_method elif asyncio.iscoroutine(attr): # indicates this is an async_property - return self.event_loop.run_until_complete(attr) + return self.event_loop_mgr.run(attr) else: return attr @@ -1024,6 +1026,7 @@ def __init__( type_registry: Optional[dict] = None, chain_name: Optional[str] = None, sync_calls: bool = False, + event_loop_mgr: Optional[EventLoopManager] = None, max_retries: int = 5, retry_timeout: float = 60.0, _mock: bool = False, @@ -1043,10 +1046,8 @@ def __init__( sync_calls: whether this instance is going to be called through a sync wrapper or plain max_retries: number of times to retry RPC requests before giving up retry_timeout: how to long wait since the last ping to retry the RPC request - event_loop: the event loop to use + event_loop_mgr: an EventLoopManager instance, only used in the case where `sync_calls` is `True` _mock: whether to use mock version of the subtensor interface - pre_initialize: whether to initialise the network connections at initialisation of the - AsyncSubstrateInterface object """ self.max_retries = max_retries @@ -1079,7 +1080,9 @@ def __init__( self.__metadata_cache = {} self.metadata_version_hex = "0x0f000000" # v15 self.extrinsic_receipt_cls = ( - AsyncExtrinsicReceipt if sync_calls is False else ExtrinsicReceipt + AsyncExtrinsicReceipt + if sync_calls is False + else partial(ExtrinsicReceipt, event_loop_mgr=event_loop_mgr) ) self.reload_type_registry() @@ -4020,6 +4023,7 @@ def __init__( substrate: Optional["AsyncSubstrateInterface"] = None, ): self.url = url + self.event_loop_mgr = event_loop_manager or EventLoopManager() self._async_instance = ( AsyncSubstrateInterface( url=url, @@ -4027,13 +4031,13 @@ def __init__( auto_discover=auto_discover, ss58_format=ss58_format, type_registry=type_registry, + event_loop_manager=self.event_loop_mgr, chain_name=chain_name, _mock=_mock, ) if not substrate else substrate ) - self.event_loop_mgr = event_loop_manager or EventLoopManager() self.websocket = SyncWebsocket(self._async_instance.ws, self.event_loop_mgr) @property From fdc6a3959c7fbde3107715ffa1c7731988fe5c08 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 00:51:29 +0200 Subject: [PATCH 05/10] Add event loop manager to QueryMapResult --- .../substrate_interface.py | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index cc2826b..df86f8e 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -556,6 +556,7 @@ def __init__( last_key: Optional[str] = None, max_results: Optional[int] = None, ignore_decoding_errors: bool = False, + event_loop_mgr: Optional[EventLoopManager] = None, ): self.records = records self.page_size = page_size @@ -569,6 +570,7 @@ def __init__( self.ignore_decoding_errors = ignore_decoding_errors self.loading_complete = False self._buffer = iter(self.records) # Initialize the buffer with initial records + self.event_loop_mgr = event_loop_mgr async def retrieve_next_page(self, start_key) -> list: result = await self.substrate.query_map( @@ -626,19 +628,17 @@ async def __anext__(self): def __next__(self): try: - return self.substrate.event_loop.run_until_complete(self.__anext__()) + return self.event_loop_mgr.run(self.__anext__()) except StopAsyncIteration: raise StopIteration + except AttributeError: + raise AttributeError( + "This item is an async iterator. You need to iterate over it with `async for`." + ) def __getitem__(self, item): return self.records[item] - def load_all(self): - async def _load_all(): - return [item async for item in self] - - return asyncio.get_event_loop().run_until_complete(_load_all()) - @dataclass class Preprocessed: @@ -1079,11 +1079,16 @@ def __init__( ) self.__metadata_cache = {} self.metadata_version_hex = "0x0f000000" # v15 - self.extrinsic_receipt_cls = ( - AsyncExtrinsicReceipt - if sync_calls is False - else partial(ExtrinsicReceipt, event_loop_mgr=event_loop_mgr) - ) + if sync_calls is True: + self.query_map_result_cls = partial( + QueryMapResult, event_loop_mgr=event_loop_mgr + ) + self.extrinsic_receipt_cls = partial( + ExtrinsicReceipt, event_loop_mgr=event_loop_mgr + ) + else: + self.query_map_result_cls = QueryMapResult + self.extrinsic_receipt_cls = AsyncExtrinsicReceipt self.reload_type_registry() async def __aenter__(self): @@ -3767,7 +3772,7 @@ def concat_hash_len(key_hasher: str) -> int: raise item_value = None result.append([item_key, item_value]) - return QueryMapResult( + return self.query_map_result_cls( records=result, page_size=page_size, module=module, From d5e3764b352b2f0925b70819489f604e9b11f5a2 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 00:52:26 +0200 Subject: [PATCH 06/10] type --- async_substrate_interface/substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index df86f8e..0844051 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -513,7 +513,7 @@ def __init__( block_hash: Optional[str] = None, block_number: Optional[int] = None, extrinsic_idx: Optional[int] = None, - finalized: bool = None, + finalized: bool = False, event_loop_mgr: EventLoopManager = None, ): self._async_instance = AsyncExtrinsicReceipt( From d6726160308b7e9bcaa548c1d0ce01692942693b Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 00:57:39 +0200 Subject: [PATCH 07/10] Type registry cannot be reloaded if the chain is not yet set. --- async_substrate_interface/substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index 0844051..ef1f719 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -1089,7 +1089,6 @@ def __init__( else: self.query_map_result_cls = QueryMapResult self.extrinsic_receipt_cls = AsyncExtrinsicReceipt - self.reload_type_registry() async def __aenter__(self): await self.initialize() @@ -1104,6 +1103,7 @@ async def initialize(self): if not self.__chain: chain = await self.rpc_request("system_chain", []) self.__chain = chain.get("result") + self.reload_type_registry() await asyncio.gather(self.load_registry(), self._init_init_runtime()) self.initialized = True From 5740d5ed495aac83c64495180bcc288701ec4375 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 00:59:19 +0200 Subject: [PATCH 08/10] test --- tests/unit_tests/test_substrate_interface.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/test_substrate_interface.py b/tests/unit_tests/test_substrate_interface.py index 616152d..917d220 100644 --- a/tests/unit_tests/test_substrate_interface.py +++ b/tests/unit_tests/test_substrate_interface.py @@ -4,6 +4,7 @@ from async_substrate_interface.substrate_interface import ( AsyncSubstrateInterface, + get_async_substrate_interface, ScaleObj, ) @@ -12,8 +13,7 @@ async def test_invalid_url_raises_exception(): """Test that invalid URI raises an InvalidURI exception.""" with pytest.raises(InvalidURI): - async with AsyncSubstrateInterface("non_existent_entry_point"): - pass + await get_async_substrate_interface("non_existent_entry_point") def test_scale_object(): From 3521d92312b1349dc08c6b2bfb260fbb0e03991d Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 01:01:24 +0200 Subject: [PATCH 09/10] Ensure initialization. --- async_substrate_interface/substrate_interface.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index ef1f719..bd34820 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -4036,13 +4036,14 @@ def __init__( auto_discover=auto_discover, ss58_format=ss58_format, type_registry=type_registry, - event_loop_manager=self.event_loop_mgr, + event_loop_mgr=self.event_loop_mgr, chain_name=chain_name, _mock=_mock, ) if not substrate else substrate ) + self.event_loop_mgr.run(self._async_instance.initialize()) self.websocket = SyncWebsocket(self._async_instance.ws, self.event_loop_mgr) @property From 834baf238b7fcfcb2ceef2487b835fd6f739cbe8 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 16 Jan 2025 01:07:41 +0200 Subject: [PATCH 10/10] Type --- async_substrate_interface/substrate_interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_substrate_interface/substrate_interface.py b/async_substrate_interface/substrate_interface.py index bd34820..4661a87 100644 --- a/async_substrate_interface/substrate_interface.py +++ b/async_substrate_interface/substrate_interface.py @@ -1024,7 +1024,7 @@ def __init__( auto_discover: bool = True, ss58_format: Optional[int] = None, type_registry: Optional[dict] = None, - chain_name: Optional[str] = None, + chain_name: str = "", sync_calls: bool = False, event_loop_mgr: Optional[EventLoopManager] = None, max_retries: int = 5,