diff --git a/Cargo.lock b/Cargo.lock
index c755be63042b..d91317ed75c3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -697,7 +697,6 @@ version = "1.0.0"
dependencies = [
"assert_matches",
"asset-hub-kusama-runtime",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"frame-support",
@@ -706,6 +705,7 @@ dependencies = [
"pallet-asset-conversion",
"pallet-assets",
"pallet-balances",
+ "pallet-message-queue",
"pallet-xcm",
"parachains-common",
"parity-scale-codec",
@@ -752,6 +752,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-nft-fractionalization",
"pallet-nfts",
@@ -798,7 +799,6 @@ dependencies = [
name = "asset-hub-polkadot-integration-tests"
version = "1.0.0"
dependencies = [
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"frame-support",
@@ -806,6 +806,7 @@ dependencies = [
"integration-tests-common",
"pallet-assets",
"pallet-balances",
+ "pallet-message-queue",
"pallet-xcm",
"parachains-common",
"parity-scale-codec",
@@ -850,6 +851,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-nfts",
"pallet-nfts-runtime-api",
@@ -896,7 +898,6 @@ version = "1.0.0"
dependencies = [
"assert_matches",
"asset-hub-westend-runtime",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"frame-support",
@@ -905,6 +906,7 @@ dependencies = [
"pallet-asset-conversion",
"pallet-assets",
"pallet-balances",
+ "pallet-message-queue",
"pallet-xcm",
"parachains-common",
"parity-scale-codec",
@@ -950,6 +952,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-nft-fractionalization",
"pallet-nfts",
@@ -997,7 +1000,6 @@ name = "asset-test-utils"
version = "1.0.0"
dependencies = [
"assets-common",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"cumulus-primitives-core",
@@ -1853,6 +1855,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-session",
"pallet-timestamp",
@@ -1915,6 +1918,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-session",
"pallet-timestamp",
@@ -1965,6 +1969,7 @@ dependencies = [
"pallet-assets",
"pallet-balances",
"pallet-bridge-messages",
+ "pallet-message-queue",
"pallet-xcm",
"parachains-common",
"parity-scale-codec",
@@ -2021,6 +2026,7 @@ dependencies = [
"pallet-bridge-parachains",
"pallet-bridge-relayers",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-session",
"pallet-timestamp",
@@ -2076,7 +2082,6 @@ dependencies = [
"bp-runtime",
"bp-test-utils",
"bridge-runtime-common",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"frame-benchmarking",
@@ -2592,6 +2597,7 @@ dependencies = [
"pallet-assets",
"pallet-balances",
"pallet-core-fellowship",
+ "pallet-message-queue",
"pallet-salary",
"pallet-xcm",
"parachains-common",
@@ -2636,6 +2642,7 @@ dependencies = [
"pallet-collator-selection",
"pallet-collective",
"pallet-core-fellowship",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-preimage",
"pallet-proxy",
@@ -2852,6 +2859,7 @@ dependencies = [
"pallet-contracts",
"pallet-contracts-primitives",
"pallet-insecure-randomness-collective-flip",
+ "pallet-message-queue",
"pallet-multisig",
"pallet-session",
"pallet-sudo",
@@ -3570,14 +3578,17 @@ dependencies = [
"cumulus-test-client",
"cumulus-test-relay-sproof-builder",
"environmental",
+ "frame-benchmarking",
"frame-support",
"frame-system",
"hex-literal 0.4.1",
"impl-trait-for-tuples",
"lazy_static",
"log",
+ "pallet-message-queue",
"parity-scale-codec",
"polkadot-parachain",
+ "rand 0.8.5",
"sc-client-api",
"scale-info",
"sp-core",
@@ -3652,6 +3663,7 @@ dependencies = [
name = "cumulus-pallet-xcmp-queue"
version = "0.1.0"
dependencies = [
+ "bounded-collections",
"cumulus-pallet-parachain-system",
"cumulus-primitives-core",
"frame-benchmarking",
@@ -3659,6 +3671,7 @@ dependencies = [
"frame-system",
"log",
"pallet-balances",
+ "pallet-message-queue",
"parity-scale-codec",
"polkadot-runtime-common",
"rand_chacha 0.3.1",
@@ -3947,6 +3960,7 @@ dependencies = [
"frame-system-rpc-runtime-api",
"pallet-balances",
"pallet-glutton",
+ "pallet-message-queue",
"pallet-sudo",
"pallet-timestamp",
"pallet-transaction-payment",
@@ -5876,6 +5890,7 @@ dependencies = [
"frame-system-rpc-runtime-api",
"frame-try-runtime",
"pallet-glutton",
+ "pallet-message-queue",
"pallet-sudo",
"parachain-info",
"parachains-common",
@@ -6441,7 +6456,6 @@ dependencies = [
"bridge-hub-rococo-runtime",
"bridge-runtime-common",
"collectives-polkadot-runtime",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"cumulus-primitives-core",
@@ -11097,6 +11111,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-parachain-template",
"pallet-session",
"pallet-sudo",
@@ -11105,6 +11120,7 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api",
"pallet-xcm",
"parachain-info",
+ "parachains-common",
"parity-scale-codec",
"polkadot-parachain",
"polkadot-runtime-common",
@@ -11142,6 +11158,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"parity-scale-codec",
"polkadot-primitives",
"scale-info",
@@ -11150,6 +11167,7 @@ dependencies = [
"sp-io",
"sp-runtime",
"sp-std",
+ "sp-weights",
"substrate-wasm-builder",
"xcm",
"xcm-builder",
@@ -11161,7 +11179,6 @@ name = "parachains-runtimes-test-utils"
version = "1.0.0"
dependencies = [
"assets-common",
- "cumulus-pallet-dmp-queue",
"cumulus-pallet-parachain-system",
"cumulus-pallet-xcmp-queue",
"cumulus-primitives-core",
@@ -11422,6 +11439,7 @@ dependencies = [
"pallet-authorship",
"pallet-balances",
"pallet-collator-selection",
+ "pallet-message-queue",
"pallet-session",
"pallet-sudo",
"pallet-timestamp",
@@ -14248,6 +14266,7 @@ dependencies = [
"pallet-assets",
"pallet-aura",
"pallet-balances",
+ "pallet-message-queue",
"pallet-sudo",
"pallet-timestamp",
"pallet-transaction-payment",
diff --git a/cumulus/.gitattributes b/cumulus/.gitattributes
index 2ea1ab2d6b9c..b6c581f3542c 100644
--- a/cumulus/.gitattributes
+++ b/cumulus/.gitattributes
@@ -1,2 +1,4 @@
/.gitlab-ci.yml filter=ci-prettier
/scripts/ci/gitlab/pipeline/*.yml filter=ci-prettier
+runtimes/**/src/weights/*.rs linguist-generated=true
+parachains/runtimes/**/src/weights/*.rs linguist-generated=true
diff --git a/cumulus/pallets/dmp-queue/Cargo.toml b/cumulus/pallets/dmp-queue/Cargo.toml
index 3777383349fc..2ff471fcab06 100644
--- a/cumulus/pallets/dmp-queue/Cargo.toml
+++ b/cumulus/pallets/dmp-queue/Cargo.toml
@@ -40,4 +40,5 @@ std = [
"sp-std/std",
"xcm/std",
]
-try-runtime = [ "frame-support/try-runtime" ]
+try-runtime = ["frame-support/try-runtime"]
+runtime-benchmarks = ["frame-support/runtime-benchmarks"]
diff --git a/cumulus/pallets/dmp-queue/src/lib.rs b/cumulus/pallets/dmp-queue/src/lib.rs
index aca9025d9e33..2f093f188969 100644
--- a/cumulus/pallets/dmp-queue/src/lib.rs
+++ b/cumulus/pallets/dmp-queue/src/lib.rs
@@ -1,12 +1,12 @@
-// Copyright 2020-2021 Parity Technologies (UK) Ltd.
+// Copyright Parity Technologies (UK) Ltd.
// This file is part of Cumulus.
-// Cumulus is free software: you can redistribute it and/or modify
+// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
-// Cumulus is distributed in the hope that it will be useful,
+// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
@@ -14,57 +14,51 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see .
-//! Pallet implementing a message queue for downward messages from the relay-chain.
-//! Executes downward messages if there is enough weight available and schedules the rest for later
-//! execution (by `on_idle` or another `handle_dmp_messages` call). Individual overweight messages
-//! are scheduled into a separate queue that is only serviced by explicit extrinsic calls.
+//! Migrates the storage from the previously deleted DMP pallet.
#![cfg_attr(not(feature = "std"), no_std)]
-pub mod migration;
+mod tests;
-use codec::{Decode, DecodeLimit, Encode};
-use cumulus_primitives_core::{relay_chain::BlockNumber as RelayBlockNumber, DmpMessageHandler};
+use cumulus_primitives_core::relay_chain::BlockNumber as RelayBlockNumber;
use frame_support::{
- traits::EnsureOrigin,
- weights::{constants::WEIGHT_REF_TIME_PER_MILLIS, Weight},
+ pallet_prelude::*,
+ storage_alias,
+ traits::{HandleMessage, OnRuntimeUpgrade},
+ weights::Weight,
};
-pub use pallet::*;
-use scale_info::TypeInfo;
-use sp_runtime::RuntimeDebug;
-use sp_std::{convert::TryFrom, prelude::*};
-use xcm::{latest::prelude::*, VersionedXcm, MAX_XCM_DECODE_DEPTH};
-
-const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
-
-// Maximum amount of messages to process per block. This is a temporary measure until we properly
-// account for proof size weights.
-const MAX_MESSAGES_PER_BLOCK: u8 = 10;
-// Maximum amount of messages that can exist in the overweight queue at any given time.
-const MAX_OVERWEIGHT_MESSAGES: u32 = 1000;
-
-#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo)]
-pub struct ConfigData {
- /// The maximum amount of weight any individual message may consume. Messages above this weight
- /// go into the overweight queue and may only be serviced explicitly by the
- /// `ExecuteOverweightOrigin`.
- max_individual: Weight,
+use sp_runtime::Saturating;
+use sp_std::vec::Vec;
+
+const LOG: &str = "dmp-queue-undeploy-migration";
+
+/// Undeploy the DMP queue pallet.
+///
+/// Moves all storage from the pallet to a new Queue handler. Afterwards the storage of the DMP
+/// should be purged with [DeleteDmpQueue].
+pub struct UndeployDmpQueue(PhantomData);
+
+/// Delete the DMP pallet. Should only be used once the DMP pallet is removed from the runtime and
+/// after [UndeployDmpQueue].
+pub type DeleteDmpQueue = frame_support::migrations::RemovePallet<
+ ::PalletName,
+ ::DbWeight,
+>;
+
+/// Subset of the DMP queue config required for [UndeployDmpQueue].
+pub trait MigrationConfig {
+ /// Name of the previously deployed DMP queue pallet.
+ type PalletName: Get<&'static str>;
+
+ /// New handler for the messages.
+ type DmpHandler: HandleMessage;
+
+ // The weight info for the runtime.
+ type DbWeight: Get;
}
-impl Default for ConfigData {
- fn default() -> Self {
- Self {
- max_individual: Weight::from_parts(
- 10u64 * WEIGHT_REF_TIME_PER_MILLIS, // 10 ms of execution time maximum by default
- DEFAULT_POV_SIZE, // 64 KB of proof size by default
- ),
- }
- }
-}
-
-/// Information concerning our message pages.
#[derive(Copy, Clone, Eq, PartialEq, Default, Encode, Decode, RuntimeDebug, TypeInfo)]
-pub struct PageIndexData {
+struct PageIndexData {
/// The lowest used page index.
begin_used: PageCounter,
/// The lowest unused page index.
@@ -73,843 +67,116 @@ pub struct PageIndexData {
overweight_count: OverweightIndex,
}
-/// Simple type used to identify messages for the purpose of reporting events. Secure if and only
-/// if the message content is unique.
-pub type MessageId = XcmHash;
-
-/// Index used to identify overweight messages.
-pub type OverweightIndex = u64;
-
-/// Index used to identify normal pages.
-pub type PageCounter = u32;
-
-#[frame_support::pallet]
-pub mod pallet {
- use super::*;
- use frame_support::pallet_prelude::*;
- use frame_system::pallet_prelude::*;
-
- #[pallet::pallet]
- #[pallet::storage_version(migration::STORAGE_VERSION)]
- #[pallet::without_storage_info]
- pub struct Pallet(_);
-
- /// The module configuration trait.
- #[pallet::config]
- pub trait Config: frame_system::Config {
- /// The overarching event type.
- type RuntimeEvent: From> + IsType<::RuntimeEvent>;
-
- type XcmExecutor: ExecuteXcm;
-
- /// Origin which is allowed to execute overweight messages.
- type ExecuteOverweightOrigin: EnsureOrigin;
- }
-
- /// The configuration.
- #[pallet::storage]
- pub(super) type Configuration = StorageValue<_, ConfigData, ValueQuery>;
-
- /// The page index.
- #[pallet::storage]
- pub(super) type PageIndex = StorageValue<_, PageIndexData, ValueQuery>;
-
- /// The queue pages.
- #[pallet::storage]
- pub(super) type Pages =
- StorageMap<_, Blake2_128Concat, PageCounter, Vec<(RelayBlockNumber, Vec)>, ValueQuery>;
-
- /// The overweight messages.
- #[pallet::storage]
- pub(super) type Overweight = CountedStorageMap<
- _,
- Blake2_128Concat,
- OverweightIndex,
- (RelayBlockNumber, Vec),
- OptionQuery,
- >;
-
- #[pallet::error]
- pub enum Error {
- /// The message index given is unknown.
- Unknown,
- /// The amount of weight given is possibly not enough for executing the message.
- OverLimit,
- }
-
- #[pallet::hooks]
- impl Hooks> for Pallet {
- fn on_idle(_now: BlockNumberFor, max_weight: Weight) -> Weight {
- // on_idle processes additional messages with any remaining block weight.
- Self::service_queue(max_weight)
+type OverweightIndex = u64;
+type PageCounter = u32;
+
+#[storage_alias(dynamic)]
+type PageIndex =
+ StorageValue<::PalletName, PageIndexData, ValueQuery>;
+
+#[storage_alias(dynamic)]
+type Pages = StorageMap<
+ ::PalletName,
+ Blake2_128Concat,
+ PageCounter,
+ Vec<(RelayBlockNumber, Vec)>,
+ ValueQuery,
+>;
+
+#[storage_alias(dynamic)]
+type Overweight = CountedStorageMap<
+ ::PalletName,
+ Blake2_128Concat,
+ OverweightIndex,
+ (RelayBlockNumber, Vec),
+ OptionQuery,
+>;
+
+impl OnRuntimeUpgrade for UndeployDmpQueue {
+ #[cfg(feature = "try-runtime")]
+ fn pre_upgrade() -> Result, sp_runtime::DispatchError> {
+ let index = PageIndex::::get();
+
+ // Check that all pages are present.
+ ensure!(index.begin_used <= index.end_used, "Invalid page index");
+ for p in index.begin_used..index.end_used {
+ ensure!(Pages::::contains_key(p), "Missing page");
+ ensure!(Pages::::get(p).len() > 0, "Empty page");
}
- }
- #[pallet::call]
- impl Pallet {
- /// Service a single overweight message.
- #[pallet::call_index(0)]
- #[pallet::weight(weight_limit.saturating_add(Weight::from_parts(1_000_000, 0)))]
- pub fn service_overweight(
- origin: OriginFor,
- index: OverweightIndex,
- weight_limit: Weight,
- ) -> DispatchResultWithPostInfo {
- T::ExecuteOverweightOrigin::ensure_origin(origin)?;
-
- let (sent_at, data) = Overweight::::get(index).ok_or(Error::::Unknown)?;
- let weight_used = Self::try_service_message(weight_limit, sent_at, &data[..])
- .map_err(|_| Error::::OverLimit)?;
- Overweight::::remove(index);
- Self::deposit_event(Event::OverweightServiced { overweight_index: index, weight_used });
- Ok(Some(weight_used.saturating_add(Weight::from_parts(1_000_000, 0))).into())
+ // Check that all overweight messages are present.
+ for i in 0..index.overweight_count {
+ ensure!(Overweight::::contains_key(i), "Missing overweight message");
}
- }
- #[pallet::event]
- #[pallet::generate_deposit(pub(super) fn deposit_event)]
- pub enum Event {
- /// Downward message is invalid XCM.
- InvalidFormat { message_hash: XcmHash },
- /// Downward message is unsupported version of XCM.
- UnsupportedVersion { message_hash: XcmHash },
- /// Downward message executed with the given outcome.
- ExecutedDownward { message_hash: XcmHash, message_id: XcmHash, outcome: Outcome },
- /// The weight limit for handling downward messages was reached.
- WeightExhausted {
- message_hash: XcmHash,
- message_id: XcmHash,
- remaining_weight: Weight,
- required_weight: Weight,
- },
- /// Downward message is overweight and was placed in the overweight queue.
- OverweightEnqueued {
- message_hash: XcmHash,
- message_id: XcmHash,
- overweight_index: OverweightIndex,
- required_weight: Weight,
- },
- /// Downward message from the overweight queue was executed.
- OverweightServiced { overweight_index: OverweightIndex, weight_used: Weight },
- /// The maximum number of downward messages was reached.
- MaxMessagesExhausted { message_hash: XcmHash },
+ Ok(Default::default())
}
- /// Error type when a message was failed to be serviced.
- pub(crate) struct ServiceMessageError {
- /// The message's hash.
- message_hash: XcmHash,
- /// The message's ID (which could also be its hash if nothing overrides it).
- message_id: XcmHash,
- /// Weight required for the message to be executed.
- required_weight: Weight,
- }
+ #[cfg(feature = "try-runtime")]
+ fn post_upgrade(_: Vec) -> Result<(), sp_runtime::DispatchError> {
+ let index = PageIndex::::get();
- impl Pallet {
- /// Service the message queue up to some given weight `limit`.
- ///
- /// Returns the weight consumed by executing messages in the queue.
- fn service_queue(limit: Weight) -> Weight {
- let mut messages_processed = 0;
- PageIndex::::mutate(|page_index| {
- Self::do_service_queue(limit, page_index, &mut messages_processed)
- })
+ // Check that all pages are removed.
+ for p in index.begin_used..index.end_used {
+ ensure!(!Pages::::contains_key(p), "Page should be gone");
}
+ ensure!(Pages::::iter_keys().next().is_none(), "Un-indexed pages");
- /// Exactly equivalent to `service_queue` but expects a mutable `page_index` to be passed
- /// in and any changes stored.
- fn do_service_queue(
- limit: Weight,
- page_index: &mut PageIndexData,
- messages_processed: &mut u8,
- ) -> Weight {
- let mut used = Weight::zero();
- while page_index.begin_used < page_index.end_used {
- let page = Pages::::take(page_index.begin_used);
- for (i, &(sent_at, ref data)) in page.iter().enumerate() {
- if *messages_processed >= MAX_MESSAGES_PER_BLOCK {
- // Exceeded block message limit - put the remaining messages back and bail
- Pages::::insert(page_index.begin_used, &page[i..]);
- return used
- }
- *messages_processed += 1;
- match Self::try_service_message(limit.saturating_sub(used), sent_at, &data[..])
- {
- Ok(w) => used += w,
- Err(..) => {
- // Too much weight needed - put the remaining messages back and bail
- Pages::::insert(page_index.begin_used, &page[i..]);
- return used
- },
- }
- }
- page_index.begin_used += 1;
- }
- if page_index.begin_used == page_index.end_used {
- // Reset if there's no pages left.
- page_index.begin_used = 0;
- page_index.end_used = 0;
- }
- used
+ // Check that all overweight messages are removed.
+ for i in 0..index.overweight_count {
+ ensure!(!Overweight::::contains_key(i), "Overweight message should be gone");
}
+ ensure!(Overweight::::iter_keys().next().is_none(), "Un-indexed overweight messages");
- /// Attempt to service an individual message. Will return `Ok` with the execution weight
- /// consumed unless the message was found to need more weight than `limit`.
- ///
- /// NOTE: This will return `Ok` in the case of an error decoding, weighing or executing
- /// the message. This is why it's called message "servicing" rather than "execution".
- pub(crate) fn try_service_message(
- limit: Weight,
- _sent_at: RelayBlockNumber,
- mut data: &[u8],
- ) -> Result {
- let message_hash = sp_io::hashing::blake2_256(data);
- let mut message_id = message_hash;
- let maybe_msg = VersionedXcm::::decode_all_with_depth_limit(
- MAX_XCM_DECODE_DEPTH,
- &mut data,
- )
- .map(Xcm::::try_from);
- match maybe_msg {
- Err(_) => {
- Self::deposit_event(Event::InvalidFormat { message_hash });
- Ok(Weight::zero())
- },
- Ok(Err(())) => {
- Self::deposit_event(Event::UnsupportedVersion { message_hash });
- Ok(Weight::zero())
- },
- Ok(Ok(x)) => {
- let outcome = T::XcmExecutor::prepare_and_execute(
- Parent,
- x,
- &mut message_id,
- limit,
- Weight::zero(),
- );
- match outcome {
- Outcome::Error(XcmError::WeightLimitReached(required_weight)) =>
- Err(ServiceMessageError { message_hash, message_id, required_weight }),
- outcome => {
- let weight_used = outcome.weight_used();
- Self::deposit_event(Event::ExecutedDownward {
- message_hash,
- message_id,
- outcome,
- });
- Ok(weight_used)
- },
- }
- },
- }
- }
+ Ok(())
}
- /// For an incoming downward message, this just adapts an XCM executor and executes DMP messages
- /// immediately up until some `MaxWeight` at which point it errors. Their origin is asserted to
- /// be the `Parent` location.
- impl DmpMessageHandler for Pallet {
- fn handle_dmp_messages(
- iter: impl Iterator- )>,
- limit: Weight,
- ) -> Weight {
- let mut messages_processed = 0;
- let mut page_index = PageIndex::::get();
- let config = Configuration::::get();
-
- // First try to use `max_weight` to service the current queue.
- let mut used = Self::do_service_queue(limit, &mut page_index, &mut messages_processed);
-
- // Then if the queue is empty, use the weight remaining to service the incoming messages
- // and once we run out of weight, place them in the queue.
- let item_count = iter.size_hint().0;
- let mut maybe_enqueue_page = if page_index.end_used > page_index.begin_used {
- // queue is already non-empty - start a fresh page.
- Some(Vec::with_capacity(item_count))
- } else {
- None
- };
-
- for (i, (sent_at, data)) in iter.enumerate() {
- if maybe_enqueue_page.is_none() {
- if messages_processed >= MAX_MESSAGES_PER_BLOCK {
- let item_count_left = item_count.saturating_sub(i);
- maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
-
- Self::deposit_event(Event::MaxMessagesExhausted {
- message_hash: sp_io::hashing::blake2_256(&data),
- });
- } else {
- // We're not currently enqueuing - try to execute inline.
- let remaining_weight = limit.saturating_sub(used);
- messages_processed += 1;
- match Self::try_service_message(remaining_weight, sent_at, &data[..]) {
- Ok(consumed) => used += consumed,
- Err(ServiceMessageError {
- message_hash,
- message_id,
- required_weight,
- }) =>
- // Too much weight required right now.
- {
- let is_under_limit =
- Overweight::::count() < MAX_OVERWEIGHT_MESSAGES;
- used.saturating_accrue(T::DbWeight::get().reads(1));
- if required_weight.any_gt(config.max_individual) && is_under_limit {
- // overweight - add to overweight queue and continue with
- // message execution.
- let overweight_index = page_index.overweight_count;
- Overweight::::insert(overweight_index, (sent_at, data));
- Self::deposit_event(Event::OverweightEnqueued {
- message_hash,
- message_id,
- overweight_index,
- required_weight,
- });
- page_index.overweight_count += 1;
- // Not needed for control flow, but only to ensure that the
- // compiler understands that we won't attempt to re-use `data`
- // later.
- continue
- } else {
- // not overweight. stop executing inline and enqueue normally
- // from here on.
- let item_count_left = item_count.saturating_sub(i);
- maybe_enqueue_page = Some(Vec::with_capacity(item_count_left));
- Self::deposit_event(Event::WeightExhausted {
- message_hash,
- message_id,
- remaining_weight,
- required_weight,
- });
- }
- },
- }
- }
- }
- // Cannot be an `else` here since the `maybe_enqueue_page` may have changed.
- if let Some(ref mut enqueue_page) = maybe_enqueue_page {
- enqueue_page.push((sent_at, data));
- }
- }
+ fn on_runtime_upgrade() -> Weight {
+ let index = PageIndex::::get();
+ log::info!(target: LOG, "Page index: {index:?}");
+ let (mut messages_migrated, mut pages_migrated) = (0u32, 0u32);
- // Deposit the enqueued page if any and save the index.
- if let Some(enqueue_page) = maybe_enqueue_page {
- Pages::::insert(page_index.end_used, enqueue_page);
- page_index.end_used += 1;
+ for p in index.begin_used..index.end_used {
+ let page = Pages::::take(p);
+ log::info!(target: LOG, "Migrating page #{p} with {} messages ...", page.len());
+ if page.is_empty() {
+ log::error!(target: LOG, "Page #{p}: EMPTY - storage corrupted?");
}
- PageIndex::::put(page_index);
- used
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate as dmp_queue;
-
- use codec::Encode;
- use cumulus_primitives_core::ParaId;
- use frame_support::{assert_noop, parameter_types, traits::OnIdle};
- use sp_core::H256;
- use sp_runtime::{
- traits::{BlakeTwo256, IdentityLookup},
- BuildStorage,
- DispatchError::BadOrigin,
- };
- use sp_version::RuntimeVersion;
- use std::cell::RefCell;
- use xcm::latest::{MultiLocation, OriginKind};
-
- type Block = frame_system::mocking::MockBlock;
- type Xcm = xcm::latest::Xcm;
-
- frame_support::construct_runtime!(
- pub enum Test
- {
- System: frame_system::{Pallet, Call, Config, Storage, Event},
- DmpQueue: dmp_queue::{Pallet, Call, Storage, Event},
- }
- );
-
- parameter_types! {
- pub const BlockHashCount: u64 = 250;
- pub Version: RuntimeVersion = RuntimeVersion {
- spec_name: sp_version::create_runtime_str!("test"),
- impl_name: sp_version::create_runtime_str!("system-test"),
- authoring_version: 1,
- spec_version: 1,
- impl_version: 1,
- apis: sp_version::create_apis_vec!([]),
- transaction_version: 1,
- state_version: 1,
- };
- pub const ParachainId: ParaId = ParaId::new(200);
- pub const ReservedXcmpWeight: Weight = Weight::zero();
- pub const ReservedDmpWeight: Weight = Weight::zero();
- }
-
- type AccountId = u64;
-
- impl frame_system::Config for Test {
- type RuntimeOrigin = RuntimeOrigin;
- type RuntimeCall = RuntimeCall;
- type Nonce = u64;
- type Hash = H256;
- type Hashing = BlakeTwo256;
- type AccountId = AccountId;
- type Lookup = IdentityLookup;
- type Block = Block;
- type RuntimeEvent = RuntimeEvent;
- type BlockHashCount = BlockHashCount;
- type BlockLength = ();
- type BlockWeights = ();
- type Version = Version;
- type PalletInfo = PalletInfo;
- type AccountData = ();
- type OnNewAccount = ();
- type OnKilledAccount = ();
- type DbWeight = ();
- type BaseCallFilter = frame_support::traits::Everything;
- type SystemWeightInfo = ();
- type SS58Prefix = ();
- type OnSetCode = ();
- type MaxConsumers = frame_support::traits::ConstU32<16>;
- }
+ for (m, (block, msg)) in page.iter().enumerate() {
+ let Ok(bound) = BoundedVec::::try_from(msg.clone()) else {
+ log::error!(target: LOG, "[Page {p}] Message #{m}: TOO LONG - ignoring");
+ continue;
+ };
- thread_local! {
- pub static TRACE: RefCell> = RefCell::new(Vec::new());
- }
- pub fn take_trace() -> Vec<(Xcm, Outcome)> {
- TRACE.with(|q| {
- let q = &mut *q.borrow_mut();
- let r = q.clone();
- q.clear();
- r
- })
- }
-
- pub struct MockPrepared(Xcm);
- impl PreparedMessage for MockPrepared {
- fn weight_of(&self) -> Weight {
- match ((self.0).0.len(), &(self.0).0.first()) {
- (1, Some(Transact { require_weight_at_most, .. })) => *require_weight_at_most,
- _ => Weight::from_parts(1, 1),
+ T::DmpHandler::handle_message(bound.as_bounded_slice());
+ messages_migrated.saturating_inc();
+ log::info!(target: LOG, "[Page {p}] Migrated message #{m} from block {block}");
}
+ pages_migrated.saturating_inc();
}
- }
- pub struct MockExec;
- impl ExecuteXcm for MockExec {
- type Prepared = MockPrepared;
+ log::info!(target: LOG, "Migrated {messages_migrated} messages from {pages_migrated} pages");
- fn prepare(message: Xcm) -> Result {
- Ok(MockPrepared(message))
- }
+ // Now migrate the overweight messages.
+ let mut overweight_migrated = 0u32;
+ log::info!(target: LOG, "Migrating {} overweight messages ...", index.overweight_count);
- fn execute(
- _origin: impl Into,
- prepared: MockPrepared,
- _id: &mut XcmHash,
- _weight_credit: Weight,
- ) -> Outcome {
- let message = prepared.0;
- let o = match (message.0.len(), &message.0.first()) {
- (1, Some(Transact { require_weight_at_most, .. })) =>
- Outcome::Complete(*require_weight_at_most),
- // use 1000 to decide that it's not supported.
- _ => Outcome::Incomplete(Weight::from_parts(1, 1), XcmError::Unimplemented),
+ for i in 0..index.overweight_count {
+ let Some((block, msg)) = Overweight::::take(i) else {
+ log::error!(target: LOG, "[Overweight {i}] Message: EMPTY - storage corrupted?");
+ continue;
+ };
+ let Ok(bound) = BoundedVec::::try_from(msg) else {
+ log::error!(target: LOG, "[Overweight {i}] Message: TOO LONG - ignoring");
+ continue;
};
- TRACE.with(|q| q.borrow_mut().push((message, o.clone())));
- o
- }
-
- fn charge_fees(_location: impl Into, _fees: MultiAssets) -> XcmResult {
- Err(XcmError::Unimplemented)
- }
- }
-
- impl Config for Test {
- type RuntimeEvent = RuntimeEvent;
- type XcmExecutor = MockExec;
- type ExecuteOverweightOrigin = frame_system::EnsureRoot;
- }
-
- pub(crate) fn new_test_ext() -> sp_io::TestExternalities {
- frame_system::GenesisConfig::::default().build_storage().unwrap().into()
- }
- fn enqueue(enqueued: &[Xcm]) {
- if !enqueued.is_empty() {
- let mut index = PageIndex::::get();
- Pages::::insert(
- index.end_used,
- enqueued
- .iter()
- .map(|m| (0, VersionedXcm::::from(m.clone()).encode()))
- .collect::>(),
- );
- index.end_used += 1;
- PageIndex::::put(index);
+ T::DmpHandler::handle_message(bound.as_bounded_slice());
+ overweight_migrated.saturating_inc();
+ log::info!(target: LOG, "[Overweight {i}] Migrated message from block {block}");
}
- }
-
- fn handle_messages(incoming: &[Xcm], limit: Weight) -> Weight {
- let iter = incoming
- .iter()
- .map(|m| (0, VersionedXcm::::from(m.clone()).encode()));
- DmpQueue::handle_dmp_messages(iter, limit)
- }
-
- fn msg(weight: u64) -> Xcm {
- Xcm(vec![Transact {
- origin_kind: OriginKind::Native,
- require_weight_at_most: Weight::from_parts(weight, weight),
- call: Vec::new().into(),
- }])
- }
-
- fn msg_complete(weight: u64) -> (Xcm, Outcome) {
- (msg(weight), Outcome::Complete(Weight::from_parts(weight, weight)))
- }
-
- fn pages_queued() -> PageCounter {
- PageIndex::::get().end_used - PageIndex::::get().begin_used
- }
-
- fn queue_is_empty() -> bool {
- pages_queued() == 0
- }
-
- fn overweights() -> Vec {
- (0..PageIndex::::get().overweight_count)
- .filter(|i| Overweight::::contains_key(i))
- .collect::>()
- }
-
- #[test]
- fn basic_setup_works() {
- new_test_ext().execute_with(|| {
- let weight_used = handle_messages(&[], Weight::from_parts(1000, 1000));
- assert_eq!(weight_used, Weight::zero());
- assert_eq!(take_trace(), Vec::new());
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_inline_complete_works() {
- new_test_ext().execute_with(|| {
- let incoming = vec![msg(1000), msg(1001)];
- let weight_used = handle_messages(&incoming, Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(2001, 2001));
- assert_eq!(take_trace(), vec![msg_complete(1000), msg_complete(1001)]);
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_enqueued_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(1001), msg(1002)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&[], Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(2001, 2001));
- assert_eq!(take_trace(), vec![msg_complete(1000), msg_complete(1001),]);
- });
- }
-
- #[test]
- fn enqueue_works() {
- new_test_ext().execute_with(|| {
- let incoming = vec![msg(1000), msg(1001), msg(1002)];
- let weight_used = handle_messages(&incoming, Weight::from_parts(999, 999));
- assert_eq!(weight_used, Weight::zero());
- assert_eq!(
- PageIndex::::get(),
- PageIndexData { begin_used: 0, end_used: 1, overweight_count: 0 }
- );
- assert_eq!(Pages::::get(0).len(), 3);
- assert_eq!(take_trace(), vec![]);
-
- let weight_used = handle_messages(&[], Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(2001, 2001));
- assert_eq!(take_trace(), vec![msg_complete(1000), msg_complete(1001)]);
-
- let weight_used = handle_messages(&[], Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(1002, 1002));
- assert_eq!(take_trace(), vec![msg_complete(1002)]);
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_inline_then_enqueue_works() {
- new_test_ext().execute_with(|| {
- let incoming = vec![msg(1000), msg(1001), msg(1002)];
- let weight_used = handle_messages(&incoming, Weight::from_parts(1500, 1500));
- assert_eq!(weight_used, Weight::from_parts(1000, 1000));
- assert_eq!(pages_queued(), 1);
- assert_eq!(Pages::::get(0).len(), 2);
- assert_eq!(take_trace(), vec![msg_complete(1000)]);
-
- let weight_used = handle_messages(&[], Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(2003, 2003));
- assert_eq!(take_trace(), vec![msg_complete(1001), msg_complete(1002),]);
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_enqueued_and_inline_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(1001)];
- let incoming = vec![msg(1002), msg(1003)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&incoming, Weight::from_parts(5000, 5000));
- assert_eq!(weight_used, Weight::from_parts(4006, 4006));
- assert_eq!(
- take_trace(),
- vec![
- msg_complete(1000),
- msg_complete(1001),
- msg_complete(1002),
- msg_complete(1003),
- ]
- );
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_enqueued_partially_and_then_enqueue_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(10001)];
- let incoming = vec![msg(1002), msg(1003)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&incoming, Weight::from_parts(5000, 5000));
- assert_eq!(weight_used, Weight::from_parts(1000, 1000));
- assert_eq!(take_trace(), vec![msg_complete(1000)]);
- assert_eq!(pages_queued(), 2);
-
- // 5000 is not enough to process the 10001 blocker, so nothing happens.
- let weight_used = handle_messages(&[], Weight::from_parts(5000, 5000));
- assert_eq!(weight_used, Weight::zero());
- assert_eq!(take_trace(), vec![]);
-
- // 20000 is now enough to process everything.
- let weight_used = handle_messages(&[], Weight::from_parts(20000, 20000));
- assert_eq!(weight_used, Weight::from_parts(12006, 12006));
- assert_eq!(
- take_trace(),
- vec![msg_complete(10001), msg_complete(1002), msg_complete(1003),]
- );
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_enqueued_completely_and_then_enqueue_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(1001)];
- let incoming = vec![msg(10002), msg(1003)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&incoming, Weight::from_parts(5000, 5000));
- assert_eq!(weight_used, Weight::from_parts(2001, 2001));
- assert_eq!(take_trace(), vec![msg_complete(1000), msg_complete(1001)]);
- assert_eq!(pages_queued(), 1);
-
- // 20000 is now enough to process everything.
- let weight_used = handle_messages(&[], Weight::from_parts(20000, 20000));
- assert_eq!(weight_used, Weight::from_parts(11005, 11005));
- assert_eq!(take_trace(), vec![msg_complete(10002), msg_complete(1003),]);
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn service_enqueued_then_inline_then_enqueue_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(1001)];
- let incoming = vec![msg(1002), msg(10003)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&incoming, Weight::from_parts(5000, 5000));
- assert_eq!(weight_used, Weight::from_parts(3003, 3003));
- assert_eq!(
- take_trace(),
- vec![msg_complete(1000), msg_complete(1001), msg_complete(1002),]
- );
- assert_eq!(pages_queued(), 1);
-
- // 20000 is now enough to process everything.
- let weight_used = handle_messages(&[], Weight::from_parts(20000, 20000));
- assert_eq!(weight_used, Weight::from_parts(10003, 10003));
- assert_eq!(take_trace(), vec![msg_complete(10003),]);
- assert!(queue_is_empty());
- });
- }
-
- #[test]
- fn page_crawling_works() {
- new_test_ext().execute_with(|| {
- let enqueued = vec![msg(1000), msg(1001)];
- enqueue(&enqueued);
- let weight_used = handle_messages(&[msg(1002)], Weight::from_parts(1500, 1500));
- assert_eq!(weight_used, Weight::from_parts(1000, 1000));
- assert_eq!(take_trace(), vec![msg_complete(1000)]);
- assert_eq!(pages_queued(), 2);
- assert_eq!(PageIndex::::get().begin_used, 0);
-
- let weight_used = handle_messages(&[msg(1003)], Weight::from_parts(1500, 1500));
- assert_eq!(weight_used, Weight::from_parts(1001, 1001));
- assert_eq!(take_trace(), vec![msg_complete(1001)]);
- assert_eq!(pages_queued(), 2);
- assert_eq!(PageIndex::::get().begin_used, 1);
-
- let weight_used = handle_messages(&[msg(1004)], Weight::from_parts(1500, 1500));
- assert_eq!(weight_used, Weight::from_parts(1002, 1002));
- assert_eq!(take_trace(), vec![msg_complete(1002)]);
- assert_eq!(pages_queued(), 2);
- assert_eq!(PageIndex::::get().begin_used, 2);
- });
- }
-
- #[test]
- fn overweight_should_not_block_queue() {
- new_test_ext().execute_with(|| {
- // Set the overweight threshold to 9999.
- Configuration::::put(ConfigData {
- max_individual: Weight::from_parts(9999, 9999),
- });
-
- let incoming = vec![msg(1000), msg(10001), msg(1002)];
- let weight_used = handle_messages(&incoming, Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::from_parts(2002, 2002));
- assert!(queue_is_empty());
- assert_eq!(take_trace(), vec![msg_complete(1000), msg_complete(1002),]);
-
- assert_eq!(overweights(), vec![0]);
- });
- }
-
- #[test]
- fn overweights_should_be_manually_executable() {
- new_test_ext().execute_with(|| {
- // Set the overweight threshold to 9999.
- Configuration::::put(ConfigData {
- max_individual: Weight::from_parts(9999, 9999),
- });
-
- let incoming = vec![msg(10000)];
- let weight_used = handle_messages(&incoming, Weight::from_parts(2500, 2500));
- assert_eq!(weight_used, Weight::zero());
- assert_eq!(take_trace(), vec![]);
- assert_eq!(overweights(), vec![0]);
-
- assert_noop!(
- DmpQueue::service_overweight(
- RuntimeOrigin::signed(1),
- 0,
- Weight::from_parts(20000, 20000)
- ),
- BadOrigin
- );
- assert_noop!(
- DmpQueue::service_overweight(
- RuntimeOrigin::root(),
- 1,
- Weight::from_parts(20000, 20000)
- ),
- Error::::Unknown
- );
- assert_noop!(
- DmpQueue::service_overweight(
- RuntimeOrigin::root(),
- 0,
- Weight::from_parts(9999, 9999)
- ),
- Error::::OverLimit
- );
- assert_eq!(take_trace(), vec![]);
-
- let base_weight =
- super::Call::::service_overweight { index: 0, weight_limit: Weight::zero() }
- .get_dispatch_info()
- .weight;
- use frame_support::dispatch::GetDispatchInfo;
- let info = DmpQueue::service_overweight(
- RuntimeOrigin::root(),
- 0,
- Weight::from_parts(20000, 20000),
- )
- .unwrap();
- let actual_weight = info.actual_weight.unwrap();
- assert_eq!(actual_weight, base_weight + Weight::from_parts(10000, 10000));
- assert_eq!(take_trace(), vec![msg_complete(10000)]);
- assert!(overweights().is_empty());
-
- assert_noop!(
- DmpQueue::service_overweight(
- RuntimeOrigin::root(),
- 0,
- Weight::from_parts(20000, 20000)
- ),
- Error::::Unknown
- );
- });
- }
-
- #[test]
- fn on_idle_should_service_queue() {
- new_test_ext().execute_with(|| {
- enqueue(&[msg(1000), msg(1001)]);
- enqueue(&[msg(1002), msg(1003)]);
- enqueue(&[msg(1004), msg(1005)]);
-
- let weight_used = DmpQueue::on_idle(1, Weight::from_parts(6000, 6000));
- assert_eq!(weight_used, Weight::from_parts(5010, 5010));
- assert_eq!(
- take_trace(),
- vec![
- msg_complete(1000),
- msg_complete(1001),
- msg_complete(1002),
- msg_complete(1003),
- msg_complete(1004),
- ]
- );
- assert_eq!(pages_queued(), 1);
- });
- }
- #[test]
- fn handle_max_messages_per_block() {
- new_test_ext().execute_with(|| {
- enqueue(&[msg(1000), msg(1001)]);
- enqueue(&[msg(1002), msg(1003)]);
- enqueue(&[msg(1004), msg(1005)]);
-
- let incoming =
- (0..MAX_MESSAGES_PER_BLOCK).map(|i| msg(1006 + i as u64)).collect::>();
- handle_messages(&incoming, Weight::from_parts(25000, 25000));
-
- assert_eq!(
- take_trace(),
- (0..MAX_MESSAGES_PER_BLOCK)
- .map(|i| msg_complete(1000 + i as u64))
- .collect::>(),
- );
- assert_eq!(pages_queued(), 1);
-
- handle_messages(&[], Weight::from_parts(25000, 25000));
- assert_eq!(
- take_trace(),
- (MAX_MESSAGES_PER_BLOCK..MAX_MESSAGES_PER_BLOCK + 6)
- .map(|i| msg_complete(1000 + i as u64))
- .collect::>(),
- );
- });
+ Weight::zero() // FAIL-CI
}
}
diff --git a/cumulus/pallets/dmp-queue/src/migration.rs b/cumulus/pallets/dmp-queue/src/migration.rs
deleted file mode 100644
index b2323f6a60fa..000000000000
--- a/cumulus/pallets/dmp-queue/src/migration.rs
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright 2022 Parity Technologies (UK) Ltd.
-// This file is part of Polkadot.
-
-// Polkadot is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Polkadot is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Polkadot. If not, see .
-
-//! A module that is responsible for migration of storage.
-
-use crate::{Config, Configuration, Overweight, Pallet, DEFAULT_POV_SIZE};
-use frame_support::{
- pallet_prelude::*,
- traits::{OnRuntimeUpgrade, StorageVersion},
- weights::{constants::WEIGHT_REF_TIME_PER_MILLIS, Weight},
-};
-
-/// The current storage version.
-pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(2);
-
-/// Migrates the pallet storage to the most recent version.
-pub struct Migration(PhantomData);
-
-impl OnRuntimeUpgrade for Migration {
- fn on_runtime_upgrade() -> Weight {
- let mut weight = T::DbWeight::get().reads(1);
-
- if StorageVersion::get::>() == 0 {
- weight.saturating_accrue(migrate_to_v1::());
- StorageVersion::new(1).put::>();
- weight.saturating_accrue(T::DbWeight::get().writes(1));
- }
-
- if StorageVersion::get::>() == 1 {
- weight.saturating_accrue(migrate_to_v2::());
- StorageVersion::new(2).put::>();
- weight.saturating_accrue(T::DbWeight::get().writes(1));
- }
-
- weight
- }
-}
-
-mod v0 {
- use super::*;
- use codec::{Decode, Encode};
-
- #[derive(Decode, Encode, Debug)]
- pub struct ConfigData {
- pub max_individual: u64,
- }
-
- impl Default for ConfigData {
- fn default() -> Self {
- ConfigData { max_individual: 10u64 * WEIGHT_REF_TIME_PER_MILLIS }
- }
- }
-}
-
-/// Migrates `QueueConfigData` from v1 (using only reference time weights) to v2 (with
-/// 2D weights).
-///
-/// NOTE: Only use this function if you know what you're doing. Default to using
-/// `migrate_to_latest`.
-pub fn migrate_to_v1() -> Weight {
- let translate = |pre: v0::ConfigData| -> super::ConfigData {
- super::ConfigData {
- max_individual: Weight::from_parts(pre.max_individual, DEFAULT_POV_SIZE),
- }
- };
-
- if Configuration::::translate(|pre| pre.map(translate)).is_err() {
- log::error!(
- target: "dmp_queue",
- "unexpected error when performing translation of the QueueConfig type during storage upgrade to v2"
- );
- }
-
- T::DbWeight::get().reads_writes(1, 1)
-}
-
-/// Migrates `Overweight` so that it initializes the storage map's counter.
-///
-/// NOTE: Only use this function if you know what you're doing. Default to using
-/// `migrate_to_latest`.
-pub fn migrate_to_v2() -> Weight {
- let overweight_messages = Overweight::::initialize_counter() as u64;
-
- T::DbWeight::get().reads_writes(overweight_messages, 1)
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::tests::{new_test_ext, Test};
-
- #[test]
- fn test_migration_to_v1() {
- let v0 = v0::ConfigData { max_individual: 30_000_000_000 };
-
- new_test_ext().execute_with(|| {
- frame_support::storage::unhashed::put_raw(
- &crate::Configuration::::hashed_key(),
- &v0.encode(),
- );
-
- migrate_to_v1::();
-
- let v1 = crate::Configuration::::get();
-
- assert_eq!(v0.max_individual, v1.max_individual.ref_time());
- assert_eq!(v1.max_individual.proof_size(), DEFAULT_POV_SIZE);
- });
- }
-}
diff --git a/cumulus/pallets/dmp-queue/src/tests.rs b/cumulus/pallets/dmp-queue/src/tests.rs
new file mode 100644
index 000000000000..738701340acb
--- /dev/null
+++ b/cumulus/pallets/dmp-queue/src/tests.rs
@@ -0,0 +1,145 @@
+// Copyright Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see .
+
+//! Test the migration.
+
+#![cfg(test)]
+
+use super::*;
+
+#[cfg(feature = "try-runtime")]
+use frame_support::assert_ok;
+use frame_support::{
+ parameter_types,
+ traits::{Footprint, HandleMessage, OnRuntimeUpgrade},
+ StorageNoopGuard,
+};
+use sp_core::bounded_vec::BoundedSlice;
+use sp_io::TestExternalities as TestExt;
+
+parameter_types! {
+ static RecordedMessages: u32 = 0;
+}
+
+struct MockedDmpHandler;
+impl HandleMessage for MockedDmpHandler {
+ type MaxMessageLen = ConstU32<16>;
+
+ fn handle_message(_: BoundedSlice) {
+ RecordedMessages::mutate(|n| *n += 1);
+ }
+
+ fn handle_messages<'a>(_: impl Iterator
- >) {
+ unimplemented!()
+ }
+
+ fn sweep_queue() {
+ unimplemented!()
+ }
+
+ fn footprint() -> Footprint {
+ unimplemented!()
+ }
+}
+
+parameter_types! {
+ const PalletName: &'static str = "DmpQueue";
+}
+
+struct Runtime;
+impl MigrationConfig for Runtime {
+ type PalletName = PalletName;
+ type DmpHandler = MockedDmpHandler;
+ type DbWeight = ();
+}
+
+#[test]
+fn migration_works() {
+ TestExt::default().execute_with(|| {
+ // This test should leak no storage:
+ let _g = StorageNoopGuard::default();
+
+ // Setup the storage:
+ PageIndex::::set(PageIndexData {
+ begin_used: 10,
+ end_used: 20,
+ overweight_count: 5,
+ });
+
+ for p in 10..20 {
+ let msgs = (0..16).map(|i| (p, vec![i as u8; 1])).collect::>();
+ Pages::::insert(p, msgs);
+ }
+
+ for i in 0..5 {
+ Overweight::::insert(i, (0, vec![i as u8; 1]));
+ }
+
+ // Run the migration:
+ #[cfg(feature = "try-runtime")]
+ assert_ok!(UndeployDmpQueue::::pre_upgrade());
+ let _weight = UndeployDmpQueue::::on_runtime_upgrade();
+ #[cfg(feature = "try-runtime")]
+ assert_ok!(UndeployDmpQueue::::post_upgrade(vec![]));
+
+ assert_eq!(RecordedMessages::take(), 10 * 16 + 5);
+
+ // Test the storage removal:
+ assert!(PageIndex::::exists(), "Not gone yet");
+ DeleteDmpQueue::::on_runtime_upgrade();
+ assert!(!PageIndex::::exists());
+ assert!(!Pages::::contains_key(10));
+ assert!(!Overweight::::contains_key(0));
+ });
+}
+
+#[test]
+fn migration_too_long_ignored() {
+ TestExt::default().execute_with(|| {
+ // This test should leak no storage:
+ //let _g = StorageNoopGuard::default();
+
+ // Setup the storage:
+ PageIndex::::set(PageIndexData {
+ begin_used: 10,
+ end_used: 11,
+ overweight_count: 2,
+ });
+
+ let short = vec![1; 16];
+ let long = vec![0; 17];
+ Pages::::insert(10, vec![(10, short.clone()), (10, long.clone())]);
+ // Insert one good and one bad overweight msg:
+ Overweight::::insert(0, (0, short.clone()));
+ Overweight::::insert(1, (0, long.clone()));
+
+ // Run the migration:
+ #[cfg(feature = "try-runtime")]
+ assert_ok!(UndeployDmpQueue::::pre_upgrade());
+ let _weight = UndeployDmpQueue::::on_runtime_upgrade();
+ #[cfg(feature = "try-runtime")]
+ assert_ok!(UndeployDmpQueue::::post_upgrade(vec![]));
+
+ assert_eq!(RecordedMessages::take(), 2);
+
+ // Test the storage removal:
+ assert!(PageIndex::::exists(), "Not gone yet");
+ DeleteDmpQueue::::on_runtime_upgrade();
+ assert!(!PageIndex::::exists());
+ assert!(!Pages::::contains_key(10));
+ assert!(!Overweight::::contains_key(0));
+ });
+}
diff --git a/cumulus/pallets/parachain-system/Cargo.toml b/cumulus/pallets/parachain-system/Cargo.toml
index cfc439724f5a..b1d8826ff402 100644
--- a/cumulus/pallets/parachain-system/Cargo.toml
+++ b/cumulus/pallets/parachain-system/Cargo.toml
@@ -17,6 +17,7 @@ scale-info = { version = "2.9.0", default-features = false, features = ["derive"
# Substrate
frame-support = { path = "../../../substrate/frame/support", default-features = false}
frame-system = { path = "../../../substrate/frame/system", default-features = false}
+pallet-message-queue = { git = "https://github.com/paritytech/substrate", default-features = false, branch = "master" }
sp-core = { path = "../../../substrate/primitives/core", default-features = false}
sp-externalities = { path = "../../../substrate/primitives/externalities", default-features = false}
sp-inherents = { path = "../../../substrate/primitives/inherents", default-features = false}
@@ -40,6 +41,7 @@ cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inh
assert_matches = "1.5"
hex-literal = "0.4.1"
lazy_static = "1.4"
+rand = "0.8.5"
# Substrate
sc-client-api = { path = "../../../substrate/client/api" }
@@ -60,22 +62,34 @@ std = [
"cumulus-primitives-parachain-inherent/std",
"environmental/std",
"frame-support/std",
+ "frame-benchmarking/std",
"frame-system/std",
"log/std",
"scale-info/std",
+ "pallet-message-queue/std",
"sp-core/std",
"sp-externalities/std",
"sp-io/std",
"sp-runtime/std",
"sp-state-machine/std",
"sp-std/std",
+ "sp-tracing/std",
"sp-trie/std",
"trie-db/std",
"xcm/std",
]
-runtime-benchmarks = [ "sp-runtime/runtime-benchmarks" ]
+runtime-benchmarks = [
+ "frame-benchmarking/runtime-benchmarks",
+ "frame-support/runtime-benchmarks",
+ "sp-runtime/runtime-benchmarks",
+ "pallet-message-queue/runtime-benchmarks",
+ "cumulus-test-client/runtime-benchmarks",
+]
-try-runtime = [ "frame-support/try-runtime" ]
+try-runtime = [
+ "frame-support/try-runtime",
+ "pallet-message-queue/try-runtime",
+]
parameterized-consensus-hook = []
diff --git a/cumulus/pallets/parachain-system/src/benchmarking.rs b/cumulus/pallets/parachain-system/src/benchmarking.rs
new file mode 100644
index 000000000000..375feaa867b9
--- /dev/null
+++ b/cumulus/pallets/parachain-system/src/benchmarking.rs
@@ -0,0 +1,65 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: Apache-2.0
+
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Benchmarking for the parachain-system pallet.
+
+#![cfg(feature = "runtime-benchmarks")]
+
+use super::*;
+use cumulus_primitives_core::relay_chain::Hash as RelayHash;
+use frame_benchmarking::v2::*;
+use sp_runtime::traits::BlakeTwo256;
+
+#[benchmarks]
+mod benchmarks {
+ use super::*;
+
+ /// Enqueue `n` messages via `enqueue_inbound_downward_messages`.
+ #[benchmark]
+ fn enqueue_inbound_downward_messages(n: Linear<0, 1000>) {
+ let msg = InboundDownwardMessage {
+ sent_at: n, // The block number does not matter.
+ msg: vec![0u8; MaxDmpMessageLenOf::::get() as usize],
+ };
+ let msgs = vec![msg; n as usize];
+ let head = mqp_head(&msgs);
+
+ #[block]
+ {
+ Pallet::::enqueue_inbound_downward_messages(head, msgs);
+ }
+
+ assert_eq!(ProcessedDownwardMessages::::get(), n);
+ assert_eq!(LastDmqMqcHead::::get().head(), head);
+ }
+
+ /// Re-implements an easy version of the `MessageQueueChain` for testing purposes.
+ fn mqp_head(msgs: &Vec) -> RelayHash {
+ let mut head = Default::default();
+ for msg in msgs.iter() {
+ let msg_hash = BlakeTwo256::hash_of(&msg.msg);
+ head = BlakeTwo256::hash_of(&(head, msg.sent_at, msg_hash));
+ }
+ head
+ }
+
+ impl_benchmark_test_suite! {
+ Pallet,
+ crate::mock::new_test_ext(),
+ crate::mock::Test
+ }
+}
diff --git a/cumulus/pallets/parachain-system/src/consensus_hook.rs b/cumulus/pallets/parachain-system/src/consensus_hook.rs
index 2566eea9bbcc..146696a22afd 100644
--- a/cumulus/pallets/parachain-system/src/consensus_hook.rs
+++ b/cumulus/pallets/parachain-system/src/consensus_hook.rs
@@ -66,6 +66,14 @@ pub trait ConsensusHook {
fn on_state_proof(state_proof: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity);
}
+// testing
+#[cfg(any(feature = "std", feature = "runtime-benchmarks"))]
+impl ConsensusHook for () {
+ fn on_state_proof(_state_proof: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity) {
+ (Weight::zero(), UnincludedSegmentCapacity::from(NonZeroU32::new(1).unwrap()))
+ }
+}
+
/// A special consensus hook for handling the migration to asynchronous backing gracefully,
/// even if collators haven't been updated to provide the last included parent in the state
/// proof yet.
diff --git a/cumulus/pallets/parachain-system/src/lib.rs b/cumulus/pallets/parachain-system/src/lib.rs
index ded869bf6f79..67ebf9d4541e 100644
--- a/cumulus/pallets/parachain-system/src/lib.rs
+++ b/cumulus/pallets/parachain-system/src/lib.rs
@@ -29,18 +29,19 @@
use codec::{Decode, Encode, MaxEncodedLen};
use cumulus_primitives_core::{
- relay_chain, AbridgedHostConfiguration, ChannelStatus, CollationInfo, DmpMessageHandler,
- GetChannelInfo, InboundDownwardMessage, InboundHrmpMessage, MessageSendError,
- OutboundHrmpMessage, ParaId, PersistedValidationData, UpwardMessage, UpwardMessageSender,
- XcmpMessageHandler, XcmpMessageSource,
+ relay_chain, AbridgedHostConfiguration, ChannelStatus, CollationInfo, GetChannelInfo,
+ InboundDownwardMessage, InboundHrmpMessage, MessageSendError, OutboundHrmpMessage, ParaId,
+ PersistedValidationData, UpwardMessage, UpwardMessageSender, XcmpMessageHandler,
+ XcmpMessageSource,
};
use cumulus_primitives_parachain_inherent::{MessageQueueChain, ParachainInherentData};
use frame_support::{
+ defensive,
dispatch::{DispatchError, DispatchResult, Pays, PostDispatchInfo},
ensure,
inherent::{InherentData, InherentIdentifier, ProvideInherent},
storage,
- traits::Get,
+ traits::{Get, HandleMessage},
weights::Weight,
};
use frame_system::{ensure_none, ensure_root, pallet_prelude::HeaderFor};
@@ -52,15 +53,21 @@ use sp_runtime::{
InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity,
ValidTransaction,
},
+ BoundedSlice,
RuntimeDebug,
};
use sp_std::{cmp, collections::btree_map::BTreeMap, prelude::*};
use xcm::latest::XcmHash;
+mod benchmarking;
pub mod migration;
-
+mod mock;
#[cfg(test)]
mod tests;
+pub mod weights;
+
+pub use weights::WeightInfo;
+
mod unincluded_segment;
pub mod consensus_hook;
@@ -177,6 +184,9 @@ where
check_version: bool,
}
+/// The maximal length of a DMP message.
+pub type MaxDmpMessageLenOf = <::DmpQueue as HandleMessage>::MaxMessageLen;
+
#[frame_support::pallet]
pub mod pallet {
use super::*;
@@ -202,17 +212,18 @@ pub mod pallet {
/// The place where outbound XCMP messages come from. This is queried in `finalize_block`.
type OutboundXcmpMessageSource: XcmpMessageSource;
- /// The message handler that will be invoked when messages are received via DMP.
- type DmpMessageHandler: DmpMessageHandler;
+ /// Queues inbound downward messages for delayed processing.
+ ///
+ /// All inbound DMP messages from the relay are pushed into this. The handler is expected to
+ /// eventually process all the messages that are pushed to it.
+ type DmpQueue: HandleMessage;
/// The weight we reserve at the beginning of the block for processing DMP messages.
type ReservedDmpWeight: Get;
/// The message handler that will be invoked when messages are received via XCMP.
///
- /// The messages are dispatched in the order they were relayed by the relay chain. If
- /// multiple messages were relayed at one block, these will be dispatched in ascending
- /// order of the sender's para ID.
+ /// This should normally link to the XCMP Queue pallet.
type XcmpMessageHandler: XcmpMessageHandler;
/// The weight we reserve at the beginning of the block for processing XCMP messages.
@@ -221,6 +232,9 @@ pub mod pallet {
/// Something that can check the associated relay parent block number.
type CheckAssociatedRelayNumber: CheckAssociatedRelayNumber;
+ /// Weight info for functions and calls.
+ type WeightInfo: WeightInfo;
+
/// An entry-point for higher-level logic to manage the backlog of unincluded parachain
/// blocks and authorship rights for those blocks.
///
@@ -601,15 +615,16 @@ pub mod pallet {
::on_validation_data(&vfp);
- total_weight += Self::process_inbound_downward_messages(
+ let mut total_weight = Weight::zero();
+ total_weight.saturating_accrue(Self::enqueue_inbound_downward_messages(
relevant_messaging_state.dmq_mqc_head,
downward_messages,
- );
- total_weight += Self::process_inbound_horizontal_messages(
+ ));
+ total_weight.saturating_accrue(Self::enqueue_inbound_horizontal_messages(
&relevant_messaging_state.ingress_channels,
horizontal_messages,
vfp.relay_parent_number,
- );
+ ));
Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No })
}
@@ -1055,7 +1070,7 @@ impl Pallet {
// inherent.
}
- /// Process all inbound downward messages relayed by the collator.
+ /// Enqueue all inbound downward messages relayed by the collator into the MQ pallet.
///
/// Checks if the sequence of the messages is valid, dispatches them and communicates the
/// number of processed messages to the collator via a storage update.
@@ -1064,26 +1079,34 @@ impl Pallet {
///
/// If it turns out that after processing all messages the Message Queue Chain
/// hash doesn't match the expected.
- fn process_inbound_downward_messages(
+ fn enqueue_inbound_downward_messages(
expected_dmq_mqc_head: relay_chain::Hash,
downward_messages: Vec,
) -> Weight {
let dm_count = downward_messages.len() as u32;
let mut dmq_head = >::get();
- let mut weight_used = Weight::zero();
+ let weight_used = T::WeightInfo::enqueue_inbound_downward_messages(dm_count);
if dm_count != 0 {
Self::deposit_event(Event::DownwardMessagesReceived { count: dm_count });
- let max_weight =
- >::get().unwrap_or_else(T::ReservedDmpWeight::get);
-
- let message_iter = downward_messages
- .into_iter()
- .inspect(|m| {
- dmq_head.extend_downward(m);
- })
- .map(|m| (m.sent_at, m.msg));
- weight_used += T::DmpMessageHandler::handle_dmp_messages(message_iter, max_weight);
+
+ // Eagerly update the MQC head hash:
+ for m in &downward_messages {
+ dmq_head.extend_downward(m);
+ }
+ // Note: we are not using `.defensive()` here since that prints the whole value to
+ // console. In case that the message is too long, this clogs up the log quite badly.
+ let bounded =
+ downward_messages
+ .iter()
+ .filter_map(|m| match BoundedSlice::try_from(&m.msg[..]) {
+ Ok(bounded) => Some(bounded),
+ Err(_) => {
+ defensive!("Inbound Downward message was too long; dropping");
+ None
+ },
+ });
+ T::DmpQueue::handle_messages(bounded);
>::put(&dmq_head);
Self::deposit_event(Event::DownwardMessagesProcessed {
@@ -1106,14 +1129,15 @@ impl Pallet {
/// Process all inbound horizontal messages relayed by the collator.
///
- /// This is similar to `Pallet::process_inbound_downward_messages`, but works on multiple
- /// inbound channels.
+ /// This is similar to [`enqueue_inbound_downward_messages`], but works on multiple inbound
+ /// channels. It immediately dispatches signals and queues all other XCM. Blob messages are
+ /// ignored.
///
/// **Panics** if either any of horizontal messages submitted by the collator was sent from
/// a para which has no open channel to this parachain or if after processing
/// messages across all inbound channels MQCs were obtained which do not
/// correspond to the ones found on the relay-chain.
- fn process_inbound_horizontal_messages(
+ fn enqueue_inbound_horizontal_messages(
ingress_channels: &[(ParaId, cumulus_primitives_core::AbridgedHrmpChannel)],
horizontal_messages: BTreeMap>,
relay_parent_number: relay_chain::BlockNumber,
diff --git a/cumulus/pallets/parachain-system/src/mock.rs b/cumulus/pallets/parachain-system/src/mock.rs
new file mode 100644
index 000000000000..ec5a5c6eeb24
--- /dev/null
+++ b/cumulus/pallets/parachain-system/src/mock.rs
@@ -0,0 +1,504 @@
+// Copyright Parity Technologies (UK) Ltd.
+// This file is part of Cumulus.
+
+// Cumulus is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Cumulus is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Cumulus. If not, see .
+
+//! Test setup and helpers.
+
+#![cfg(test)]
+
+use super::*;
+
+use codec::Encode;
+use cumulus_primitives_core::{
+ relay_chain::BlockNumber as RelayBlockNumber, AggregateMessageOrigin, InboundDownwardMessage,
+ InboundHrmpMessage, PersistedValidationData,
+};
+use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
+use frame_support::{
+ dispatch::UnfilteredDispatchable,
+ inherent::{InherentData, ProvideInherent},
+ parameter_types,
+ traits::{OnFinalize, OnInitialize, ProcessMessage, ProcessMessageError},
+ weights::{Weight, WeightMeter},
+};
+use frame_system::{pallet_prelude::BlockNumberFor, RawOrigin};
+use sp_core::H256;
+use sp_runtime::{
+ traits::{BlakeTwo256, IdentityLookup},
+ BuildStorage,
+};
+use sp_std::{collections::vec_deque::VecDeque, num::NonZeroU32};
+use sp_version::RuntimeVersion;
+use std::cell::RefCell;
+
+use crate as parachain_system;
+use crate::consensus_hook::UnincludedSegmentCapacity;
+
+type Block = frame_system::mocking::MockBlock;
+
+frame_support::construct_runtime!(
+ pub enum Test {
+ System: frame_system::{Pallet, Call, Config, Storage, Event},
+ ParachainSystem: parachain_system::{Pallet, Call, Config, Storage, Inherent, Event, ValidateUnsigned},
+ MessageQueue: pallet_message_queue::{Pallet, Call, Storage, Event},
+ }
+);
+
+parameter_types! {
+ pub const BlockHashCount: u64 = 250;
+ pub Version: RuntimeVersion = RuntimeVersion {
+ spec_name: sp_version::create_runtime_str!("test"),
+ impl_name: sp_version::create_runtime_str!("system-test"),
+ authoring_version: 1,
+ spec_version: 1,
+ impl_version: 1,
+ apis: sp_version::create_apis_vec!([]),
+ transaction_version: 1,
+ state_version: 1,
+ };
+ pub const ParachainId: ParaId = ParaId::new(200);
+ pub const ReservedXcmpWeight: Weight = Weight::zero();
+ pub const ReservedDmpWeight: Weight = Weight::zero();
+}
+
+impl frame_system::Config for Test {
+ type RuntimeOrigin = RuntimeOrigin;
+ type RuntimeCall = RuntimeCall;
+ type Nonce = u64;
+ type Hash = H256;
+ type Hashing = BlakeTwo256;
+ type AccountId = u64;
+ type Lookup = IdentityLookup;
+ type Block = Block;
+ type RuntimeEvent = RuntimeEvent;
+ type BlockHashCount = BlockHashCount;
+ type BlockLength = ();
+ type BlockWeights = ();
+ type Version = Version;
+ type PalletInfo = PalletInfo;
+ type AccountData = ();
+ type OnNewAccount = ();
+ type OnKilledAccount = ();
+ type DbWeight = ();
+ type BaseCallFilter = frame_support::traits::Everything;
+ type SystemWeightInfo = ();
+ type SS58Prefix = ();
+ type OnSetCode = ParachainSetCode;
+ type MaxConsumers = frame_support::traits::ConstU32<16>;
+}
+
+parameter_types! {
+ pub const RelayOrigin: AggregateMessageOrigin = AggregateMessageOrigin::Parent;
+}
+
+impl Config for Test {
+ type RuntimeEvent = RuntimeEvent;
+ type OnSystemEvent = ();
+ type SelfParaId = ParachainId;
+ type OutboundXcmpMessageSource = FromThreadLocal;
+ type DmpQueue = frame_support::traits::EnqueueWithOrigin;
+ type ReservedDmpWeight = ReservedDmpWeight;
+ type XcmpMessageHandler = SaveIntoThreadLocal;
+ type ReservedXcmpWeight = ReservedXcmpWeight;
+ type CheckAssociatedRelayNumber = AnyRelayNumber;
+ type ConsensusHook = TestConsensusHook;
+ type WeightInfo = ();
+}
+
+std::thread_local! {
+ pub static CONSENSUS_HOOK: RefCell (Weight, UnincludedSegmentCapacity)>>
+ = RefCell::new(Box::new(|_| (Weight::zero(), NonZeroU32::new(1).unwrap().into())));
+}
+
+pub struct TestConsensusHook;
+
+impl ConsensusHook for TestConsensusHook {
+ fn on_state_proof(s: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity) {
+ CONSENSUS_HOOK.with(|f| f.borrow_mut()(s))
+ }
+}
+
+parameter_types! {
+ pub const MaxWeight: Weight = Weight::MAX;
+}
+
+impl pallet_message_queue::Config for Test {
+ type RuntimeEvent = RuntimeEvent;
+ #[cfg(feature = "runtime-benchmarks")]
+ type MessageProcessor =
+ pallet_message_queue::mock_helpers::NoopMessageProcessor;
+ #[cfg(not(feature = "runtime-benchmarks"))]
+ type MessageProcessor = SaveIntoThreadLocal;
+ type Size = u32;
+ type QueueChangeHandler = ();
+ type QueuePausedQuery = ();
+ type HeapSize = sp_core::ConstU32<{ 64 * 1024 }>;
+ type MaxStale = sp_core::ConstU32<8>;
+ type ServiceWeight = MaxWeight;
+ type WeightInfo = ();
+}
+
+pub struct FromThreadLocal;
+pub struct SaveIntoThreadLocal;
+
+std::thread_local! {
+ pub static HANDLED_DMP_MESSAGES: RefCell>> = RefCell::new(Vec::new());
+ pub static HANDLED_XCMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new());
+ pub static SENT_MESSAGES: RefCell)>> = RefCell::new(Vec::new());
+}
+
+pub fn send_message(dest: ParaId, message: Vec) {
+ SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message)));
+}
+
+impl XcmpMessageSource for FromThreadLocal {
+ fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> {
+ let mut ids = std::collections::BTreeSet::::new();
+ let mut taken_messages = 0;
+ let mut taken_bytes = 0;
+ let mut result = Vec::new();
+ SENT_MESSAGES.with(|ms| {
+ ms.borrow_mut().retain(|m| {
+ let status = as GetChannelInfo>::get_channel_status(m.0);
+ let (max_size_now, max_size_ever) = match status {
+ ChannelStatus::Ready(now, ever) => (now, ever),
+ ChannelStatus::Closed => return false, // drop message
+ ChannelStatus::Full => return true, // keep message queued.
+ };
+
+ let msg_len = m.1.len();
+
+ if !ids.contains(&m.0) &&
+ taken_messages < maximum_channels &&
+ msg_len <= max_size_ever &&
+ taken_bytes + msg_len <= max_size_now
+ {
+ ids.insert(m.0);
+ taken_messages += 1;
+ taken_bytes += msg_len;
+ result.push(m.clone());
+ false
+ } else {
+ true
+ }
+ })
+ });
+ result
+ }
+}
+
+impl ProcessMessage for SaveIntoThreadLocal {
+ type Origin = AggregateMessageOrigin;
+
+ fn process_message(
+ message: &[u8],
+ origin: Self::Origin,
+ _meter: &mut WeightMeter,
+ _id: &mut [u8; 32],
+ ) -> Result {
+ assert_eq!(origin, Self::Origin::Parent);
+
+ HANDLED_DMP_MESSAGES.with(|m| {
+ m.borrow_mut().push(message.to_vec());
+ Weight::zero()
+ });
+ Ok(true)
+ }
+}
+
+impl XcmpMessageHandler for SaveIntoThreadLocal {
+ fn handle_xcmp_messages<'a, I: Iterator
- >(
+ iter: I,
+ _max_weight: Weight,
+ ) -> Weight {
+ HANDLED_XCMP_MESSAGES.with(|m| {
+ for (sender, sent_at, message) in iter {
+ m.borrow_mut().push((sender, sent_at, message.to_vec()));
+ }
+ Weight::zero()
+ })
+ }
+}
+
+// This function basically just builds a genesis storage key/value store according to
+// our desired mockup.
+pub fn new_test_ext() -> sp_io::TestExternalities {
+ HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear());
+ HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear());
+
+ frame_system::GenesisConfig::::default().build_storage().unwrap().into()
+}
+
+#[allow(dead_code)]
+pub fn mk_dmp(sent_at: u32) -> InboundDownwardMessage {
+ InboundDownwardMessage { sent_at, msg: format!("down{}", sent_at).into_bytes() }
+}
+
+pub fn mk_hrmp(sent_at: u32) -> InboundHrmpMessage {
+ InboundHrmpMessage { sent_at, data: format!("{}", sent_at).into_bytes() }
+}
+
+pub struct ReadRuntimeVersion(pub Vec);
+
+impl sp_core::traits::ReadRuntimeVersion for ReadRuntimeVersion {
+ fn read_runtime_version(
+ &self,
+ _wasm_code: &[u8],
+ _ext: &mut dyn sp_externalities::Externalities,
+ ) -> Result, String> {
+ Ok(self.0.clone())
+ }
+}
+
+pub fn wasm_ext() -> sp_io::TestExternalities {
+ let version = RuntimeVersion {
+ spec_name: "test".into(),
+ spec_version: 2,
+ impl_version: 1,
+ ..Default::default()
+ };
+
+ let mut ext = new_test_ext();
+ ext.register_extension(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion(
+ version.encode(),
+ )));
+ ext
+}
+
+pub struct BlockTest {
+ n: BlockNumberFor,
+ within_block: Box,
+ after_block: Option>,
+}
+
+/// BlockTests exist to test blocks with some setup: we have to assume that
+/// `validate_block` will mutate and check storage in certain predictable
+/// ways, for example, and we want to always ensure that tests are executed
+/// in the context of some particular block number.
+#[derive(Default)]
+pub struct BlockTests {
+ tests: Vec,
+ without_externalities: bool,
+ pending_upgrade: Option,
+ ran: bool,
+ relay_sproof_builder_hook:
+ Option>,
+ inherent_data_hook:
+ Option>,
+ inclusion_delay: Option,
+ relay_block_number: Option) -> RelayChainBlockNumber>>,
+
+ included_para_head: Option,
+ pending_blocks: VecDeque,
+}
+
+impl BlockTests {
+ pub fn new() -> BlockTests {
+ Default::default()
+ }
+
+ pub fn new_without_externalities() -> BlockTests {
+ let mut tests = BlockTests::new();
+ tests.without_externalities = true;
+ tests
+ }
+
+ pub fn add_raw(mut self, test: BlockTest) -> Self {
+ self.tests.push(test);
+ self
+ }
+
+ pub fn add(self, n: BlockNumberFor, within_block: F) -> Self
+ where
+ F: 'static + Fn(),
+ {
+ self.add_raw(BlockTest { n, within_block: Box::new(within_block), after_block: None })
+ }
+
+ pub fn add_with_post_test(
+ self,
+ n: BlockNumberFor,
+ within_block: F1,
+ after_block: F2,
+ ) -> Self
+ where
+ F1: 'static + Fn(),
+ F2: 'static + Fn(),
+ {
+ self.add_raw(BlockTest {
+ n,
+ within_block: Box::new(within_block),
+ after_block: Some(Box::new(after_block)),
+ })
+ }
+
+ pub fn with_relay_sproof_builder(mut self, f: F) -> Self
+ where
+ F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut RelayStateSproofBuilder),
+ {
+ self.relay_sproof_builder_hook = Some(Box::new(f));
+ self
+ }
+
+ pub fn with_relay_block_number(mut self, f: F) -> Self
+ where
+ F: 'static + Fn(&BlockNumberFor) -> RelayChainBlockNumber,
+ {
+ self.relay_block_number = Some(Box::new(f));
+ self
+ }
+
+ pub fn with_inherent_data(mut self, f: F) -> Self
+ where
+ F: 'static + Fn(&BlockTests, RelayChainBlockNumber, &mut ParachainInherentData),
+ {
+ self.inherent_data_hook = Some(Box::new(f));
+ self
+ }
+
+ pub fn with_inclusion_delay(mut self, inclusion_delay: usize) -> Self {
+ self.inclusion_delay.replace(inclusion_delay);
+ self
+ }
+
+ pub fn run(&mut self) {
+ wasm_ext().execute_with(|| {
+ self.run_without_ext();
+ });
+ }
+
+ pub fn run_without_ext(&mut self) {
+ self.ran = true;
+
+ let mut parent_head_data = {
+ let header = HeaderFor::::new_from_number(0);
+ relay_chain::HeadData(header.encode())
+ };
+
+ self.included_para_head = Some(parent_head_data.clone());
+
+ for BlockTest { n, within_block, after_block } in self.tests.iter() {
+ let relay_parent_number = self
+ .relay_block_number
+ .as_ref()
+ .map(|f| f(n))
+ .unwrap_or(*n as RelayChainBlockNumber);
+ // clear pending updates, as applicable
+ if let Some(upgrade_block) = self.pending_upgrade {
+ if n >= &upgrade_block.into() {
+ self.pending_upgrade = None;
+ }
+ }
+
+ // begin initialization
+ let parent_hash = BlakeTwo256::hash(&parent_head_data.0);
+ System::reset_events();
+ System::initialize(n, &parent_hash, &Default::default());
+
+ // now mess with the storage the way validate_block does
+ let mut sproof_builder = RelayStateSproofBuilder::default();
+ sproof_builder.included_para_head = self
+ .included_para_head
+ .clone()
+ .unwrap_or_else(|| parent_head_data.clone())
+ .into();
+ if let Some(ref hook) = self.relay_sproof_builder_hook {
+ hook(self, relay_parent_number, &mut sproof_builder);
+ }
+ let (relay_parent_storage_root, relay_chain_state) =
+ sproof_builder.into_state_root_and_proof();
+ let vfp = PersistedValidationData {
+ relay_parent_number,
+ relay_parent_storage_root,
+ ..Default::default()
+ };
+
+ >::put(&vfp);
+ NewValidationCode::::kill();
+
+ // It is insufficient to push the validation function params
+ // to storage; they must also be included in the inherent data.
+ let inherent_data = {
+ let mut inherent_data = InherentData::default();
+ let mut system_inherent_data = ParachainInherentData {
+ validation_data: vfp.clone(),
+ relay_chain_state,
+ downward_messages: Default::default(),
+ horizontal_messages: Default::default(),
+ };
+ if let Some(ref hook) = self.inherent_data_hook {
+ hook(self, relay_parent_number, &mut system_inherent_data);
+ }
+ inherent_data
+ .put_data(
+ cumulus_primitives_parachain_inherent::INHERENT_IDENTIFIER,
+ &system_inherent_data,
+ )
+ .expect("failed to put VFP inherent");
+ inherent_data
+ };
+
+ // execute the block
+ ParachainSystem::on_initialize(*n);
+ ParachainSystem::create_inherent(&inherent_data)
+ .expect("got an inherent")
+ .dispatch_bypass_filter(RawOrigin::None.into())
+ .expect("dispatch succeeded");
+ MessageQueue::on_initialize(*n);
+ within_block();
+ MessageQueue::on_finalize(*n);
+ ParachainSystem::on_finalize(*n);
+
+ // did block execution set new validation code?
+ if NewValidationCode::::exists() && self.pending_upgrade.is_some() {
+ panic!("attempted to set validation code while upgrade was pending");
+ }
+
+ // clean up
+ let header = System::finalize();
+ let head_data = relay_chain::HeadData(header.encode());
+ parent_head_data = head_data.clone();
+ match self.inclusion_delay {
+ Some(delay) if delay > 0 => {
+ self.pending_blocks.push_back(head_data);
+ if self.pending_blocks.len() > delay {
+ let included = self.pending_blocks.pop_front().unwrap();
+
+ self.included_para_head.replace(included);
+ }
+ },
+ _ => {
+ self.included_para_head.replace(head_data);
+ },
+ }
+
+ if let Some(after_block) = after_block {
+ after_block();
+ }
+ }
+ }
+}
+
+impl Drop for BlockTests {
+ fn drop(&mut self) {
+ if !self.ran {
+ if self.without_externalities {
+ self.run_without_ext();
+ } else {
+ self.run();
+ }
+ }
+ }
+}
diff --git a/cumulus/pallets/parachain-system/src/tests.rs b/cumulus/pallets/parachain-system/src/tests.rs
index 41e8dc63808d..8e2a9468b275 100755
--- a/cumulus/pallets/parachain-system/src/tests.rs
+++ b/cumulus/pallets/parachain-system/src/tests.rs
@@ -13,434 +13,20 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see .
+
+#![cfg(test)]
+
use super::*;
+use crate::mock::*;
-use codec::Encode;
-use cumulus_primitives_core::{
- relay_chain::BlockNumber as RelayBlockNumber, AbridgedHrmpChannel, InboundDownwardMessage,
- InboundHrmpMessage, PersistedValidationData,
-};
-use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
-use frame_support::{
- assert_ok,
- dispatch::UnfilteredDispatchable,
- inherent::{InherentData, ProvideInherent},
- parameter_types,
- traits::{OnFinalize, OnInitialize},
- weights::Weight,
-};
-use frame_system::{
- pallet_prelude::{BlockNumberFor, HeaderFor},
- RawOrigin,
-};
+use cumulus_primitives_core::{AbridgedHrmpChannel, InboundDownwardMessage, InboundHrmpMessage};
+use frame_support::{assert_ok, parameter_types, weights::Weight};
+use frame_system::RawOrigin;
use hex_literal::hex;
+use rand::Rng;
use relay_chain::HrmpChannelId;
-use sp_core::{blake2_256, H256};
-use sp_runtime::{
- traits::{BlakeTwo256, IdentityLookup},
- BuildStorage, DispatchErrorWithPostInfo,
-};
-use sp_std::{collections::vec_deque::VecDeque, num::NonZeroU32};
-use sp_version::RuntimeVersion;
-use std::cell::RefCell;
-
-use crate as parachain_system;
-use crate::consensus_hook::UnincludedSegmentCapacity;
-
-type Block = frame_system::mocking::MockBlock;
-
-frame_support::construct_runtime!(
- pub enum Test
- {
- System: frame_system::{Pallet, Call, Config, Storage, Event},
- ParachainSystem: parachain_system::{Pallet, Call, Config, Storage, Inherent, Event, ValidateUnsigned},
- }
-);
-
-parameter_types! {
- pub const BlockHashCount: u64 = 250;
- pub Version: RuntimeVersion = RuntimeVersion {
- spec_name: sp_version::create_runtime_str!("test"),
- impl_name: sp_version::create_runtime_str!("system-test"),
- authoring_version: 1,
- spec_version: 1,
- impl_version: 1,
- apis: sp_version::create_apis_vec!([]),
- transaction_version: 1,
- state_version: 1,
- };
- pub const ParachainId: ParaId = ParaId::new(200);
- pub const ReservedXcmpWeight: Weight = Weight::zero();
- pub const ReservedDmpWeight: Weight = Weight::zero();
-}
-impl frame_system::Config for Test {
- type RuntimeOrigin = RuntimeOrigin;
- type RuntimeCall = RuntimeCall;
- type Nonce = u64;
- type Hash = H256;
- type Hashing = BlakeTwo256;
- type AccountId = u64;
- type Lookup = IdentityLookup;
- type Block = Block;
- type RuntimeEvent = RuntimeEvent;
- type BlockHashCount = BlockHashCount;
- type BlockLength = ();
- type BlockWeights = ();
- type Version = Version;
- type PalletInfo = PalletInfo;
- type AccountData = ();
- type OnNewAccount = ();
- type OnKilledAccount = ();
- type DbWeight = ();
- type BaseCallFilter = frame_support::traits::Everything;
- type SystemWeightInfo = ();
- type SS58Prefix = ();
- type OnSetCode = ParachainSetCode;
- type MaxConsumers = frame_support::traits::ConstU32<16>;
-}
-impl Config for Test {
- type RuntimeEvent = RuntimeEvent;
- type OnSystemEvent = ();
- type SelfParaId = ParachainId;
- type OutboundXcmpMessageSource = FromThreadLocal;
- type DmpMessageHandler = SaveIntoThreadLocal;
- type ReservedDmpWeight = ReservedDmpWeight;
- type XcmpMessageHandler = SaveIntoThreadLocal;
- type ReservedXcmpWeight = ReservedXcmpWeight;
- type CheckAssociatedRelayNumber = AnyRelayNumber;
- type ConsensusHook = TestConsensusHook;
-}
-
-pub struct FromThreadLocal;
-pub struct SaveIntoThreadLocal;
-
-std::thread_local! {
- static HANDLED_DMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new());
- static HANDLED_XCMP_MESSAGES: RefCell)>> = RefCell::new(Vec::new());
- static SENT_MESSAGES: RefCell)>> = RefCell::new(Vec::new());
- static CONSENSUS_HOOK: RefCell (Weight, UnincludedSegmentCapacity)>>
- = RefCell::new(Box::new(|_| (Weight::zero(), NonZeroU32::new(1).unwrap().into())));
-}
-
-pub struct TestConsensusHook;
-
-impl ConsensusHook for TestConsensusHook {
- fn on_state_proof(s: &RelayChainStateProof) -> (Weight, UnincludedSegmentCapacity) {
- CONSENSUS_HOOK.with(|f| f.borrow_mut()(s))
- }
-}
-
-fn send_message(dest: ParaId, message: Vec) {
- SENT_MESSAGES.with(|m| m.borrow_mut().push((dest, message)));
-}
-
-impl XcmpMessageSource for FromThreadLocal {
- fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> {
- let mut ids = std::collections::BTreeSet::::new();
- let mut taken_messages = 0;
- let mut taken_bytes = 0;
- let mut result = Vec::new();
- SENT_MESSAGES.with(|ms| {
- ms.borrow_mut().retain(|m| {
- let status = as GetChannelInfo>::get_channel_status(m.0);
- let (max_size_now, max_size_ever) = match status {
- ChannelStatus::Ready(now, ever) => (now, ever),
- ChannelStatus::Closed => return false, // drop message
- ChannelStatus::Full => return true, // keep message queued.
- };
-
- let msg_len = m.1.len();
-
- if !ids.contains(&m.0) &&
- taken_messages < maximum_channels &&
- msg_len <= max_size_ever &&
- taken_bytes + msg_len <= max_size_now
- {
- ids.insert(m.0);
- taken_messages += 1;
- taken_bytes += msg_len;
- result.push(m.clone());
- false
- } else {
- true
- }
- })
- });
- result
- }
-}
-
-impl DmpMessageHandler for SaveIntoThreadLocal {
- fn handle_dmp_messages(
- iter: impl Iterator
- )>,
- _max_weight: Weight,
- ) -> Weight {
- HANDLED_DMP_MESSAGES.with(|m| {
- for i in iter {
- m.borrow_mut().push(i);
- }
- Weight::zero()
- })
- }
-}
-
-impl XcmpMessageHandler for SaveIntoThreadLocal {
- fn handle_xcmp_messages<'a, I: Iterator
- >(
- iter: I,
- _max_weight: Weight,
- ) -> Weight {
- HANDLED_XCMP_MESSAGES.with(|m| {
- for (sender, sent_at, message) in iter {
- m.borrow_mut().push((sender, sent_at, message.to_vec()));
- }
- Weight::zero()
- })
- }
-}
-
-// This function basically just builds a genesis storage key/value store according to
-// our desired mockup.
-fn new_test_ext() -> sp_io::TestExternalities {
- HANDLED_DMP_MESSAGES.with(|m| m.borrow_mut().clear());
- HANDLED_XCMP_MESSAGES.with(|m| m.borrow_mut().clear());
-
- frame_system::GenesisConfig::::default().build_storage().unwrap().into()
-}
-
-struct ReadRuntimeVersion(Vec);
-
-impl sp_core::traits::ReadRuntimeVersion for ReadRuntimeVersion {
- fn read_runtime_version(
- &self,
- _wasm_code: &[u8],
- _ext: &mut dyn sp_externalities::Externalities,
- ) -> Result, String> {
- Ok(self.0.clone())
- }
-}
-
-fn wasm_ext() -> sp_io::TestExternalities {
- let version = RuntimeVersion {
- spec_name: "test".into(),
- spec_version: 2,
- impl_version: 1,
- ..Default::default()
- };
-
- let mut ext = new_test_ext();
- ext.register_extension(sp_core::traits::ReadRuntimeVersionExt::new(ReadRuntimeVersion(
- version.encode(),
- )));
- ext
-}
-
-struct BlockTest {
- n: BlockNumberFor,
- within_block: Box,
- after_block: Option>,
-}
-
-/// BlockTests exist to test blocks with some setup: we have to assume that
-/// `validate_block` will mutate and check storage in certain predictable
-/// ways, for example, and we want to always ensure that tests are executed
-/// in the context of some particular block number.
-#[derive(Default)]
-struct BlockTests {
- tests: Vec,
- pending_upgrade: Option,
- ran: bool,
- relay_sproof_builder_hook:
- Option>,
- inherent_data_hook:
- Option>,
- inclusion_delay: Option,
- relay_block_number: Option) -> RelayChainBlockNumber>>,
-
- included_para_head: Option,
- pending_blocks: VecDeque,
-}
-
-impl BlockTests {
- fn new() -> BlockTests {
- Default::default()
- }
-
- fn add_raw(mut self, test: BlockTest) -> Self {
- self.tests.push(test);
- self
- }
-
- fn add