Skip to content

Commit

Permalink
Push global jobs back when trying to steal them
Browse files Browse the repository at this point in the history
When stealing jobs from the global queue, it's possible the local queue
is filled up while there are still jobs we stole. Prior to this commit
those jobs would be lost, resulting in processes never running again.
This commit fixes this by pushing these remaining jobs back onto the
global queue.

This fixes #539.

Changelog: fixed
  • Loading branch information
yorickpeterse committed May 23, 2023
1 parent 95bfb8d commit 5b2980a
Showing 1 changed file with 40 additions and 2 deletions.
42 changes: 40 additions & 2 deletions rt/src/scheduler/process.rs
Expand Up @@ -442,9 +442,14 @@ impl Thread {
if steal > 0 {
// We're splitting at an index, so we must subtract one from the
// amount.
for process in global.split_off(steal - 1) {
let mut to_steal = global.split_off(steal - 1);

drop(global);

while let Some(process) = to_steal.pop() {
if let Err(process) = self.work.push(process) {
global.push(process);
to_steal.push(process);
self.pool.schedule_multiple(to_steal);
break;
}
}
Expand Down Expand Up @@ -1040,6 +1045,39 @@ mod tests {
assert!(state.scheduler.pool.global.lock().unwrap().is_empty());
}

#[test]
fn test_thread_steal_from_global_with_full_local_queue() {
let class = empty_process_class("A");
let process = new_main_process(*class, method).take_and_forget();
let state = setup();
let mut thread = Thread::new(0, 0, state.scheduler.pool.clone());

for _ in 0..LOCAL_QUEUE_CAPACITY {
thread.schedule(process);
}

state.scheduler.pool.schedule(process);
state.scheduler.pool.schedule(process);
state.scheduler.pool.schedule(process);
state.scheduler.pool.schedule(process);

// When the scheduler/threads are dropped, pending processes are
// deallocated. Since we're pushing the same process many times, we have
// to clear the queues first before setting any assertions that may
// fail.
let stolen = thread.steal_from_global().is_some();
let global_len = state.scheduler.pool.global.lock().unwrap().len();

state.scheduler.pool.global.lock().unwrap().clear();

for _ in 0..LOCAL_QUEUE_CAPACITY {
thread.work.pop();
}

assert!(stolen);
assert_eq!(global_len, 3);
}

#[test]
fn test_thread_run_as_backup() {
let class = empty_process_class("A");
Expand Down

0 comments on commit 5b2980a

Please sign in to comment.