From 9461f7bb49409bee2568291f3629d70265b30880 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Mon, 5 Jun 2023 16:54:25 -0700 Subject: [PATCH 1/6] core: registry: Allow using the current thread in a new pool. See discussion in #1052. Closes #1052. --- rayon-core/src/lib.rs | 13 +++++++++++++ rayon-core/src/registry.rs | 36 ++++++++++++++++-------------------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index e80a10c6f..0596e2a80 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -174,6 +174,9 @@ pub struct ThreadPoolBuilder { /// If RAYON_NUM_THREADS is invalid or zero will use the default. num_threads: usize, + /// The thread we're building *from* will also be part of the pool. + use_current: bool, + /// Custom closure, if any, to handle a panic that we cannot propagate /// anywhere else. panic_handler: Option>, @@ -227,6 +230,7 @@ impl Default for ThreadPoolBuilder { fn default() -> Self { ThreadPoolBuilder { num_threads: 0, + use_current: false, panic_handler: None, get_thread_name: None, stack_size: None, @@ -437,6 +441,7 @@ impl ThreadPoolBuilder { spawn_handler: CustomSpawn::new(spawn), // ..self num_threads: self.num_threads, + use_current: self.use_current, panic_handler: self.panic_handler, get_thread_name: self.get_thread_name, stack_size: self.stack_size, @@ -529,6 +534,12 @@ impl ThreadPoolBuilder { self } + /// Use the current thread as one of the threads in the pool. + pub fn use_current(mut self) -> Self { + self.use_current = true; + self + } + /// Returns a copy of the current panic handler. fn take_panic_handler(&mut self) -> Option> { self.panic_handler.take() @@ -768,6 +779,7 @@ impl fmt::Debug for ThreadPoolBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, + ref use_current, ref get_thread_name, ref panic_handler, ref stack_size, @@ -792,6 +804,7 @@ impl fmt::Debug for ThreadPoolBuilder { f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) + .field("use_current", use_current) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 0c8224edc..0fb17797e 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -207,26 +207,7 @@ fn default_global_registry() -> Result, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new() - .num_threads(1) - .spawn_handler(|thread| { - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - unsafe { - WorkerThread::set_current(worker_thread); - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - } - - Ok(()) - }); - + let builder = ThreadPoolBuilder::new().num_threads(1).use_current(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; @@ -300,6 +281,21 @@ impl Registry { stealer, index, }; + + if index == 0 && builder.use_current { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + // TODO: what about non-global thread pools? + let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + + unsafe { + WorkerThread::set_current(worker_thread); + Latch::set(®istry.thread_infos[index].primed); + } + continue; + } + if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } From 18f68616799ed2e74eff55ee9382eff75b1e9f81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Sun, 25 Jun 2023 14:05:54 +0200 Subject: [PATCH 2/6] core: registry: Rename ThreadPoolBuilder::use_current to use_current_thread. As per suggestion in #1052. --- rayon-core/src/lib.rs | 14 +++++++------- rayon-core/src/registry.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 0596e2a80..ebfba1201 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -175,7 +175,7 @@ pub struct ThreadPoolBuilder { num_threads: usize, /// The thread we're building *from* will also be part of the pool. - use_current: bool, + use_current_thread: bool, /// Custom closure, if any, to handle a panic that we cannot propagate /// anywhere else. @@ -230,7 +230,7 @@ impl Default for ThreadPoolBuilder { fn default() -> Self { ThreadPoolBuilder { num_threads: 0, - use_current: false, + use_current_thread: false, panic_handler: None, get_thread_name: None, stack_size: None, @@ -441,7 +441,7 @@ impl ThreadPoolBuilder { spawn_handler: CustomSpawn::new(spawn), // ..self num_threads: self.num_threads, - use_current: self.use_current, + use_current_thread: self.use_current_thread, panic_handler: self.panic_handler, get_thread_name: self.get_thread_name, stack_size: self.stack_size, @@ -535,8 +535,8 @@ impl ThreadPoolBuilder { } /// Use the current thread as one of the threads in the pool. - pub fn use_current(mut self) -> Self { - self.use_current = true; + pub fn use_current_thread(mut self) -> Self { + self.use_current_thread = true; self } @@ -779,7 +779,7 @@ impl fmt::Debug for ThreadPoolBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, - ref use_current, + ref use_current_thread, ref get_thread_name, ref panic_handler, ref stack_size, @@ -804,7 +804,7 @@ impl fmt::Debug for ThreadPoolBuilder { f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) - .field("use_current", use_current) + .field("use_current_thread", use_current_thread) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 0fb17797e..130f2c3a3 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -207,7 +207,7 @@ fn default_global_registry() -> Result, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new().num_threads(1).use_current(); + let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; @@ -282,7 +282,7 @@ impl Registry { index, }; - if index == 0 && builder.use_current { + if index == 0 && builder.use_current_thread { // Rather than starting a new thread, we're just taking over the current thread // *without* running the main loop, so we can still return from here. // The WorkerThread is leaked, but we never shutdown the global pool anyway. From 87274ad093b354384b50e73121366b07662bad1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Sun, 25 Jun 2023 14:05:54 +0200 Subject: [PATCH 3/6] core: registry: Add some more documentation for ThreadPoolBuilder::use_current_thread. --- rayon-core/src/lib.rs | 17 +++++++++++++++++ rayon-core/src/registry.rs | 3 +-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index ebfba1201..d5751746a 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -535,6 +535,23 @@ impl ThreadPoolBuilder { } /// Use the current thread as one of the threads in the pool. + /// + /// The current thread is guaranteed to be at index 0, and since the thread is not managed by + /// rayon, the spawn and exit handlers do not run for that thread. + /// + /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into + /// the thread-pool will generally not be picked up automatically by this thread unless you + /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. + /// + /// # Panics + /// + /// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've + /// called this function and the current thread is already part of another [`ThreadPool`]. + /// + /// # Local thread-pools + /// + /// Using this in a local thread-pool means the registry will be leaked. In future versions + /// there might be a way of cleaning up the current-thread state. pub fn use_current_thread(mut self) -> Self { self.use_current_thread = true; self diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 130f2c3a3..927a4b551 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -286,8 +286,7 @@ impl Registry { // Rather than starting a new thread, we're just taking over the current thread // *without* running the main loop, so we can still return from here. // The WorkerThread is leaked, but we never shutdown the global pool anyway. - // TODO: what about non-global thread pools? - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); + let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); unsafe { WorkerThread::set_current(worker_thread); From f4db4d711e6382b13b4a7da88c0087e71a6252e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Sun, 25 Jun 2023 14:05:54 +0200 Subject: [PATCH 4/6] core: tests: Add some basic tests for ThreadPoolBuilder::use_current_thread. Ideas for testing the "call cleanup function from a job" case would be great. --- rayon-core/Cargo.toml | 4 +++ rayon-core/tests/use_current_thread.rs | 47 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 rayon-core/tests/use_current_thread.rs diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index f6e4797b4..2cd5372bb 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -53,3 +53,7 @@ path = "tests/simple_panic.rs" [[test]] name = "scoped_threadpool" path = "tests/scoped_threadpool.rs" + +[[test]] +name = "use_current_thread" +path = "tests/use_current_thread.rs" diff --git a/rayon-core/tests/use_current_thread.rs b/rayon-core/tests/use_current_thread.rs new file mode 100644 index 000000000..d72d3b235 --- /dev/null +++ b/rayon-core/tests/use_current_thread.rs @@ -0,0 +1,47 @@ +use rayon_core::ThreadPoolBuilder; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread::{self, JoinHandle}; + +#[test] +fn use_current_thread_basic() { + static JOIN_HANDLES: Mutex>> = Mutex::new(Vec::new()); + let pool = ThreadPoolBuilder::new() + .num_threads(2) + .use_current_thread() + .spawn_handler(|builder| { + let handle = thread::Builder::new().spawn(|| builder.run())?; + JOIN_HANDLES.lock().unwrap().push(handle); + Ok(()) + }) + .build() + .unwrap(); + assert_eq!(rayon_core::current_thread_index(), Some(0)); + assert_eq!( + JOIN_HANDLES.lock().unwrap().len(), + 1, + "Should only spawn one extra thread" + ); + + let pair = Arc::new((Mutex::new(false), Condvar::new())); + let pair2 = Arc::clone(&pair); + pool.spawn(move || { + assert_ne!(rayon_core::current_thread_index(), Some(0)); + // This should execute even if the current thread is blocked, since we have two threads in + // the pool. + let &(ref started, ref condvar) = &*pair2; + *started.lock().unwrap() = true; + condvar.notify_one(); + }); + + let _guard = pair + .1 + .wait_while(pair.0.lock().unwrap(), |ran| !*ran) + .unwrap(); + std::mem::drop(pool); // Drop the pool. + + // Wait until all threads have actually exited. This is not really needed, other than to + // reduce noise of leak-checking tools. + for handle in std::mem::take(&mut *JOIN_HANDLES.lock().unwrap()) { + let _ = handle.join(); + } +} From 40b59c0e443fc803c6b88b26fdf7a5194a19ca81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Cobos=20=C3=81lvarez?= Date: Wed, 20 Sep 2023 12:14:39 +0200 Subject: [PATCH 5/6] core: Make use_current_thread error rather than panic when already in the pool. --- rayon-core/src/lib.rs | 13 +++++++------ rayon-core/src/registry.rs | 5 +++++ rayon-core/tests/use_current_thread.rs | 9 +++++++++ 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index d5751746a..7001c8c1d 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -147,6 +147,7 @@ pub struct ThreadPoolBuildError { #[derive(Debug)] enum ErrorKind { GlobalPoolAlreadyInitialized, + CurrentThreadAlreadyInPool, IOError(io::Error), } @@ -543,11 +544,6 @@ impl ThreadPoolBuilder { /// the thread-pool will generally not be picked up automatically by this thread unless you /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], or [`scope()`]. /// - /// # Panics - /// - /// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've - /// called this function and the current thread is already part of another [`ThreadPool`]. - /// /// # Local thread-pools /// /// Using this in a local thread-pool means the registry will be leaked. In future versions @@ -759,18 +755,22 @@ impl ThreadPoolBuildError { const GLOBAL_POOL_ALREADY_INITIALIZED: &str = "The global thread pool has already been initialized."; +const CURRENT_THREAD_ALREADY_IN_POOL: &str = + "The current thread is already part of another thread pool."; + impl Error for ThreadPoolBuildError { #[allow(deprecated)] fn description(&self) -> &str { match self.kind { ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED, + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL, ErrorKind::IOError(ref e) => e.description(), } } fn source(&self) -> Option<&(dyn Error + 'static)> { match &self.kind { - ErrorKind::GlobalPoolAlreadyInitialized => None, + ErrorKind::GlobalPoolAlreadyInitialized | ErrorKind::CurrentThreadAlreadyInPool => None, ErrorKind::IOError(e) => Some(e), } } @@ -779,6 +779,7 @@ impl Error for ThreadPoolBuildError { impl fmt::Display for ThreadPoolBuildError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.kind { + ErrorKind::CurrentThreadAlreadyInPool => CURRENT_THREAD_ALREADY_IN_POOL.fmt(f), ErrorKind::GlobalPoolAlreadyInitialized => GLOBAL_POOL_ALREADY_INITIALIZED.fmt(f), ErrorKind::IOError(e) => e.fmt(f), } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 927a4b551..342a9cfb5 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -283,6 +283,11 @@ impl Registry { }; if index == 0 && builder.use_current_thread { + if !WorkerThread::current().is_null() { + return Err(ThreadPoolBuildError::new( + ErrorKind::CurrentThreadAlreadyInPool, + )); + } // Rather than starting a new thread, we're just taking over the current thread // *without* running the main loop, so we can still return from here. // The WorkerThread is leaked, but we never shutdown the global pool anyway. diff --git a/rayon-core/tests/use_current_thread.rs b/rayon-core/tests/use_current_thread.rs index d72d3b235..de1c10350 100644 --- a/rayon-core/tests/use_current_thread.rs +++ b/rayon-core/tests/use_current_thread.rs @@ -22,6 +22,15 @@ fn use_current_thread_basic() { "Should only spawn one extra thread" ); + let another_pool = ThreadPoolBuilder::new() + .num_threads(2) + .use_current_thread() + .build(); + assert!( + another_pool.is_err(), + "Should error if the thread is already part of a pool" + ); + let pair = Arc::new((Mutex::new(false), Condvar::new())); let pair2 = Arc::clone(&pair); pool.spawn(move || { From 01d28003764758b6589e29cff40f8ca8d0d51797 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Wed, 20 Sep 2023 09:38:29 -0700 Subject: [PATCH 6/6] Ignore the multi-threaded test on emscripten/wasm --- rayon-core/tests/use_current_thread.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rayon-core/tests/use_current_thread.rs b/rayon-core/tests/use_current_thread.rs index de1c10350..ec801c98d 100644 --- a/rayon-core/tests/use_current_thread.rs +++ b/rayon-core/tests/use_current_thread.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Condvar, Mutex}; use std::thread::{self, JoinHandle}; #[test] +#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)] fn use_current_thread_basic() { static JOIN_HANDLES: Mutex>> = Mutex::new(Vec::new()); let pool = ThreadPoolBuilder::new()