diff --git a/.github/workflows/check-sdk-tests.yml b/.github/workflows/check-sdk-tests.yml index 1e2e295..9e40b61 100644 --- a/.github/workflows/check-sdk-tests.yml +++ b/.github/workflows/check-sdk-tests.yml @@ -15,9 +15,17 @@ on: - staging types: [opened, synchronize, reopened, labeled, unlabeled] + workflow_dispatch: + inputs: + bittensor_branch: + description: "Optional: Bittensor branch to use instead of staging" + required: false + default: "staging" + env: CARGO_TERM_COLOR: always VERBOSE: ${{ github.event.inputs.verbose }} + BITTENSOR_BRANCH: ${{ github.event.inputs.bittensor_branch || 'staging' }} jobs: apply-label-to-new-pr: @@ -54,8 +62,16 @@ jobs: - name: Check out repository uses: actions/checkout@v4 - - name: Get labels from PR + - name: Skip label check for manual runs id: get-labels + if: ${{ github.event_name == 'workflow_dispatch' }} + run: | + echo "Manual workflow dispatch detected, skipping PR label check." + echo "run-sdk-tests=true" >> $GITHUB_OUTPUT + + - name: Get labels from PR + id: get-labels-pr + if: ${{ github.event_name == 'pull_request' }} run: | sleep 5 LABELS=$(gh api repos/${{ github.repository }}/issues/${{ github.event.pull_request.number }}/labels --jq '.[].name') @@ -79,9 +95,15 @@ jobs: working-directory: ${{ github.workspace }} run: git clone https://github.com/opentensor/bittensor.git - - name: Checkout + - name: Verify and checkout Bittensor branch working-directory: ${{ github.workspace }}/bittensor - run: git checkout staging + run: | + if ! git fetch origin $BITTENSOR_BRANCH; then + echo "❌ Error: Branch '$BITTENSOR_BRANCH' does not exist in opentensor/bittensor." + exit 1 + fi + git checkout FETCH_HEAD + echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" - name: Install dependencies run: sudo apt-get install -y jq @@ -90,11 +112,14 @@ jobs: id: get-tests run: | test_files=$(find ${{ github.workspace }}/bittensor/tests/e2e_tests -name "test*.py" | jq -R -s -c 'split("\n") | map(select(. != ""))') - echo "::set-output name=test-files::$test_files" + echo "test-files=$test_files" >> $GITHUB_OUTPUT shell: bash pull-docker-image: - needs: check-labels + needs: + - check-labels + - find-e2e-tests + runs-on: ubuntu-latest if: always() && needs.check-labels.outputs.run-sdk-tests == 'true' steps: @@ -163,8 +188,12 @@ jobs: working-directory: ${{ github.workspace }}/bittensor run: | source ${{ github.workspace }}/venv/bin/activate - git checkout staging - git fetch origin staging + if ! git fetch origin $BITTENSOR_BRANCH; then + echo "❌ Error: Branch '$BITTENSOR_BRANCH' does not exist in opentensor/bittensor." + exit 1 + fi + git checkout FETCH_HEAD + echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" python3 -m pip install --upgrade pip python3 -m pip install '.[dev]' @@ -225,8 +254,12 @@ jobs: working-directory: ${{ github.workspace }}/bittensor run: | source ${{ github.workspace }}/venv/bin/activate - git checkout staging - git fetch origin staging + if ! git fetch origin $BITTENSOR_BRANCH; then + echo "❌ Error: Branch '$BITTENSOR_BRANCH' does not exist in opentensor/bittensor." + exit 1 + fi + git checkout FETCH_HEAD + echo "✅ Using Bittensor branch: $BITTENSOR_BRANCH" python3 -m pip install --upgrade pip python3 -m pip install '.[dev]' diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index f87dff4..4bd819b 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -1,4 +1,4 @@ -name: E2E Tests +name: ASI E2E Tests concurrency: group: e2e-${{ github.event.pull_request.number || github.ref }} diff --git a/CHANGELOG.md b/CHANGELOG.md index bab699d..4bd1e14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,17 @@ # Changelog +## 1.5.7 /2025-10-15 +* Updates the type hint on ws_shutdown_timer in RetryAsyncSubstrate by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/203 +* correct type hint by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/204 +* Clear asyncio.Queue after retrieval by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/206 +* Add the option to manually specify the Bittensor branch when running with `workflow_dispatch` by @basfroman in https://github.com/opentensor/async-substrate-interface/pull/208 +* Subscription Exception Handling by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/207 +* more efficient query map by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/211 +* Unique keys in request manager by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/212 +* Adds type annotations for Runtime by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/214 +* Edge case ss58 decoding in decode_query_map by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/213 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.5.6...v1.5.7 + ## 1.5.6 /2025-10-08 * Clean Up Error Handling by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/193 * Avoids ID of 'None' in queries by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/196 diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index a93af0c..d7df725 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -55,6 +55,7 @@ RuntimeCache, SubstrateMixin, Preprocessed, + RequestResults, ) from async_substrate_interface.utils import ( hex_to_bytes, @@ -561,7 +562,7 @@ def __init__( self._received: dict[str, asyncio.Future] = {} self._received_subscriptions: dict[str, asyncio.Queue] = {} self._sending: Optional[asyncio.Queue] = None - self._send_recv_task = None + self._send_recv_task: Optional[asyncio.Task] = None self._inflight: dict[str, str] = {} self._attempts = 0 self._lock = asyncio.Lock() @@ -747,7 +748,7 @@ async def _start_receiving(self, ws: ClientConnection) -> Exception: elif isinstance(e, websockets.exceptions.ConnectionClosedOK): logger.debug("Websocket connection closed.") else: - logger.debug(f"Timeout occurred. Reconnecting.") + logger.debug(f"Timeout occurred.") return e async def _start_sending(self, ws) -> Exception: @@ -755,6 +756,7 @@ async def _start_sending(self, ws) -> Exception: try: while True: to_send_ = await self._sending.get() + self._sending.task_done() send_id = to_send_["id"] to_send = json.dumps(to_send_) async with self._lock: @@ -779,7 +781,7 @@ async def _start_sending(self, ws) -> Exception: elif isinstance(e, websockets.exceptions.ConnectionClosedOK): logger.debug("Websocket connection closed.") else: - logger.debug("Timeout occurred. Reconnecting.") + logger.debug("Timeout occurred.") return e async def send(self, payload: dict) -> str: @@ -848,7 +850,9 @@ async def retrieve(self, item_id: str) -> Optional[dict]: return res else: try: - return self._received_subscriptions[item_id].get_nowait() + subscription = self._received_subscriptions[item_id].get_nowait() + self._received_subscriptions[item_id].task_done() + return subscription except asyncio.QueueEmpty: pass if self._send_recv_task is not None and self._send_recv_task.done(): @@ -856,6 +860,9 @@ async def retrieve(self, item_id: str) -> Optional[dict]: if isinstance((e := self._send_recv_task.exception()), Exception): logger.exception(f"Websocket sending exception: {e}") raise e + elif isinstance((e := self._send_recv_task.result()), Exception): + logger.exception(f"Websocket sending exception: {e}") + raise e await asyncio.sleep(0.1) return None @@ -874,7 +881,7 @@ def __init__( retry_timeout: float = 60.0, _mock: bool = False, _log_raw_websockets: bool = False, - ws_shutdown_timer: float = 5.0, + ws_shutdown_timer: Optional[float] = 5.0, decode_ss58: bool = False, ): """ @@ -2385,9 +2392,12 @@ async def _make_rpc_request( attempt: int = 1, runtime: Optional[Runtime] = None, force_legacy_decode: bool = False, - ) -> RequestManager.RequestResults: + ) -> RequestResults: request_manager = RequestManager(payloads) + if len(set(x["id"] for x in payloads)) != len(payloads): + raise ValueError("Payloads must have unique ids") + subscription_added = False async with self.ws as ws: @@ -3663,34 +3673,41 @@ async def query_map( self.decode_ss58, ) else: - all_responses = [] + # storage item and value scale type are not included here because this is batch-decoded in rust page_batches = [ result_keys[i : i + page_size] for i in range(0, len(result_keys), page_size) ] changes = [] - for batch_group in [ - # run five concurrent batch pulls; could go higher, but it's good to be a good citizens - # of the ecosystem - page_batches[i : i + 5] - for i in range(0, len(page_batches), 5) - ]: - all_responses.extend( - await asyncio.gather( - *[ - self.rpc_request( - method="state_queryStorageAt", - params=[batch_keys, block_hash], - runtime=runtime, - ) - for batch_keys in batch_group - ] + payloads = [] + for idx, page_batch in enumerate(page_batches): + payloads.append( + self.make_payload( + str(idx), "state_queryStorageAt", [page_batch, block_hash] ) ) - for response in all_responses: - for result_group in response["result"]: - changes.extend(result_group["changes"]) - + results: RequestResults = await self._make_rpc_request( + payloads, runtime=runtime + ) + for result in results.values(): + res = result[0] + if "error" in res: + err_msg = res["error"]["message"] + if ( + "Client error: Api called for an unknown Block: State already discarded" + in err_msg + ): + bh = err_msg.split("State already discarded for ")[ + 1 + ].strip() + raise StateDiscardedError(bh) + else: + raise SubstrateRequestException(err_msg) + elif "result" not in res: + raise SubstrateRequestException(res) + else: + for result_group in res["result"]: + changes.extend(result_group["changes"]) result = decode_query_map( changes, prefix, diff --git a/async_substrate_interface/substrate_addons.py b/async_substrate_interface/substrate_addons.py index 4f2412f..3b0f0ba 100644 --- a/async_substrate_interface/substrate_addons.py +++ b/async_substrate_interface/substrate_addons.py @@ -264,7 +264,7 @@ def __init__( _mock: bool = False, _log_raw_websockets: bool = False, archive_nodes: Optional[list[str]] = None, - ws_shutdown_timer: float = 5.0, + ws_shutdown_timer: Optional[float] = 5.0, ): fallback_chains = fallback_chains or [] archive_nodes = archive_nodes or [] diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 1575a8e..5349b31 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -34,6 +34,7 @@ RequestManager, Preprocessed, ScaleObj, + RequestResults, ) from async_substrate_interface.utils import ( hex_to_bytes, @@ -1892,9 +1893,13 @@ def _make_rpc_request( result_handler: Optional[ResultHandler] = None, attempt: int = 1, force_legacy_decode: bool = False, - ) -> RequestManager.RequestResults: + ) -> RequestResults: request_manager = RequestManager(payloads) _received = {} + + if len(set(x["id"] for x in payloads)) != len(payloads): + raise ValueError("Payloads must have unique ids") + subscription_added = False ws = self.connect(init=False if attempt == 1 else True) diff --git a/async_substrate_interface/types.py b/async_substrate_interface/types.py index a5cac00..f1b4917 100644 --- a/async_substrate_interface/types.py +++ b/async_substrate_interface/types.py @@ -6,7 +6,9 @@ from datetime import datetime from typing import Optional, Union, Any +import scalecodec.types from bt_decode import PortableRegistry, encode as encode_by_type_string +from bt_decode.bt_decode import MetadataV15 from scalecodec import ss58_encode, ss58_decode, is_valid_ss58_address from scalecodec.base import RuntimeConfigurationObject, ScaleBytes from scalecodec.type_registry import load_type_registry_preset @@ -121,13 +123,13 @@ class Runtime: def __init__( self, chain: str, - metadata, - type_registry, + metadata: scalecodec.types.GenericMetadataVersioned, + type_registry: dict, runtime_config: Optional[RuntimeConfigurationObject] = None, - metadata_v15=None, - runtime_info=None, - registry=None, - ss58_format=SS58_FORMAT, + metadata_v15: Optional[MetadataV15] = None, + runtime_info: Optional[dict] = None, + registry: Optional[PortableRegistry] = None, + ss58_format: int = SS58_FORMAT, ): self.ss58_format = ss58_format self.config = {} @@ -369,9 +371,10 @@ def resolve_type_definition(type_id_): self.type_id_to_name = type_id_to_name -class RequestManager: - RequestResults = dict[Union[str, int], list[Union[ScaleType, dict]]] +RequestResults = dict[Union[str, int], list[Union[ScaleType, dict]]] + +class RequestManager: def __init__(self, payloads): self.response_map = {} self.responses = defaultdict( diff --git a/async_substrate_interface/utils/decoding.py b/async_substrate_interface/utils/decoding.py index 9557159..8b191b3 100644 --- a/async_substrate_interface/utils/decoding.py +++ b/async_substrate_interface/utils/decoding.py @@ -113,7 +113,9 @@ def concat_hash_len(key_hasher: str) -> int: for item in result_group_changes: pre_decoded_keys.append(bytes.fromhex(item[0][len(prefix) :])) - pre_decoded_values.append(hex_to_bytes_(item[1])) + pre_decoded_values.append( + hex_to_bytes_(item[1]) if item[1] is not None else b"" + ) all_decoded = _decode_scale_list_with_runtime( pre_decoded_key_types + pre_decoded_value_types, pre_decoded_keys + pre_decoded_values, @@ -133,7 +135,10 @@ def concat_hash_len(key_hasher: str) -> int: if len(param_types) - len(params) == 1: item_key = dk[1] if decode_ss58: - if kts[kts.index(", ") + 2 : kts.index(")")] == "scale_info::0": + if ( + isinstance(item_key[0], (tuple, list)) + and kts[kts.index(", ") + 2 : kts.index(")")] == "scale_info::0" + ): item_key = ss58_encode(bytes(item_key[0]), runtime.ss58_format) else: try: diff --git a/pyproject.toml b/pyproject.toml index 2fd3b5b..f194af7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.5.6" +version = "1.5.7" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }