Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ async def connect(self, force=False):
try:
await asyncio.wait_for(self._cancel(), timeout=10.0)
except asyncio.TimeoutError:
logger.debug(f"Timed out waiting for cancellation")
pass
self.ws = await asyncio.wait_for(
connect(self.ws_url, **self._options), timeout=10.0
Expand All @@ -618,8 +619,9 @@ async def connect(self, force=False):
self._send_recv_task = asyncio.get_running_loop().create_task(
self._handler(self.ws)
)
logger.debug("Websocket handler attached.")

async def _handler(self, ws: ClientConnection) -> None:
async def _handler(self, ws: ClientConnection) -> Union[None, Exception]:
recv_task = asyncio.create_task(self._start_receiving(ws))
send_task = asyncio.create_task(self._start_sending(ws))
done, pending = await asyncio.wait(
Expand Down Expand Up @@ -652,6 +654,7 @@ async def _handler(self, ws: ClientConnection) -> None:
)
await self.connect(True)
await self._handler(ws=self.ws)
return None
elif isinstance(e := recv_task.result(), Exception):
return e
elif isinstance(e := send_task.result(), Exception):
Expand Down Expand Up @@ -834,8 +837,10 @@ async def retrieve(self, item_id: str) -> Optional[dict]:
except asyncio.QueueEmpty:
pass
if self._send_recv_task is not None and self._send_recv_task.done():
if isinstance(e := self._send_recv_task.result(), Exception):
raise e
if not self._send_recv_task.cancelled():
if isinstance((e := self._send_recv_task.exception()), Exception):
logger.exception(f"Websocket sending exception: {e}")
raise e
await asyncio.sleep(0.1)
return None

Expand Down Expand Up @@ -2377,8 +2382,13 @@ async def _make_rpc_request(
for payload in payloads:
item_id = await ws.send(payload["payload"])
request_manager.add_request(item_id, payload["id"])
# truncate to 2000 chars for debug logging
if len(stringified_payload := str(payload)) < 2_000:
output_payload = stringified_payload
else:
output_payload = f"{stringified_payload[:2_000]} (truncated)"
logger.debug(
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {payload}"
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}"
)

while True:
Expand Down Expand Up @@ -2420,6 +2430,7 @@ async def _make_rpc_request(
request_manager.add_response(
item_id, decoded_response, complete
)
# truncate to 2000 chars for debug logging
if (
len(stringified_response := str(decoded_response))
< 2_000
Expand Down
8 changes: 7 additions & 1 deletion async_substrate_interface/sync_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1904,8 +1904,13 @@ def _make_rpc_request(
raw_websocket_logger.debug(f"WEBSOCKET_SEND> {to_send}")
ws.send(to_send)
request_manager.add_request(item_id, payload["id"])
# truncate to 2000 chars for debug logging
if len(stringified_payload := str(payload)) < 2_000:
output_payload = stringified_payload
else:
output_payload = f"{stringified_payload[:2_000]} (truncated)"
logger.debug(
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {payload}"
f"Submitted payload ID {payload['id']} with websocket ID {item_id}: {output_payload}"
)

while True:
Expand Down Expand Up @@ -1968,6 +1973,7 @@ def _make_rpc_request(
request_manager.add_response(
item_id, decoded_response, complete
)
# truncate to 2000 chars for debug logging
if len(stringified_response := str(decoded_response)) < 2_000:
output_response = stringified_response
# avoids clogging logs up needlessly (esp for Metadata stuff)
Expand Down
Loading