Skip to content

Commit

Permalink
Implement signal handling for Unix
Browse files Browse the repository at this point in the history
This is done using the "self-pipe" trick. The Tokio-based driver waits
for readability on the read end of the pipe until the signal handler
writes a single byte to the write end. This allows for communicating
with the outside from within the primitive world of the signal handler.

The current implementation fulfills futures only once, with subsequent
polls returning `Ready`. This is done by enabling the signal's `caught`
bit. This saves having to check readability of the pipe again.

TODO items:

* Implement futures for Windows.

* Implement futures for reasonably reusable signals.

* Check whether a signal has already been registered. Alternatively, we
  can instead allow for registering multiple signal handlers by queuing
  their wakers. Although this seems rather odd and perhaps unnecessary.
  • Loading branch information
nvzqz committed Feb 5, 2020
1 parent 2709e06 commit 6488093
Show file tree
Hide file tree
Showing 15 changed files with 1,081 additions and 384 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ categories = ["asynchronous", "concurrency"]
include = ["Cargo.toml", "src", "README*", "CHANGELOG*", "LICENSE*"]

[features]
default = ["once"]
once = []

[dependencies]
futures = { version = "0.3.1", optional = true }
tokio = { version = "0.2.11", default-features = false, features = ["io-driver"] }

[target.'cfg(unix)'.dependencies]
cfg-if = "0.1.10"
libc = "0.2.66"
mio = "0.6.15"

[target.'cfg(windows)'.dependencies]
winapi = "0.3.8"
Expand Down
49 changes: 7 additions & 42 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,15 @@
#![deny(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]

use std::task::{Context, Poll};
// mod ctrlc;

#[macro_use]
mod macros;
#[cfg(any(docsrs, feature = "once"))]
#[cfg_attr(docsrs, doc(cfg(feature = "once")))]
pub mod once;

mod util;

cfg_unix! {
pub mod unix;
}
#[cfg(any(unix, docsrs))]
#[cfg_attr(docsrs, doc(cfg(unix)))]
pub mod unix;

#[cfg(windows)]
mod windows;

/// A future for `CTRL` + `C` signals.
#[derive(Debug)]
pub struct CtrlC {
_private: (),
}

impl CtrlC {
/// Receive the next signal notification event.
#[inline]
pub async fn recv(&mut self) -> Option<()> {
util::poll_fn(|cx| self.poll_recv(cx)).await
}

/// Poll to receive the next signal notification event, outside of an
/// `async` context.
pub fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
unimplemented!()
}
}

cfg_futures! {
impl futures::stream::Stream for CtrlC {
type Item = ();

#[inline]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<()>> {
self.poll_recv(cx)
}
}
}
19 changes: 0 additions & 19 deletions src/macros.rs

This file was deleted.

90 changes: 90 additions & 0 deletions src/once/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//! Futures that are fulfilled once.

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[cfg(any(unix, docsrs))]
#[cfg_attr(docsrs, doc(cfg(unix)))]
pub mod unix;

#[cfg(unix)]
type CtrlCOnceInner = unix::SignalSetOnce;

/// A future that is fulfilled once upon receiving `CTRL` + `C`.
///
/// After an instance is fulfilled, all subsequent polls will return `Ready`.
#[derive(Debug)]
pub struct CtrlCOnce(CtrlCOnceInner);

impl Future for CtrlCOnce {
type Output = ();

#[inline]
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}

impl CtrlCOnce {
/// Registers the `CTRL` + `C` handler.
#[inline]
pub fn register() -> Result<Self, RegisterCtrlCOnceError> {
// Register via `Signal` instead of `SignalSet` since it's slightly more
// efficient.
#[cfg(unix)]
let inner = unix::SignalSetOnce::from(
crate::unix::Signal::Interrupt.register_once()?,
);

Ok(Self(inner))
}

/// Registers the handler for all signals that would otherwise terminate.
///
/// # Unix Behavior
///
/// On Unix-like systems, this corresponds to: [`Alarm`], [`Hangup`],
/// [`Interrupt`], [`Pipe`], [`Quit`], [`Terminate`], [`UserDefined1`], and
/// [`UserDefined2`].
///
/// [`Alarm`]: ../unix/enum.Signal.html#variant.Alarm
/// [`Hangup`]: ../unix/enum.Signal.html#variant.Hangup
/// [`Interrupt`]: ../unix/enum.Signal.html#variant.Interrupt
/// [`Pipe`]: ../unix/enum.Signal.html#variant.Pipe
/// [`Quit`]: ../unix/enum.Signal.html#variant.Quit
/// [`Terminate`]: ../unix/enum.Signal.html#variant.Terminate
/// [`UserDefined1`]: ../unix/enum.Signal.html#variant.UserDefined1
/// [`UserDefined2`]: ../unix/enum.Signal.html#variant.UserDefined2
#[inline]
pub fn register_termination() -> Result<Self, RegisterCtrlCOnceError> {
#[cfg(unix)]
let inner = crate::unix::SignalSet::new()
.termination_set()
.register_once()?;

Ok(Self(inner))
}
}

#[cfg(unix)]
type RegisterCtrlCOnceErrorInner = unix::RegisterOnceError;

/// An error returned when registering a [`Signal`] or [`SignalSet`] fails.
///
/// [`Signal`]: ../../unix/enum.Signal.html
/// [`SignalSet`]: ../../unix/struct.SignalSet.html
#[derive(Debug)]
pub struct RegisterCtrlCOnceError(RegisterCtrlCOnceErrorInner);

impl From<RegisterCtrlCOnceErrorInner> for RegisterCtrlCOnceError {
#[inline]
fn from(error: RegisterCtrlCOnceErrorInner) -> Self {
Self(error)
}
}
113 changes: 113 additions & 0 deletions src/once/unix/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! Unix-specific functionality.

use std::{
io, mem, ptr,
sync::atomic::Ordering,
task::{Context, Poll},
};
use tokio::io::PollEvented;

use crate::unix::{pipe, Signal, SignalSet};

mod signal;
mod signal_set;
mod table;

pub use {signal::SignalOnce, signal_set::SignalSetOnce};

/// The event driver for when the pipe can be read.
#[derive(Debug)]
struct Driver(PollEvented<pipe::Reader>);

impl Driver {
pub fn new(reader: pipe::Reader) -> io::Result<Self> {
Ok(Self(PollEvented::new(reader)?))
}

pub fn poll(&self, cx: &mut Context) -> Poll<()> {
match self.0.poll_read_ready(cx, mio::Ready::readable()) {
Poll::Ready(Ok(_)) => Poll::Ready(()),
Poll::Ready(Err(error)) => panic!("Error on self-pipe: {}", error),
Poll::Pending => Poll::Pending,
}
}
}

/// An error returned when registering a [`Signal`] or [`SignalSet`] fails.
///
/// [`Signal`]: ../../unix/enum.Signal.html
/// [`SignalSet`]: ../../unix/struct.SignalSet.html
#[derive(Debug)]
pub enum RegisterOnceError {
/// Signals were already registered.
Registered(SignalSet),
/// An I/O error.
Io(io::Error),
}

impl From<io::Error> for RegisterOnceError {
#[inline]
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}

fn register_signal(signal: Signal) -> io::Result<RegisteredSignal> {
extern "C" fn signal_handler(signal: libc::c_int) {
if let Some(signal) = Signal::from_raw(signal) {
let table = table::Table::global();

// Set the flag before waking up the reading end.
table.caught.enable(signal, Ordering::SeqCst);
table.entry(signal).load_writer(Ordering::SeqCst).wake();
}
}

let raw_signal = signal.into_raw();

// A custom `sigaction` union type is used because:
//
// 1. The `sa_handler` field is used regardless of platform, since `libc`
// specifies some having only `sa_sigaction` or `sa_handler`. This is a
// restriction based on Rust not having had unions at the time.
//
// 2. The union allows for ensuring the correct offset for the `sa_flags`
// field and overall size/alignment of the type.
let new_action = {
#[allow(non_camel_case_types)]
union sigaction {
sa_handler: Option<extern "C" fn(signal: libc::c_int)>,
libc: libc::sigaction,
}

unsafe {
let mut action: sigaction = mem::zeroed();
action.sa_handler = Some(signal_handler);
action.libc.sa_flags = libc::SA_RESTART | libc::SA_NOCLDSTOP;
action.libc
}
};

let mut old_action: libc::sigaction = unsafe { mem::zeroed() };

match unsafe { libc::sigaction(raw_signal, &new_action, &mut old_action) } {
0 => Ok(RegisteredSignal {
raw_signal,
old_action,
}),
_ => Err(io::Error::last_os_error()),
}
}

struct RegisteredSignal {
pub raw_signal: libc::c_int,
pub old_action: libc::sigaction,
}

impl RegisteredSignal {
pub fn reset(&self) {
unsafe {
libc::sigaction(self.raw_signal, &self.old_action, ptr::null_mut());
}
}
}
71 changes: 71 additions & 0 deletions src/once/unix/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{
future::Future,
pin::Pin,
sync::atomic::Ordering,
task::{Context, Poll},
};

use super::{table::Table, Driver, RegisterOnceError};
use crate::unix::{pipe, Signal};

/// A future that is fulfilled once upon receiving a [`Signal`].
///
/// After an instance is fulfilled, all subsequent polls will return [`Ready`].
///
/// [`Signal`]: ../../unix/enum.Signal.html
///
/// [`Ready`]: https://doc.rust-lang.org/std/task/enum.Poll.html#variant.Ready
#[derive(Debug)]
pub struct SignalOnce {
pub(super) signal: Signal,
pub(super) driver: Driver,
}

impl Future for SignalOnce {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let table = Table::global();

if table.caught.contains(self.signal, Ordering::SeqCst) {
return Poll::Ready(());
}

self.driver.poll(cx)
}
}

impl SignalOnce {
/// Registers a handler for `signal` that will only be fulfilled once.
pub fn register(signal: Signal) -> Result<Self, RegisterOnceError> {
// TODO: Handle `signal` already being registered.

let (reader, writer) = pipe::pipe()?;

let close_pipe = || unsafe {
libc::close(reader.0);
libc::close(writer.0);
};

let driver = match Driver::new(reader) {
Ok(d) => d,
Err(error) => {
close_pipe();
return Err(error.into());
}
};

Table::global()
.entry(signal)
.writer_fd()
.store(writer.0, Ordering::SeqCst);

match super::register_signal(signal) {
Ok(_) => Ok(Self { signal, driver }),
Err(error) => {
close_pipe();
Err(error.into())
}
}
}
}

0 comments on commit 6488093

Please sign in to comment.