Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 184 additions & 134 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,170 @@ mod sealed {
pub trait Sealed {}
}

/// An event delivered every time the SIGCHLD signal occurs.
static SIGCHLD: Event = Event::new();
/// The zombie process reaper.
///
/// This structure reaps zombie processes and emits the `SIGCHLD` signal.
struct Reaper {
/// An event delivered every time the SIGCHLD signal occurs.
sigchld: Event,

/// The list of zombie processes.
zombies: Mutex<Vec<std::process::Child>>,

/// The pipe that delivers signal notifications.
pipe: Pipe,
}

impl Reaper {
/// Get the singleton instance of the reaper.
fn get() -> &'static Self {
static REAPER: OnceCell<Reaper> = OnceCell::new();

REAPER.get_or_init_blocking(|| {
thread::Builder::new()
.name("async-process".to_string())
.spawn(|| REAPER.wait_blocking().reap())
.expect("cannot spawn async-process thread");

Reaper {
sigchld: Event::new(),
zombies: Mutex::new(Vec::new()),
pipe: Pipe::new().expect("cannot create SIGCHLD pipe"),
}
})
}

/// Reap zombie processes forever.
fn reap(&'static self) -> ! {
loop {
// Wait for the next SIGCHLD signal.
self.pipe.wait();

// Notify all listeners waiting on the SIGCHLD event.
self.sigchld.notify(std::usize::MAX);

// Reap zombie processes.
let mut zombies = self.zombies.lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
}

/// Register a process with this reaper.
fn register(&'static self, child: &std::process::Child) -> io::Result<()> {
self.pipe.register(child)
}
}

cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;

use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};

/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The sender channel for the SIGCHLD signal.
sender: mpsc::SyncSender<()>,

/// The receiver channel for the SIGCHLD signal.
receiver: Mutex<mpsc::Receiver<()>>,
}

impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
let (sender, receiver) = mpsc::sync_channel(1);
Ok(Pipe {
sender,
receiver: Mutex::new(receiver),
})
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
self.receiver.lock().unwrap().recv().ok();
}

/// Register a process object into this pipe.
fn register(&self, child: &std::process::Child) -> io::Result<()> {
// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
Reaper::get().pipe.sender.try_send(()).ok();
}

// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};

if ret == 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}
} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};

/// Waits for the next SIGCHLD signal.
struct Pipe {
/// The iterator over SIGCHLD signals.
signals: Signals,
}

impl Pipe {
/// Creates a new pipe.
fn new() -> io::Result<Pipe> {
Ok(Pipe {
signals: Signals::new(Some(Signal::Child))?,
})
}

/// Waits for the next SIGCHLD signal.
fn wait(&self) {
async_io::block_on((&self.signals).next());
}

/// Register a process object into this pipe.
fn register(&self, _child: &std::process::Child) -> io::Result<()> {
Ok(())
}
}

/// Wrap a file descriptor into a non-blocking I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}

/// A guard that can kill child processes, or push them into the zombie list.
struct ChildGuard {
Expand All @@ -106,6 +268,21 @@ impl ChildGuard {
}
}

// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = Reaper::get().zombies.lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}

/// A spawned child process.
///
/// The process can be in running or exited state. Use [`status()`][`Child::status()`] or
Expand Down Expand Up @@ -144,144 +321,17 @@ impl Child {
/// The "async-process" thread waits for processes in the global list and cleans up the
/// resources when they exit.
fn new(cmd: &mut Command) -> io::Result<Child> {
// Make sure the reaper exists before we spawn the child process.
let reaper = Reaper::get();
let mut child = cmd.inner.spawn()?;

// Convert sync I/O types into async I/O types.
let stdin = child.stdin.take().map(wrap).transpose()?.map(ChildStdin);
let stdout = child.stdout.take().map(wrap).transpose()?.map(ChildStdout);
let stderr = child.stderr.take().map(wrap).transpose()?.map(ChildStderr);

cfg_if::cfg_if! {
if #[cfg(windows)] {
use std::ffi::c_void;
use std::os::windows::io::AsRawHandle;
use std::sync::mpsc;

use windows_sys::Win32::{
Foundation::{BOOLEAN, HANDLE},
System::Threading::{
RegisterWaitForSingleObject, INFINITE, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
},
};

// This channel is used to simulate SIGCHLD on Windows.
fn callback_channel() -> (&'static mpsc::SyncSender<()>, &'static Mutex<mpsc::Receiver<()>>) {
static CALLBACK: OnceCell<(mpsc::SyncSender<()>, Mutex<mpsc::Receiver<()>>)> =
OnceCell::new();

let (s, r) = CALLBACK.get_or_init_blocking(|| {
let (s, r) = mpsc::sync_channel(1);
(s, Mutex::new(r))
});

(s, r)
}

// Called when a child exits.
unsafe extern "system" fn callback(_: *mut c_void, _: BOOLEAN) {
callback_channel().0.try_send(()).ok();
}

// Register this child process to invoke `callback` on exit.
let mut wait_object = 0;
let ret = unsafe {
RegisterWaitForSingleObject(
&mut wait_object,
child.as_raw_handle() as HANDLE,
Some(callback),
std::ptr::null_mut(),
INFINITE,
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
)
};
if ret == 0 {
return Err(io::Error::last_os_error());
}

// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
callback_channel().1.lock().unwrap().recv().ok();
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T>(io: T) -> io::Result<Unblock<T>> {
Ok(Unblock::new(io))
}

} else if #[cfg(unix)] {
use async_signal::{Signal, Signals};

static SIGNALS: OnceCell<Signals> = OnceCell::new();

// Make sure the signal handler is registered before interacting with the process.
SIGNALS.get_or_init_blocking(|| {
Signals::new(Some(Signal::Child))
.expect("Failed to register SIGCHLD handler")
});

// Waits for the next SIGCHLD signal.
fn wait_sigchld() {
async_io::block_on(
SIGNALS
.get()
.expect("Signals not registered")
.next()
);
}

// Wraps a sync I/O type into an async I/O type.
fn wrap<T: std::os::unix::io::AsRawFd>(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
}

static ZOMBIES: OnceCell<Mutex<Vec<std::process::Child>>> = OnceCell::new();

// Make sure the thread is started.
ZOMBIES.get_or_init_blocking(|| {
// Start a thread that handles SIGCHLD and notifies tasks when child processes exit.
thread::Builder::new()
.name("async-process".to_string())
.spawn(move || {
loop {
// Wait for the next SIGCHLD signal.
wait_sigchld();

// Notify all listeners waiting on the SIGCHLD event.
SIGCHLD.notify(std::usize::MAX);

// Reap zombie processes.
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
let mut i = 0;
while i < zombies.len() {
if let Ok(None) = zombies[i].try_wait() {
i += 1;
} else {
zombies.swap_remove(i);
}
}
}
})
.expect("cannot spawn async-process thread");

Mutex::new(Vec::new())
});

// When the last reference to the child process is dropped, push it into the zombie list.
impl Drop for ChildGuard {
fn drop(&mut self) {
if self.kill_on_drop {
self.get_mut().kill().ok();
}
if self.reap_on_drop {
let mut zombies = ZOMBIES.get().unwrap().lock().unwrap();
if let Ok(None) = self.get_mut().try_wait() {
zombies.push(self.inner.take().unwrap());
}
}
}
}
// Register the child process in the global list.
reaper.register(&child)?;

Ok(Child {
stdin,
Expand Down Expand Up @@ -381,7 +431,7 @@ impl Child {
let child = self.child.clone();

async move {
let listener = EventListener::new(&SIGCHLD);
let listener = EventListener::new(&Reaper::get().sigchld);
let mut listening = false;
futures_lite::pin!(listener);

Expand Down