From 080cbc228c620138448002e3fbe5fe4e2ac1ffa2 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Thu, 16 Oct 2025 21:05:44 +0200 Subject: [PATCH 01/14] Trying some stuff --- async_substrate_interface/async_substrate.py | 57 +++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index d7df725..b612e77 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -571,6 +571,8 @@ def __init__( self._log_raw_websockets = _log_raw_websockets self._in_use_ids = set() self._max_retries = max_retries + self._last_activity = asyncio.Event() + self._last_activity.set() @property def state(self): @@ -588,6 +590,50 @@ async def __aenter__(self): async def loop_time() -> float: return asyncio.get_running_loop().time() + async def _reset_activity_timer(self): + """Reset the shared activity timeout""" + self._last_activity.set() + self._last_activity.clear() + + async def _wait_with_activity_timeout(self, coro, timeout: float): + """ + Wait for a coroutine with a shared activity timeout. + Returns the result or raises TimeoutError if no activity for timeout seconds. + """ + activity_task = asyncio.create_task(self._last_activity.wait()) + + # Handle both coroutines and tasks + if isinstance(coro, asyncio.Task): + main_task = coro + else: + main_task = asyncio.create_task(coro) + + try: + done, pending = await asyncio.wait( + [main_task, activity_task], + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED + ) + + if not done: # Timeout occurred + for task in pending: + task.cancel() + raise asyncio.TimeoutError() + + # Check which completed + if main_task in done: + activity_task.cancel() + return main_task.result() + else: # activity_task completed (activity occurred elsewhere) + # Recursively wait again with fresh timeout + # main_task is already a Task, so pass it directly + return await self._wait_with_activity_timeout(main_task, timeout) + + except asyncio.CancelledError: + main_task.cancel() + activity_task.cancel() + raise + async def _cancel(self): try: self._send_recv_task.cancel() @@ -657,6 +703,8 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: await self._sending.put(to_send) if is_retry: # Otherwise the connection was just closed due to no activity, which should not count against retries + if self._attempts >= self._max_retries: + return TimeoutError("Max retries exceeded.") logger.info( f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}" ) @@ -728,13 +776,16 @@ async def _recv(self, recd: bytes) -> None: async def _start_receiving(self, ws: ClientConnection) -> Exception: try: while True: - recd = await asyncio.wait_for( - ws.recv(decode=False), timeout=self.retry_timeout + recd = await self._wait_with_activity_timeout( + ws.recv(decode=False), + self.retry_timeout ) + await self._reset_activity_timer() # reset the counter once we successfully receive something back self._attempts = 0 await self._recv(recd) except Exception as e: + logger.exception("Maybe timeout? 738", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( @@ -764,7 +815,9 @@ async def _start_sending(self, ws) -> Exception: if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") await ws.send(to_send) + await self._reset_activity_timer() except Exception as e: + logger.exception("Maybe timeout? 769", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( From f805aaaa4ba999a8a8600c9d494abe705321a536 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Fri, 17 Oct 2025 09:50:51 +0200 Subject: [PATCH 02/14] Debug --- async_substrate_interface/async_substrate.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b612e77..f9c492e 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -656,15 +656,19 @@ async def connect(self, force=False): self._sending = asyncio.Queue() if self._exit_task: self._exit_task.cancel() + logger.debug(f"self.state={self.state}") if self.state not in (State.OPEN, State.CONNECTING) or force: try: await asyncio.wait_for(self._cancel(), timeout=10.0) except asyncio.TimeoutError: logger.debug(f"Timed out waiting for cancellation") pass - self.ws = await asyncio.wait_for( + logger.debug("Attempting connection") + connection = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) + logger.debug("Connection established") + self.ws = connection if self._send_recv_task is None or self._send_recv_task.done(): self._send_recv_task = asyncio.get_running_loop().create_task( self._handler(self.ws) @@ -700,6 +704,7 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: for original_id, payload in list(self._inflight.items()): self._received[original_id] = loop.create_future() to_send = json.loads(payload) + logger.debug(f"Resubmitting {to_send}") await self._sending.put(to_send) if is_retry: # Otherwise the connection was just closed due to no activity, which should not count against retries @@ -710,6 +715,7 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: ) await self.connect(True) await self._handler(ws=self.ws) + logger.debug(f"Current send queue size: {self._sending.qsize()}") return None elif isinstance(e := recv_task.result(), Exception): return e From 60923e1d2f76f34196d379888477150093b1131e Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 21:54:11 +0200 Subject: [PATCH 03/14] Seems to work --- async_substrate_interface/async_substrate.py | 171 +++++++++++++------ 1 file changed, 119 insertions(+), 52 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index f9c492e..f80a9e5 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -8,6 +8,7 @@ import inspect import logging import os +import socket import ssl import warnings from contextlib import suppress @@ -34,6 +35,7 @@ ss58_encode, MultiAccountId, ) +from websockets import CloseCode from websockets.asyncio.client import connect, ClientConnection from websockets.exceptions import ( ConnectionClosed, @@ -592,8 +594,11 @@ async def loop_time() -> float: async def _reset_activity_timer(self): """Reset the shared activity timeout""" - self._last_activity.set() - self._last_activity.clear() + # Create a NEW event instead of reusing the same one + old_event = self._last_activity + self._last_activity = asyncio.Event() + self._last_activity.clear() # Start fresh + old_event.set() # Wake up anyone waiting on the old event async def _wait_with_activity_timeout(self, coro, timeout: float): """ @@ -612,21 +617,28 @@ async def _wait_with_activity_timeout(self, coro, timeout: float): done, pending = await asyncio.wait( [main_task, activity_task], timeout=timeout, - return_when=asyncio.FIRST_COMPLETED + return_when=asyncio.FIRST_COMPLETED, ) if not done: # Timeout occurred + logger.debug(f"Activity timeout after {timeout}s, no activity detected") for task in pending: task.cancel() - raise asyncio.TimeoutError() + raise TimeoutError() # Check which completed if main_task in done: activity_task.cancel() - return main_task.result() + + # Check if the task raised an exception + exc = main_task.exception() + if exc is not None: + raise exc + else: + return main_task.result() else: # activity_task completed (activity occurred elsewhere) + logger.debug("Activity detected, resetting timeout") # Recursively wait again with fresh timeout - # main_task is already a Task, so pass it directly return await self._wait_with_activity_timeout(main_task, timeout) except asyncio.CancelledError: @@ -636,46 +648,72 @@ async def _wait_with_activity_timeout(self, coro, timeout: float): async def _cancel(self): try: - self._send_recv_task.cancel() - await self.ws.close() - except ( - AttributeError, - asyncio.CancelledError, - WebSocketException, - ): + logger.debug("Cancelling send/recv tasks") + if self._send_recv_task is not None: + self._send_recv_task.cancel() + except asyncio.CancelledError: pass except Exception as e: logger.warning( f"{e} encountered while trying to close websocket connection." ) + try: + logger.debug("Closing websocket connection") + if self.ws is not None: + await self.ws.close() + except Exception as e: + logger.warning( + f"{e} encountered while trying to close websocket connection." + ) async def connect(self, force=False): - async with self._lock: - logger.debug(f"Websocket connecting to {self.ws_url}") - if self._sending is None or self._sending.empty(): - self._sending = asyncio.Queue() - if self._exit_task: - self._exit_task.cancel() - logger.debug(f"self.state={self.state}") - if self.state not in (State.OPEN, State.CONNECTING) or force: + if not force: + await self._lock.acquire() + else: + logger.debug("Proceeding without acquiring lock.") + logger.debug(f"Websocket connecting to {self.ws_url}") + if self._sending is None or self._sending.empty(): + self._sending = asyncio.Queue() + if self._exit_task: + self._exit_task.cancel() + logger.debug(f"self.state={self.state}") + if force and self.state == State.OPEN: + logger.debug(f"Attempting to reconnect while already connected.") + if self.ws is not None: + self.ws.protocol.fail(CloseCode.SERVICE_RESTART) + logger.debug(f"Open connection cancelled.") + await asyncio.sleep(1) + if self.state not in (State.OPEN, State.CONNECTING) or force: + if not force: try: + logger.debug("Attempting cancellation") await asyncio.wait_for(self._cancel(), timeout=10.0) except asyncio.TimeoutError: logger.debug(f"Timed out waiting for cancellation") pass - logger.debug("Attempting connection") + logger.debug("Attempting connection") + try: connection = await asyncio.wait_for( connect(self.ws_url, **self._options), timeout=10.0 ) - logger.debug("Connection established") - self.ws = connection - if self._send_recv_task is None or self._send_recv_task.done(): - self._send_recv_task = asyncio.get_running_loop().create_task( - self._handler(self.ws) - ) - logger.debug("Websocket handler attached.") + except socket.gaierror: + logger.debug(f"Hostname not known (this is just for testing") + await asyncio.sleep(10) + if self._lock.locked(): + self._lock.release() + return await self.connect(force=force) + logger.debug("Connection established") + self.ws = connection + if self._send_recv_task is None or self._send_recv_task.done(): + self._send_recv_task = asyncio.get_running_loop().create_task( + self._handler(self.ws) + ) + if self._lock.locked(): + self._lock.release() + return None async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: + logger.debug("WS handler attached") recv_task = asyncio.create_task(self._start_receiving(ws)) send_task = asyncio.create_task(self._start_sending(ws)) done, pending = await asyncio.wait( @@ -685,38 +723,54 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: loop = asyncio.get_running_loop() should_reconnect = False is_retry = False + for task in pending: task.cancel() + for task in done: task_res = task.result() - if isinstance( - task_res, (asyncio.TimeoutError, ConnectionClosed, TimeoutError) - ): + + # If ConnectionClosedOK, graceful shutdown - don't reconnect + if isinstance(task_res, websockets.exceptions.ConnectionClosedOK): + logger.debug("Graceful shutdown detected, not reconnecting") + return None # Clean exit + + # Check for timeout/connection errors that should trigger reconnect + if isinstance(task_res, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)): should_reconnect = True + logger.debug(f"Reconnection triggered by: {type(task_res).__name__}") + if isinstance(task_res, (asyncio.TimeoutError, TimeoutError)): self._attempts += 1 is_retry = True + if should_reconnect is True: if len(self._received_subscriptions) > 0: return SubstrateRequestException( f"Unable to reconnect because there are currently open subscriptions." ) - for original_id, payload in list(self._inflight.items()): - self._received[original_id] = loop.create_future() - to_send = json.loads(payload) - logger.debug(f"Resubmitting {to_send}") - await self._sending.put(to_send) + if is_retry: - # Otherwise the connection was just closed due to no activity, which should not count against retries if self._attempts >= self._max_retries: + logger.error("Max retries exceeded.") return TimeoutError("Max retries exceeded.") logger.info( f"Timeout occurred. Reconnecting. Attempt {self._attempts} of {self._max_retries}" ) + + async with self._lock: + for original_id in list(self._inflight.keys()): + payload = self._inflight.pop(original_id) + self._received[original_id] = loop.create_future() + to_send = json.loads(payload) + logger.debug(f"Resubmitting {to_send['id']}") + await self._sending.put(to_send) + + logger.debug("Attempting reconnection...") await self.connect(True) - await self._handler(ws=self.ws) - logger.debug(f"Current send queue size: {self._sending.qsize()}") - return None + logger.debug(f"Reconnected. Send queue size: {self._sending.qsize()}") + # Recursively call handler + return await self._handler(self.ws) elif isinstance(e := recv_task.result(), Exception): return e elif isinstance(e := send_task.result(), Exception): @@ -753,6 +807,7 @@ async def _exit_with_timer(self): pass async def shutdown(self): + logger.debug("Shutdown requested") try: await asyncio.wait_for(self._cancel(), timeout=10.0) except asyncio.TimeoutError: @@ -766,11 +821,16 @@ async def _recv(self, recd: bytes) -> None: response = json.loads(recd) if "id" in response: async with self._lock: - self._inflight.pop(response["id"]) - with suppress(KeyError): - # These would be subscriptions that were unsubscribed + inflight_item = self._inflight.pop(response["id"], None) + if inflight_item is not None: + logger.debug(f"Popped {response['id']} from inflight") + else: + logger.debug( + f"Received response for {response['id']} which is no longer inflight (likely reconnection)" + ) + if self._received.get(response["id"]) is not None: self._received[response["id"]].set_result(response) - self._in_use_ids.remove(response["id"]) + self._in_use_ids.discard(response["id"]) elif "params" in response: sub_id = response["params"]["subscription"] if sub_id not in self._received_subscriptions: @@ -780,39 +840,43 @@ async def _recv(self, recd: bytes) -> None: raise KeyError(response) async def _start_receiving(self, ws: ClientConnection) -> Exception: + logger.debug("Starting receiving task") try: while True: recd = await self._wait_with_activity_timeout( - ws.recv(decode=False), - self.retry_timeout + ws.recv(decode=False), self.retry_timeout ) await self._reset_activity_timer() # reset the counter once we successfully receive something back self._attempts = 0 await self._recv(recd) + except websockets.exceptions.ConnectionClosedOK as e: + logger.debug("ConnectionClosedOK") + return e except Exception as e: - logger.exception("Maybe timeout? 738", exc_info=e) + logger.exception("Receiving exception", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( - e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) + e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) ): logger.exception("Websocket receiving exception", exc_info=e) for fut in self._received.values(): if not fut.done(): fut.set_exception(e) fut.cancel() - elif isinstance(e, websockets.exceptions.ConnectionClosedOK): - logger.debug("Websocket connection closed.") else: - logger.debug(f"Timeout occurred.") + logger.debug(f"Timeout/ConnectionClosed occurred.") return e async def _start_sending(self, ws) -> Exception: + logger.debug("Starting sending task") to_send = None try: while True: + logger.debug(f"_sending, {self._sending.qsize()}") to_send_ = await self._sending.get() + logger.debug("Retrieved item from sending queue") self._sending.task_done() send_id = to_send_["id"] to_send = json.dumps(to_send_) @@ -821,6 +885,7 @@ async def _start_sending(self, ws) -> Exception: if self._log_raw_websockets: raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}") await ws.send(to_send) + logger.debug("Sent to websocket") await self._reset_activity_timer() except Exception as e: logger.exception("Maybe timeout? 769", exc_info=e) @@ -2529,6 +2594,8 @@ async def _make_rpc_request( if request_manager.is_complete: break + else: + await asyncio.sleep(0.2) return request_manager.get_results() From 3bfd3ac8b1b63b05e10850a612173cae98b31c39 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 21:54:28 +0200 Subject: [PATCH 04/14] Added ProxyServer test fixture --- tests/helpers/proxy_server.py | 51 +++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 tests/helpers/proxy_server.py diff --git a/tests/helpers/proxy_server.py b/tests/helpers/proxy_server.py new file mode 100644 index 0000000..e1584d2 --- /dev/null +++ b/tests/helpers/proxy_server.py @@ -0,0 +1,51 @@ +import asyncio + +from websockets.asyncio.server import serve, ServerConnection +from websockets.asyncio.client import connect + + +class ProxyServer: + def __init__(self, upstream: str, time_til_pause: float, time_til_resume: float): + self.upstream_server = upstream + self.time_til_pause = time_til_pause + self.time_til_resume = time_til_resume + self.upstream_connection = None + self.connection_time = 0 + self.shutdown_time = 0 + self.resume_time = 0 + + async def connect(self): + self.upstream_connection = await connect(self.upstream_server) + self.connection_time = asyncio.get_running_loop().time() + self.shutdown_time = self.connection_time + self.time_til_pause + self.resume_time = self.shutdown_time + self.time_til_resume + + async def close(self): + if self.upstream_connection: + await self.upstream_connection.close() + + async def proxy_request(self, websocket: ServerConnection): + async for message in websocket: + print(websocket) + await self.upstream_connection.send(message) + recd = await self.upstream_connection.recv() + current_time = asyncio.get_running_loop().time() + if self.shutdown_time < current_time < self.resume_time: + print("Pausing") + await asyncio.sleep(self.time_til_resume) + await websocket.send(recd) + # await websocket.send(message) + + async def serve(self): + async with serve(self.proxy_request, "localhost", 8080) as server: + await server.serve_forever() + + +async def main(): + proxy = ProxyServer("wss://archive.sub.latent.to", 20, 30) + await proxy.connect() + await proxy.serve() + + +if __name__ == "__main__": + asyncio.run(main()) From b2a14687cc22817758d87ab1d1facaa87c37f3f9 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 22:00:15 +0200 Subject: [PATCH 05/14] Ruff --- async_substrate_interface/async_substrate.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index f80a9e5..a9f3d5f 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -736,7 +736,9 @@ async def _handler(self, ws: ClientConnection) -> Union[None, Exception]: return None # Clean exit # Check for timeout/connection errors that should trigger reconnect - if isinstance(task_res, (asyncio.TimeoutError, TimeoutError, ConnectionClosed)): + if isinstance( + task_res, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) + ): should_reconnect = True logger.debug(f"Reconnection triggered by: {type(task_res).__name__}") @@ -858,7 +860,7 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( - e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) + e, (asyncio.TimeoutError, TimeoutError, ConnectionClosed) ): logger.exception("Websocket receiving exception", exc_info=e) for fut in self._received.values(): From 4b0cb39865f531403f087dde8277cadc5d132851 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 22:53:19 +0200 Subject: [PATCH 06/14] Tests --- async_substrate_interface/async_substrate.py | 11 +- tests/helpers/proxy_server.py | 60 +++++---- .../test_async_substrate_interface.py | 123 +++++++++++++++++- 3 files changed, 157 insertions(+), 37 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index a9f3d5f..b6626c3 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -607,7 +607,6 @@ async def _wait_with_activity_timeout(self, coro, timeout: float): """ activity_task = asyncio.create_task(self._last_activity.wait()) - # Handle both coroutines and tasks if isinstance(coro, asyncio.Task): main_task = coro else: @@ -620,25 +619,22 @@ async def _wait_with_activity_timeout(self, coro, timeout: float): return_when=asyncio.FIRST_COMPLETED, ) - if not done: # Timeout occurred + if not done: logger.debug(f"Activity timeout after {timeout}s, no activity detected") for task in pending: task.cancel() raise TimeoutError() - # Check which completed if main_task in done: activity_task.cancel() - # Check if the task raised an exception exc = main_task.exception() if exc is not None: raise exc else: return main_task.result() - else: # activity_task completed (activity occurred elsewhere) + else: logger.debug("Activity detected, resetting timeout") - # Recursively wait again with fresh timeout return await self._wait_with_activity_timeout(main_task, timeout) except asyncio.CancelledError: @@ -849,14 +845,12 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: ws.recv(decode=False), self.retry_timeout ) await self._reset_activity_timer() - # reset the counter once we successfully receive something back self._attempts = 0 await self._recv(recd) except websockets.exceptions.ConnectionClosedOK as e: logger.debug("ConnectionClosedOK") return e except Exception as e: - logger.exception("Receiving exception", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( @@ -890,7 +884,6 @@ async def _start_sending(self, ws) -> Exception: logger.debug("Sent to websocket") await self._reset_activity_timer() except Exception as e: - logger.exception("Maybe timeout? 769", exc_info=e) if isinstance(e, ssl.SSLError): e = ConnectionClosed if not isinstance( diff --git a/tests/helpers/proxy_server.py b/tests/helpers/proxy_server.py index e1584d2..c561289 100644 --- a/tests/helpers/proxy_server.py +++ b/tests/helpers/proxy_server.py @@ -1,7 +1,10 @@ -import asyncio +import logging +import time -from websockets.asyncio.server import serve, ServerConnection -from websockets.asyncio.client import connect +from websockets.sync.server import serve, ServerConnection +from websockets.sync.client import connect + +logger = logging.getLogger("websockets.proxy") class ProxyServer: @@ -14,38 +17,41 @@ def __init__(self, upstream: str, time_til_pause: float, time_til_resume: float) self.shutdown_time = 0 self.resume_time = 0 - async def connect(self): - self.upstream_connection = await connect(self.upstream_server) - self.connection_time = asyncio.get_running_loop().time() + def connect(self): + self.upstream_connection = connect(self.upstream_server) + self.connection_time = time.time() self.shutdown_time = self.connection_time + self.time_til_pause self.resume_time = self.shutdown_time + self.time_til_resume - async def close(self): + def close(self): if self.upstream_connection: - await self.upstream_connection.close() - - async def proxy_request(self, websocket: ServerConnection): - async for message in websocket: - print(websocket) - await self.upstream_connection.send(message) - recd = await self.upstream_connection.recv() - current_time = asyncio.get_running_loop().time() + self.upstream_connection.close() + self.server.shutdown() + + def proxy_request(self, websocket: ServerConnection): + for message in websocket: + self.upstream_connection.send(message) + recd = self.upstream_connection.recv() + current_time = time.time() if self.shutdown_time < current_time < self.resume_time: - print("Pausing") - await asyncio.sleep(self.time_til_resume) - await websocket.send(recd) - # await websocket.send(message) + logger.info("Pausing") + time.sleep(self.time_til_resume) + websocket.send(recd) + + def serve(self): + with serve(self.proxy_request, "localhost", 8080) as self.server: + self.server.serve_forever() - async def serve(self): - async with serve(self.proxy_request, "localhost", 8080) as server: - await server.serve_forever() + def connect_and_serve(self): + self.connect() + self.serve() -async def main(): - proxy = ProxyServer("wss://archive.sub.latent.to", 20, 30) - await proxy.connect() - await proxy.serve() +def run_proxy_server(time_til_pause: float = 20.0, time_til_resume: float = 30.0): + proxy = ProxyServer("wss://archive.sub.latent.to", time_til_pause, time_til_resume) + proxy.connect() + proxy.serve() if __name__ == "__main__": - asyncio.run(main()) + run_proxy_server() diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 8dab260..16dfc38 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -1,12 +1,16 @@ import asyncio +import logging +import os.path import time +import threading import pytest from scalecodec import ss58_encode -from async_substrate_interface.async_substrate import AsyncSubstrateInterface +from async_substrate_interface.async_substrate import AsyncSubstrateInterface, logger from async_substrate_interface.types import ScaleObj from tests.helpers.settings import ARCHIVE_ENTRYPOINT, LATENT_LITE_ENTRYPOINT +from tests.helpers.proxy_server import ProxyServer @pytest.mark.asyncio @@ -174,3 +178,120 @@ async def test_query_map_with_odd_number_of_params(): first_record = qm.records[0] assert len(first_record) == 2 assert len(first_record[0]) == 4 + + +@pytest.mark.asyncio +async def test_improved_reconnection_(): + ws_logger_path = "/tmp/websockets-proxy-test" + ws_logger = logging.getLogger("websockets.proxy") + if os.path.exists(ws_logger_path): + os.remove(ws_logger_path) + ws_logger.setLevel(logging.INFO) + ws_logger.addHandler(logging.FileHandler(ws_logger_path)) + + asi_logger_path = "/tmp/async-substrate-interface-test" + if os.path.exists(asi_logger_path): + os.remove(asi_logger_path) + logger.setLevel(logging.DEBUG) + logger.addHandler(logging.FileHandler(asi_logger_path)) + + proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20) + proxy.connect() + server_thread = threading.Thread(target=proxy.serve) + server_thread.daemon = True + server_thread.start() + + await asyncio.sleep(3) # give the server start up time + + try: + async with AsyncSubstrateInterface( + "ws://localhost:8080", + ss58_format=42, + chain_name="Bittensor", + retry_timeout=10.0, + ws_shutdown_timer=None, + ) as substrate: + blocks_to_check = [ + 5215000, + 5215001, + 5215002, + 5215003, + 5215004, + 5215005, + 5215006, + ] + tasks = [] + for block in blocks_to_check: + block_hash = await substrate.get_block_hash(block_id=block) + tasks.append( + substrate.query_map( + "SubtensorModule", "TotalHotkeyShares", block_hash=block_hash + ) + ) + records = await asyncio.gather(*tasks) + assert len(records) == len(blocks_to_check) + await substrate.close() + with open(ws_logger_path, "r") as f: + assert "Pausing" in f.read() + with open(asi_logger_path, "r") as f: + assert "Timeout/ConnectionClosed occurred." in f.read() + finally: + proxy.stop() + server_thread.join(timeout=5) + + +@pytest.mark.asyncio +async def test_improved_reconnection(): + ws_logger_path = "/tmp/websockets-proxy-test" + ws_logger = logging.getLogger("websockets.proxy") + if os.path.exists(ws_logger_path): + os.remove(ws_logger_path) + ws_logger.setLevel(logging.INFO) + ws_logger.addHandler(logging.FileHandler(ws_logger_path)) + + asi_logger_path = "/tmp/async-substrate-interface-test" + if os.path.exists(asi_logger_path): + os.remove(asi_logger_path) + logger.setLevel(logging.DEBUG) + logger.addHandler(logging.FileHandler(asi_logger_path)) + + proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20) + + server_thread = threading.Thread(target=proxy.connect_and_serve) + server_thread.start() + await asyncio.sleep(3) # give the server start up time + async with AsyncSubstrateInterface( + "ws://localhost:8080", + ss58_format=42, + chain_name="Bittensor", + retry_timeout=10.0, + ws_shutdown_timer=None, + ) as substrate: + blocks_to_check = [ + 5215000, + 5215001, + 5215002, + 5215003, + 5215004, + 5215005, + 5215006, + ] + tasks = [] + for block in blocks_to_check: + block_hash = await substrate.get_block_hash(block_id=block) + tasks.append( + substrate.query_map( + "SubtensorModule", "TotalHotkeyShares", block_hash=block_hash + ) + ) + records = await asyncio.gather(*tasks) + assert len(records) == len(blocks_to_check) + await substrate.close() + with open(ws_logger_path, "r") as f: + assert "Pausing" in f.read() + with open(asi_logger_path, "r") as f: + assert "Timeout/ConnectionClosed occurred." in f.read() + shutdown_thread = threading.Thread(target=proxy.close) + shutdown_thread.start() + shutdown_thread.join(timeout=5) + server_thread.join(timeout=5) From 1602285f47674bd6818f842568dc7e495ac5bd30 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 23:00:10 +0200 Subject: [PATCH 07/14] Removed old incomplete test --- .../test_async_substrate_interface.py | 60 ------------------- 1 file changed, 60 deletions(-) diff --git a/tests/integration_tests/test_async_substrate_interface.py b/tests/integration_tests/test_async_substrate_interface.py index 16dfc38..8957314 100644 --- a/tests/integration_tests/test_async_substrate_interface.py +++ b/tests/integration_tests/test_async_substrate_interface.py @@ -180,66 +180,6 @@ async def test_query_map_with_odd_number_of_params(): assert len(first_record[0]) == 4 -@pytest.mark.asyncio -async def test_improved_reconnection_(): - ws_logger_path = "/tmp/websockets-proxy-test" - ws_logger = logging.getLogger("websockets.proxy") - if os.path.exists(ws_logger_path): - os.remove(ws_logger_path) - ws_logger.setLevel(logging.INFO) - ws_logger.addHandler(logging.FileHandler(ws_logger_path)) - - asi_logger_path = "/tmp/async-substrate-interface-test" - if os.path.exists(asi_logger_path): - os.remove(asi_logger_path) - logger.setLevel(logging.DEBUG) - logger.addHandler(logging.FileHandler(asi_logger_path)) - - proxy = ProxyServer("wss://archive.sub.latent.to", 10, 20) - proxy.connect() - server_thread = threading.Thread(target=proxy.serve) - server_thread.daemon = True - server_thread.start() - - await asyncio.sleep(3) # give the server start up time - - try: - async with AsyncSubstrateInterface( - "ws://localhost:8080", - ss58_format=42, - chain_name="Bittensor", - retry_timeout=10.0, - ws_shutdown_timer=None, - ) as substrate: - blocks_to_check = [ - 5215000, - 5215001, - 5215002, - 5215003, - 5215004, - 5215005, - 5215006, - ] - tasks = [] - for block in blocks_to_check: - block_hash = await substrate.get_block_hash(block_id=block) - tasks.append( - substrate.query_map( - "SubtensorModule", "TotalHotkeyShares", block_hash=block_hash - ) - ) - records = await asyncio.gather(*tasks) - assert len(records) == len(blocks_to_check) - await substrate.close() - with open(ws_logger_path, "r") as f: - assert "Pausing" in f.read() - with open(asi_logger_path, "r") as f: - assert "Timeout/ConnectionClosed occurred." in f.read() - finally: - proxy.stop() - server_thread.join(timeout=5) - - @pytest.mark.asyncio async def test_improved_reconnection(): ws_logger_path = "/tmp/websockets-proxy-test" From 94b1d119bfa74738d693683a215079b67d9e52fc Mon Sep 17 00:00:00 2001 From: bdhimes Date: Mon, 20 Oct 2025 23:29:57 +0200 Subject: [PATCH 08/14] Unload from JSON --- async_substrate_interface/async_substrate.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index b6626c3..7fba760 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -891,8 +891,9 @@ async def _start_sending(self, ws) -> Exception: ): logger.exception("Websocket sending exception", exc_info=e) if to_send is not None: - self._received[to_send["id"]].set_exception(e) - self._received[to_send["id"]].cancel() + to_send_ = json.loads(to_send) + self._received[to_send_["id"]].set_exception(e) + self._received[to_send_["id"]].cancel() else: for i in self._received.keys(): self._received[i].set_exception(e) From f2c5691d7c1779ac0ba105a8d77fd13870cf74ff Mon Sep 17 00:00:00 2001 From: Arthurdw Date: Tue, 21 Oct 2025 16:23:55 +0200 Subject: [PATCH 09/14] fix: resolve issue with conflicting retry param --- async_substrate_interface/substrate_addons.py | 8 ++--- tests/e2e_tests/test_substrate_addons.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 3b0f0ba..5c548ce 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -171,8 +171,8 @@ def __init__( for method in retry_methods: setattr(self, method, partial(self._retry, method)) - def _retry(self, method, *args, **kwargs): - method_ = self._original_methods[method] + def _retry(self, method_name, *args, **kwargs): + method_ = self._original_methods[method_name] try: return method_(*args, **kwargs) except ( @@ -341,8 +341,8 @@ async def _reinstantiate_substrate( self._initializing = False await self.initialize() - async def _retry(self, method, *args, **kwargs): - method_ = self._original_methods[method] + async def _retry(self, method_name, *args, **kwargs): + method_ = self._original_methods[method_name] try: return await method_(*args, **kwargs) except ( diff --git a/tests/e2e_tests/test_substrate_addons.py b/tests/e2e_tests/test_substrate_addons.py index c776506..bcf8750 100644 --- a/tests/e2e_tests/test_substrate_addons.py +++ b/tests/e2e_tests/test_substrate_addons.py @@ -105,3 +105,36 @@ def test_retry_sync_subtensor_archive_node(): LATENT_LITE_ENTRYPOINT, archive_nodes=[ARCHIVE_ENTRYPOINT] ) as substrate: assert isinstance((substrate.get_block(block_number=old_block)), dict) + + +@pytest.mark.asyncio +async def test_retry_async_substrate_runtime_call_with_keyword_args(): + """Test that runtime_call works with keyword arguments (parameter name conflict fix).""" + async with RetryAsyncSubstrate( + LATENT_LITE_ENTRYPOINT, retry_forever=True + ) as substrate: + # This should not raise TypeError due to parameter name conflict + # The 'method' kwarg should not conflict with _retry's parameter + result = await substrate.runtime_call( + api="SwapRuntimeApi", + method="current_alpha_price", + params=[1], + block_hash=None, + ) + assert result is not None + + +def test_retry_sync_substrate_runtime_call_with_keyword_args(): + """Test that runtime_call works with keyword arguments (parameter name conflict fix).""" + with RetrySyncSubstrate( + LATENT_LITE_ENTRYPOINT, retry_forever=True + ) as substrate: + # This should not raise TypeError due to parameter name conflict + # The 'method' kwarg should not conflict with _retry's parameter + result = substrate.runtime_call( + api="SwapRuntimeApi", + method="current_alpha_price", + params=[1], + block_hash=None, + ) + assert result is not None From 03e258ebf4ff162607be51f27aedc59b38c8f076 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 21 Oct 2025 19:01:05 +0200 Subject: [PATCH 10/14] Debug, lack of sleeping so much. --- async_substrate_interface/async_substrate.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 7fba760..29b23bf 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -132,6 +132,16 @@ def __init__( self.__weight = None self.__total_fee_amount = None + def __str__(self): + return ( + f"AsyncExtrinsicReceipt({self.extrinsic_hash}), " + f"block_hash={self.block_hash}, block_number={self.block_number}), " + f"finalized={self.finalized})" + ) + + def __repr__(self): + return self.__str__() + async def get_extrinsic_identifier(self) -> str: """ Returns the on-chain identifier for this extrinsic in format "[block_number]-[extrinsic_idx]" e.g. 134324-2 @@ -983,7 +993,7 @@ async def retrieve(self, item_id: str) -> Optional[dict]: elif isinstance((e := self._send_recv_task.result()), Exception): logger.exception(f"Websocket sending exception: {e}") raise e - await asyncio.sleep(0.1) + await asyncio.sleep(0.01) return None @@ -1550,6 +1560,7 @@ async def retrieve_pending_extrinsics(self) -> list: result_data = await self.rpc_request("author_pendingExtrinsics", []) if "error" in result_data: + logger.error(f"Error in retrieving pending extrinsics: {result_data['error']}") raise SubstrateRequestException(result_data["error"]["message"]) extrinsics = [] @@ -2591,7 +2602,7 @@ async def _make_rpc_request( if request_manager.is_complete: break else: - await asyncio.sleep(0.2) + await asyncio.sleep(0.01) return request_manager.get_results() @@ -2675,10 +2686,12 @@ async def rpc_request( bh = err_msg.split("State already discarded for ")[1].strip() raise StateDiscardedError(bh) else: + logger.error(f"Substrate Request Exception: {result[payload_id]}") raise SubstrateRequestException(err_msg) if "result" in result[payload_id][0]: return result[payload_id][0] else: + logger.error(f"Substrate Request Exception: {result[payload_id]}") raise SubstrateRequestException(result[payload_id][0]) @cached_fetcher(max_size=SUBSTRATE_CACHE_METHOD_SIZE) From 635662b3bc6f5712f75d25bd2758819b719ab956 Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 21 Oct 2025 19:49:31 +0200 Subject: [PATCH 11/14] Update subscribe/unsubscribe logic to catch edge cases --- async_substrate_interface/async_substrate.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 29b23bf..5fce7b8 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -985,6 +985,12 @@ async def retrieve(self, item_id: str) -> Optional[dict]: return subscription except asyncio.QueueEmpty: pass + except KeyError: + logger.debug( + f"Received item {item_id} not in received subscriptions. " + f"This indicates the response of the subscription was inflight when sending " + f"the unsubscribe request." + ) if self._send_recv_task is not None and self._send_recv_task.done(): if not self._send_recv_task.cancelled(): if isinstance((e := self._send_recv_task.exception()), Exception): @@ -1071,7 +1077,7 @@ def __init__( "strict_scale_decode": True, } self.initialized = False - self._forgettable_task = None + self._forgettable_tasks = set() self.type_registry = type_registry self.type_registry_preset = type_registry_preset self.runtime_cache = RuntimeCache() @@ -1531,11 +1537,13 @@ async def result_handler( if subscription_result is not None: # Handler returned end result: unsubscribe from further updates - self._forgettable_task = asyncio.create_task( + unsub_task = asyncio.create_task( self.rpc_request( "state_unsubscribeStorage", [subscription_id] ) ) + self._forgettable_tasks.add(unsub_task) + unsub_task.add_done_callback(self._forgettable_tasks.discard) return result_found, subscription_result @@ -1560,7 +1568,9 @@ async def retrieve_pending_extrinsics(self) -> list: result_data = await self.rpc_request("author_pendingExtrinsics", []) if "error" in result_data: - logger.error(f"Error in retrieving pending extrinsics: {result_data['error']}") + logger.error( + f"Error in retrieving pending extrinsics: {result_data['error']}" + ) raise SubstrateRequestException(result_data["error"]["message"]) extrinsics = [] From 59399ee86a2438e229932dc8a4b0aed0f7640ace Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 21 Oct 2025 20:01:56 +0200 Subject: [PATCH 12/14] Rename --- async_substrate_interface/substrate_addons.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 3b0f0ba..b8cfd75 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -321,11 +321,11 @@ async def _reinstantiate_substrate( await self.ws.shutdown() except AttributeError: pass - if self._forgettable_task is not None: - self._forgettable_task: asyncio.Task - self._forgettable_task.cancel() + _forgettable_task: asyncio.Task + for _forgettable_task in self._forgettable_tasks: + _forgettable_task.cancel() try: - await self._forgettable_task + await _forgettable_task except asyncio.CancelledError: pass self.chain_endpoint = next_network From c3e15a22c67ca40bf46703191a77077dc18eddef Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 21 Oct 2025 21:31:36 +0200 Subject: [PATCH 13/14] Use uv for test dependencies --- .github/workflows/check-btcli-tests.yml | 14 +++++++------- .github/workflows/check-sdk-tests.yml | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/workflows/check-btcli-tests.yml b/.github/workflows/check-btcli-tests.yml index 18b93a2..89552d7 100644 --- a/.github/workflows/check-btcli-tests.yml +++ b/.github/workflows/check-btcli-tests.yml @@ -166,9 +166,9 @@ jobs: source ${{ github.workspace }}/venv/bin/activate git checkout staging git fetch origin staging - python3 -m pip install --upgrade pip - python3 -m pip install '.[dev]' - python3 -m pip install pytest + python3 -m pip install --upgrade pip uv + uv pip install '.[dev]' + uv pip install pytest - name: Clone async-substrate-interface repo run: git clone https://github.com/opentensor/async-substrate-interface.git @@ -185,7 +185,7 @@ jobs: run: | source ${{ github.workspace }}/venv/bin/activate python3 -m pip uninstall async-substrate-interface -y - python3 -m pip install . + uv pip install . - name: Download Cached Docker Image uses: actions/download-artifact@v4 @@ -229,8 +229,8 @@ jobs: source ${{ github.workspace }}/venv/bin/activate git checkout staging git fetch origin staging - python3 -m pip install --upgrade pip - python3 -m pip install '.[dev]' + python3 -m pip install --upgrade pip uv + uv pip install '.[dev]' - name: Clone async-substrate-interface repo run: git clone https://github.com/opentensor/async-substrate-interface.git @@ -247,7 +247,7 @@ jobs: run: | source ${{ github.workspace }}/venv/bin/activate pip uninstall async-substrate-interface -y - pip install . + uv pip install . - name: Run SDK unit tests run: | diff --git a/.github/workflows/check-sdk-tests.yml b/.github/workflows/check-sdk-tests.yml index 9e40b61..7529ea3 100644 --- a/.github/workflows/check-sdk-tests.yml +++ b/.github/workflows/check-sdk-tests.yml @@ -194,8 +194,8 @@ jobs: fi git checkout FETCH_HEAD echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" - python3 -m pip install --upgrade pip - python3 -m pip install '.[dev]' + python3 -m pip install --upgrade pip uv + uv pip install '.[dev]' - name: Clone Bittensor async-substrate-interface repo run: git clone https://github.com/opentensor/async-substrate-interface.git @@ -212,7 +212,7 @@ jobs: run: | source ${{ github.workspace }}/venv/bin/activate python3 -m pip uninstall async-substrate-interface -y - python3 -m pip install . + uv pip install . - name: Download Cached Docker Image uses: actions/download-artifact@v4 @@ -260,8 +260,8 @@ jobs: fi git checkout FETCH_HEAD echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" - python3 -m pip install --upgrade pip - python3 -m pip install '.[dev]' + python3 -m pip install --upgrade pip uv + uv pip install '.[dev]' - name: Clone async-substrate-interface repo run: git clone https://github.com/opentensor/async-substrate-interface.git @@ -278,7 +278,7 @@ jobs: run: | source ${{ github.workspace }}/venv/bin/activate pip uninstall async-substrate-interface -y - pip install . + uv pip install . - name: Run SDK integration tests run: | From af41c6f80f9eed9db6b269f6ea228fc6a1fbef3d Mon Sep 17 00:00:00 2001 From: bdhimes Date: Tue, 21 Oct 2025 22:13:51 +0200 Subject: [PATCH 14/14] Bump changelog + version --- CHANGELOG.md | 10 ++++++++++ pyproject.toml | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bd1e14..eaf50b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ # Changelog +## 1.5.8 /2025-10-21 +* Fix parameter name conflict in retry substrate _retry() methods by @Arthurdw in https://github.com/opentensor/async-substrate-interface/pull/218 +* Use uv for test dependencies by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/219 +* Reconnection/Resubmission Logic Improved by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/217 + +## New Contributors +* @Arthurdw made their first contribution in https://github.com/opentensor/async-substrate-interface/pull/218 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.7...v1.5.8 + ## 1.5.7 /2025-10-15 * Updates the type hint on ws_shutdown_timer in RetryAsyncSubstrate by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/203 * correct type hint by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/204 diff --git a/pyproject.toml b/pyproject.toml index f194af7..ddb2b89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.7" +version = "1.5.8" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }