In [2]:
from __future__ import annotations

from substrateinterface import SubstrateInterface

from scripts.utils.chain_model import Chain, ChainAsset
from scripts.utils.chain_model import NativeAssetType, StatemineAssetType, UnsupportedAssetType, OrmlAssetType
from utils.work_with_data import get_data_from_file

In [3]:
def dry_run_api_ext(runtime_types_prefix: str | None) -> dict | None:
    if runtime_types_prefix is None:
        return None
    
    return {
        "runtime_api": {
            "DryRunApi": {
                "methods": {
                    "dry_run_call": {
                        "description": "Dry run runtime call",
                        "params": [
                            {
                                "name": "origin_caller",
                                "type": f"{runtime_types_prefix}::OriginCaller"
                            },
                            {
                                "name": "call",
                                "type": "GenericCall"
                            }
                        ],
                        "type": "CallDryRunEffectsResult"
                    },
                    "dry_run_xcm": {
                        "description": "Dry run xcm program",
                        "params": [
                            {
                                "name": "origin_location",
                                "type": "xcm::VersionedLocation"
                            },
                            {
                                "name": "xcm",
                                "type": "xcm::VersionedXcm"
                            }
                        ],
                        "type": "XcmDryRunEffectsResult"
                    },
                },
                "types": {
                    "CallDryRunEffectsResult": {
                        "type": "enum",
                        "type_mapping": [
                            [
                                "Ok",
                                "CallDryRunEffects"
                            ],
                            [
                                "Error",
                                "DryRunEffectsResultErr"
                            ]
                        ]
                    },
                    "DryRunEffectsResultErr": {
                        "type": "enum",
                        "value_list": [
                            "Unimplemented",
                            "VersionedConversionFailed"
                        ]
                    },
                    "CallDryRunEffects": {
                        "type": "struct",
                        "type_mapping": [
                            [
                                "execution_result",
                                "CallDryRunExecutionResult" 
                            ],
                            [
                                "emitted_events",
                                f"Vec<{runtime_types_prefix}::RuntimeEvent>"
                            ],
                            [
                                "local_xcm",
                                "Option<xcm::VersionedXcm>"
                            ],
                            [
                                "forwarded_xcms",
                                "Vec<(xcm::VersionedLocation, Vec<xcm::VersionedXcm>)>"
                            ]
                        ]
                    },
                    "CallDryRunExecutionResult": {
                        "type": "enum",
                        "type_mapping": [
                            [
                                "Ok",
                                "DispatchPostInfo"
                            ],
                            [
                                "Error",
                                "DispatchErrorWithPostInfo"
                            ]
                        ]
                    },
                    "XcmDryRunEffects": {
                        "type": "struct",
                        "type_mapping": [
                            [
                                "execution_result",
                                "staging_xcm::v4::traits::Outcome"
                            ],
                            [
                                "emitted_events",
                                f"Vec<{runtime_types_prefix}::RuntimeEvent>"
                            ],
                            [
                                "forwarded_xcms",
                                "Vec<(xcm::VersionedLocation, Vec<xcm::VersionedXcm>)>"
                            ]
                        ]
                    },
                    "XcmDryRunEffectsResult": {
                        "type": "enum",
                        "type_mapping": [
                            [
                                "Ok",
                                "XcmDryRunEffects"
                            ],
                            [
                                "Error",
                                "DryRunEffectsResultErr"
                            ]
                        ]
                    },
                    "DispatchErrorWithPostInfo": {
                       "type": "struct",
                        "type_mapping": [
                            [
                                "post_info",
                                "DispatchPostInfo" 
                            ],
                            [
                                "error",
                                "sp_runtime::DispatchError"
                            ],
                        ] 
                    },
                    "DispatchPostInfo": {
                        "type": "struct",
                        "type_mapping": [
                            [
                                "actual_weight",
                                "Option<Weight>" 
                            ],
                            [
                                "pays_fee",
                                "frame_support::dispatch::Pays"
                            ],
                        ] 
                    }
                }
            },
        }
    }

In [4]:
from dataclasses import dataclass
import types
from abc import ABC, abstractmethod
from substrateinterface.utils.ss58 import ss58_decode
from typing import Union, List, Self, Callable, TypeVar

xcm_version_mode_consts = types.SimpleNamespace()
xcm_version_mode_consts.AlreadyVersioned = "AlreadyVersioned"
xcm_version_mode_consts.DefaultVersion = "DefaultVersion"


class VerionsedXcm:
    unversioned: dict
    versioned: dict | List
    version: int

    @staticmethod
    def default_xcm_version() -> int:
        return 4

    def __init__(self,
                 message: dict | List,
                 message_version: str | int = xcm_version_mode_consts.DefaultVersion,
                 ):
        match message_version:
            case int():
                self._init_from_unversioned(message, version=message_version)
            case xcm_version_mode_consts.AlreadyVersioned:
                self._init_from_versioned(message)
            case xcm_version_mode_consts.DefaultVersion:
                self._init_from_unversioned(message, version=None)
            case _:
                raise Exception(f"Unknown message version mode: {message_version}")

    def _init_from_versioned(self, message: dict):
        if type(message) is not dict:
            raise Exception(f"Already versioned xcm must be a dict with a single version key, got: {message}")

        version_key = next(iter(message))

        self.version = self._parse_version(version_key)
        self.versioned = message
        self.unversioned = message[version_key]

    def _init_from_unversioned(self, message: dict | List, version: int | None):
        if version is None:
            self.version = VerionsedXcm.default_xcm_version()
        else:
            self.version = version

        self.unversioned = message
        self.versioned = {f"V{self.version}": message}

    @staticmethod
    def _parse_version(version_key: str):
        return int(version_key.removeprefix("V"))

    def __str__(self):
        return str(self.versioned)

    def is_v4(self) -> bool:
        return self.version == 4


def multi_location(parents: int, junctions: List[dict]) -> VerionsedXcm:
    interior = "Here"

    if len(junctions) > 0:
        interior_variant = f"X{len(junctions)}"

        if VerionsedXcm.default_xcm_version() <= 3 and len(junctions) == 1:
            interior = {interior_variant: junctions[0]}
        else:
            interior = {interior_variant: junctions}

    return VerionsedXcm({"parents": parents, "interior": interior})


def asset_id(location: VerionsedXcm) -> VerionsedXcm:
    if location.is_v4():
        return location
    else:
        return VerionsedXcm({"Concrete": location.unversioned})


def asset(location: VerionsedXcm, amount: int) -> VerionsedXcm:
    return VerionsedXcm({"id": asset_id(location).unversioned, "fun": {"Fungible": amount}})


def assets(location: VerionsedXcm, amount: int) -> VerionsedXcm:
    return VerionsedXcm([asset(location, amount).unversioned])


def absolute_location(junctions: List[dict]) -> VerionsedXcm:
    return multi_location(parents=0, junctions=junctions)


def xcm_program(message: List[dict] | dict[str, List]) -> VerionsedXcm:
    # List of instructions
    if type(message) is list:
        return VerionsedXcm(fix_asymmetrical_decoding(message))
    # Versioned program
    else:
        return VerionsedXcm(message, message_version=xcm_version_mode_consts.AlreadyVersioned)
        # instructions = fix_asymmetrical_decoding(xcm_asymmetrical.unversioned)
        # return VerionsedXcm([instructions], message_version=xcm_asymmetrical.version)


_affected_by_asymmetrical_decoding = ["ReceiveTeleportedAsset", "ReserveAssetDeposited", "WithdrawAsset"]


# Tuple of size 1 decodes to just its value but upon encoding requires a Tuple
# This is causing just decoded Assets to fail when passing to encode function
def fix_asymmetrical_decoding(instructions: List[dict]) -> List[dict]:
    return instructions

    # for instruction in instructions:
    #     for affected in _affected_by_asymmetrical_decoding:
    #         if affected in instruction:
    #             affected_value = instruction[affected]
    #             if type(affected_value) is list and type(affected_value[0]) is not list:
    #                 print(f"Fixing asymmetrical_decoding for {instruction}")
    #                 instruction[affected] = [affected_value]
    # 
    # return instructions


def buy_execution(fees_asset: VerionsedXcm, weight_limit: str | dict = "Unlimited"):
    return {'BuyExecution': {'fees': fees_asset.unversioned, 'weight_limit': weight_limit}}


def deposit_asset(
        beneficiary: str,
        evm: bool
):
    return {"DepositAsset": {"assets": {'Wild': {'AllCounted': 1}},
                             "beneficiary": absolute_location(
                                 junctions=[account_junction(beneficiary, evm)]).unversioned}}


def account_junction(
        account: str,
        evm: bool,
) -> dict:
    account_id = decode_account_id(account)

    if evm:
        return {"AccountKey20": {"network": None, "key": account_id}}
    else:
        return {"AccountId32": {"network": None, "id": account_id}}


def decode_account_id(address: str) -> str:
    if address.startswith("0x"):
        return address.lower()
    else:
        return "0x" + ss58_decode(address)


def parachain_junction(parachain_id: int) -> dict:
    return {"Parachain": parachain_id}


cacheMissing = object()


class ReserveLocation:
    symbol: str
    junctions: List[dict]
    _registry: 'XcmRegistry'

    _cached_parachain_id: int | None | cacheMissing = cacheMissing

    def __init__(self, symbol: str, junctions: List[dict], registry: 'XcmRegistry'):
        self.symbol = symbol
        self.junctions = junctions
        self._registry = registry

    def parachain_id(self) -> int | None:
        if self._cached_parachain_id != cacheMissing:
            return self._cached_parachain_id

        computed_value = next(
            (junction["Parachain"] for junction in self.junctions if self.__is_parachain_junction(junction)), None)
        self._cached_parachain_id = computed_value

        return computed_value

    def reserve_chain(self) -> 'XcmChain':
        parachain_id = self.parachain_id()

        if parachain_id is not None:
            return self._registry.get_parachain(parachain_id)
        else:
            return self._registry.relay

    def local_junctions(self):
        index_of_parachain_junction = next(
            (i for i, junction in enumerate(self.junctions) if self.__is_parachain_junction(junction)), None)
        if index_of_parachain_junction is not None:
            return self.junctions[index_of_parachain_junction + 1:]
        else:
            return self.junctions

    def __is_parachain_junction(self, junction: dict) -> bool:
        return junction["Parachain"] is not None


class ReserveLocations:
    _data: dict[str, List[dict]]
    _registry: 'XcmRegistry'

    def __init__(self, registry: 'XcmRegistry'):
        self._data = self.__get_data()
        self._registry = registry

    def get_reserve(self, chain_asset: ChainAsset):
        symbol = chain_asset.unified_symbol()

        return ReserveLocation(symbol, self._data[symbol], self._registry)

    def __get_data(self):
        return {
            "DOT": [],
            "USDT": [{"Parachain": 1000}, {"PalletInstance": 50}, {"GeneralIndex": 1984}],
            "GLMR": [{"Parachain": 2004}, {"PalletInstance": 10}]
        }

@dataclass
class Parachain:
    parachain_id: int
    chain: Chain

class XcmRegistry:
    reserves: ReserveLocations
    relay: 'XcmChain'
    parachains: List['XcmChain']

    def __init__(
            self,
            relay: Chain,
            parachains: List[Parachain],
    ):
        self.reserves = ReserveLocations(self)
        self.relay = XcmChain(relay, parachain_id=None, registry=self)
        self.parachains = [XcmChain(parachain.chain, parachain_id=parachain.parachain_id, registry=self) for parachain in parachains]

    def get_parachain(self, parachain_id: int) -> 'XcmChain':
        return next((parachain for parachain in self.parachains if parachain.parachain_id == parachain_id))

    def get_chain(self, chain_id: int) -> 'XcmChain':
        return next((parachain for parachain in self.parachains if parachain.chain.chainId == chain_id))


xcm_pallet_aliases = ["PolkadotXcm", "XcmPallet"]

T = TypeVar('T')


class TransferType(ABC):

    @abstractmethod
    def check_remote_reserve(self) -> Union['XcmChain', None]:
        pass

    @abstractmethod
    def transfer_type_call_param(self) -> dict | str:
        pass


class Teleport(TransferType):

    def check_remote_reserve(self) -> Union['XcmChain', None]:
        return None

    def transfer_type_call_param(self) -> dict | str:
        return "Teleport"


class LocalReserve(TransferType):

    def check_remote_reserve(self) -> Union['XcmChain', None]:
        return None

    def transfer_type_call_param(self) -> dict | str:
        return "LocalReserve"


class DestinationReserve(TransferType):

    def check_remote_reserve(self) -> Union['XcmChain', None]:
        return None

    def transfer_type_call_param(self) -> dict | str:
        return "DestinationReserve"


class RemoteReserve(TransferType):
    _origin_chain: 'XcmChain'
    _reserve_chain: 'XcmChain'
    _registry: XcmRegistry

    def __init__(self, origin_chain: 'XcmChain', reserve_chain: 'XcmChain', registry: 'XcmRegistry'):
        self._reserve_chain = reserve_chain
        self._registry = registry
        self._origin_chain = origin_chain

    def check_remote_reserve(self) -> Union['XcmChain', None]:
        return self._reserve_chain

    def transfer_type_call_param(self) -> dict | str:
        return {"RemoteReserve": self._origin_chain.sibling_location_of(self._reserve_chain).versioned}


class XcmChain:
    chain: Chain
    parachain_id: int | None
    _registry: XcmRegistry

    def __init__(
            self,
            chain: Chain,
            parachain_id: int | None,
            registry: XcmRegistry,
    ):
        self.chain = chain
        self._registry = registry
        self.parachain_id = parachain_id

    def access_substrate(self, action: Callable[[SubstrateInterface], T]) -> T:
        return self.chain.access_substrate(action)

    def sibling_location_of(self, destination_chain: Self) -> VerionsedXcm:
        parents = 1 if self.parachain_id is not None else 0

        junctions = []

        if destination_chain.parachain_id is not None:
            junctions.append(parachain_junction(destination_chain.parachain_id))

        return multi_location(parents=parents, junctions=junctions)

    def account_location(self, account: str):
        return multi_location(parents=0, junctions=[account_junction(account, evm=self.chain.has_evm_addresses())])

    def relative_reserve_location_of(self, chainAsset: ChainAsset) -> VerionsedXcm:
        reserve = self._registry.reserves.get_reserve(chainAsset)
        reserve_parachain_id = reserve.parachain_id()

        same_chain = reserve_parachain_id == self.parachain_id

        if same_chain:
            return multi_location(parents=0, junctions=reserve.local_junctions())
        else:
            return multi_location(parents=1, junctions=reserve.junctions)

    def xcm_pallet_alias(self) -> str:
        result = next((candidate for candidate in xcm_pallet_aliases if
                       self.access_substrate(lambda s: s.get_metadata_module(candidate)) is not None), None)

        if result is None:
            raise Exception(f"No XcmPallet or its aliases has been found. Searched aliases: {xcm_pallet_aliases}")

        return result

    def is_system_parachain(self) -> bool:
        return self.parachain_id is not None and 1000 <= self.parachain_id < 2000

    def is_relay(self) -> bool:
        return self.parachain_id is None

    def transfer_type(self, destination_chain: Self, chain_asset: ChainAsset) -> TransferType:
        reserve = self._registry.reserves.get_reserve(chain_asset)
        reserve_parachain_id = reserve.parachain_id()

        if self._should_use_teleport(destination_chain):
            return Teleport()
        elif self.parachain_id == reserve_parachain_id:
            return LocalReserve()
        elif destination_chain.parachain_id == reserve_parachain_id:
            return DestinationReserve()
        else:
            reserve_chain = reserve.reserve_chain()
            return RemoteReserve(origin_chain=self, reserve_chain=reserve_chain, registry=reserve_chain)

    def _should_use_teleport(self, destination_chain: Self) -> bool:
        to_relay_teleport = self.is_system_parachain() and destination_chain.is_relay()
        from_relay_teleport = self.is_relay() and destination_chain.is_system_parachain()

        return to_relay_teleport or from_relay_teleport
    
   
            

In [10]:
polkadot_id = "91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3"
polkadot_asset_hub_id = "68d56f15f85d3136970ec16946040bc1752654e906147f7e43e9d539d7c3de2f" 
polkadot_bridge_hub_id = "dcf691b5a3fbe24adc99ddc959c0561b973e329b1aef4c4b22e7bb2ddecb4464"
moonbeam_id = "fe58ea77779b7abda7da4ec526d14db9b1e9cd40a217c34892af80a9b332b76d"
bifrost_id = "262e1b2ad728475fd6fe88e62d34c200abe6fd693931ddad144059b1eb884e5b"
astar_id = "9eb76c5184c4ab8679d2d5d819fdf90b9c001403e9e17da2e14b6d8aec4029c6"

additional_xcm_data = get_data_from_file("xcm_additional_data.json")
chains_file = get_data_from_file("../chains/v21/chains_dev.json")

relay: Chain | None = None
parachains = []

for chain_config in chains_file:
    additional_xcm_chain_data = additional_xcm_data.get(chain_config["chainId"], None)
    
    if additional_xcm_chain_data is None:
        print(f"No additional xcm data found for {chain_config["name"]}, skipping")
        continue
    
    runtime_prefix = additional_xcm_chain_data["runtimePrefix"]
    type_registry = dry_run_api_ext(runtime_prefix)
    chain = Chain(chain_config, type_registry)
    
    if chain.chainId == polkadot_id:
        relay = chain
    else:
        parachain = Parachain(additional_xcm_chain_data["parachainId"], chain)
        parachains.append(parachain)

if relay is None:
    raise Exception("Relay was not found in configuration")

registry = XcmRegistry(relay, parachains)
polkadot = registry.relay
polkadot_asset_hub = registry.get_chain(polkadot_asset_hub_id)
moonbeam = registry.get_chain(moonbeam_id)
bifrost = registry.get_chain(bifrost_id)
astar = registry.get_chain(astar_id)


No additional xcm data found for Kusama, skipping
No additional xcm data found for Westend, skipping
No additional xcm data found for Westmint, skipping
No additional xcm data found for Kusama Asset Hub, skipping
No additional xcm data found for Karura, skipping
No additional xcm data found for Moonriver, skipping
No additional xcm data found for Moonbase Alpha, skipping
No additional xcm data found for Shiden, skipping
No additional xcm data found for Bifrost Kusama, skipping
No additional xcm data found for Kintsugi, skipping
No additional xcm data found for Edgeware, skipping
No additional xcm data found for Parallel Heiko, skipping
No additional xcm data found for Basilisk, skipping
No additional xcm data found for Altair, skipping
No additional xcm data found for Khala, skipping
No additional xcm data found for KILT Peregrine, skipping
No additional xcm data found for Calamari, skipping
No additional xcm data found for QUARTZ, skipping
No additional xcm data found for Bit.Country 

In [11]:
from scalecodec import GenericCall, GenericEvent


class XcmSentEvent:
    _attributes: dict
    sent_message: VerionsedXcm

    def __init__(self, event_data: dict):
        self._attributes = event_data["attributes"]
        self.sent_message = xcm_program(self._attributes["message"])
    


def is_call_run_error(execution_result: dict):
    return "Error" in execution_result


def handle_call_run_error_execution_result(chain: XcmChain, execution_result: dict):
    error = execution_result["Error"]["error"]

    print(error)

    error_description: str

    if "Module" in error:
        module_error = error["Module"]
        error_index = int(module_error["error"][2:4], 16) - 1
        print(error_index)

        error = chain.access_substrate(
            lambda s: s.metadata.get_module_error(module_index=module_error["index"], error_index=error_index))
        error_description = str(error)
    else:
        error_description = str(error)

    raise Exception(error_description)


def find_event(events: List, event_module: str, event_name: str) -> dict | None:
    return next((event for event in events if event["module_id"] == event_module and event["event_id"] == event_name),
                None)


def find_xcm_sent_event(chain: XcmChain, events: List) -> XcmSentEvent | None:
    event_data = find_event(events, event_module=chain.xcm_pallet_alias(), event_name="Sent")
    if event_data is not None:
        return XcmSentEvent(event_data)
    else:
        return None
    
def find_forwarded_xcm(
        success_dry_run_effects: dict,
        final_destination_account: str,
) -> VerionsedXcm | None:
    forwarded_xcms = success_dry_run_effects["forwarded_xcms"]
    
    final_destination_account_id = decode_account_id(final_destination_account)
    
    for (destination, messages) in forwarded_xcms:
        for message in messages:
            message_program = xcm_program(message)
            extracted_account = extract_final_beneficiary_from_program(message_program)
            
            if extracted_account == final_destination_account_id:                
                return message_program
    
    return None
    
    
def find_sent_xcm(
        origin: XcmChain,
        success_dry_run_effects: dict,
        final_destination_account: str
) -> VerionsedXcm:    
    emitted_events = success_dry_run_effects["emitted_events"]
    xcm_sent_event = find_xcm_sent_event(origin, emitted_events)
    if xcm_sent_event is not None:
        print(f"Found sent xcm in XcmSent event")
        return xcm_sent_event.sent_message
    
    forwarded_xcm = find_forwarded_xcm(success_dry_run_effects, final_destination_account)
    if forwarded_xcm is not None:
        print(f"Found sent xcm in forwarded xcms")
        return forwarded_xcm
        
    raise Exception(f"Sent xcm was not found, got: {success_dry_run_effects}")


def extract_destination_asset(xcm_message: List[dict]) -> dict:
    first_instruction = xcm_message[0]

    if "ReceiveTeleportedAsset" in first_instruction:
        return first_instruction["ReceiveTeleportedAsset"][0]
    elif "ReserveAssetDeposited" in first_instruction:
        return first_instruction["ReserveAssetDeposited"][0]

    raise Exception("Found no destination assets")

def extract_final_beneficiary_from_program(program: VerionsedXcm) -> str | None:
    for instruction in program.unversioned:
        from_instruction = extract_final_beneficiary_from_instruction(instruction)
        if from_instruction is not None:
            return from_instruction
        
    return None

def get_single_key(instruction: dict | str) -> str | None:
    if type(instruction) is str:
        return None
    
    if len(instruction) != 1:
        raise Exception(f"Expected a single key dict, got: {instruction}")
    
    return next(iter(instruction))

def extract_final_beneficiary_from_instruction(instruction: dict) -> str | None:
    match get_single_key(instruction):
        case "DepositAsset":
            return extract_beneficiary_from_location(instruction["DepositAsset"]["beneficiary"])
        case "DepositReserveAsset" | "InitiateReserveWithdraw" | "InitiateTeleport" | "TransferReserveAsset" as key:
            nested_message = instruction[key]["xcm"]
            return extract_final_beneficiary_from_program(xcm_program(nested_message))
        case _:
            return None
    
def extract_beneficiary_from_location(location: dict) -> str | None:
    interior = location["interior"]
    x1_junction: dict
    
    match get_single_key(interior):
        # Accounts are always X1
        case "X1":
            x1_junction = interior["X1"]
            # pre-v3 x1 interior is just junction itself, otherwise it is a list of single element 
            if type(x1_junction) is list:
                x1_junction = x1_junction[0]
        case _:
            return None
    
    
    match get_single_key(x1_junction):
        case "AccountKey20":
            return x1_junction["AccountKey20"]["key"]
        case "AccountId32":
            return x1_junction["AccountId32"]["id"]
        case _:
            return None
    

def signed_origin(account: str) -> dict:
    return {"system": {"Signed": account}}


def root_origin() -> dict:
    return {"system": "Root"}


def dry_run_xcm_call(
        chain: XcmChain,
        call: GenericCall,
        origin: dict,
        final_destination_account: str
) -> VerionsedXcm:
    dry_run_result = chain.access_substrate(
        lambda substrate: substrate.runtime_call(api="DryRunApi", method="dry_run_call",
                                                 params={"origin_caller": origin, "call": call})).value
        
    dry_run_effects = dry_run_result["Ok"]    
    execution_result = dry_run_effects["execution_result"]

    if is_call_run_error(execution_result):
        handle_call_run_error_execution_result(chain, execution_result)
    else:
        return find_sent_xcm(chain, dry_run_effects, final_destination_account)
    

def is_xcm_run_error(execution_result: dict):
    return "Incomplete" in execution_result or "Error" in execution_result


def handle_xcm_run_error_execution_result(execution_result: dict):
    if "Incomplete" in execution_result:
        error = execution_result["Incomplete"]["error"]
    elif "Error" in execution_result:
        error = execution_result["Error"]["error"]
    else:
        error = execution_result

    raise Exception(str(error))


def dry_run_xcm(chain: XcmChain, xcm: VerionsedXcm, origin: VerionsedXcm) -> dict:
    dry_run_effects = chain.access_substrate(
        lambda substrate: substrate.runtime_call(api="DryRunApi", method="dry_run_xcm",
                                                 params={"origin_location": origin.versioned,
                                                         "xcm": xcm.versioned})).value["Ok"]    
    execution_result = dry_run_effects["execution_result"]

    if is_xcm_run_error(execution_result):
        handle_xcm_run_error_execution_result(execution_result)
    else:
        return dry_run_effects


def dry_run_intermediate_xcm(
        chain: XcmChain,
        xcm: VerionsedXcm,
        origin: VerionsedXcm,
        final_destination_account: str
) -> VerionsedXcm:
    dry_run_effects = dry_run_xcm(chain, xcm, origin)

    return find_sent_xcm(chain, dry_run_effects, final_destination_account)


def dry_run_final_xcm(chain: XcmChain, xcm: VerionsedXcm, origin: VerionsedXcm) -> List[GenericEvent]:
    dry_run_effects = dry_run_xcm(chain, xcm, origin)

    return dry_run_effects["emitted_events"]


def multi_address(account: str, evm: bool):
    if evm:
        return account
    else:
        return {"Id": account}


def compose_dispatch_as(
        substrate: SubstrateInterface,
        origin: dict,
        call: GenericCall
):
    return substrate.compose_call(
        call_module="Utility",
        call_function="dispatch_as",
        call_params={
            "as_origin": origin,
            "call": call
        }
    )


def compose_native_fund(
        substrate: SubstrateInterface,
        chain: XcmChain,
        account: str,
        amount_planks: int,
) -> GenericCall:
    return substrate.compose_call(
        call_module="Balances",
        call_function="force_set_balance",
        call_params={
            "who": multi_address(account, chain.chain.has_evm_addresses()),
            "new_free": amount_planks
        }
    )


def compose_assets_fund(
        substrate: SubstrateInterface,
        chain: XcmChain,
        statemine_type: StatemineAssetType,
        account: str,
        amount_planks: int,
) -> GenericCall:
    asset_info = substrate.query(module=statemine_type.pallet_name(), storage_function="Asset",
                                 params=[statemine_type.encodable_asset_id()]).value
    issuer = asset_info["issuer"]

    mint_call = substrate.compose_call(
        call_module=statemine_type.pallet_name(),
        call_function="mint",
        call_params={
            "id": statemine_type.encodable_asset_id(),
            "beneficiary": multi_address(account, chain.chain.has_evm_addresses()),
            "amount": amount_planks
        }
    )

    return compose_dispatch_as(
        substrate=substrate,
        origin=signed_origin(issuer),
        call=mint_call
    )

def compose_orml_fund(
        substrate: SubstrateInterface,
        chain: XcmChain,
        orml_type: OrmlAssetType,
        account: str,
        amount_planks: int,
) -> GenericCall:
    return substrate.compose_call(
        call_module=orml_type.pallet_name(),
        call_function="set_balance",
        call_params={
            "who": multi_address(account, chain.chain.has_evm_addresses()),
            "currency_id": orml_type.encodable_asset_id(),
            "new_free": amount_planks,
            "new_reserved": 0,
        }
    )


def compose_fund_call(
        substrate: SubstrateInterface,
        chain: XcmChain,
        chain_asset: ChainAsset,
        account: str,
        amount_planks: int,
) -> GenericCall:
    match chain_asset.type:
        case NativeAssetType():
            return compose_native_fund(substrate, chain, account, amount_planks)
        case StatemineAssetType() as statemineType:
            return compose_assets_fund(substrate, chain, statemineType, account, amount_planks)
        case OrmlAssetType() as ormlType:
            return compose_orml_fund(substrate, chain, ormlType, account, amount_planks)
            
        case UnsupportedAssetType():
            raise Exception("Unsupported asset type")


def fund_account_and_then(
        substrate: SubstrateInterface,
        chain: XcmChain,
        chain_asset: ChainAsset,
        account: str,
        amount: int,
        next_call: GenericCall
) -> GenericCall:
    planks = chain_asset.planks(amount)
    fund_account_call = compose_fund_call(substrate, chain, chain_asset, account, planks)

    wrapped_next_call = compose_dispatch_as(
        substrate=substrate,
        origin=signed_origin(account),
        call=next_call
    )

    return substrate.compose_call(
        call_module="Utility",
        call_function="batch_all",
        call_params={
            "calls": [fund_account_call, wrapped_next_call]
        }
    )


def dry_run_transfer(
        origin_chain: XcmChain,
        destination_chain: XcmChain,
        chain_asset: ChainAsset,
        sender: str,
        recipient: str,
        amount: int | float
):
    transfer_type = origin_chain.transfer_type(destination_chain, chain_asset)

    token_location_origin = origin_chain.relative_reserve_location_of(chain_asset)

    dest = origin_chain.sibling_location_of(destination_chain).versioned
    assets_param = assets(token_location_origin, amount=chain_asset.planks(amount)).versioned

    remote_reserve_chain = transfer_type.check_remote_reserve()

    beneficiary = destination_chain.account_location(recipient).versioned

    fee_asset_item = 0
    weight_limit = "Unlimited"

    print(f"{dest=}")
    print(f"{beneficiary=}")
    print(f"{assets_param=}")
    print(f"{fee_asset_item=}")
    print(f"{weight_limit=}")
    
    print("\n------------------\n")

    def transfer_assets_call(substrate: SubstrateInterface) -> GenericCall:
        return substrate.compose_call(
            call_module=origin_chain.xcm_pallet_alias(),
            call_function="transfer_assets",
            call_params={
                "dest": dest,
                "assets": assets_param,
                "beneficiary": beneficiary,
                "fee_asset_item": fee_asset_item,
                "weight_limit": weight_limit
            }
        )

    call = origin_chain.access_substrate(
        lambda s: fund_account_and_then(s, origin_chain, chain_asset, account=sender, amount=100,
                                        next_call=transfer_assets_call(s))
    )


    message_to_next_hop = dry_run_xcm_call(origin_chain, call, root_origin(), final_destination_account=recipient)
    print(f"Transfer successfully initiated on {origin_chain.chain.name}, message: {message_to_next_hop}")
    print("\n------------------\n\n")

    message_to_destination: VerionsedXcm
    origin_on_destination: VerionsedXcm

    if remote_reserve_chain is not None:
        message_to_reserve = message_to_next_hop
        origin_on_reserve = remote_reserve_chain.sibling_location_of(origin_chain)
        
        print(f"{origin_on_reserve.versioned=}")

        message_to_destination = dry_run_intermediate_xcm(chain=remote_reserve_chain, xcm=message_to_reserve,
                                                  origin=origin_on_reserve, final_destination_account=recipient)
        print(f"Transfer successfully handled by reserve chain {remote_reserve_chain.chain.name}, message: {message_to_reserve.unversioned}\n")

        origin_on_destination = destination_chain.sibling_location_of(remote_reserve_chain)
    else:
        origin_on_destination = destination_chain.sibling_location_of(origin_chain)
        message_to_destination = message_to_next_hop

    print("\n------------------\n")

    destination_events = dry_run_final_xcm(destination_chain, message_to_destination, origin_on_destination)

    print(f"Transfer successfully finished on {destination_chain.chain.name}, final events: {destination_events}")



substrate_account = "13mp1WEs72kbCBF3WKcoK6Hfhu2HHZGpQ4jsKCZbfd6FoRvH"
evm_account = "0x0c7485f4AA235347BDE0168A59f6c73C7A42ff2C"

dry_run_transfer(
    origin_chain=astar,
    destination_chain=polkadot,
    sender=substrate_account,
    recipient=substrate_account,
    chain_asset= astar.chain.get_asset("DOT"),
    amount=10
)

dest={'V4': {'parents': 1, 'interior': 'Here'}}
beneficiary={'V4': {'parents': 0, 'interior': {'X1': [{'AccountId32': {'network': None, 'id': '0x7aa58293520da680e29d7cf2da0c95a1851c9ed281d53dc70c3f7197c195d566'}}]}}}
assets_param={'V4': [{'id': {'parents': 1, 'interior': 'Here'}, 'fun': {'Fungible': 100000000000}}]}
fee_asset_item=0
weight_limit='Unlimited'

------------------

Connecting to  wss://astar.public.curie.radiumblock.co/ws
Connected to  wss://astar.public.curie.radiumblock.co/ws
Found sent xcm in XcmSent event
Transfer successfully initiated on Astar, message: {'V4': [{'WithdrawAsset': [{'id': {'parents': 0, 'interior': 'Here'}, 'fun': {'Fungible': 100000000000}}]}, 'ClearOrigin', {'BuyExecution': {'fees': {'id': {'parents': 0, 'interior': 'Here'}, 'fun': {'Fungible': 100000000000}}, 'weight_limit': 'Unlimited'}}, {'DepositAsset': {'assets': {'Wild': {'AllCounted': 1}}, 'beneficiary': {'parents': 0, 'interior': {'X1': [{'AccountId32': {'network': None, 'id': '0x7aa58293520d

In [14]:
dhrmp_channels_map = polkadot.substrate.query_map(module="Hrmp", storage_function="HrmpChannels")
hrmp_channels = [result[0].value for result in hrmp_channels_map]
hrmp_channels

AttributeError: 'XcmChain' object has no attribute 'substrate'

In [None]:
chain_id_by_parachain_id = {parachain.substrate.query("ParachainInfo", "ParachainId").value:parachain.chainId for parachain in polkadot_parachains}
chain_id_by_parachain_id

In [None]:
from typing import List, Dict, Tuple


def construct_chain_graph(
        relay: Chain,
        parachains: List[chains],
        hrmp_channels: List[Tuple[int, int]],
        chain_id_by_parachain_id: Dict[int, str]
)-> Dict[str, List[str]]:
    result: Dict[str, List[str]] = {}
    
    def add_edge_by_id(origin: str, destination: str):
        edges = result.get(origin)
        if edges is None:
            result[origin] = [destination]
        else:
            edges.append(destination)
            
    def add_edge_by_chain(origin: Chain, destination: Chain):
        add_edge_by_id(origin.chainId, destination.chainId)
   
    
    # relay is accessible from each parachain
    for parachain in parachains:
        add_edge_by_chain(relay, parachain)
        add_edge_by_chain(parachain, relay)
    
    # add all supported channels    
    for channel in hrmp_channels:
        origin = chain_id_by_parachain_id.get(channel["recipient"])
        destination = chain_id_by_parachain_id.get(channel["sender"])
        if origin is None or destination is None:
            continue
            
        add_edge_by_id(origin, destination)
        
    return result

construct_chain_graph(polkadot, polkadot_parachains, hrmp_channels, chain_id_by_parachain_id)

In [None]:
polkadot.substrate.runtime_call(api="DryRunApi", method="dry_run_call")