From ccea232615c860ec6cbc58f6b25a1026f51f66ba Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 25 Jun 2023 06:52:41 -0700 Subject: [PATCH 1/5] Centralize all global state --- src/lib.rs | 319 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 185 insertions(+), 134 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5c87762..74cd88d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,8 +90,173 @@ mod sealed { pub trait Sealed {} } -/// An event delivered every time the SIGCHLD signal occurs. -static SIGCHLD: Event = Event::new(); +/// The zombie process reaper. +/// +/// This structure reaps zombie processes and emits the `SIGCHLD` signal. +struct Reaper { + /// An event delivered every time the SIGCHLD signal occurs. + sigchld: Event, + + /// The list of zombie processes. + zombies: Mutex>, + + /// The pipe that delivers signal notifications. + pipe: OnceCell, +} + +impl Reaper { + /// Get the singleton instance of the reaper. + fn get() -> &'static Self { + static REAPER: Reaper = Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: OnceCell::new(), + }; + + // Start up the reaper thread if we haven't already. + REAPER.pipe.get_or_init_blocking(|| { + thread::Builder::new() + .name("async-process".to_string()) + .spawn(|| REAPER.reap()) + .expect("cannot spawn async-process thread"); + + Pipe::new().expect("cannot create SIGCHLD pipe") + }); + + &REAPER + } + + /// Reap zombie processes forever. + fn reap(&'static self) -> ! { + loop { + // Wait for the next SIGCHLD signal. + self.pipe.get().unwrap().wait(); + + // Notify all listeners waiting on the SIGCHLD event. + self.sigchld.notify(std::usize::MAX); + + // Reap zombie processes. + let mut zombies = self.zombies.lock().unwrap(); + let mut i = 0; + while i < zombies.len() { + if let Ok(None) = zombies[i].try_wait() { + i += 1; + } else { + zombies.swap_remove(i); + } + } + } + } + + /// Register a process with this reaper. + fn register(&'static self, child: &std::process::Child) -> io::Result<()> { + self.pipe.get().unwrap().register(child) + } +} + +cfg_if::cfg_if! { + if #[cfg(windows)] { + use std::ffi::c_void; + use std::os::windows::io::AsRawHandle; + use std::sync::mpsc; + + use windows_sys::Win32::{ + Foundation::{BOOLEAN, HANDLE}, + System::Threading::{ + RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, + }, + }; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The sender channel for the SIGCHLD signal. + sender: mpsc::SyncSender<()>, + + /// The receiver channel for the SIGCHLD signal. + receiver: Mutex>, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + let (sender, receiver) = mpsc::sync_channel(1); + Ok(Pipe { + sender, + receiver: Mutex::new(receiver), + }) + } + + /// Waits for the next SIGCHLD signal. + fn wait(&self) { + self.receiver.lock().unwrap().recv().ok(); + } + + /// Register a process object into this pipe. + fn register(&self, _child: &std::process::Child) -> io::Result<()> { + // Called when a child exits. + unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { + callback_channel().0.try_send(()).ok(); + } + + // Register this child process to invoke `callback` on exit. + let mut wait_object = 0; + let ret = unsafe { + RegisterWaitForSingleObject( + &mut wait_object, + child.as_raw_handle() as HANDLE, + Some(callback), + std::ptr::null_mut(), + INFINITE, + WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, + ) + }; + + if ret == 0 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + } + } + + // Wraps a sync I/O type into an async I/O type. + fn wrap(io: T) -> io::Result> { + Ok(Unblock::new(io)) + } + } else if #[cfg(unix)] { + use async_signal::{Signal, Signals}; + + /// Waits for the next SIGCHLD signal. + struct Pipe { + /// The iterator over SIGCHLD signals. + signals: Signals, + } + + impl Pipe { + /// Creates a new pipe. + fn new() -> io::Result { + Ok(Pipe { + signals: Signals::new(Some(Signal::Child))?, + }) + } + + /// Waits for the next SIGCHLD signal. + fn wait(&self) { + async_io::block_on((&self.signals).next()); + } + + /// Register a process object into this pipe. + fn register(&self, _child: &std::process::Child) -> io::Result<()> { + Ok(()) + } + } + + /// Wrap a file descriptor into a non-blocking I/O type. + fn wrap(io: T) -> io::Result> { + Async::new(io) + } + } +} /// A guard that can kill child processes, or push them into the zombie list. struct ChildGuard { @@ -106,6 +271,21 @@ impl ChildGuard { } } +// When the last reference to the child process is dropped, push it into the zombie list. +impl Drop for ChildGuard { + fn drop(&mut self) { + if self.kill_on_drop { + self.get_mut().kill().ok(); + } + if self.reap_on_drop { + let mut zombies = Reaper::get().zombies.lock().unwrap(); + if let Ok(None) = self.get_mut().try_wait() { + zombies.push(self.inner.take().unwrap()); + } + } + } +} + /// A spawned child process. /// /// The process can be in running or exited state. Use [`status()`][`Child::status()`] or @@ -151,137 +331,8 @@ impl Child { let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout); let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); - cfg_if::cfg_if! { - if #[cfg(windows)] { - use std::ffi::c_void; - use std::os::windows::io::AsRawHandle; - use std::sync::mpsc; - - use windows_sys::Win32::{ - Foundation::{BOOLEAN, HANDLE}, - System::Threading::{ - RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, - }, - }; - - // This channel is used to simulate SIGCHLD on Windows. - fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex>) { - static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex>)> = - OnceCell::new(); - - let (s, r) = CALLBACK.get_or_init_blocking(|| { - let (s, r) = mpsc::sync_channel(1); - (s, Mutex::new(r)) - }); - - (s, r) - } - - // Called when a child exits. - unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - callback_channel().0.try_send(()).ok(); - } - - // Register this child process to invoke `callback` on exit. - let mut wait_object = 0; - let ret = unsafe { - RegisterWaitForSingleObject( - &mut wait_object, - child.as_raw_handle() as HANDLE, - Some(callback), - std::ptr::null_mut(), - INFINITE, - WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, - ) - }; - if ret == 0 { - return Err(io::Error::last_os_error()); - } - - // Waits for the next SIGCHLD signal. - fn wait_sigchld() { - callback_channel().1.lock().unwrap().recv().ok(); - } - - // Wraps a sync I/O type into an async I/O type. - fn wrap(io: T) -> io::Result> { - Ok(Unblock::new(io)) - } - - } else if #[cfg(unix)] { - use async_signal::{Signal, Signals}; - - static SIGNALS: OnceCell = OnceCell::new(); - - // Make sure the signal handler is registered before interacting with the process. - SIGNALS.get_or_init_blocking(|| { - Signals::new(Some(Signal::Child)) - .expect("Failed to register SIGCHLD handler") - }); - - // Waits for the next SIGCHLD signal. - fn wait_sigchld() { - async_io::block_on( - SIGNALS - .get() - .expect("Signals not registered") - .next() - ); - } - - // Wraps a sync I/O type into an async I/O type. - fn wrap(io: T) -> io::Result> { - Async::new(io) - } - } - } - - static ZOMBIES: OnceCell>> = OnceCell::new(); - - // Make sure the thread is started. - ZOMBIES.get_or_init_blocking(|| { - // Start a thread that handles SIGCHLD and notifies tasks when child processes exit. - thread::Builder::new() - .name("async-process".to_string()) - .spawn(move || { - loop { - // Wait for the next SIGCHLD signal. - wait_sigchld(); - - // Notify all listeners waiting on the SIGCHLD event. - SIGCHLD.notify(std::usize::MAX); - - // Reap zombie processes. - let mut zombies = ZOMBIES.get().unwrap().lock().unwrap(); - let mut i = 0; - while i < zombies.len() { - if let Ok(None) = zombies[i].try_wait() { - i += 1; - } else { - zombies.swap_remove(i); - } - } - } - }) - .expect("cannot spawn async-process thread"); - - Mutex::new(Vec::new()) - }); - - // When the last reference to the child process is dropped, push it into the zombie list. - impl Drop for ChildGuard { - fn drop(&mut self) { - if self.kill_on_drop { - self.get_mut().kill().ok(); - } - if self.reap_on_drop { - let mut zombies = ZOMBIES.get().unwrap().lock().unwrap(); - if let Ok(None) = self.get_mut().try_wait() { - zombies.push(self.inner.take().unwrap()); - } - } - } - } + // Register the child process in the global list. + Reaper::get().register(&child)?; Ok(Child { stdin, @@ -381,7 +432,7 @@ impl Child { let child = self.child.clone(); async move { - let listener = EventListener::new(&SIGCHLD); + let listener = EventListener::new(&Reaper::get().sigchld); let mut listening = false; futures_lite::pin!(listener); From 4d2738eb5ac9d7870cc0aa53ea630158cd804c0b Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 25 Jun 2023 07:25:36 -0700 Subject: [PATCH 2/5] Fix windows errors --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 74cd88d..64a6748 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,10 +192,10 @@ cfg_if::cfg_if! { } /// Register a process object into this pipe. - fn register(&self, _child: &std::process::Child) -> io::Result<()> { + fn register(&self, child: &std::process::Child) -> io::Result<()> { // Called when a child exits. unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - callback_channel().0.try_send(()).ok(); + Reaper::get().pipe.get().unwrap().sender.try_send(()).ok(); } // Register this child process to invoke `callback` on exit. From a5346e52f9a674cde8bfe4649832a3bca77ba4ac Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 28 Jun 2023 20:45:08 -0700 Subject: [PATCH 3/5] Move out the cell --- src/lib.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 64a6748..fe44c01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,36 +101,33 @@ struct Reaper { zombies: Mutex>, /// The pipe that delivers signal notifications. - pipe: OnceCell, + pipe: Pipe, } impl Reaper { /// Get the singleton instance of the reaper. fn get() -> &'static Self { - static REAPER: Reaper = Reaper { - sigchld: Event::new(), - zombies: Mutex::new(Vec::new()), - pipe: OnceCell::new(), - }; + static REAPER: OnceCell = OnceCell::new(); - // Start up the reaper thread if we haven't already. - REAPER.pipe.get_or_init_blocking(|| { + REAPER.get_or_init_blocking(|| { thread::Builder::new() .name("async-process".to_string()) - .spawn(|| REAPER.reap()) + .spawn(|| REAPER.get().unwrap().reap()) .expect("cannot spawn async-process thread"); - Pipe::new().expect("cannot create SIGCHLD pipe") - }); - - &REAPER + Reaper { + sigchld: Event::new(), + zombies: Mutex::new(Vec::new()), + pipe: Pipe::new().expect("cannot create SIGCHLD pipe"), + } + }) } /// Reap zombie processes forever. fn reap(&'static self) -> ! { loop { // Wait for the next SIGCHLD signal. - self.pipe.get().unwrap().wait(); + self.pipe.wait(); // Notify all listeners waiting on the SIGCHLD event. self.sigchld.notify(std::usize::MAX); @@ -150,7 +147,7 @@ impl Reaper { /// Register a process with this reaper. fn register(&'static self, child: &std::process::Child) -> io::Result<()> { - self.pipe.get().unwrap().register(child) + self.pipe.register(child) } } From 739703a3e67e0a9423134cc434a2c35ef8cdf518 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 10 Sep 2023 15:54:37 -0700 Subject: [PATCH 4/5] Fix the halting bug There was a panic where the reaper thread tried to access the OnceCell before it was initialized. This was fixed by using wait_blocking() instead of get().unwrap() Signed-off-by: John Nunley --- src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fe44c01..789f343 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,7 +112,7 @@ impl Reaper { REAPER.get_or_init_blocking(|| { thread::Builder::new() .name("async-process".to_string()) - .spawn(|| REAPER.get().unwrap().reap()) + .spawn(|| REAPER.wait_blocking().reap()) .expect("cannot spawn async-process thread"); Reaper { @@ -321,6 +321,8 @@ impl Child { /// The "async-process" thread waits for processes in the global list and cleans up the /// resources when they exit. fn new(cmd: &mut Command) -> io::Result { + // Make sure the reaper exists before we spawn the child process. + let reaper = Reaper::get(); let mut child = cmd.inner.spawn()?; // Convert sync I/O types into async I/O types. @@ -329,7 +331,7 @@ impl Child { let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr); // Register the child process in the global list. - Reaper::get().register(&child)?; + reaper.register(&child)?; Ok(Child { stdin, From 398bbbed1c331e7b0865ba98c65bef17cac161dc Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 10 Sep 2023 15:58:26 -0700 Subject: [PATCH 5/5] Fix Windows build error Signed-off-by: John Nunley --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 789f343..6cacc7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,7 +192,7 @@ cfg_if::cfg_if! { fn register(&self, child: &std::process::Child) -> io::Result<()> { // Called when a child exits. unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) { - Reaper::get().pipe.get().unwrap().sender.try_send(()).ok(); + Reaper::get().pipe.sender.try_send(()).ok(); } // Register this child process to invoke `callback` on exit.