[ISSUE #4539]✨Add performance optimizations and new benchmarks for ScheduleMessageService#4540
[ISSUE #4539]✨Add performance optimizations and new benchmarks for ScheduleMessageService#4540rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
…heduleMessageService
WalkthroughThis change introduces performance benchmarks and optimizations for ScheduleMessageService, including pre-allocation constants, batch processing capabilities, backpressure mechanisms, and refactored task handle management from Arc<Mutex> to Option. A new Criterion-based benchmark module measures VecDeque allocation, lock contention, and batch processing performance. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–60 minutes Areas requiring extra attention:
Poem
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-broker/src/schedule/schedule_message_service.rs (1)
1130-1147: Bug: Backpressure check is performed after message delivery, causing delivered messages to be lost.The new
MAX_PENDING_QUEUE_SIZEcheck at lines 1136-1144 is performed afterdeliver_message()has already been called (line 1131-1133). When backpressure triggers and returnsOk(false), the message has been delivered to the store but theresult_processis not added toprocesses_queue. This means:
- The message is delivered but the result is not tracked
- The offset won't be updated for that message
- On service restart, the message may be re-delivered
The backpressure check should be performed before calling
deliver_message(), similar to the existing flow control check.+ // Performance optimization: Check queue size before delivering + if processes_queue.len() >= MAX_PENDING_QUEUE_SIZE { + warn!( + "Pending queue for delay level {} is full (size={}), applying backpressure", + self.delay_level, + processes_queue.len() + ); + // Return false to signal backpressure + return Ok(false); + } + // Deliver the message let result_process = self .deliver_message(msg_inner, msg_id, offset, offset_py, size_py, true) .await?; - // Performance optimization: Check queue size before adding - if processes_queue.len() >= MAX_PENDING_QUEUE_SIZE { - warn!( - "Pending queue for delay level {} is full (size={}), applying backpressure", - self.delay_level, - processes_queue.len() - ); - // Return false to signal backpressure - return Ok(false); - } - // Add to pending queue processes_queue.push_back(result_process);
🧹 Nitpick comments (3)
rocketmq-broker/benches/schedule_message_service_performance.rs (2)
77-105: Consider adding concurrent reader benchmarks for a more meaningful RwLock comparison.The current benchmark tests sequential reads, but RwLock's advantage over Mutex is primarily in concurrent read scenarios where multiple readers can acquire the lock simultaneously. The current sequential benchmark may not show significant differences between the two.
Consider adding a concurrent variant:
+ group.bench_function("rwlock_concurrent_read", |b| { + b.iter(|| { + rt.block_on(async { + let handles: Vec<_> = (0..4) + .map(|_| { + let data = Arc::clone(&rwlock_data); + tokio::spawn(async move { + let guard = data.read().await; + black_box(guard.len()) + }) + }) + .collect(); + for h in handles { + let _ = h.await; + } + }) + }); + });
112-134: Unbounded vector growth may skew benchmark results.The
mutex_dataandrwlock_datavectors grow continuously across all benchmark iterations without being reset. This causes:
- Increasing memory usage throughout the benchmark
- Periodic Vec reallocations that add noise
- Measurements that change based on iteration count
group.bench_function("mutex_sequential_write", |b| { + rt.block_on(async { mutex_data.lock().await.clear() }); b.iter(|| { rt.block_on(async { let mut data = mutex_data.lock().await; data.push(black_box(42)); + if data.len() > 1000 { + data.clear(); + } }) }); }); group.bench_function("rwlock_sequential_write", |b| { + rt.block_on(async { rwlock_data.write().await.clear() }); b.iter(|| { rt.block_on(async { let mut data = rwlock_data.write().await; data.push(black_box(42)); + if data.len() > 1000 { + data.clear(); + } }) }); });rocketmq-broker/src/schedule/schedule_message_service.rs (1)
258-259: Minor: Capacity calculation may over-allocate when async delivery is disabled.The capacity
len * 2 + 1assumes both delivery and async handler tasks per level, but async handler tasks are only spawned whenenable_async_deliveris true. This is a minor inefficiency.- // Pre-allocate task_handles vector for all delay levels + persist task - let mut task_handles = Vec::with_capacity(this.delay_level_table.len() * 2 + 1); + // Pre-allocate task_handles vector for all delay levels + persist task + let multiplier = if this.enable_async_deliver { 2 } else { 1 }; + let mut task_handles = Vec::with_capacity(this.delay_level_table.len() * multiplier + 1);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-broker/Cargo.toml(1 hunks)rocketmq-broker/benches/schedule_message_service_performance.rs(1 hunks)rocketmq-broker/src/schedule/schedule_message_service.rs(11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Agent
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (8)
rocketmq-broker/Cargo.toml (1)
77-83: LGTM!The benchmark configuration is correct. Using
harness = falseis required for Criterion-based benchmarks, and the naming follows the existing convention in the project.rocketmq-broker/benches/schedule_message_service_performance.rs (1)
171-211: LGTM!The atomic operations benchmark and helper functions are well-structured. The benchmark covers the key atomic operations used in the service (
load,store,fetch_add), and the helper functions provide appropriate workload simulation for batch processing comparisons.rocketmq-broker/src/schedule/schedule_message_service.rs (6)
70-76: LGTM!The performance constants are well-chosen and documented. The values align with common practices:
MAX_BATCH_SIZE = 32is a reasonable batch size for message processing,INITIAL_QUEUE_CAPACITY = 128reduces early reallocations, andMAX_PENDING_QUEUE_SIZE = 10000provides a sensible upper bound for backpressure.
135-136: Good simplification of the task handle storage.Changing from
Arc<Mutex<Vec<...>>>toOption<Vec<...>>is appropriate since task handles are only written duringstart()and consumed duringshutdown(). This eliminates unnecessary synchronization overhead.Also applies to: 155-156
344-355: LGTM!The shutdown implementation correctly uses
take()for one-time consumption of task handles. The early return with a warning handles the edge case where shutdown is called without a prior start (or called twice).
588-594: LGTM!Pre-allocating the
VecDequewithINITIAL_QUEUE_CAPACITYis a good optimization that reduces reallocations during normal message processing operations.
1656-1709: LGTM with observation.The batch processing optimization is well-implemented. The early exit on
Runningstatus and breaks onException/Skippreserve the original semantics while allowing batch processing for successful completions. The queue size logging at >1000 items provides useful operational visibility.Note: When
ExceptionorSkipstatus is encountered, the batch effectively processes only one item before breaking. This is intentional and correct for maintaining proper error handling order.
1742-1884: LGTM!The test coverage is comprehensive, covering:
DelayOffsetSerializeWrappercreation and accessors- Bidirectional
queue_id/delay_levelconversionsProcessStatusenum traits- Edge cases with negative and large values
- JSON serialization format verification
The tests appropriately focus on the public API and helper functions without requiring a full service instantiation.
There was a problem hiding this comment.
Pull request overview
This PR adds performance optimizations and comprehensive benchmarks for the ScheduleMessageService component, focusing on reducing memory allocations, improving batch processing, and implementing backpressure mechanisms.
Key Changes
- Refactored task handle management from
Arc<Mutex<Vec>>toOption<Vec>for one-time initialization/consumption pattern - Added VecDeque capacity pre-allocation to reduce reallocations during normal operation
- Implemented batch processing optimization with
MAX_BATCH_SIZEconstant (32 items per cycle) - Added backpressure mechanism when pending queue exceeds
MAX_PENDING_QUEUE_SIZE(10,000 items) - Created comprehensive benchmark suite covering VecDeque allocation, lock performance, batch processing, and atomic operations
- Added unit tests for utility functions and data structures
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| rocketmq-broker/src/schedule/schedule_message_service.rs | Core optimizations including task handle refactoring, VecDeque capacity pre-allocation, backpressure logic, batch processing improvements, and new unit tests |
| rocketmq-broker/benches/schedule_message_service_performance.rs | New comprehensive benchmark suite measuring memory allocation overhead, lock performance, and batch processing efficiency |
| rocketmq-broker/Cargo.toml | Added benchmark configuration for the new performance test suite |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Performance optimization: Check queue size before adding | ||
| if processes_queue.len() >= MAX_PENDING_QUEUE_SIZE { | ||
| warn!( | ||
| "Pending queue for delay level {} is full (size={}), applying backpressure", | ||
| self.delay_level, | ||
| processes_queue.len() | ||
| ); | ||
| // Return false to signal backpressure | ||
| return Ok(false); | ||
| } |
There was a problem hiding this comment.
The backpressure check occurs after deliver_message has already been called (line 1131-1133), which means the message has been delivered and processed but the result is discarded if the queue is full. This could lead to message loss or duplicate processing. The backpressure check should be performed before calling deliver_message, similar to the existing flow control check at lines 1113-1120.
| /// Test compute_deliver_timestamp with known delay level | ||
| #[test] | ||
| fn test_compute_deliver_timestamp() { | ||
| // This test requires a ScheduleMessageService instance with delay_level_table populated | ||
| // We'll test the logic by understanding the function behavior | ||
| let store_timestamp = 1000i64; | ||
| let delay_time = 5000i64; // 5 seconds | ||
| let expected = store_timestamp + delay_time; | ||
|
|
||
| assert_eq!(expected, 6000); | ||
| } |
There was a problem hiding this comment.
This test doesn't actually test the compute_deliver_timestamp function - it only performs arithmetic verification (1000 + 5000 = 6000) without calling any actual method. Consider either removing this test or implementing it to actually test the compute_deliver_timestamp method with a properly configured ScheduleMessageService instance.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4540 +/- ##
==========================================
+ Coverage 30.29% 30.36% +0.06%
==========================================
Files 673 673
Lines 97651 97736 +85
==========================================
+ Hits 29588 29676 +88
+ Misses 68063 68060 -3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
rocketmq-rust-bot
left a comment
There was a problem hiding this comment.
LGTM - All CI checks passed ✅
Which Issue(s) This PR Fixes(Closes)
Fixes #4539
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.