diff --git a/rt/src/scheduler/process.rs b/rt/src/scheduler/process.rs index 7b9fcfde6..6635496fe 100644 --- a/rt/src/scheduler/process.rs +++ b/rt/src/scheduler/process.rs @@ -442,9 +442,14 @@ impl Thread { if steal > 0 { // We're splitting at an index, so we must subtract one from the // amount. - for process in global.split_off(steal - 1) { + let mut to_steal = global.split_off(steal - 1); + + drop(global); + + while let Some(process) = to_steal.pop() { if let Err(process) = self.work.push(process) { - global.push(process); + to_steal.push(process); + self.pool.schedule_multiple(to_steal); break; } } @@ -1040,6 +1045,39 @@ mod tests { assert!(state.scheduler.pool.global.lock().unwrap().is_empty()); } + #[test] + fn test_thread_steal_from_global_with_full_local_queue() { + let class = empty_process_class("A"); + let process = new_main_process(*class, method).take_and_forget(); + let state = setup(); + let mut thread = Thread::new(0, 0, state.scheduler.pool.clone()); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + thread.schedule(process); + } + + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + + // When the scheduler/threads are dropped, pending processes are + // deallocated. Since we're pushing the same process many times, we have + // to clear the queues first before setting any assertions that may + // fail. + let stolen = thread.steal_from_global().is_some(); + let global_len = state.scheduler.pool.global.lock().unwrap().len(); + + state.scheduler.pool.global.lock().unwrap().clear(); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + thread.work.pop(); + } + + assert!(stolen); + assert_eq!(global_len, 3); + } + #[test] fn test_thread_run_as_backup() { let class = empty_process_class("A");