Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to customize thread spawning #636

Merged
merged 6 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ num_cpus = "1.2"
lazy_static = "1"
crossbeam-deque = "0.6.3"
crossbeam-queue = "0.1.2"
crossbeam-utils = "0.6.5"

[dev-dependencies]
rand = "0.6"
rand_xorshift = "0.1"
scoped-tls = "1.0"

[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"
Expand All @@ -49,3 +51,7 @@ path = "tests/scope_join.rs"
[[test]]
name = "simple_panic"
path = "tests/simple_panic.rs"

[[test]]
name = "scoped_threadpool"
path = "tests/scoped_threadpool.rs"
119 changes: 110 additions & 9 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::str::FromStr;

extern crate crossbeam_deque;
extern crate crossbeam_queue;
extern crate crossbeam_utils;
#[cfg(any(debug_assertions, rayon_unstable))]
#[macro_use]
extern crate lazy_static;
Expand All @@ -46,6 +47,8 @@ extern crate rand_xorshift;

#[macro_use]
mod log;
#[macro_use]
mod private;

mod job;
mod join;
Expand All @@ -64,13 +67,16 @@ mod test;
#[cfg(rayon_unstable)]
pub mod internal;
pub use join::{join, join_context};
pub use registry::ThreadBuilder;
pub use scope::{scope, Scope};
pub use scope::{scope_fifo, ScopeFifo};
pub use spawn::{spawn, spawn_fifo};
pub use thread_pool::current_thread_has_pending_tasks;
pub use thread_pool::current_thread_index;
pub use thread_pool::ThreadPool;

use registry::{CustomSpawn, DefaultSpawn, ThreadSpawn};

/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
/// the number of threads for the thread-pool of the current
Expand Down Expand Up @@ -123,8 +129,7 @@ enum ErrorKind {
///
/// [`ThreadPool`]: struct.ThreadPool.html
/// [`build_global()`]: struct.ThreadPoolBuilder.html#method.build_global
#[derive(Default)]
pub struct ThreadPoolBuilder {
pub struct ThreadPoolBuilder<S = DefaultSpawn> {
/// The number of threads in the rayon thread pool.
/// If zero will use the RAYON_NUM_THREADS environment variable.
/// If RAYON_NUM_THREADS is invalid or zero will use the default.
Expand All @@ -146,6 +151,9 @@ pub struct ThreadPoolBuilder {
/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// Closure invoked to spawn threads.
spawn_handler: S,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
Expand Down Expand Up @@ -174,12 +182,35 @@ type StartHandler = Fn(usize) + Send + Sync;
/// Note that this same closure may be invoked multiple times in parallel.
type ExitHandler = Fn(usize) + Send + Sync;

// NB: We can't `#[derive(Default)]` because `S` is left ambiguous.
impl Default for ThreadPoolBuilder {
fn default() -> Self {
ThreadPoolBuilder {
num_threads: 0,
panic_handler: None,
get_thread_name: None,
stack_size: None,
start_handler: None,
exit_handler: None,
spawn_handler: DefaultSpawn,
breadth_first: false,
}
}
}

impl ThreadPoolBuilder {
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
pub fn new() -> ThreadPoolBuilder {
ThreadPoolBuilder::default()
pub fn new() -> Self {
Self::default()
}
}

/// Note: the `S: ThreadSpawn` constraint is an internal implementation detail for the
/// default spawn and those set by [`spawn_handler`](#method.spawn_handler).
impl<S> ThreadPoolBuilder<S>
where
S: ThreadSpawn,
{
/// Create a new `ThreadPool` initialized using this configuration.
pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
ThreadPool::build(self)
Expand Down Expand Up @@ -207,6 +238,75 @@ impl ThreadPoolBuilder {
registry.wait_until_primed();
Ok(())
}
}

impl ThreadPoolBuilder {
/// Create a scoped `ThreadPool` initialized using this configuration.
///
/// This is a convenience function for building a pool using a `crossbeam`
cuviper marked this conversation as resolved.
Show resolved Hide resolved
/// scope to spawn threads in a [`spawn_handler`](#method.spawn_handler).
/// The threads in this pool will start by calling `wrapper`, which should
/// do initialization and continue by calling `ThreadBuilder::run()`.
cuviper marked this conversation as resolved.
Show resolved Hide resolved
pub fn build_scoped<W, F, R>(self, wrapper: W, with_pool: F) -> Result<R, ThreadPoolBuildError>
where
W: Fn(ThreadBuilder) + Sync, // expected to call `run()`
F: FnOnce(&ThreadPool) -> R,
{
let result = crossbeam_utils::thread::scope(|scope| {
let wrapper = &wrapper;
let pool = self
.spawn_handler(|thread| {
let mut builder = scope.builder();
if let Some(name) = thread.name() {
builder = builder.name(name.to_string());
}
if let Some(size) = thread.stack_size() {
builder = builder.stack_size(size);
}
builder.spawn(move |_| wrapper(thread))?;
Ok(())
})
.build()?;
Ok(with_pool(&pool))
});

match result {
Ok(result) => result,
Err(err) => unwind::resume_unwinding(err),
}
}
}

impl<S> ThreadPoolBuilder<S> {
/// Set a custom function for spawning threads.
///
/// Note that the threads will not exit until after the pool is dropped. It
/// is up to the caller to wait for thread termination if that is important
/// for any invariants. For instance, threads created in `crossbeam::scope`
cuviper marked this conversation as resolved.
Show resolved Hide resolved
/// will be joined before that scope returns, and this will block indefinitely
/// if the pool is leaked. Furthermore, the global thread pool doesn't terminate
/// until the entire process exits!
pub fn spawn_handler<F>(self, spawn: F) -> ThreadPoolBuilder<CustomSpawn<F>>
where
F: FnMut(ThreadBuilder) -> io::Result<()>,
{
ThreadPoolBuilder {
spawn_handler: CustomSpawn::new(spawn),
// ..self
num_threads: self.num_threads,
panic_handler: self.panic_handler,
get_thread_name: self.get_thread_name,
stack_size: self.stack_size,
start_handler: self.start_handler,
exit_handler: self.exit_handler,
breadth_first: self.breadth_first,
}
}

/// Returns a reference to the current spawn handler.
fn get_spawn_handler(&mut self) -> &mut S {
&mut self.spawn_handler
}

/// Get the number of threads that will be used for the thread
/// pool. See `num_threads()` for more information.
Expand Down Expand Up @@ -276,7 +376,7 @@ impl ThreadPoolBuilder {
/// replacement of the now deprecated `RAYON_RS_NUM_CPUS` environment
/// variable. If both variables are specified, `RAYON_NUM_THREADS` will
/// be prefered.
pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolBuilder {
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}
Expand All @@ -300,7 +400,7 @@ impl ThreadPoolBuilder {
/// If the panic handler itself panics, this will abort the
/// process. To prevent this, wrap the body of your panic handler
/// in a call to `std::panic::catch_unwind()`.
pub fn panic_handler<H>(mut self, panic_handler: H) -> ThreadPoolBuilder
pub fn panic_handler<H>(mut self, panic_handler: H) -> Self
where
H: Fn(Box<Any + Send>) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -368,7 +468,7 @@ impl ThreadPoolBuilder {
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
/// If that handler returns, then startup will continue normally.
pub fn start_handler<H>(mut self, start_handler: H) -> ThreadPoolBuilder
pub fn start_handler<H>(mut self, start_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
Expand All @@ -387,7 +487,7 @@ impl ThreadPoolBuilder {
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
/// If that handler returns, then the thread will exit normally.
pub fn exit_handler<H>(mut self, exit_handler: H) -> ThreadPoolBuilder
pub fn exit_handler<H>(mut self, exit_handler: H) -> Self
where
H: Fn(usize) + Send + Sync + 'static,
{
Expand Down Expand Up @@ -503,7 +603,7 @@ pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
config.into_builder().build_global().map_err(Box::from)
}

impl fmt::Debug for ThreadPoolBuilder {
impl<S> fmt::Debug for ThreadPoolBuilder<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ThreadPoolBuilder {
ref num_threads,
Expand All @@ -512,6 +612,7 @@ impl fmt::Debug for ThreadPoolBuilder {
ref stack_size,
ref start_handler,
ref exit_handler,
spawn_handler: _,
ref breadth_first,
} = *self;

Expand Down
26 changes: 26 additions & 0 deletions rayon-core/src/private.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! The public parts of this private module are used to create traits
//! that cannot be implemented outside of our own crate. This way we
//! can feel free to extend those traits without worrying about it
//! being a breaking change for other implementations.

/// If this type is pub but not publicly reachable, third parties
/// can't name it and can't implement traits using it.
#[allow(missing_debug_implementations)]
pub struct PrivateMarker;

macro_rules! private_decl {
() => {
/// This trait is private; this method exists to make it
/// impossible to implement outside the crate.
#[doc(hidden)]
fn __rayon_private__(&self) -> ::private::PrivateMarker;
}
}

macro_rules! private_impl {
() => {
fn __rayon_private__(&self) -> ::private::PrivateMarker {
::private::PrivateMarker
}
}
}
Loading