Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Yieldable queues for pallet MessageQueue #13424

Merged
merged 15 commits into from
Feb 25, 2023
2 changes: 1 addition & 1 deletion bin/node/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ impl pallet_message_queue::Config for Runtime {
type RuntimeEvent = RuntimeEvent;
type WeightInfo = ();
/// NOTE: Always set this to `NoopMessageProcessor` for benchmarking.
type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor;
type MessageProcessor = pallet_message_queue::mock_helpers::NoopMessageProcessor<u32>;
type Size = u32;
type QueueChangeHandler = ();
type HeapSize = ConstU32<{ 64 * 1024 }>;
Expand Down
129 changes: 122 additions & 7 deletions frame/message-queue/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
use crate::{
mock::{
new_test_ext, CountingMessageProcessor, IntoWeight, MockedWeightInfo, NumMessagesProcessed,
SuspendedQueues,
},
mock_helpers::MessageOrigin,
*,
};

Expand All @@ -39,6 +41,7 @@ use sp_runtime::{
testing::Header,
traits::{BlakeTwo256, IdentityLookup},
};
use std::collections::{BTreeMap, BTreeSet};

type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic<Test>;
type Block = frame_system::mocking::MockBlock<Test>;
Expand Down Expand Up @@ -100,7 +103,8 @@ impl Config for Test {

/// Simulates heavy usage by enqueueing and processing large amounts of messages.
///
/// Best to run with `-r`, `RUST_LOG=info` and `RUSTFLAGS='-Cdebug-assertions=y'`.
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
Expand Down Expand Up @@ -130,29 +134,131 @@ fn stress_test_enqueue_and_service() {
let mut msgs_remaining = 0;
for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let (enqueued, _) =
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;

// Pick a fraction of all messages currently in queue and process them.
let processed = rng.gen_range(1..=msgs_remaining);
log::info!("Processing {} of all messages {}", processed, msgs_remaining);
process_messages(processed); // This also advances the block.
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
log::info!("Processing all remaining {} messages", msgs_remaining);
process_messages(msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// Simulates heavy usage of the suspension logic via `Yield`.
///
/// Best to run with `RUST_LOG=info RUSTFLAGS='-Cdebug-assertions=y' cargo test -r -p
/// pallet-message-queue -- --ignored`.
///
/// # Example output
///
/// ```pre
/// Enqueued 11776 messages across 2526 queues. Payload 173.94 KiB
/// Suspended 63 and resumed 7 queues of 2526 in total
/// Processing 593 messages. Resumed msgs: 11599, All msgs: 11776
/// Enqueued 30104 messages across 5533 queues. Payload 416.62 KiB
/// Suspended 24 and resumed 15 queues of 5533 in total
/// Processing 12841 messages. Resumed msgs: 40857, All msgs: 41287
/// Processing all 28016 remaining resumed messages
/// Resumed all 64 suspended queues
/// Processing all remaining 430 messages
/// ```
#[test]
#[ignore] // Only run in the CI.
fn stress_test_queue_suspension() {
let blocks = 20;
let max_queues = 10_000;
let max_messages_per_queue = 10_000;
let (max_suspend_per_block, max_resume_per_block) = (100, 50);
let max_msg_len = MaxMessageLenOf::<Test>::get();
let mut rng = StdRng::seed_from_u64(41);

new_test_ext::<Test>().execute_with(|| {
let mut suspended = BTreeSet::<u32>::new();
let mut msgs_remaining = 0;

for _ in 0..blocks {
// Start by enqueuing a large number of messages.
let enqueued =
enqueue_messages(max_queues, max_messages_per_queue, max_msg_len, &mut rng);
msgs_remaining += enqueued;
let per_queue = msgs_per_queue();

// Suspend a random subset of queues.
let to_suspend = rng.gen_range(0..max_suspend_per_block).min(per_queue.len());
for _ in 0..to_suspend {
let q = rng.gen_range(0..per_queue.len());
suspended.insert(*per_queue.iter().nth(q).map(|(q, _)| q).unwrap());
}
// Resume a random subst of suspended queues.
let to_resume = rng.gen_range(0..max_resume_per_block).min(suspended.len());
for _ in 0..to_resume {
let q = rng.gen_range(0..suspended.len());
suspended.remove(&suspended.iter().nth(q).unwrap().clone());
}
log::info!(
"Suspended {} and resumed {} queues of {} in total",
to_suspend,
to_resume,
per_queue.len()
);
SuspendedQueues::set(suspended.iter().map(|q| MessageOrigin::Everywhere(*q)).collect());

// Pick a fraction of all messages currently in queue and process them.
let resumed_messages =
per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum();
let processed = rng.gen_range(1..=resumed_messages);
log::info!(
"Processing {} messages. Resumed msgs: {}, All msgs: {}",
processed,
resumed_messages,
msgs_remaining
);
process_some_messages(processed); // This also advances the block.
msgs_remaining -= processed;
}
let per_queue = msgs_per_queue();
let resumed_messages =
per_queue.iter().filter(|(q, _)| !suspended.contains(q)).map(|(_, n)| n).sum();
log::info!("Processing all {} remaining resumed messages", resumed_messages);
process_all_messages(resumed_messages);
msgs_remaining -= resumed_messages;

let resumed = SuspendedQueues::take();
log::info!("Resumed all {} suspended queues", resumed.len());
log::info!("Processing all remaining {} messages", msgs_remaining);
process_all_messages(msgs_remaining);
post_conditions();
});
}

/// How many messages are in each queue.
fn msgs_per_queue() -> BTreeMap<u32, u32> {
let mut per_queue = BTreeMap::new();
for (o, q) in BookStateFor::<Test>::iter() {
let MessageOrigin::Everywhere(o) = o else {
unreachable!();
};
per_queue.insert(o, q.message_count as u32);
}
per_queue
}

/// Enqueue a random number of random messages into a random number of queues.
///
/// Returns the total number of enqueued messages, their combined length and the number of messages
/// per queue.
fn enqueue_messages(
max_queues: u32,
max_per_queue: u32,
max_msg_len: u32,
rng: &mut StdRng,
) -> (u32, usize) {
) -> u32 {
let num_queues = rng.gen_range(1..max_queues);
let mut num_messages = 0;
let mut total_msg_len = 0;
Expand All @@ -179,11 +285,11 @@ fn enqueue_messages(
num_queues,
total_msg_len as f64 / 1024.0
);
(num_messages, total_msg_len as usize)
num_messages
}

/// Process the number of messages.
fn process_messages(num_msgs: u32) {
fn process_some_messages(num_msgs: u32) {
let weight = (num_msgs as u64).into_weight();
ServiceWeight::set(Some(weight));
let consumed = next_block();
Expand All @@ -192,6 +298,15 @@ fn process_messages(num_msgs: u32) {
assert_eq!(NumMessagesProcessed::take(), num_msgs as usize);
}

/// Process all remaining messages and assert their number.
fn process_all_messages(expected: u32) {
ServiceWeight::set(Some(Weight::MAX));
let consumed = next_block();

assert_eq!(consumed, Weight::from_all(expected as u64));
assert_eq!(NumMessagesProcessed::take(), expected as usize);
}

/// Returns the weight consumed by `MessageQueue::on_initialize()`.
fn next_block() -> Weight {
MessageQueue::on_finalize(System::block_number());
Expand Down
55 changes: 40 additions & 15 deletions frame/message-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@ pub mod pallet {
Queued,
/// There is temporarily not enough weight to continue servicing messages.
InsufficientWeight,
/// This message is temporarily unprocessable.
///
/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
/// retrying.
TemporarilyUnprocessable,
}

/// The index of the first and last (non-empty) pages.
Expand Down Expand Up @@ -588,6 +593,9 @@ pub mod pallet {

/// Execute an overweight message.
///
/// Temporary processing errors will be propagated whereas permanent errors are treated
/// as success condition.
///
/// - `origin`: Must be `Signed`.
/// - `message_origin`: The origin from which the message to be executed arrived.
/// - `page`: The page in the queue in which the message to be executed is sitting.
Expand Down Expand Up @@ -621,6 +629,10 @@ pub mod pallet {
enum PageExecutionStatus {
/// The execution bailed because there was not enough weight remaining.
Bailed,
/// The page did not make any progress on its execution.
///
/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
NoProgress,
/// No more messages could be loaded. This does _not_ imply `page.is_complete()`.
///
/// The reasons for this status are:
Expand All @@ -634,6 +646,10 @@ enum PageExecutionStatus {
enum ItemExecutionStatus {
/// The execution bailed because there was not enough weight remaining.
Bailed,
/// The item did not make any progress on its execution.
///
/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
NoProgress,
/// The item was not found.
NoItem,
/// Whether the execution of an item resulted in it being processed.
Expand All @@ -651,8 +667,8 @@ enum MessageExecutionStatus {
Overweight,
/// The message was processed successfully.
Processed,
/// The message was processed and resulted in a permanent error.
Unprocessable,
/// The message was processed and resulted in a, possibly permanent, error.
Unprocessable { permanent: bool },
}

impl<T: Config> Pallet<T> {
Expand Down Expand Up @@ -814,7 +830,8 @@ impl<T: Config> Pallet<T> {
// additional overweight event being deposited.
) {
Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
Unprocessable | Processed => {
Unprocessable { permanent: false } => Err(Error::<T>::TemporarilyUnprocessable),
Unprocessable { permanent: true } | Processed => {
page.note_processed_at_pos(pos);
book_state.message_count.saturating_dec();
book_state.size.saturating_reduce(payload_len);
Expand Down Expand Up @@ -921,6 +938,7 @@ impl<T: Config> Pallet<T> {
weight: &mut WeightMeter,
overweight_limit: Weight,
) -> (bool, Option<MessageOriginOf<T>>) {
use PageExecutionStatus::*;
if !weight.check_accrue(
T::WeightInfo::service_queue_base().saturating_add(T::WeightInfo::ready_ring_unknit()),
) {
Expand All @@ -936,9 +954,9 @@ impl<T: Config> Pallet<T> {
total_processed.saturating_accrue(processed);
match status {
// Store the page progress and do not go to the next one.
PageExecutionStatus::Bailed => break,
Bailed | NoProgress => break,
// Go to the next page if this one is at the end.
PageExecutionStatus::NoMore => (),
NoMore => (),
};
book_state.begin.saturating_inc();
}
Expand Down Expand Up @@ -1003,6 +1021,7 @@ impl<T: Config> Pallet<T> {
) {
Bailed => break PageExecutionStatus::Bailed,
NoItem => break PageExecutionStatus::NoMore,
NoProgress => break PageExecutionStatus::NoProgress,
// Keep going as long as we make progress...
Executed(true) => total_processed.saturating_inc(),
Executed(false) => (),
Expand Down Expand Up @@ -1053,7 +1072,8 @@ impl<T: Config> Pallet<T> {
overweight_limit,
) {
InsufficientWeight => return ItemExecutionStatus::Bailed,
Processed | Unprocessable => true,
Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
Processed | Unprocessable { permanent: true } => true,
Overweight => false,
};

Expand Down Expand Up @@ -1125,12 +1145,14 @@ impl<T: Config> Pallet<T> {
page_index: PageIndex,
message_index: T::Size,
message: &[u8],
weight: &mut WeightMeter,
meter: &mut WeightMeter,
overweight_limit: Weight,
) -> MessageExecutionStatus {
let hash = T::Hashing::hash(message);
use ProcessMessageError::Overweight;
match T::MessageProcessor::process_message(message, origin.clone(), weight.remaining()) {
use ProcessMessageError::*;
let prev_consumed = meter.consumed;

match T::MessageProcessor::process_message(message, origin.clone(), meter) {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
// Permanently overweight.
Self::deposit_event(Event::<T>::OverweightEnqueued {
Expand All @@ -1146,16 +1168,19 @@ impl<T: Config> Pallet<T> {
// queue.
MessageExecutionStatus::InsufficientWeight
},
Err(error) => {
Err(Yield) => {
// Processing should be reattempted later.
MessageExecutionStatus::Unprocessable { permanent: false }
},
Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
// Permanent error - drop
Self::deposit_event(Event::<T>::ProcessingFailed { hash, origin, error });
MessageExecutionStatus::Unprocessable
MessageExecutionStatus::Unprocessable { permanent: true }
},
Ok((success, weight_used)) => {
Ok(success) => {
// Success
weight.defensive_saturating_accrue(weight_used);
let event = Event::<T>::Processed { hash, origin, weight_used, success };
Self::deposit_event(event);
let weight_used = meter.consumed.saturating_sub(prev_consumed);
Self::deposit_event(Event::<T>::Processed { hash, origin, weight_used, success });
MessageExecutionStatus::Processed
},
}
Expand Down
Loading