diff --git a/pallets/parachain-system/Cargo.toml b/pallets/parachain-system/Cargo.toml index a58104c1ffb..8990a848ad0 100644 --- a/pallets/parachain-system/Cargo.toml +++ b/pallets/parachain-system/Cargo.toml @@ -80,4 +80,4 @@ runtime-benchmarks = [ try-runtime = ["frame-support/try-runtime"] -parameterized-consensus-hook = [] \ No newline at end of file +parameterized-consensus-hook = [] diff --git a/pallets/parachain-system/src/lib.rs b/pallets/parachain-system/src/lib.rs index 20e85eee041..5239bde2a16 100644 --- a/pallets/parachain-system/src/lib.rs +++ b/pallets/parachain-system/src/lib.rs @@ -240,8 +240,12 @@ pub mod pallet { return }, }; - let relevant_messaging_state = match Self::relevant_messaging_state() { - Some(ok) => ok, + + // Before updating the relevant messaging state, we need to extract + // the total bandwidth limits for the purpose of updating the unincluded + // segment. + let total_bandwidth_out = match Self::relevant_messaging_state() { + Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s), None => { debug_assert!( false, @@ -252,22 +256,28 @@ pub mod pallet { }, }; - let total_bandwidth_out = - OutboundBandwidthLimits::from_relay_chain_state(&relevant_messaging_state); - let bandwidth_out = AggregatedUnincludedSegment::::get().map(|segment| { - let mut bandwidth_out = total_bandwidth_out.clone(); - bandwidth_out.subtract(segment.used_bandwidth()); - bandwidth_out - }); + // After this point, the `RelevantMessagingState` in storage reflects the + // unincluded segment. + Self::adjust_egress_bandwidth_limits(); let (ump_msg_count, ump_total_bytes) = >::mutate(|up| { - let bandwidth_out = bandwidth_out.as_ref().unwrap_or(&total_bandwidth_out); - let available_capacity = cmp::min( - bandwidth_out.ump_messages_remaining, - host_config.max_upward_message_num_per_candidate, - ); - - let available_size = bandwidth_out.ump_bytes_remaining; + let (available_capacity, available_size) = match Self::relevant_messaging_state() { + Some(limits) => ( + limits.relay_dispatch_queue_remaining_capacity.remaining_count, + limits.relay_dispatch_queue_remaining_capacity.remaining_size, + ), + None => { + debug_assert!( + false, + "relevant messaging state is promised to be set until `on_finalize`; \ + qed", + ); + return (0, 0) + }, + }; + + let available_capacity = + cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate); // Count the number of messages we can possibly fit in the given constraints, i.e. // available_capacity and available_size. @@ -312,8 +322,9 @@ pub mod pallet { .hrmp_max_message_num_per_candidate .min(>::take()) as usize; - // TODO [now]: the `ChannelInfo` implementation for this pallet is what's - // important here for proper limiting. + // Note: this internally calls the `GetChannelInfo` implementation for this + // pallet, which draws on the `RelevantMessagingState`. That in turn has + // been adjusted above to reflect the correct limits in all channels. let outbound_messages = T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels) .into_iter() @@ -351,9 +362,7 @@ pub mod pallet { let watermark = HrmpWatermark::::get(); let watermark_update = HrmpWatermarkUpdate::new(watermark, LastRelayChainBlockNumber::::get()); - // TODO: In order of this panic to be correct, outbound message source should - // respect bandwidth limits as well. - // + aggregated_segment .append(&ancestor, watermark_update, &total_bandwidth_out) .expect("unincluded segment limits exceeded"); @@ -431,6 +440,9 @@ pub mod pallet { 4 + hrmp_max_message_num_per_candidate as u64, ); + // Weight for adjusting the unincluded segment in `on_finalize`. + weight += T::DbWeight::get().reads_writes(6, 3); + // Always try to read `UpgradeGoAhead` in `on_finalize`. weight += T::DbWeight::get().reads(1); @@ -557,6 +569,7 @@ pub mod pallet { let host_config = relay_state_proof .read_abridged_host_configuration() .expect("Invalid host configuration in relay chain state proof"); + let relevant_messaging_state = relay_state_proof .read_messaging_state_snapshot(&host_config) .expect("Invalid messaging state in relay chain state proof"); @@ -1227,6 +1240,46 @@ impl Pallet { weight_used } + /// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the + /// unincluded segment. + // + // Reads: 2 + // Writes: 1 + fn adjust_egress_bandwidth_limits() { + let unincluded_segment = match AggregatedUnincludedSegment::::get() { + None => return, + Some(s) => s, + }; + + >::mutate(|messaging_state| { + let messaging_state = match messaging_state { + None => return, + Some(s) => s, + }; + + let used_bandwidth = unincluded_segment.used_bandwidth(); + + let channels = &mut messaging_state.egress_channels; + for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() { + let i = match channels.binary_search_by_key(para_id, |item| item.0) { + Ok(i) => i, + Err(_) => continue, // indicates channel closed. + }; + + let c = &mut channels[i].1; + + c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size); + c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity); + } + + let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity; + upward_capacity.remaining_count = + upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count); + upward_capacity.remaining_size = + upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes); + }); + } + /// Put a new validation function into a particular location where polkadot /// monitors for updates. Calling this function notifies polkadot that a new /// upgrade has been scheduled. diff --git a/pallets/parachain-system/src/tests.rs b/pallets/parachain-system/src/tests.rs index 1f5a1208a11..41e8dc63808 100755 --- a/pallets/parachain-system/src/tests.rs +++ b/pallets/parachain-system/src/tests.rs @@ -137,15 +137,28 @@ fn send_message(dest: ParaId, message: Vec) { impl XcmpMessageSource for FromThreadLocal { fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec)> { let mut ids = std::collections::BTreeSet::::new(); - let mut taken = 0; + let mut taken_messages = 0; + let mut taken_bytes = 0; let mut result = Vec::new(); SENT_MESSAGES.with(|ms| { ms.borrow_mut().retain(|m| { let status = as GetChannelInfo>::get_channel_status(m.0); - let ready = matches!(status, ChannelStatus::Ready(..)); - if ready && !ids.contains(&m.0) && taken < maximum_channels { + let (max_size_now, max_size_ever) = match status { + ChannelStatus::Ready(now, ever) => (now, ever), + ChannelStatus::Closed => return false, // drop message + ChannelStatus::Full => return true, // keep message queued. + }; + + let msg_len = m.1.len(); + + if !ids.contains(&m.0) && + taken_messages < maximum_channels && + msg_len <= max_size_ever && + taken_bytes + msg_len <= max_size_now + { ids.insert(m.0); - taken += 1; + taken_messages += 1; + taken_bytes += msg_len; result.push(m.clone()); false } else { @@ -435,6 +448,47 @@ fn block_tests_run_on_drop() { BlockTests::new().add(123, || panic!("if this test passes, block tests run properly")); } +#[test] +fn test_xcmp_source_keeps_messages() { + let recipient = ParaId::from(400); + + CONSENSUS_HOOK.with(|c| { + *c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(3).unwrap().into())) + }); + + BlockTests::new() + .with_inclusion_delay(2) + .with_relay_sproof_builder(move |_, block_number, sproof| { + sproof.host_config.hrmp_max_message_num_per_candidate = 10; + let channel = sproof.upsert_outbound_channel(recipient); + channel.max_total_size = 10; + channel.max_message_size = 10; + + // Only fit messages starting from 3rd block. + channel.max_capacity = if block_number < 3 { 0 } else { 1 }; + }) + .add(1, || {}) + .add_with_post_test( + 2, + move || { + send_message(recipient, b"22".to_vec()); + }, + move || { + let v = HrmpOutboundMessages::::get(); + assert!(v.is_empty()); + }, + ) + .add_with_post_test( + 3, + move || {}, + move || { + // Not discarded. + let v = HrmpOutboundMessages::::get(); + assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"22".to_vec() }]); + }, + ); +} + #[test] fn unincluded_segment_works() { CONSENSUS_HOOK.with(|c| { @@ -679,6 +733,142 @@ fn inherent_processed_messages_are_ignored() { }); } +#[test] +fn hrmp_outbound_respects_used_bandwidth() { + let recipient = ParaId::from(400); + + CONSENSUS_HOOK.with(|c| { + *c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(3).unwrap().into())) + }); + + BlockTests::new() + .with_inclusion_delay(2) + .with_relay_sproof_builder(move |_, block_number, sproof| { + sproof.host_config.hrmp_max_message_num_per_candidate = 10; + let channel = sproof.upsert_outbound_channel(recipient); + channel.max_capacity = 2; + channel.max_total_size = 4; + + channel.max_message_size = 10; + + // states: + // [relay_chain][unincluded_segment] + [message_queue] + // 2: []["2"] + ["2222"] + // 3: []["2", "3"] + ["2222"] + // 4: []["2", "3"] + ["2222", "444", "4"] + // 5: ["2"]["3"] + ["2222", "444", "4"] + // 6: ["2", "3"][] + ["2222", "444", "4"] + // 7: ["3"]["444"] + ["2222", "4"] + // 8: []["444", "4"] + ["2222"] + // + // 2 tests max bytes - there is message space but no byte space. + // 4 tests max capacity - there is byte space but no message space + + match block_number { + 5 => { + // 2 included. + // one message added + channel.msg_count = 1; + channel.total_size = 1; + }, + 6 => { + // 3 included. + // one message added + channel.msg_count = 2; + channel.total_size = 2; + }, + 7 => { + // 4 included. + // one message drained. + channel.msg_count = 1; + channel.total_size = 1; + }, + 8 => { + // 5 included. no messages added, one drained. + channel.msg_count = 0; + channel.total_size = 0; + }, + _ => { + channel.msg_count = 0; + channel.total_size = 0; + }, + } + }) + .add(1, || {}) + .add_with_post_test( + 2, + move || { + send_message(recipient, b"2".to_vec()); + send_message(recipient, b"2222".to_vec()); + }, + move || { + let v = HrmpOutboundMessages::::get(); + assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"2".to_vec() }]); + }, + ) + .add_with_post_test( + 3, + move || { + send_message(recipient, b"3".to_vec()); + }, + move || { + let v = HrmpOutboundMessages::::get(); + assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"3".to_vec() }]); + }, + ) + .add_with_post_test( + 4, + move || { + send_message(recipient, b"444".to_vec()); + send_message(recipient, b"4".to_vec()); + }, + move || { + // Queue has byte capacity but not message capacity. + let v = HrmpOutboundMessages::::get(); + assert!(v.is_empty()); + }, + ) + .add_with_post_test( + 5, + || {}, + move || { + // 1 is included here, channel not drained yet. nothing fits. + let v = HrmpOutboundMessages::::get(); + assert!(v.is_empty()); + }, + ) + .add_with_post_test( + 6, + || {}, + move || { + // 2 is included here. channel is totally full. + let v = HrmpOutboundMessages::::get(); + assert!(v.is_empty()); + }, + ) + .add_with_post_test( + 7, + || {}, + move || { + // 3 is included here. One message was drained out. The 3-byte message + // finally fits + let v = HrmpOutboundMessages::::get(); + // This line relies on test implementation of [`XcmpMessageSource`]. + assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"444".to_vec() }]); + }, + ) + .add_with_post_test( + 8, + || {}, + move || { + // 4 is included here. Relay-chain side of the queue is empty, + let v = HrmpOutboundMessages::::get(); + // This line relies on test implementation of [`XcmpMessageSource`]. + assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"4".to_vec() }]); + }, + ); +} + #[test] fn events() { BlockTests::new() diff --git a/pallets/parachain-system/src/unincluded_segment.rs b/pallets/parachain-system/src/unincluded_segment.rs index 2ca052cd39b..1a0f986ec81 100644 --- a/pallets/parachain-system/src/unincluded_segment.rs +++ b/pallets/parachain-system/src/unincluded_segment.rs @@ -23,11 +23,12 @@ use super::relay_state_snapshot::{MessagingStateSnapshot, RelayDispatchQueueRemainingCapacity}; use codec::{Decode, Encode}; use cumulus_primitives_core::{relay_chain, ParaId}; +use frame_support::RuntimeDebug; use scale_info::TypeInfo; use sp_std::{collections::btree_map::BTreeMap, marker::PhantomData}; /// Constraints on outbound HRMP channel. -#[derive(Clone)] +#[derive(Clone, RuntimeDebug)] pub struct HrmpOutboundLimits { /// The maximum bytes that can be written to the channel. pub bytes_remaining: u32, @@ -36,7 +37,7 @@ pub struct HrmpOutboundLimits { } /// Limits on outbound message bandwidth. -#[derive(Clone)] +#[derive(Clone, RuntimeDebug)] pub struct OutboundBandwidthLimits { /// The amount of UMP messages remaining. pub ump_messages_remaining: u32, @@ -75,25 +76,10 @@ impl OutboundBandwidthLimits { hrmp_outgoing, } } - - /// Compute the remaining bandwidth when accounting for the used amounts provided. - pub fn subtract(&mut self, used: &UsedBandwidth) { - self.ump_messages_remaining = - self.ump_messages_remaining.saturating_sub(used.ump_msg_count); - self.ump_bytes_remaining = self.ump_bytes_remaining.saturating_sub(used.ump_total_bytes); - for (para_id, channel_limits) in self.hrmp_outgoing.iter_mut() { - if let Some(update) = used.hrmp_outgoing.get(para_id) { - channel_limits.bytes_remaining = - channel_limits.bytes_remaining.saturating_sub(update.total_bytes); - channel_limits.messages_remaining = - channel_limits.messages_remaining.saturating_sub(update.msg_count); - } - } - } } /// The error type for updating bandwidth used by a segment. -#[derive(Debug)] +#[derive(RuntimeDebug)] pub enum BandwidthUpdateError { /// Too many messages submitted to HRMP channel. HrmpMessagesOverflow { @@ -140,7 +126,7 @@ pub enum BandwidthUpdateError { } /// The number of messages and size in bytes submitted to HRMP channel. -#[derive(Debug, Default, Copy, Clone, Encode, Decode, TypeInfo)] +#[derive(RuntimeDebug, Default, Copy, Clone, Encode, Decode, TypeInfo)] pub struct HrmpChannelUpdate { /// The amount of messages submitted to the channel. pub msg_count: u32, @@ -199,7 +185,7 @@ impl HrmpChannelUpdate { /// /// This struct can be created with pub items, however, it should /// never hit the storage directly to avoid bypassing limitations checks. -#[derive(Default, Clone, Encode, Decode, TypeInfo)] +#[derive(Default, Clone, Encode, Decode, TypeInfo, RuntimeDebug)] pub struct UsedBandwidth { /// The amount of UMP messages sent. pub ump_msg_count: u32, @@ -260,7 +246,7 @@ impl UsedBandwidth { /// Ancestor of the block being currently executed, not yet included /// into the relay chain. -#[derive(Encode, Decode, TypeInfo)] +#[derive(Encode, Decode, TypeInfo, RuntimeDebug)] pub struct Ancestor { /// Bandwidth used by this block. used_bandwidth: UsedBandwidth, @@ -326,7 +312,7 @@ impl HrmpWatermarkUpdate { /// Struct that keeps track of bandwidth used by the unincluded part of the chain /// along with the latest HRMP watermark. -#[derive(Default, Encode, Decode, TypeInfo)] +#[derive(Default, Encode, Decode, TypeInfo, RuntimeDebug)] pub struct SegmentTracker { /// Bandwidth used by the segment. used_bandwidth: UsedBandwidth, diff --git a/test/relay-sproof-builder/src/lib.rs b/test/relay-sproof-builder/src/lib.rs index 55130904b09..901fda942c0 100644 --- a/test/relay-sproof-builder/src/lib.rs +++ b/test/relay-sproof-builder/src/lib.rs @@ -91,16 +91,34 @@ impl RelayStateSproofBuilder { in_index.insert(idx, sender); } - self.hrmp_channels - .entry(relay_chain::HrmpChannelId { sender, recipient: self.para_id }) - .or_insert_with(|| AbridgedHrmpChannel { - max_capacity: 0, - max_total_size: 0, - max_message_size: 0, - msg_count: 0, - total_size: 0, - mqc_head: None, - }) + self.upsert_channel(relay_chain::HrmpChannelId { sender, recipient: self.para_id }) + } + + /// Returns a mutable reference to HRMP channel metadata for a channel (`self.para_id`, `recipient`). + /// + /// If there is no channel, a new default one is created. + /// + /// It also updates the `hrmp_egress_channel_index`, creating it if needed. + pub fn upsert_outbound_channel(&mut self, recipient: ParaId) -> &mut AbridgedHrmpChannel { + let in_index = self.hrmp_egress_channel_index.get_or_insert_with(Vec::new); + if let Err(idx) = in_index.binary_search(&recipient) { + in_index.insert(idx, recipient); + } + + self.upsert_channel(relay_chain::HrmpChannelId { sender: self.para_id, recipient }) + } + + /// Creates a new default entry in the hrmp channels mapping if not exists, and returns mutable + /// reference to it. + fn upsert_channel(&mut self, id: relay_chain::HrmpChannelId) -> &mut AbridgedHrmpChannel { + self.hrmp_channels.entry(id).or_insert_with(|| AbridgedHrmpChannel { + max_capacity: 0, + max_total_size: 0, + max_message_size: 0, + msg_count: 0, + total_size: 0, + mqc_head: None, + }) } pub fn into_state_root_and_proof(