diff --git a/Cargo.lock b/Cargo.lock index b4bf48ce24ae..60d09a94eaaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4541,8 +4541,6 @@ dependencies = [ [[package]] name = "rayon-core" version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", diff --git a/Cargo.toml b/Cargo.toml index 1437c7176330..5dcddbcc9c2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,6 +169,9 @@ web-sys = { path = "build/rust/dummy-web/web-sys" } # Overrides to allow easier use of common internal crates. moz_asserts = { path = "mozglue/static/rust/moz_asserts" } +# Patch rayon core to import https://github.com/rayon-rs/rayon/pull/1063 +rayon-core = { path = "third_party/rust/rayon-core" } + # Patch `rure` to disable building the cdylib and staticlib targets # Cargo has no way to disable building targets your dependencies provide which # you don't depend on, and linking the cdylib breaks during instrumentation diff --git a/supply-chain/config.toml b/supply-chain/config.toml index 2af18036af70..1559cde00259 100644 --- a/supply-chain/config.toml +++ b/supply-chain/config.toml @@ -177,6 +177,10 @@ notes = "This is a first-party crate which is entirely unrelated to the crates.i audit-as-crates-io = true notes = "This is a first-party crate which is also published to crates.io, but we should publish audits for it for the benefit of the ecosystem." +[policy.rayon-core] +audit-as-crates-io = true +notes = "Identical to upstream, with a Mozilla-authored PR, see Cargo.toml comment for details" + [policy.rure] audit-as-crates-io = true notes = "Identical to upstream, but with cdylib and staticlib targets disabled to avoid unnecessary build artifacts and linker errors." diff --git a/third_party/rust/rayon-core/src/lib.rs b/third_party/rust/rayon-core/src/lib.rs index c9694ee16144..ca8c8b9d6cbc 100644 --- a/third_party/rust/rayon-core/src/lib.rs +++ b/third_party/rust/rayon-core/src/lib.rs @@ -99,10 +99,10 @@ pub use self::registry::ThreadBuilder; pub use self::scope::{in_place_scope, scope, Scope}; pub use self::scope::{in_place_scope_fifo, scope_fifo, ScopeFifo}; pub use self::spawn::{spawn, spawn_fifo}; -pub use self::thread_pool::current_thread_has_pending_tasks; -pub use self::thread_pool::current_thread_index; -pub use self::thread_pool::ThreadPool; -pub use self::thread_pool::{yield_local, yield_now, Yield}; +pub use self::thread_pool::{ + clean_up_use_current_thread, current_thread_has_pending_tasks, current_thread_index, + yield_local, yield_now, ThreadPool, Yield, +}; use self::registry::{CustomSpawn, DefaultSpawn, ThreadSpawn}; @@ -175,6 +175,9 @@ pub struct ThreadPoolBuilder { /// If RAYON_NUM_THREADS is invalid or zero will use the default. num_threads: usize, + /// The thread we're building *from* will also be part of the pool. + use_current_thread: bool, + /// Custom closure, if any, to handle a panic that we cannot propagate /// anywhere else. panic_handler: Option>, @@ -228,6 +231,7 @@ impl Default for ThreadPoolBuilder { fn default() -> Self { ThreadPoolBuilder { num_threads: 0, + use_current_thread: false, panic_handler: None, get_thread_name: None, stack_size: None, @@ -445,6 +449,7 @@ impl ThreadPoolBuilder { spawn_handler: CustomSpawn::new(spawn), // ..self num_threads: self.num_threads, + use_current_thread: self.use_current_thread, panic_handler: self.panic_handler, get_thread_name: self.get_thread_name, stack_size: self.stack_size, @@ -532,6 +537,34 @@ impl ThreadPoolBuilder { self } + /// Use the current thread as one of the threads in the pool. + /// + /// The current thread is guaranteed to be at index 0, and since the thread is not managed by + /// rayon, the spawn and exit handlers do not run for that thread. + /// + /// Note that the current thread won't run the main work-stealing loop, so jobs spawned into + /// the thread-pool will generally not be picked up automatically by this thread unless you + /// yield to rayon in some way, like via [`yield_now()`], [`yield_local()`], [`scope()`], or + /// [`clean_up_use_current_thread()`]. + /// + /// # Panics + /// + /// This function won't panic itself, but [`ThreadPoolBuilder::build()`] will panic if you've + /// called this function and the current thread is already part of another [`ThreadPool`]. + /// + /// # Cleaning up a local thread-pool + /// + /// In order to properly clean-up the worker thread state, for local thread-pools you should + /// call [`clean_up_use_current_thread()`] from the same thread that built the thread-pool. + /// See that function's documentation for more details. + /// + /// This call is not required, but without it the registry will leak even if the pool is + /// otherwise terminated. + pub fn use_current_thread(mut self) -> Self { + self.use_current_thread = true; + self + } + /// Returns a copy of the current panic handler. fn take_panic_handler(&mut self) -> Option> { self.panic_handler.take() @@ -771,6 +804,7 @@ impl fmt::Debug for ThreadPoolBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, + ref use_current_thread, ref get_thread_name, ref panic_handler, ref stack_size, @@ -795,6 +829,7 @@ impl fmt::Debug for ThreadPoolBuilder { f.debug_struct("ThreadPoolBuilder") .field("num_threads", num_threads) + .field("use_current_thread", use_current_thread) .field("get_thread_name", &get_thread_name) .field("panic_handler", &panic_handler) .field("stack_size", &stack_size) diff --git a/third_party/rust/rayon-core/src/registry.rs b/third_party/rust/rayon-core/src/registry.rs index 5d56ac92765a..a658f58711d5 100644 --- a/third_party/rust/rayon-core/src/registry.rs +++ b/third_party/rust/rayon-core/src/registry.rs @@ -139,6 +139,9 @@ pub(super) struct Registry { start_handler: Option>, exit_handler: Option>, + /// Whether the first thread of our pool is the creator of the thread pool. + used_creator_thread: bool, + // When this latch reaches 0, it means that all work on this // registry must be complete. This is ensured in the following ways: // @@ -210,26 +213,7 @@ fn default_global_registry() -> Result, ThreadPoolBuildError> { // is stubbed out, and we won't have to change anything if they do add real threading. let unsupported = matches!(&result, Err(e) if e.is_unsupported()); if unsupported && WorkerThread::current().is_null() { - let builder = ThreadPoolBuilder::new() - .num_threads(1) - .spawn_handler(|thread| { - // Rather than starting a new thread, we're just taking over the current thread - // *without* running the main loop, so we can still return from here. - // The WorkerThread is leaked, but we never shutdown the global pool anyway. - let worker_thread = Box::leak(Box::new(WorkerThread::from(thread))); - let registry = &*worker_thread.registry; - let index = worker_thread.index; - - unsafe { - WorkerThread::set_current(worker_thread); - - // let registry know we are ready to do work - Latch::set(®istry.thread_infos[index].primed); - } - - Ok(()) - }); - + let builder = ThreadPoolBuilder::new().num_threads(1).use_current_thread(); let fallback_result = Registry::new(builder); if fallback_result.is_ok() { return fallback_result; @@ -291,6 +275,7 @@ impl Registry { panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), exit_handler: builder.take_exit_handler(), + used_creator_thread: builder.use_current_thread, }); // If we return early or panic, make sure to terminate existing threads. @@ -305,6 +290,23 @@ impl Registry { stealer, index, }; + + if index == 0 && builder.use_current_thread { + // Rather than starting a new thread, we're just taking over the current thread + // *without* running the main loop, so we can still return from here. + // The WorkerThread is leaked, but we never shutdown the global pool anyway. + // + // For local pools, the caller is responsible of cleaning this up if they need to + // by using clean_up_use_current_thread. + let worker_thread = Box::into_raw(Box::new(WorkerThread::from(thread))); + + unsafe { + WorkerThread::set_current(worker_thread); + Latch::set(®istry.thread_infos[index].primed); + } + continue; + } + if let Err(e) = builder.get_spawn_handler().spawn(thread) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))); } @@ -619,6 +621,10 @@ impl Registry { pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) { self.sleep.notify_worker_latch_is_set(target_worker_index); } + + pub(super) fn used_creator_thread(&self) -> bool { + self.used_creator_thread + } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -945,13 +951,8 @@ unsafe fn main_loop(thread: ThreadBuilder) { worker: index, terminate_addr: my_terminate_latch.as_core_latch().addr(), }); - worker_thread.wait_until(my_terminate_latch); - // Should not be any work left in our queue. - debug_assert!(worker_thread.take_local_job().is_none()); - - // let registry know we are done - Latch::set(®istry.thread_infos[index].stopped); + wait_until_out_of_work(worker_thread); // Normal termination, do not abort. mem::forget(abort_guard); @@ -965,6 +966,21 @@ unsafe fn main_loop(thread: ThreadBuilder) { } } +pub(crate) unsafe fn wait_until_out_of_work(worker_thread: &WorkerThread) { + debug_assert_eq!(worker_thread as *const _, WorkerThread::current()); + let registry = &*worker_thread.registry; + let index = worker_thread.index; + let my_terminate_latch = ®istry.thread_infos[index].terminate; + + worker_thread.wait_until(my_terminate_latch); + + // Should not be any work left in our queue. + debug_assert!(worker_thread.take_local_job().is_none()); + + // let registry know we are done + Latch::set(®istry.thread_infos[index].stopped); +} + /// If already in a worker-thread, just execute `op`. Otherwise, /// execute `op` in the default thread-pool. Either way, block until /// `op` completes and return its return value. If `op` panics, that diff --git a/third_party/rust/rayon-core/src/thread_pool/mod.rs b/third_party/rust/rayon-core/src/thread_pool/mod.rs index c37826ef52a7..e9cfb6adaa0f 100644 --- a/third_party/rust/rayon-core/src/thread_pool/mod.rs +++ b/third_party/rust/rayon-core/src/thread_pool/mod.rs @@ -461,6 +461,39 @@ pub fn yield_local() -> Option { } } +/// Waits for termination of the thread-pool (if pending), and cleans up resources allocated by +/// [`ThreadPoolBuilder::use_current_thread()`]. Should only be called from the thread that built +/// the thread-pool, and only when [`ThreadPoolBuilder::use_current_thread()`] is used. +/// +/// Calling this function from a thread pool job will block indefinitely. +/// +/// Calling this function before before the thread-pool has been dropped will cause the thread to +/// not return control flow to the caller until that happens (stealing work as necessary). +/// +/// # Panics +/// +/// If the calling thread is no the creator thread of a thread-pool, or not part of that +/// thread-pool, via [`ThreadPoolBuilder::use_current_thread()`]. +pub fn clean_up_use_current_thread() { + unsafe { + let thread = WorkerThread::current() + .as_ref() + .expect("Should be called from a worker thread"); + assert!( + thread.registry().used_creator_thread(), + "Should only be used to clean up the pool creator constructor thread" + ); + assert_eq!( + thread.index(), + 0, + "Should be called from the thread that created the pool" + ); + crate::registry::wait_until_out_of_work(thread); + let _ = Box::from_raw(WorkerThread::current() as *mut WorkerThread); + } + assert!(WorkerThread::current().is_null()); +} + /// Result of [`yield_now()`] or [`yield_local()`]. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Yield {