Skip to content

Commit

Permalink
Add a main_handler which is passed an argument to run the proper main…
Browse files Browse the repository at this point in the history
… loop
  • Loading branch information
Zoxc committed Jan 31, 2019
1 parent 729d7cc commit e58b9c3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
29 changes: 29 additions & 0 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ pub struct ThreadPoolBuilder {
/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// Closure invoked on worker thread start.
main_handler: Option<Box<MainHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
Expand Down Expand Up @@ -173,6 +176,12 @@ type StartHandler = Fn(usize) + Send + Sync;
/// Note that this same closure may be invoked multiple times in parallel.
type ExitHandler = Fn(usize) + Send + Sync;

/// The type for a closure that gets invoked with a
/// function which runs rayon tasks.
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;

impl ThreadPoolBuilder {
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
pub fn new() -> ThreadPoolBuilder {
Expand Down Expand Up @@ -383,6 +392,23 @@ impl ThreadPoolBuilder {
self.exit_handler = Some(Box::new(exit_handler));
self
}

/// Takes the current thread main callback, leaving `None`.
fn take_main_handler(&mut self) -> Option<Box<MainHandler>> {
self.main_handler.take()
}

/// Set a callback to be invoked on thread main.
///
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
pub fn main_handler<H>(mut self, main_handler: H) -> ThreadPoolBuilder
where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static
{
self.main_handler = Some(Box::new(main_handler));
self
}
}

#[allow(deprecated)]
Expand Down Expand Up @@ -500,6 +526,7 @@ impl fmt::Debug for ThreadPoolBuilder {
ref panic_handler,
ref stack_size,
ref start_handler,
ref main_handler,
ref exit_handler,
ref breadth_first,
} = *self;
Expand All @@ -516,6 +543,7 @@ impl fmt::Debug for ThreadPoolBuilder {
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);

f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
Expand All @@ -524,6 +552,7 @@ impl fmt::Debug for ThreadPoolBuilder {
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("main_handler", &main_handler)
.field("breadth_first", &breadth_first)
.finish()
}
Expand Down
21 changes: 19 additions & 2 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::thread;
use std::usize;
use unwind;
use util::leak;
use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, ThreadPoolBuilder};
use {ErrorKind, ExitHandler, PanicHandler, StartHandler,
MainHandler, ThreadPoolBuildError, ThreadPoolBuilder};

pub struct Registry {
thread_infos: Vec<ThreadInfo>,
Expand All @@ -28,6 +29,7 @@ pub struct Registry {
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
main_handler: Option<Box<MainHandler>>,

// When this latch reaches 0, it means that all work on this
// registry must be complete. This is ensured in the following ways:
Expand Down Expand Up @@ -117,6 +119,7 @@ impl Registry {
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
main_handler: builder.take_main_handler(),
exit_handler: builder.take_exit_handler(),
});

Expand Down Expand Up @@ -689,7 +692,21 @@ unsafe fn main_loop(
}
}

worker_thread.wait_until(&registry.terminate_latch);
let mut work = || {
worker_thread.wait_until(&registry.terminate_latch);
};

if let Some(ref handler) = registry.main_handler {
match unwind::halt_unwinding(|| handler(index, &mut work)) {
Ok(()) => {
}
Err(err) => {
registry.handle_panic(err);
}
}
} else {
work();
}

// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());
Expand Down

0 comments on commit e58b9c3

Please sign in to comment.