Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.0.0rc11 /2025-02-06
* Reuses the websocket for sync Substrate by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/29
* Feat/metadata v15 cache by @camfairchild in https://github.com/opentensor/async-substrate-interface/pull/30
* Backmerge main to staging rc10 by @ibraheem-opentensor in https://github.com/opentensor/async-substrate-interface/pull/31

## 1.0.0rc10 /2025-02-04
* Fixes decoding account ids for sync substrate

Expand Down
127 changes: 120 additions & 7 deletions async_substrate_interface/async_substrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@
Preprocessed,
)
from async_substrate_interface.utils import hex_to_bytes, json
from async_substrate_interface.utils.decoding import (
_determine_if_old_runtime_call,
_bt_decode_to_dict_or_list,
)
from async_substrate_interface.utils.storage import StorageKey
from async_substrate_interface.type_registry import _TYPE_REGISTRY

if TYPE_CHECKING:
from websockets.asyncio.client import ClientConnection
Expand Down Expand Up @@ -685,7 +690,7 @@ def __init__(
self.ws = Websocket(
url,
options={
"max_size": 2**32,
"max_size": self.ws_max_size,
"write_limit": 2**16,
},
)
Expand All @@ -706,6 +711,8 @@ def __init__(
ss58_format=self.ss58_format, implements_scale_info=True
)
self._metadata_cache = {}
self._metadata_v15_cache = {}
self._old_metadata_v15 = None
self._nonces = {}
self.metadata_version_hex = "0x0f000000" # v15
self.reload_type_registry()
Expand Down Expand Up @@ -800,6 +807,20 @@ async def load_registry(self):
)
self.registry = PortableRegistry.from_metadata_v15(self.metadata_v15)

async def _load_registry_at_block(self, block_hash: str) -> MetadataV15:
# Should be called for any block that fails decoding.
# Possibly the metadata was different.
metadata_rpc_result = await self.rpc_request(
"state_call",
["Metadata_metadata_at_version", self.metadata_version_hex],
block_hash=block_hash,
)
metadata_option_hex_str = metadata_rpc_result["result"]
metadata_option_bytes = bytes.fromhex(metadata_option_hex_str[2:])
old_metadata = MetadataV15.decode_from_metadata_option(metadata_option_bytes)

return old_metadata

async def _wait_for_registry(self, _attempt: int = 1, _retries: int = 3) -> None:
async def _waiter():
while self.registry is None:
Expand Down Expand Up @@ -930,7 +951,10 @@ async def get_runtime(block_hash, block_id) -> Runtime:
if (
(block_hash and block_hash == self.last_block_hash)
or (block_id and block_id == self.block_id)
) and self._metadata is not None:
) and all(
x is not None
for x in [self._metadata, self._old_metadata_v15, self.metadata_v15]
):
return Runtime(
self.chain,
self.runtime_config,
Expand Down Expand Up @@ -976,9 +1000,9 @@ async def get_runtime(block_hash, block_id) -> Runtime:
f"No runtime information for block '{block_hash}'"
)
# Check if runtime state already set to current block
if (
runtime_info.get("specVersion") == self.runtime_version
and self._metadata is not None
if runtime_info.get("specVersion") == self.runtime_version and all(
x is not None
for x in [self._metadata, self._old_metadata_v15, self.metadata_v15]
):
return Runtime(
self.chain,
Expand All @@ -1002,6 +1026,8 @@ async def get_runtime(block_hash, block_id) -> Runtime:
self.runtime_version
]
else:
# TODO when I get time, I'd like to add this and the metadata v15 as tasks with callbacks
# TODO to update the caches, but I don't have time now.
metadata = self._metadata = await self.get_block_metadata(
block_hash=runtime_block_hash, decode=True
)
Expand All @@ -1015,6 +1041,30 @@ async def get_runtime(block_hash, block_id) -> Runtime:
self._metadata_cache[self.runtime_version] = self._metadata
else:
metadata = self._metadata

if self.runtime_version in self._metadata_v15_cache:
# Get metadata v15 from cache
logging.debug(
"Retrieved metadata v15 for {} from memory".format(
self.runtime_version
)
)
metadata_v15 = self._old_metadata_v15 = self._metadata_v15_cache[
self.runtime_version
]
else:
metadata_v15 = (
self._old_metadata_v15
) = await self._load_registry_at_block(block_hash=runtime_block_hash)
logging.debug(
"Retrieved metadata v15 for {} from Substrate node".format(
self.runtime_version
)
)

# Update metadata v15 cache
self._metadata_v15_cache[self.runtime_version] = metadata_v15

# Update type registry
self.reload_type_registry(use_remote_preset=False, auto_discover=True)

Expand Down Expand Up @@ -2487,6 +2537,56 @@ async def get_chain_finalised_head(self):

return response.get("result")

async def _do_runtime_call_old(
self,
api: str,
method: str,
params: Optional[Union[list, dict]] = None,
block_hash: Optional[str] = None,
) -> ScaleType:
logging.debug(
f"Decoding old runtime call: {api}.{method} with params: {params} at block hash: {block_hash}"
)
runtime_call_def = _TYPE_REGISTRY["runtime_api"][api]["methods"][method]

# Encode params
param_data = b""

if "encoder" in runtime_call_def:
param_data = runtime_call_def["encoder"](params)
else:
for idx, param in enumerate(runtime_call_def["params"]):
param_type_string = f"{param['type']}"
if isinstance(params, list):
param_data += await self.encode_scale(
param_type_string, params[idx]
)
else:
if param["name"] not in params:
raise ValueError(
f"Runtime Call param '{param['name']}' is missing"
)

param_data += await self.encode_scale(
param_type_string, params[param["name"]]
)

# RPC request
result_data = await self.rpc_request(
"state_call", [f"{api}_{method}", param_data.hex(), block_hash]
)
result_vec_u8_bytes = hex_to_bytes(result_data["result"])
result_bytes = await self.decode_scale("Vec<u8>", result_vec_u8_bytes)

# Decode result
# Get correct type
result_decoded = runtime_call_def["decoder"](bytes(result_bytes))
as_dict = _bt_decode_to_dict_or_list(result_decoded)
logging.debug("Decoded old runtime call result: ", as_dict)
result_obj = ScaleObj(as_dict)

return result_obj

async def runtime_call(
self,
api: str,
Expand All @@ -2513,14 +2613,27 @@ async def runtime_call(
params = {}

try:
metadata_v15 = self.metadata_v15.value()
apis = {entry["name"]: entry for entry in metadata_v15["apis"]}
if block_hash:
# Use old metadata v15 from init_runtime call
metadata_v15 = self._old_metadata_v15
else:
metadata_v15 = self.metadata_v15

self.registry = PortableRegistry.from_metadata_v15(metadata_v15)
metadata_v15_value = metadata_v15.value()

apis = {entry["name"]: entry for entry in metadata_v15_value["apis"]}
api_entry = apis[api]
methods = {entry["name"]: entry for entry in api_entry["methods"]}
runtime_call_def = methods[method]
except KeyError:
raise ValueError(f"Runtime API Call '{api}.{method}' not found in registry")

if _determine_if_old_runtime_call(runtime_call_def, metadata_v15_value):
result = await self._do_runtime_call_old(api, method, params, block_hash)

return result

if isinstance(params, list) and len(params) != len(runtime_call_def["inputs"]):
raise ValueError(
f"Number of parameter provided ({len(params)}) does not "
Expand Down
Loading