From 5b2980ab9676ea685aa3724602ed0127583e8371 Mon Sep 17 00:00:00 2001 From: Yorick Peterse Date: Tue, 23 May 2023 17:21:47 +0200 Subject: [PATCH] Push global jobs back when trying to steal them When stealing jobs from the global queue, it's possible the local queue is filled up while there are still jobs we stole. Prior to this commit those jobs would be lost, resulting in processes never running again. This commit fixes this by pushing these remaining jobs back onto the global queue. This fixes https://github.com/inko-lang/inko/issues/539. Changelog: fixed --- rt/src/scheduler/process.rs | 42 +++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/rt/src/scheduler/process.rs b/rt/src/scheduler/process.rs index 7b9fcfde6..6635496fe 100644 --- a/rt/src/scheduler/process.rs +++ b/rt/src/scheduler/process.rs @@ -442,9 +442,14 @@ impl Thread { if steal > 0 { // We're splitting at an index, so we must subtract one from the // amount. - for process in global.split_off(steal - 1) { + let mut to_steal = global.split_off(steal - 1); + + drop(global); + + while let Some(process) = to_steal.pop() { if let Err(process) = self.work.push(process) { - global.push(process); + to_steal.push(process); + self.pool.schedule_multiple(to_steal); break; } } @@ -1040,6 +1045,39 @@ mod tests { assert!(state.scheduler.pool.global.lock().unwrap().is_empty()); } + #[test] + fn test_thread_steal_from_global_with_full_local_queue() { + let class = empty_process_class("A"); + let process = new_main_process(*class, method).take_and_forget(); + let state = setup(); + let mut thread = Thread::new(0, 0, state.scheduler.pool.clone()); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + thread.schedule(process); + } + + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + state.scheduler.pool.schedule(process); + + // When the scheduler/threads are dropped, pending processes are + // deallocated. Since we're pushing the same process many times, we have + // to clear the queues first before setting any assertions that may + // fail. + let stolen = thread.steal_from_global().is_some(); + let global_len = state.scheduler.pool.global.lock().unwrap().len(); + + state.scheduler.pool.global.lock().unwrap().clear(); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + thread.work.pop(); + } + + assert!(stolen); + assert_eq!(global_len, 3); + } + #[test] fn test_thread_run_as_backup() { let class = empty_process_class("A");