From 86b5d3dadd5cf10f656ca6144c4080dc8aeaf1f4 Mon Sep 17 00:00:00 2001 From: Chris Sosnin <48099298+slumber@users.noreply.github.com> Date: Sat, 29 Apr 2023 20:19:04 +0300 Subject: [PATCH] (async backing) parachain-system: track limitations for unincluded blocks (#2438) * unincluded segment draft * read para head from storage proof * read_para_head -> read_included_para_head * Provide pub interface * add errors * fix unincluded segment update * BlockTracker -> Ancestor * add a dmp limit * Read para head depending on the storage switch * doc comments * storage items docs * add a sanity check on block initialize * Check watermark * append to the segment on block finalize * Move segment update into set_validation_data * Resolve para head todo * option watermark * fix comment * Drop dmq check * fix weight * doc-comments on inherent invariant * Remove TODO * add todo * primitives tests * pallet tests * doc comments --- Cargo.lock | 1 + pallets/parachain-system/Cargo.toml | 1 + pallets/parachain-system/src/lib.rs | 185 +++++- .../src/relay_state_snapshot.rs | 10 + pallets/parachain-system/src/tests.rs | 97 ++- .../src/unincluded_segment.rs | 611 ++++++++++++++++++ .../parachain-inherent/src/client_side.rs | 1 + test/relay-sproof-builder/src/lib.rs | 5 + 8 files changed, 898 insertions(+), 13 deletions(-) create mode 100644 pallets/parachain-system/src/unincluded_segment.rs diff --git a/Cargo.lock b/Cargo.lock index 97e893c299e..10b520c0ad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2150,6 +2150,7 @@ dependencies = [ name = "cumulus-pallet-parachain-system" version = "0.1.0" dependencies = [ + "assert_matches", "bytes", "cumulus-pallet-parachain-system-proc-macro", "cumulus-primitives-core", diff --git a/pallets/parachain-system/Cargo.toml b/pallets/parachain-system/Cargo.toml index 668457be002..0d6f1954b9e 100644 --- a/pallets/parachain-system/Cargo.toml +++ b/pallets/parachain-system/Cargo.toml @@ -36,6 +36,7 @@ cumulus-primitives-core = { path = "../../primitives/core", default-features = f cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent", default-features = false } [dev-dependencies] +assert_matches = "1.5" hex-literal = "0.4.1" lazy_static = "1.4" diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index ac3b8648c65..3edcc6dfb14 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -48,7 +48,7 @@ use frame_system::{ensure_none, ensure_root}; use polkadot_parachain::primitives::RelayChainBlockNumber; use scale_info::TypeInfo; use sp_runtime::{ - traits::{Block as BlockT, BlockNumberProvider, Hash}, + traits::{Block as BlockT, BlockNumberProvider, Hash, Zero}, transaction_validity::{ InvalidTransaction, TransactionLongevity, TransactionSource, TransactionValidity, ValidTransaction, @@ -59,11 +59,16 @@ use xcm::latest::XcmHash; mod migration; mod relay_state_snapshot; +mod unincluded_segment; #[macro_use] pub mod validate_block; #[cfg(test)] mod tests; +use unincluded_segment::{ + Ancestor, HrmpChannelUpdate, SegmentTracker, TotalBandwidthLimits, UsedBandwidth, +}; + /// Register the `validate_block` function that is used by parachains to validate blocks on a /// validator. /// @@ -232,7 +237,7 @@ pub mod pallet { }, }; - >::mutate(|up| { + let (ump_msg_count, ump_total_bytes) = >::mutate(|up| { let (count, size) = relevant_messaging_state.relay_dispatch_queue_size; let available_capacity = cmp::min( @@ -243,24 +248,32 @@ pub mod pallet { // Count the number of messages we can possibly fit in the given constraints, i.e. // available_capacity and available_size. - let num = up + let (num, total_size) = up .iter() - .scan((available_capacity as usize, available_size as usize), |state, msg| { - let (cap_left, size_left) = *state; - match (cap_left.checked_sub(1), size_left.checked_sub(msg.len())) { - (Some(new_cap), Some(new_size)) => { + .scan((0u32, 0u32), |state, msg| { + let (cap_used, size_used) = *state; + let new_cap = cap_used.saturating_add(1); + let new_size = size_used.saturating_add(msg.len() as u32); + match available_capacity + .checked_sub(new_cap) + .and(available_size.checked_sub(new_size)) + { + Some(_) => { *state = (new_cap, new_size); - Some(()) + Some(*state) }, _ => None, } }) - .count(); + .last() + .unwrap_or_default(); // TODO: #274 Return back messages that do not longer fit into the queue. - UpwardMessages::::put(&up[..num]); - *up = up.split_off(num); + UpwardMessages::::put(&up[..num as usize]); + *up = up.split_off(num as usize); + + (num, total_size) }); // Sending HRMP messages is a little bit more involved. There are the following @@ -282,6 +295,43 @@ pub mod pallet { .map(|(recipient, data)| OutboundHrmpMessage { recipient, data }) .collect::>(); + if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { + // NOTE: these limits don't account for the amount of processed messages from + // downward and horizontal queues. + // + // This is correct because: + // - inherent never contains messages that were previously processed. + // - current implementation always attempts to exhaust each message queue. + // + // + let limits = TotalBandwidthLimits::new(&relevant_messaging_state); + + let hrmp_outgoing = outbound_messages + .iter() + .map(|msg| { + ( + msg.recipient, + HrmpChannelUpdate { msg_count: 1, total_bytes: msg.data.len() as u32 }, + ) + }) + .collect(); + let used_bandwidth = + UsedBandwidth { ump_msg_count, ump_total_bytes, hrmp_outgoing }; + // The bandwidth constructed was ensured to satisfy relay chain constraints. + let ancestor = Ancestor::new_unchecked(used_bandwidth); + + let watermark = HrmpWatermark::::get(); + AggregatedUnincludedSegment::::mutate(|agg| { + let agg = agg.get_or_insert_with(SegmentTracker::default); + // TODO: In order of this panic to be correct, outbound message source should + // respect bandwidth limits as well. + // + agg.append(&ancestor, watermark, &limits) + .expect("unincluded segment limits exceeded"); + }); + // Check in `on_initialize` guarantees there's space for this block. + UnincludedSegment::::append(ancestor); + } HrmpOutboundMessages::::put(outbound_messages); } @@ -296,6 +346,23 @@ pub mod pallet { weight += T::DbWeight::get().writes(1); } + // New para head was unknown during block finalization, update it. + if MaxUnincludedLen::::get().map_or(false, |max_len| !max_len.is_zero()) { + >::mutate(|chain| { + if let Some(ancestor) = chain.last_mut() { + let parent = frame_system::Pallet::::parent_hash(); + // Ancestor is the latest finalized block, thus current parent is + // its output head. + ancestor.replace_para_head_hash(parent); + } + }); + weight += T::DbWeight::get().reads_writes(1, 1); + + // Weight used during finalization. + weight += T::DbWeight::get().reads_writes(2, 2); + } + weight += T::DbWeight::get().reads(1); + // Remove the validation from the old block. ValidationData::::kill(); ProcessedDownwardMessages::::kill(); @@ -336,6 +403,9 @@ pub mod pallet { 4 + hrmp_max_message_num_per_candidate as u64, ); + // Always try to read `MaxUnincludedLen` in `on_finalize`. + weight += T::DbWeight::get().reads(1); + weight } } @@ -364,6 +434,12 @@ pub mod pallet { "ValidationData must be updated only once in a block", ); + // NOTE: the inherent data is expected to be unique, even if this block is built + // in the context of the same relay parent as the previous one. In particular, + // the inherent shouldn't contain messages that were already processed by any of the + // ancestors. + // + // This invariant should be upheld by the `ProvideInherent` implementation. let ParachainInherentData { validation_data: vfp, relay_chain_state, @@ -442,6 +518,7 @@ pub mod pallet { horizontal_messages, vfp.relay_parent_number, ); + total_weight += Self::maybe_drop_included_ancestors(&relay_state_proof); Ok(PostDispatchInfo { actual_weight: Some(total_weight), pays_fee: Pays::No }) } @@ -544,6 +621,29 @@ pub mod pallet { Unauthorized, } + /// Maximum number of latest included block descendants the runtime is allowed to accept. In other words, + /// these are ancestor of the block being currently executed, not yet sent to the relay chain runtime. + /// + /// This value is optional, but once set to `Some` by the governance, should never go back to `None`. + /// Requires latest included para head to be present in the relay chain storage proof. + #[pallet::storage] + pub(super) type MaxUnincludedLen = StorageValue<_, T::BlockNumber, OptionQuery>; + + /// Latest included block descendants the runtime accepted. In other words, these are + /// ancestors of the block being currently executed, not yet sent to the relay chain runtime. + /// + /// The segment length is limited by [`MaxUnincludedLen`]. + #[pallet::storage] + pub(super) type UnincludedSegment = + StorageValue<_, Vec>, ValueQuery>; + + /// Storage field that keeps track of bandwidth used by the unincluded segment along with the latest + /// the latest HRMP watermark. Used for limiting the acceptance of new blocks with respect to relay + /// chain constraints. + #[pallet::storage] + pub(super) type AggregatedUnincludedSegment = + StorageValue<_, SegmentTracker, OptionQuery>; + /// In case of a scheduled upgrade, this storage field contains the validation code to be applied. /// /// As soon as the relay chain gives us the go-ahead signal, we will overwrite the [`:code`][well_known_keys::CODE] @@ -960,6 +1060,69 @@ impl Pallet { weight_used } + /// Drop blocks from the unincluded segment with respect to the latest parachain head. + /// + /// No-op if [`MaxUnincludedLen`] is not set. + fn maybe_drop_included_ancestors(relay_state_proof: &RelayChainStateProof) -> Weight { + let mut weight_used = Weight::zero(); + // If `MaxUnincludedLen` is present in the storage, parachain head + // is always expected to be included into the relay storage proof. + let para_head_with_len = >::get().map(|max_len| { + ( + relay_state_proof + .read_included_para_head() + .expect("Invalid para head in relay chain state proof"), + max_len, + ) + }); + weight_used += T::DbWeight::get().reads(1); + let Some((para_head, max_len)) = para_head_with_len else { return weight_used }; + + let para_head_hash = T::Hashing::hash(¶_head.0); + if !max_len.is_zero() { + let (dropped, left_count): (Vec>, u32) = + >::mutate(|chain| { + // Drop everything up to the block with an included para head, if present. + let idx = chain + .iter() + .position(|block| { + let head_hash = block.para_head_hash().expect( + "para head hash is updated during block initialization; qed", + ); + head_hash == ¶_head_hash + }) + .map_or(0, |idx| idx + 1); // inclusive. + + let left_count = (idx..chain.len()).count() as u32; + let dropped = chain.drain(..idx).collect(); + (dropped, left_count) + }); + weight_used += T::DbWeight::get().reads_writes(1, 1); + + // sanity-check there's place for the block at finalization phase. + // + // If this fails, the max segment len is reached and parachain should wait + // for ancestor's inclusion. + assert!( + max_len > left_count.into(), + "no space left for the block in the unincluded segment" + ); + + if !dropped.is_empty() { + >::mutate(|agg| { + let agg = agg.as_mut().expect( + "dropped part of the segment wasn't empty, hence value exists; qed", + ); + for block in dropped { + agg.subtract(&block); + } + }); + weight_used += T::DbWeight::get().reads_writes(1, 1); + } + } + weight_used + } + /// Put a new validation function into a particular location where polkadot /// monitors for updates. Calling this function notifies polkadot that a new /// upgrade has been scheduled. diff --git a/pallets/parachain-system/src/relay_state_snapshot.rs b/pallets/parachain-system/src/relay_state_snapshot.rs index 0a6426a8012..9da5a03ce83 100644 --- a/pallets/parachain-system/src/relay_state_snapshot.rs +++ b/pallets/parachain-system/src/relay_state_snapshot.rs @@ -85,6 +85,8 @@ pub enum Error { HrmpEgressChannelIndex(ReadEntryErr), /// The channel identified by the sender and receiver cannot be extracted. HrmpChannel(ParaId, ParaId, ReadEntryErr), + /// The latest included parachain head cannot be extracted. + ParaHead(ReadEntryErr), } #[derive(Debug)] @@ -235,6 +237,14 @@ impl RelayChainStateProof { .map_err(Error::Config) } + /// Read latest included parachain [head data](`relay_chain::HeadData`) from the relay chain state proof. + /// + /// Returns an error if anything failed at reading or decoding. + pub fn read_included_para_head(&self) -> Result { + read_entry(&self.trie_backend, &relay_chain::well_known_keys::para_head(self.para_id), None) + .map_err(Error::ParaHead) + } + /// Read the [`Slot`](relay_chain::Slot) from the relay chain state proof. /// /// The slot is slot of the relay chain block this state proof was extracted from. diff --git a/pallets/parachain-system/src/tests.rs b/pallets/parachain-system/src/tests.rs index 70e4c106bf2..8edbabaf5a7 100755 --- a/pallets/parachain-system/src/tests.rs +++ b/pallets/parachain-system/src/tests.rs @@ -38,6 +38,7 @@ use sp_runtime::{ traits::{BlakeTwo256, IdentityLookup}, DispatchErrorWithPostInfo, }; +use sp_std::collections::vec_deque::VecDeque; use sp_version::RuntimeVersion; use std::cell::RefCell; @@ -231,6 +232,11 @@ struct BlockTests { persisted_validation_data_hook: Option>, inherent_data_hook: Option>, + inclusion_delay: Option, + max_unincluded_len: Option, + + included_para_head: Option, + pending_blocks: VecDeque, } impl BlockTests { @@ -291,9 +297,25 @@ impl BlockTests { self } + fn with_unincluded_segment(mut self, inclusion_delay: usize, max_unincluded_len: u64) -> Self { + self.inclusion_delay.replace(inclusion_delay); + self.max_unincluded_len.replace(max_unincluded_len); + self + } + fn run(&mut self) { self.ran = true; wasm_ext().execute_with(|| { + let mut parent_head_data = { + let header = Header::new_from_number(0); + relay_chain::HeadData(header.encode()) + }; + + if let Some(max_unincluded_len) = self.max_unincluded_len { + // Initialize included head if the segment is enabled. + self.included_para_head.replace(parent_head_data.clone()); + >::put(max_unincluded_len); + } for BlockTest { n, within_block, after_block } in self.tests.iter() { // clear pending updates, as applicable if let Some(upgrade_block) = self.pending_upgrade { @@ -303,11 +325,17 @@ impl BlockTests { } // begin initialization + let parent_hash = BlakeTwo256::hash(&parent_head_data.0); System::reset_events(); - System::initialize(&n, &Default::default(), &Default::default()); + System::initialize(&n, &parent_hash, &Default::default()); // now mess with the storage the way validate_block does let mut sproof_builder = RelayStateSproofBuilder::default(); + sproof_builder.included_para_head = self + .included_para_head + .clone() + .unwrap_or_else(|| parent_head_data.clone()) + .into(); if let Some(ref hook) = self.relay_sproof_builder_hook { hook(self, *n as RelayChainBlockNumber, &mut sproof_builder); } @@ -364,7 +392,23 @@ impl BlockTests { } // clean up - System::finalize(); + let header = System::finalize(); + let head_data = relay_chain::HeadData(header.encode()); + parent_head_data = head_data.clone(); + match self.inclusion_delay { + Some(delay) if delay > 0 => { + self.pending_blocks.push_back(head_data); + if self.pending_blocks.len() > delay { + let included = self.pending_blocks.pop_front().unwrap(); + + self.included_para_head.replace(included); + } + }, + _ => { + self.included_para_head.replace(head_data); + }, + } + if let Some(after_block) = after_block { after_block(); } @@ -387,6 +431,55 @@ fn block_tests_run_on_drop() { BlockTests::new().add(123, || panic!("if this test passes, block tests run properly")); } +#[test] +fn unincluded_segment_works() { + BlockTests::new() + .with_unincluded_segment(1, 10) + .add_with_post_test( + 123, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 1); + assert!(>::get().is_some()); + }, + ) + .add_with_post_test( + 124, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 2); + }, + ) + .add_with_post_test( + 125, + || {}, + || { + let segment = >::get(); + // Block 123 was popped from the segment, the len is still 2. + assert_eq!(segment.len(), 2); + }, + ); +} + +#[test] +#[should_panic] +fn unincluded_segment_is_limited() { + BlockTests::new() + .with_unincluded_segment(10, 1) + .add_with_post_test( + 123, + || {}, + || { + let segment = >::get(); + assert_eq!(segment.len(), 1); + assert!(>::get().is_some()); + }, + ) + .add(124, || {}); // The previous block wasn't included yet, should panic in `create_inherent`. +} + #[test] fn events() { BlockTests::new() diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs new file mode 100644 index 00000000000..d0e5dd47f61 --- /dev/null +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -0,0 +1,611 @@ +// Copyright 2023 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +//! Primitives used for tracking message queues constraints in an unincluded block segment +//! of the parachain. +//! +//! Unincluded segment describes a chain of latest included block descendants, which are not yet +//! sent to relay chain. + +use super::relay_state_snapshot::MessagingStateSnapshot; +use codec::{Decode, Encode}; +use cumulus_primitives_core::{relay_chain, ParaId}; +use scale_info::TypeInfo; +use sp_std::{collections::btree_map::BTreeMap, marker::PhantomData}; + +/// Constraints on outbound HRMP channel. +pub struct HrmpOutboundLimits { + /// The maximum bytes that can be written to the channel. + pub bytes_remaining: u32, + /// The maximum messages that can be written to the channel. + pub messages_remaining: u32, +} + +/// Constraints imposed on the entire segment, i.e. based on the latest included parablock. +pub struct TotalBandwidthLimits { + /// The amount of UMP messages remaining. + pub ump_messages_remaining: u32, + /// The amount of UMP bytes remaining. + pub ump_bytes_remaining: u32, + /// The limitations of all registered outbound HRMP channels. + pub hrmp_outgoing: BTreeMap, +} + +impl TotalBandwidthLimits { + /// Creates new limits from the messaging state. + pub fn new(messaging_state: &MessagingStateSnapshot) -> Self { + let (ump_messages_remaining, ump_bytes_remaining) = + messaging_state.relay_dispatch_queue_size; + let hrmp_outgoing = messaging_state + .egress_channels + .iter() + .map(|(id, channel)| { + ( + *id, + HrmpOutboundLimits { + bytes_remaining: channel.max_total_size, + messages_remaining: channel.max_capacity, + }, + ) + }) + .collect(); + + Self { ump_messages_remaining, ump_bytes_remaining, hrmp_outgoing } + } +} + +/// The error type for updating bandwidth used by a segment. +#[derive(Debug)] +pub enum BandwidthUpdateError { + /// Too many messages submitted to HRMP channel. + HrmpMessagesOverflow { + /// Parachain id of the recipient. + recipient: ParaId, + /// The amount of remaining messages in the capacity of the channel. + messages_remaining: u32, + /// The amount of messages submitted to the channel. + messages_submitted: u32, + }, + /// Too many bytes submitted to HRMP channel. + HrmpBytesOverflow { + /// Parachain id of the recipient. + recipient: ParaId, + /// The amount of remaining bytes in the capacity of the channel. + bytes_remaining: u32, + /// The amount of bytes submitted to the channel. + bytes_submitted: u32, + }, + /// Too many messages submitted to UMP queue. + UmpMessagesOverflow { + /// The amount of remaining messages in the capacity of UMP. + messages_remaining: u32, + /// The amount of messages submitted to UMP. + messages_submitted: u32, + }, + /// Too many bytes submitted to UMP. + UmpBytesOverflow { + /// The amount of remaining bytes in the capacity of UMP. + bytes_remaining: u32, + /// The amount of bytes submitted to UMP. + bytes_submitted: u32, + }, + /// Invalid HRMP watermark. + InvalidHrmpWatermark { + /// HRMP watermark submitted by the candidate. + submitted: relay_chain::BlockNumber, + /// Latest tracked HRMP watermark. + latest: relay_chain::BlockNumber, + }, +} + +/// The number of messages and size in bytes submitted to HRMP channel. +#[derive(Debug, Default, Copy, Clone, Encode, Decode, TypeInfo)] +pub struct HrmpChannelUpdate { + /// The amount of messages submitted to the channel. + pub msg_count: u32, + /// The amount of bytes submitted to the channel. + pub total_bytes: u32, +} + +impl HrmpChannelUpdate { + /// Returns `true` if the update is empty, `false` otherwise. + fn is_empty(&self) -> bool { + self.msg_count == 0 && self.total_bytes == 0 + } + + /// Tries to append another update, respecting given bandwidth limits. + fn append( + &self, + other: &Self, + recipient: ParaId, + limits: &TotalBandwidthLimits, + ) -> Result { + let limits = limits + .hrmp_outgoing + .get(&recipient) + .expect("limit for declared hrmp channel must be present; qed"); + + let mut new = *self; + + new.msg_count = new.msg_count.saturating_add(other.msg_count); + if new.msg_count > limits.messages_remaining { + return Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining: limits.messages_remaining, + messages_submitted: new.msg_count, + }) + } + new.total_bytes = new.total_bytes.saturating_add(other.total_bytes); + if new.total_bytes > limits.bytes_remaining { + return Err(BandwidthUpdateError::HrmpBytesOverflow { + recipient, + bytes_remaining: limits.bytes_remaining, + bytes_submitted: new.total_bytes, + }) + } + + Ok(new) + } + + /// Subtracts previously added channel update. + fn subtract(&mut self, other: &Self) { + self.msg_count -= other.msg_count; + self.total_bytes -= other.total_bytes; + } +} + +/// Bandwidth used by a parachain block(s). +/// +/// This struct can be created with pub items, however, it should +/// never hit the storage directly to avoid bypassing limitations checks. +#[derive(Default, Clone, Encode, Decode, TypeInfo)] +pub struct UsedBandwidth { + /// The amount of UMP messages sent. + pub ump_msg_count: u32, + /// The amount of UMP bytes sent. + pub ump_total_bytes: u32, + /// Outbound HRMP channels updates. + pub hrmp_outgoing: BTreeMap, +} + +impl UsedBandwidth { + /// Tries to append another update, respecting given bandwidth limits. + fn append( + &self, + other: &Self, + limits: &TotalBandwidthLimits, + ) -> Result { + let mut new = self.clone(); + + new.ump_msg_count = new.ump_msg_count.saturating_add(other.ump_msg_count); + if new.ump_msg_count > limits.ump_messages_remaining { + return Err(BandwidthUpdateError::UmpMessagesOverflow { + messages_remaining: limits.ump_messages_remaining, + messages_submitted: new.ump_msg_count, + }) + } + new.ump_total_bytes = new.ump_total_bytes.saturating_add(other.ump_total_bytes); + if new.ump_total_bytes > limits.ump_bytes_remaining { + return Err(BandwidthUpdateError::UmpBytesOverflow { + bytes_remaining: limits.ump_bytes_remaining, + bytes_submitted: new.ump_total_bytes, + }) + } + + for (id, channel) in other.hrmp_outgoing.iter() { + let current = new.hrmp_outgoing.entry(*id).or_default(); + *current = current.append(channel, *id, limits)?; + } + + Ok(new) + } + + /// Subtracts previously added bandwidth update. + fn subtract(&mut self, other: &Self) { + self.ump_msg_count -= other.ump_msg_count; + self.ump_total_bytes -= other.ump_total_bytes; + + for (id, channel) in other.hrmp_outgoing.iter() { + let entry = self + .hrmp_outgoing + .get_mut(id) + .expect("entry's been inserted earlier with `append`; qed"); + entry.subtract(channel); + } + + self.hrmp_outgoing.retain(|_, channel| !channel.is_empty()); + } +} + +/// Ancestor of the block being currently executed, not yet included +/// into the relay chain. +#[derive(Encode, Decode, TypeInfo)] +pub struct Ancestor { + /// Bandwidth used by this block. + used_bandwidth: UsedBandwidth, + /// Output head data hash of this block. This may be optional in case the head data has not + /// yet been posted on chain, but should be updated during initialization of the next block. + para_head_hash: Option, +} + +impl Ancestor { + /// Creates new ancestor without validating the bandwidth used. + pub fn new_unchecked(used_bandwidth: UsedBandwidth) -> Self { + Self { used_bandwidth, para_head_hash: None } + } + + /// Returns [`UsedBandwidth`] of this block. + pub fn used_bandwidth(&self) -> &UsedBandwidth { + &self.used_bandwidth + } + + /// Returns hashed [output head data](`relay_chain::HeadData`) of this block. + pub fn para_head_hash(&self) -> Option<&H> { + self.para_head_hash.as_ref() + } + + /// Set para head hash of this block. + pub fn replace_para_head_hash(&mut self, para_head_hash: H) { + self.para_head_hash.replace(para_head_hash); + } +} + +/// Struct that keeps track of bandwidth used by the unincluded part of the chain +/// along with the latest HRMP watermark. +#[derive(Default, Encode, Decode, TypeInfo)] +pub struct SegmentTracker { + /// Bandwidth used by the segment. + used_bandwidth: UsedBandwidth, + /// The mark which specifies the block number up to which all inbound HRMP messages are processed. + hrmp_watermark: Option, + /// `H` is the type of para head hash. + phantom_data: PhantomData, +} + +impl SegmentTracker { + /// Tries to append another block to the tracker, respecting given bandwidth limits. + pub fn append( + &mut self, + block: &Ancestor, + hrmp_watermark: relay_chain::BlockNumber, + limits: &TotalBandwidthLimits, + ) -> Result<(), BandwidthUpdateError> { + if let Some(watermark) = self.hrmp_watermark.as_ref() { + if &hrmp_watermark <= watermark { + return Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted: hrmp_watermark, + latest: *watermark, + }) + } + } + + self.used_bandwidth = self.used_bandwidth.append(block.used_bandwidth(), limits)?; + self.hrmp_watermark.replace(hrmp_watermark); + + Ok(()) + } + + /// Removes previously added block from the tracker. + pub fn subtract(&mut self, block: &Ancestor) { + self.used_bandwidth.subtract(block.used_bandwidth()); + // Watermark doesn't need to be updated since the is always dropped + // from the tail of the segment. + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + + #[test] + fn hrmp_msg_count_limits() { + let para_0 = ParaId::from(0); + let para_0_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 5 }; + + let para_1 = ParaId::from(1); + let para_1_limits = HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: 3 }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut hrmp_update = HrmpChannelUpdate::default(); + assert!(hrmp_update.is_empty()); + + for _ in 0..5 { + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 10 }, para_0, &limits) + .expect("update is withing the limits"); + } + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 1, total_bytes: 10 }, + para_0, + &limits, + ), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_0 && messages_remaining == 5 && messages_submitted == 6 + ); + + let mut hrmp_update = HrmpChannelUpdate::default(); + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 2, total_bytes: 10 }, para_1, &limits) + .expect("update is withing the limits"); + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }, + para_1, + &limits, + ), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 5 + ); + } + + #[test] + fn hrmp_bytes_limits() { + let para_0 = ParaId::from(0); + let para_0_limits = + HrmpOutboundLimits { bytes_remaining: 25, messages_remaining: u32::MAX }; + + let hrmp_outgoing = [(para_0, para_0_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut hrmp_update = HrmpChannelUpdate::default(); + assert!(hrmp_update.is_empty()); + + for _ in 0..5 { + hrmp_update = hrmp_update + .append(&HrmpChannelUpdate { msg_count: 1, total_bytes: 4 }, para_0, &limits) + .expect("update is withing the limits"); + } + assert_matches!( + hrmp_update.append( + &HrmpChannelUpdate { msg_count: 1, total_bytes: 6 }, + para_0, + &limits, + ), + Err(BandwidthUpdateError::HrmpBytesOverflow { + recipient, + bytes_remaining, + bytes_submitted, + }) if recipient == para_0 && bytes_remaining == 25 && bytes_submitted == 26 + ); + } + + #[test] + fn hrmp_limits_with_segment() { + let create_used_hrmp = + |hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing }; + + let para_0 = ParaId::from(0); + let para_0_limits = HrmpOutboundLimits { bytes_remaining: 30, messages_remaining: 10 }; + + let para_1 = ParaId::from(1); + let para_1_limits = HrmpOutboundLimits { bytes_remaining: 20, messages_remaining: 3 }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut segment = SegmentTracker::default(); + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 6 }; + let ancestor_0 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + + for watermark in 1..5 { + let ancestor = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment + .append(&ancestor, watermark, &limits) + .expect("update is withing the limits"); + } + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 }; + let ancestor_5 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + assert_matches!( + segment.append(&ancestor_5, 5, &limits), + Err(BandwidthUpdateError::HrmpBytesOverflow { + recipient, + bytes_remaining, + bytes_submitted, + }) if recipient == para_0 && bytes_remaining == 30 && bytes_submitted == 31 + ); + // Remove the first ancestor from the segment to make space. + segment.subtract(&ancestor_0); + segment.append(&ancestor_5, 5, &limits).expect("update is withing the limits"); + + let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }; + let ancestor = Ancestor { + used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor, 6, &limits).expect("update is withing the limits"); + + assert_matches!( + segment.append(&ancestor, 7, &limits), + Err(BandwidthUpdateError::HrmpMessagesOverflow { + recipient, + messages_remaining, + messages_submitted, + }) if recipient == para_1 && messages_remaining == 3 && messages_submitted == 6 + ); + } + + #[test] + fn ump_limits_with_segment() { + let create_used_ump = |(ump_msg_count, ump_total_bytes)| UsedBandwidth { + ump_msg_count, + ump_total_bytes, + hrmp_outgoing: BTreeMap::default(), + }; + + let limits = TotalBandwidthLimits { + ump_messages_remaining: 5, + ump_bytes_remaining: 50, + hrmp_outgoing: BTreeMap::default(), + }; + + let mut segment = SegmentTracker::default(); + + let ancestor_0 = Ancestor { + used_bandwidth: create_used_ump((1, 10)), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + + for watermark in 1..4 { + let ancestor = Ancestor { + used_bandwidth: create_used_ump((1, 10)), + para_head_hash: None::, + }; + segment + .append(&ancestor, watermark, &limits) + .expect("update is withing the limits"); + } + + let ancestor_4 = Ancestor { + used_bandwidth: create_used_ump((1, 30)), + para_head_hash: None::, + }; + assert_matches!( + segment.append(&ancestor_4, 4, &limits), + Err(BandwidthUpdateError::UmpBytesOverflow { + bytes_remaining, + bytes_submitted, + }) if bytes_remaining == 50 && bytes_submitted == 70 + ); + + let ancestor = Ancestor { + used_bandwidth: create_used_ump((1, 5)), + para_head_hash: None::, + }; + segment.append(&ancestor, 4, &limits).expect("update is withing the limits"); + assert_matches!( + segment.append(&ancestor, 5, &limits), + Err(BandwidthUpdateError::UmpMessagesOverflow { + messages_remaining, + messages_submitted, + }) if messages_remaining == 5 && messages_submitted == 6 + ); + } + + #[test] + fn segment_hrmp_watermark() { + let mut segment = SegmentTracker::default(); + + let ancestor = Ancestor { + used_bandwidth: UsedBandwidth::default(), + para_head_hash: None::, + }; + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing: BTreeMap::default(), + }; + + segment + .append(&ancestor, 0, &limits) + .expect("nothing to compare the watermark with in default segment"); + assert_matches!( + segment.append(&ancestor, 0, &limits), + Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted, + latest, + }) if submitted == 0 && latest == 0 + ); + + for watermark in 1..5 { + segment.append(&ancestor, watermark, &limits).expect("hrmp watermark is valid"); + } + for watermark in 0..5 { + assert_matches!( + segment.append(&ancestor, watermark, &limits), + Err(BandwidthUpdateError::InvalidHrmpWatermark { + submitted, + latest, + }) if submitted == watermark && latest == 4 + ); + } + } + + #[test] + fn segment_drops_empty_hrmp_channels() { + let create_used_hrmp = + |hrmp_outgoing| UsedBandwidth { ump_msg_count: 0, ump_total_bytes: 0, hrmp_outgoing }; + + let para_0 = ParaId::from(0); + let para_0_limits = + HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX }; + + let para_1 = ParaId::from(1); + let para_1_limits = + HrmpOutboundLimits { bytes_remaining: u32::MAX, messages_remaining: u32::MAX }; + let hrmp_outgoing = [(para_0, para_0_limits), (para_1, para_1_limits)].into(); + let limits = TotalBandwidthLimits { + ump_messages_remaining: 0, + ump_bytes_remaining: 0, + hrmp_outgoing, + }; + + let mut segment = SegmentTracker::default(); + + let para_0_update = HrmpChannelUpdate { msg_count: 1, total_bytes: 1 }; + let ancestor_0 = Ancestor { + used_bandwidth: create_used_hrmp([(para_0, para_0_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_0, 0, &limits).expect("update is withing the limits"); + let para_1_update = HrmpChannelUpdate { msg_count: 3, total_bytes: 10 }; + let ancestor_1 = Ancestor { + used_bandwidth: create_used_hrmp([(para_1, para_1_update)].into()), + para_head_hash: None::, + }; + segment.append(&ancestor_1, 1, &limits).expect("update is withing the limits"); + + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 2); + + segment.subtract(&ancestor_0); + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 1); + + segment.subtract(&ancestor_1); + assert_eq!(segment.used_bandwidth.hrmp_outgoing.len(), 0); + } +} diff --git a/primitives/parachain-inherent/src/client_side.rs b/primitives/parachain-inherent/src/client_side.rs index 6f2cd5eb504..a6e0bd1641b 100644 --- a/primitives/parachain-inherent/src/client_side.rs +++ b/primitives/parachain-inherent/src/client_side.rs @@ -104,6 +104,7 @@ async fn collect_relay_storage_proof( relevant_keys.push(relay_well_known_keys::hrmp_egress_channel_index(para_id)); relevant_keys.push(relay_well_known_keys::upgrade_go_ahead_signal(para_id)); relevant_keys.push(relay_well_known_keys::upgrade_restriction_signal(para_id)); + relevant_keys.push(relay_well_known_keys::para_head(para_id)); relevant_keys.extend(ingress_channels.into_iter().map(|sender| { relay_well_known_keys::hrmp_channels(HrmpChannelId { sender, recipient: para_id }) })); diff --git a/test/relay-sproof-builder/src/lib.rs b/test/relay-sproof-builder/src/lib.rs index decc6ee3aa0..b63ecdf5fc7 100644 --- a/test/relay-sproof-builder/src/lib.rs +++ b/test/relay-sproof-builder/src/lib.rs @@ -46,6 +46,7 @@ pub struct RelayStateSproofBuilder { pub current_epoch: u64, pub randomness: relay_chain::Hash, pub additional_key_values: Vec<(Vec, Vec)>, + pub included_para_head: Option, } impl Default for RelayStateSproofBuilder { @@ -73,6 +74,7 @@ impl Default for RelayStateSproofBuilder { current_epoch: 0u64, randomness: relay_chain::Hash::default(), additional_key_values: vec![], + included_para_head: None, } } } @@ -124,6 +126,9 @@ impl RelayStateSproofBuilder { dmq_mqc_head.encode(), ); } + if let Some(para_head) = self.included_para_head { + insert(relay_chain::well_known_keys::para_head(self.para_id), para_head.encode()); + } if let Some(relay_dispatch_queue_size) = self.relay_dispatch_queue_size { insert( relay_chain::well_known_keys::relay_dispatch_queue_size(self.para_id),