Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

parachain-system: update RelevantMessagingState according to the unincluded segment #2948

Merged
merged 11 commits into from
Aug 2, 2023
95 changes: 74 additions & 21 deletions pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,12 @@ pub mod pallet {
return
},
};
let relevant_messaging_state = match Self::relevant_messaging_state() {
Some(ok) => ok,

// Before updating the relevant messaging state, we need to extract
// the total bandwidth limits for the purpose of updating the unincluded
// segment.
let total_bandwidth_out = match Self::relevant_messaging_state() {
Some(s) => OutboundBandwidthLimits::from_relay_chain_state(&s),
None => {
debug_assert!(
false,
Expand All @@ -252,22 +256,28 @@ pub mod pallet {
},
};

let total_bandwidth_out =
OutboundBandwidthLimits::from_relay_chain_state(&relevant_messaging_state);
let bandwidth_out = AggregatedUnincludedSegment::<T>::get().map(|segment| {
let mut bandwidth_out = total_bandwidth_out.clone();
bandwidth_out.subtract(segment.used_bandwidth());
bandwidth_out
});
// After this point, the `RelevantMessagingState` in storage reflects the
// unincluded segment.
Self::adjust_egress_bandwidth_limits();

let (ump_msg_count, ump_total_bytes) = <PendingUpwardMessages<T>>::mutate(|up| {
let bandwidth_out = bandwidth_out.as_ref().unwrap_or(&total_bandwidth_out);
let available_capacity = cmp::min(
bandwidth_out.ump_messages_remaining,
host_config.max_upward_message_num_per_candidate,
);

let available_size = bandwidth_out.ump_bytes_remaining;
let (available_capacity, available_size) = match Self::relevant_messaging_state() {
Some(limits) => (
limits.relay_dispatch_queue_remaining_capacity.remaining_count,
limits.relay_dispatch_queue_remaining_capacity.remaining_size,
),
None => {
debug_assert!(
false,
"relevant messaging state is promised to be set until `on_finalize`; \
qed",
);
rphmeier marked this conversation as resolved.
Show resolved Hide resolved
return (0, 0)
},
};

let available_capacity =
cmp::min(available_capacity, host_config.max_upward_message_num_per_candidate);

// Count the number of messages we can possibly fit in the given constraints, i.e.
// available_capacity and available_size.
Expand Down Expand Up @@ -312,8 +322,9 @@ pub mod pallet {
.hrmp_max_message_num_per_candidate
.min(<AnnouncedHrmpMessagesPerCandidate<T>>::take()) as usize;

// TODO [now]: the `ChannelInfo` implementation for this pallet is what's
// important here for proper limiting.
// Note: this internally calls the `GetChannelInfo` implementation for this
// pallet, which draws on the `RelevantMessagingState`. That in turn has
// been adjusted above to reflect the correct limits in all channels.
let outbound_messages =
T::OutboundXcmpMessageSource::take_outbound_messages(maximum_channels)
.into_iter()
Expand Down Expand Up @@ -351,9 +362,7 @@ pub mod pallet {
let watermark = HrmpWatermark::<T>::get();
let watermark_update =
HrmpWatermarkUpdate::new(watermark, LastRelayChainBlockNumber::<T>::get());
// TODO: In order of this panic to be correct, outbound message source should
// respect bandwidth limits as well.
// <https://github.com/paritytech/cumulus/issues/2471>

aggregated_segment
.append(&ancestor, watermark_update, &total_bandwidth_out)
.expect("unincluded segment limits exceeded");
Expand Down Expand Up @@ -431,6 +440,9 @@ pub mod pallet {
4 + hrmp_max_message_num_per_candidate as u64,
);

// Weight for adjusting the unincluded segment in `on_finalize`.
weight += T::DbWeight::get().reads_writes(6, 3);

// Always try to read `UpgradeGoAhead` in `on_finalize`.
weight += T::DbWeight::get().reads(1);

Expand Down Expand Up @@ -557,6 +569,7 @@ pub mod pallet {
let host_config = relay_state_proof
.read_abridged_host_configuration()
.expect("Invalid host configuration in relay chain state proof");

let relevant_messaging_state = relay_state_proof
.read_messaging_state_snapshot(&host_config)
.expect("Invalid messaging state in relay chain state proof");
Expand Down Expand Up @@ -1227,6 +1240,46 @@ impl<T: Config> Pallet<T> {
weight_used
}

/// This adjusts the `RelevantMessagingState` according to the bandwidth limits in the
/// unincluded segment.
//
// Reads: 2
// Writes: 1
fn adjust_egress_bandwidth_limits() {
let unincluded_segment = match AggregatedUnincludedSegment::<T>::get() {
None => return,
Some(s) => s,
};

<RelevantMessagingState<T>>::mutate(|messaging_state| {
let messaging_state = match messaging_state {
None => return,
Some(s) => s,
};

let used_bandwidth = unincluded_segment.used_bandwidth();

let channels = &mut messaging_state.egress_channels;
for (para_id, used) in used_bandwidth.hrmp_outgoing.iter() {
let i = match channels.binary_search_by_key(para_id, |item| item.0) {
Ok(i) => i,
Err(_) => continue, // indicates channel closed.
};

let c = &mut channels[i].1;

c.total_size = (c.total_size + used.total_bytes).min(c.max_total_size);
c.msg_count = (c.msg_count + used.msg_count).min(c.max_capacity);
}

let upward_capacity = &mut messaging_state.relay_dispatch_queue_remaining_capacity;
upward_capacity.remaining_count =
upward_capacity.remaining_count.saturating_sub(used_bandwidth.ump_msg_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanity checking, do we want to be subtracting the entire used_bandwidth of the segment here each time?

Copy link
Contributor Author

@rphmeier rphmeier Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. What we read from the relay-chain state is the total capacity as-of the most recently included block. We then subtract the used bandwidth in the unincluded segment from it to get the actual remaining channel capacity.

upward_capacity.remaining_size =
upward_capacity.remaining_size.saturating_sub(used_bandwidth.ump_total_bytes);
});
}

/// Put a new validation function into a particular location where polkadot
/// monitors for updates. Calling this function notifies polkadot that a new
/// upgrade has been scheduled.
Expand Down
70 changes: 66 additions & 4 deletions pallets/parachain-system/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,23 @@ fn send_message(dest: ParaId, message: Vec<u8>) {
impl XcmpMessageSource for FromThreadLocal {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut ids = std::collections::BTreeSet::<ParaId>::new();
let mut taken = 0;
let mut taken_messages = 0;
let mut taken_bytes = 0;
let mut result = Vec::new();
SENT_MESSAGES.with(|ms| {
ms.borrow_mut().retain(|m| {
let status = <Pallet<Test> as GetChannelInfo>::get_channel_status(m.0);
let ready = matches!(status, ChannelStatus::Ready(..));
if ready && !ids.contains(&m.0) && taken < maximum_channels {
let ChannelStatus::Ready(max_size_now, max_size_ever) = status else { return false };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is wrong. As this is within a retain, returning false (e.g. when the channel is full) would prune the message forever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, my bad, pushed a fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a more comprehensive version + added a bunch more to the test you wrote, just while I was playing around with it.

let msg_len = m.1.len();

if !ids.contains(&m.0) &&
taken_messages < maximum_channels &&
msg_len <= max_size_ever &&
taken_bytes + msg_len <= max_size_now
{
ids.insert(m.0);
taken += 1;
taken_messages += 1;
taken_bytes += msg_len;
result.push(m.clone());
false
} else {
Expand Down Expand Up @@ -679,6 +687,60 @@ fn inherent_processed_messages_are_ignored() {
});
}

#[test]
fn hrmp_outbound_respects_used_bandwidth() {
let recipient = ParaId::from(400);

CONSENSUS_HOOK.with(|c| {
*c.borrow_mut() = Box::new(|_| (Weight::zero(), NonZeroU32::new(3).unwrap().into()))
});

BlockTests::new()
.with_inclusion_delay(2)
.with_relay_sproof_builder(move |_, _, sproof| {
sproof.host_config.hrmp_max_message_num_per_candidate = 10;
let channel = sproof.upsert_outbound_channel(recipient);
channel.max_capacity = 2;
channel.max_total_size = 4;

channel.max_message_size = 10;
})
.add(1, || {})
.add_with_post_test(
2,
move || {
send_message(recipient, b"22".to_vec());
},
move || {
let v = HrmpOutboundMessages::<Test>::get();
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"22".to_vec() }]);
},
)
.add_with_post_test(
3,
move || {
send_message(recipient, b"333".to_vec());
},
move || {
// Parent has not been included, new message would've exceeded capacity.
let v = HrmpOutboundMessages::<Test>::get();
assert!(v.is_empty());
},
)
.add_with_post_test(
4,
move || {
send_message(recipient, b"a".to_vec());
send_message(recipient, b"b".to_vec());
},
move || {
let v = HrmpOutboundMessages::<Test>::get();
// One small message fits. This line relies on test implementation of [`XcmpMessageSource`].
assert_eq!(v, vec![OutboundHrmpMessage { recipient, data: b"a".to_vec() }]);
},
);
}

#[test]
fn events() {
BlockTests::new()
Expand Down
15 changes: 0 additions & 15 deletions pallets/parachain-system/src/unincluded_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,6 @@ impl OutboundBandwidthLimits {
hrmp_outgoing,
}
}

/// Compute the remaining bandwidth when accounting for the used amounts provided.
pub fn subtract(&mut self, used: &UsedBandwidth) {
self.ump_messages_remaining =
self.ump_messages_remaining.saturating_sub(used.ump_msg_count);
self.ump_bytes_remaining = self.ump_bytes_remaining.saturating_sub(used.ump_total_bytes);
for (para_id, channel_limits) in self.hrmp_outgoing.iter_mut() {
if let Some(update) = used.hrmp_outgoing.get(para_id) {
channel_limits.bytes_remaining =
channel_limits.bytes_remaining.saturating_sub(update.total_bytes);
channel_limits.messages_remaining =
channel_limits.messages_remaining.saturating_sub(update.msg_count);
}
}
}
}

/// The error type for updating bandwidth used by a segment.
Expand Down
38 changes: 28 additions & 10 deletions test/relay-sproof-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,34 @@ impl RelayStateSproofBuilder {
in_index.insert(idx, sender);
}

self.hrmp_channels
.entry(relay_chain::HrmpChannelId { sender, recipient: self.para_id })
.or_insert_with(|| AbridgedHrmpChannel {
max_capacity: 0,
max_total_size: 0,
max_message_size: 0,
msg_count: 0,
total_size: 0,
mqc_head: None,
})
self.upsert_channel(relay_chain::HrmpChannelId { sender, recipient: self.para_id })
}

/// Returns a mutable reference to HRMP channel metadata for a channel (`self.para_id`, `recipient`).
///
/// If there is no channel, a new default one is created.
///
/// It also updates the `hrmp_egress_channel_index`, creating it if needed.
pub fn upsert_outbound_channel(&mut self, recipient: ParaId) -> &mut AbridgedHrmpChannel {
let in_index = self.hrmp_egress_channel_index.get_or_insert_with(Vec::new);
if let Err(idx) = in_index.binary_search(&recipient) {
in_index.insert(idx, recipient);
}

self.upsert_channel(relay_chain::HrmpChannelId { sender: self.para_id, recipient })
}

/// Creates a new default entry in the hrmp channels mapping if not exists, and returns mutable
/// reference to it.
fn upsert_channel(&mut self, id: relay_chain::HrmpChannelId) -> &mut AbridgedHrmpChannel {
self.hrmp_channels.entry(id).or_insert_with(|| AbridgedHrmpChannel {
max_capacity: 0,
max_total_size: 0,
max_message_size: 0,
msg_count: 0,
total_size: 0,
mqc_head: None,
})
}

pub fn into_state_root_and_proof(
Expand Down