Skip to content

Commit

Permalink
wake a new idle thread when last idle thread finds work
Browse files Browse the repository at this point in the history
a go-inspired addition
  • Loading branch information
nikomatsakis committed Sep 15, 2019
1 parent 7ca597d commit 2e96efb
Showing 1 changed file with 34 additions and 23 deletions.
57 changes: 34 additions & 23 deletions rayon-core/src/sleep/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,15 @@ impl Sleep {
self.thread_counts.fetch_add(MSW_ONE, Ordering::SeqCst);
}

fn sub_idle_thread(&self) {
/// Removes this thread from the "idle thread" list. Returns true
/// if we were the last idle thread and other threads are still
/// sleeping.
fn sub_idle_thread(&self) -> bool {
// Relaxed suffices: we don't use these reads as a signal that
// we can read some other memory.
self.thread_counts.fetch_sub(LSW_ONE, Ordering::Relaxed);
let thread_counts_before_sub = self.thread_counts.fetch_sub(LSW_ONE, Ordering::Relaxed);
let (num_awake_but_idle, num_sleeping) = Self::split_thread_counts(thread_counts_before_sub);
num_awake_but_idle == 1 && num_sleeping > 0
}

fn sub_sleeping_thread(&self) {
Expand All @@ -103,6 +108,23 @@ impl Sleep {
self.thread_counts.fetch_sub(MSW_ONE, Ordering::Relaxed);
}

/// Returns `(num_awake_but_idle, num_sleeping)`, the pair of:
///
/// - the number of threads that are awake but idle, looking for work
/// - the number of threads that are asleep
fn load_thread_counts(&self, ordering: Ordering) -> (u32, u32) {
// Relaxed suffices: we don't use these reads as a signal that
// we can read some other memory.
let thread_counts = self.thread_counts.load(ordering);
Self::split_thread_counts(thread_counts)
}

fn split_thread_counts(thread_counts: u64) -> (u32, u32) {
let num_sleeping = (thread_counts >> 32) as u32;
let num_awake_but_idle = (thread_counts as u32) - num_sleeping;
(num_awake_but_idle, num_sleeping)
}

fn split_j_s_counters(counters: u64) -> (JobsCounter, SleepyCounter) {
let jobs_counter = JobsCounter(counters as u32);
let sleepy_counter = SleepyCounter((counters >> 32) as u32);
Expand Down Expand Up @@ -179,19 +201,6 @@ impl Sleep {
true
}

/// Returns `(num_awake_but_idle, num_sleeping)`, the pair of:
///
/// - the number of threads that are awake but idle, looking for work
/// - the number of threads that are asleep
fn load_thread_counts(&self, ordering: Ordering) -> (u32, u32) {
// Relaxed suffices: we don't use these reads as a signal that
// we can read some other memory.
let thread_counts = self.thread_counts.load(ordering);
let num_sleeping = (thread_counts >> 32) as u32;
let num_awake_but_idle = (thread_counts as u32) - num_sleeping;
(num_awake_but_idle, num_sleeping)
}

#[inline]
pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
self.add_idle_thread();
Expand All @@ -211,7 +220,11 @@ impl Sleep {
worker: idle_state.worker_index,
yields: idle_state.rounds,
});
self.sub_idle_thread();
if self.sub_idle_thread() {
// If we were the last idle thread and other threads are still sleeping,
// then we should wake up another thread.
self.wake_threads(idle_state.worker_index, 1);
}
}

#[inline]
Expand Down Expand Up @@ -445,23 +458,21 @@ impl Sleep {
return;
}

self.new_jobs_cold(source_worker_index, num_jobs, num_awake_but_idle, num_sleepers);
let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
self.wake_threads(source_worker_index, num_to_wake);
}

#[cold]
fn new_jobs_cold(
fn wake_threads(
&self,
source_worker_index: usize,
num_jobs: u32,
num_awake_but_idle: u32,
num_sleepers: u32,
mut num_to_wake: u32,
) {
let mut num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
for (i, sleep_state) in self.worker_sleep_states.iter().enumerate() {
let is_asleep = sleep_state.is_asleep.lock().unwrap();
if *is_asleep {
sleep_state.condvar.notify_one();
log!(TickleAnyTarget {
log!(TickleAnyWakeThread {
source_worker: source_worker_index,
target_worker: i,
});
Expand Down

0 comments on commit 2e96efb

Please sign in to comment.