feat: support batch in session.#401
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces batch support for sessions, enabling gang scheduling by allocating executors and dispatching tasks in coordinated groups. It includes updates to the protobuf definitions, CLI, scheduler actions, and storage engines to handle batch configurations. Feedback highlights several critical issues, including a busy-wait loop in the task polling logic that could lead to high CPU usage and a race condition during batch index assignment. Performance improvements were also suggested regarding inefficient map cloning and sorting in the task selection process, along with a fix for potential integer truncation when handling task IDs.
| ctx.waker().wake_by_ref(); | ||
| return Poll::Pending; |
There was a problem hiding this comment.
| pub async fn next_batch_index(&self, ssn_id: &SessionID, batch_size: u32) -> Result<u32, FlameError> { | ||
| let exe_map = lock_ptr!(self.executors)?; | ||
|
|
||
| let bound_count = exe_map | ||
| .values() | ||
| .filter_map(|exe_ptr| exe_ptr.lock().ok()) | ||
| .filter(|exe| exe.ssn_id.as_ref() == Some(ssn_id)) | ||
| .count(); | ||
|
|
||
| Ok((bound_count as u32) % batch_size) | ||
| } |
There was a problem hiding this comment.
There is a race condition in next_batch_index. The bound_count is calculated based on a snapshot of the executors, but the assignment of the index to the executor happens after the lock on the executor map is released. If multiple executors attempt to bind to the same session concurrently, they may receive the same batch_index.
| if let Some((task_id, _)) = pending_tasks.clone().iter().next() { | ||
| return pending_tasks.remove(task_id); | ||
| } |
There was a problem hiding this comment.
Cloning the entire pending_tasks map just to retrieve the first element is highly inefficient. You can use .keys().next() to get an optional key without cloning the collection.
| if let Some((task_id, _)) = pending_tasks.clone().iter().next() { | |
| return pending_tasks.remove(task_id); | |
| } | |
| if let Some(task_id) = pending_tasks.keys().next().cloned() { | |
| return pending_tasks.remove(&task_id); | |
| } |
| let mut sorted_ids: Vec<_> = pending_tasks.keys().cloned().collect(); | ||
| sorted_ids.sort(); |
There was a problem hiding this comment.
Sorting all pending task IDs on every call to pop_pending_task results in BTreeMap) for tasks_index or caching the sorted keys.
| sorted_ids.sort(); | ||
|
|
||
| for task_id in sorted_ids { | ||
| if (task_id as u32) % batch_size == batch_index { |
There was a problem hiding this comment.
Casting task_id (i64) to u32 can lead to incorrect batch assignment due to truncation if task IDs exceed u32::MAX. It is safer to perform the modulo operation on the original i64 type or cast batch_size to i64.
| if (task_id as u32) % batch_size == batch_index { | |
| if (task_id % batch_size as i64) == batch_index as i64 { |
1a5e5ac to
5b47043
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
17c6766 to
c3bf371
Compare
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
66be9ed to
676c279
Compare
- Add batch_index field to executor for tracking batch assignment - Add derive_events_path_tests.rs with 12 tests covering sqlite://, fs://, and fallback paths including FLAME_TEST_DIR env var handling - Add 8 comprehensive FsEventManager tests for multiple events, sessions, persistence, and FLAME_TEST_DIR integration - Fix test isolation by using unique subdirectories per test to avoid shared events directory conflicts in parallel test runs
No description provided.