From 2b7c0967a7362303946deb3d4ca2ae507af6c72d Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Sat, 24 Oct 2020 17:12:37 +0000 Subject: [PATCH] Add unix::pipe (#1356) Adds one new function: unix::pipe, which is a wrapper around the pipe(2) system call, and two new types: Sender and Receiver, wrappers around the file descriptors. This is a port of https://github.com/Thomasdezeeuw/mio-pipe, commit 8c3025edf128e90733e95327d88493887b93fcdd. --- Cargo.toml | 1 + ci/azure-test-stable.yml | 2 +- src/lib.rs | 30 ++- src/macros/mod.rs | 32 ++- src/sys/mod.rs | 6 +- src/sys/shell/mod.rs | 2 +- src/sys/shell/selector.rs | 2 +- src/sys/unix/mod.rs | 6 +- src/sys/unix/pipe.rs | 383 +++++++++++++++++++++++++++++ src/sys/unix/selector/epoll.rs | 2 +- src/sys/unix/selector/kqueue.rs | 2 +- src/sys/windows/afd.rs | 2 +- src/sys/windows/io_status_block.rs | 2 +- src/sys/windows/mod.rs | 6 +- src/sys/windows/selector.rs | 8 +- tests/unix_pipe.rs | 279 +++++++++++++++++++++ 16 files changed, 743 insertions(+), 22 deletions(-) create mode 100644 src/sys/unix/pipe.rs create mode 100644 tests/unix_pipe.rs diff --git a/Cargo.toml b/Cargo.toml index e1a2b20d3..dfbba5313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ include = [ default = [] os-poll = [] os-util = [] +pipe = ["os-poll"] tcp = [] udp = [] uds = [] diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index 06c412a56..7219a3585 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -29,7 +29,7 @@ jobs: - ${{ if eq(parameters.cmd, 'test') }}: - script: | cargo install cargo-hack - cargo hack check --feature-powerset --skip guide + cargo hack check --feature-powerset --skip guide,extra-docs displayName: Check feature powerset - script: cargo ${{ parameters.cmd }} --all-features diff --git a/src/lib.rs b/src/lib.rs index 6e98c8798..83f0debb9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,9 +69,11 @@ mod waker; pub mod event; -cfg_net! { +cfg_io_source! { mod io_source; +} +cfg_net! { pub mod net; } @@ -82,11 +84,27 @@ pub use poll::{Poll, Registry}; pub use token::Token; pub use waker::Waker; -#[cfg(all(unix, feature = "os-util"))] -#[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))] +#[cfg(all(unix, any(feature = "os-util", feature = "pipe")))] +#[cfg_attr( + docsrs, + doc(cfg(all(unix, any(feature = "os-util", feature = "pipe")))) +)] pub mod unix { //! Unix only extensions. + + #[cfg(feature = "os-util")] + #[cfg_attr(docsrs, doc(cfg(all(unix, feature = "os-util"))))] pub use crate::sys::SourceFd; + + cfg_pipe! { + pub mod pipe { + //! Unix pipe. + //! + //! See the [`new`] function for documentation. + + pub use crate::sys::pipe::{new, Receiver, Sender}; + } + } } #[cfg(all(windows, feature = "os-util"))] @@ -121,6 +139,12 @@ pub mod features { //! `os-util` enables additional OS specific facilities. Currently this //! means the `unix` module (with `SourceFd`) becomes available. //! + #![cfg_attr(feature = "pipe", doc = "## `pipe` (enabled)")] + #![cfg_attr(not(feature = "pipe"), doc = "## `pipe` (disabled)")] + //! + //! The `pipe` feature adds `unix::pipe`, and related types, a non-blocking + //! wrapper around the `pipe(2)` system call. + //! //! ## Network types //! //! Mio provide three features to enable network types: diff --git a/src/macros/mod.rs b/src/macros/mod.rs index 7db25795d..2275ed9fa 100644 --- a/src/macros/mod.rs +++ b/src/macros/mod.rs @@ -36,6 +36,18 @@ macro_rules! cfg_net { } } +/// One of the features enabled that needs `IoSource`. That is `tcp`, or `udp`, +/// or on Unix `uds` or `pipe`. +macro_rules! cfg_io_source { + ($($item:item)*) => { + $( + #[cfg(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds", feature = "pipe"))))] + #[cfg_attr(docsrs, doc(any(feature = "tcp", feature = "udp", all(unix, any(feature = "uds", feature = "pipe")))))] + $item + )* + } +} + /// One of the `tcp`, `udp` features enabled. #[cfg(windows)] macro_rules! cfg_net { @@ -82,13 +94,25 @@ macro_rules! cfg_uds { } } +/// Feature `pipe` enabled. +#[cfg(unix)] +macro_rules! cfg_pipe { + ($($item:item)*) => { + $( + #[cfg(feature = "pipe")] + #[cfg_attr(docsrs, doc(cfg(feature = "pipe")))] + $item + )* + } +} + /// Feature `os-util` enabled, or one of the features that need `os-util`. #[cfg(unix)] macro_rules! cfg_any_os_util { ($($item:item)*) => { $( - #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds"))))] + #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "uds", feature = "pipe"))))] $item )* } @@ -99,8 +123,8 @@ macro_rules! cfg_any_os_util { macro_rules! cfg_any_os_util { ($($item:item)*) => { $( - #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp"))))] + #[cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "os-util", feature = "tcp", feature = "udp", feature = "pipe"))))] $item )* } diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 885233331..08bd271b7 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -72,7 +72,11 @@ cfg_os_poll! { pub(crate) use self::unix::uds; } - cfg_net! { + cfg_pipe! { + pub(crate) use self::unix::pipe; + } + + cfg_io_source! { pub(crate) use self::unix::IoSourceState; } } diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 830379705..a63760a5a 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -23,7 +23,7 @@ cfg_uds! { pub(crate) mod uds; } -cfg_net! { +cfg_io_source! { use std::io; #[cfg(windows)] use std::os::windows::io::RawSocket; diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs index 5667628ba..69be37097 100644 --- a/src/sys/shell/selector.rs +++ b/src/sys/shell/selector.rs @@ -44,7 +44,7 @@ cfg_any_os_util! { } } -cfg_net! { +cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 96d7f4dc2..f045fb511 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -38,7 +38,7 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_net! { + cfg_io_source! { use std::io; // Both `kqueue` and `epoll` don't need to hold any user space state. @@ -59,6 +59,10 @@ cfg_os_poll! { } } } + + cfg_pipe! { + pub(crate) mod pipe; + } } cfg_not_os_poll! { diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs new file mode 100644 index 000000000..d838ebcc8 --- /dev/null +++ b/src/sys/unix/pipe.rs @@ -0,0 +1,383 @@ +//! Unix pipe. +//! +//! See the [`new`] function for documentation. + +use std::fs::File; +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::process::{ChildStderr, ChildStdin, ChildStdout}; + +use crate::io_source::IoSource; +use crate::{event, Interest, Registry, Token}; + +/// Create a new non-blocking Unix pipe. +/// +/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as +/// inter-process or thread communication channel. +/// +/// This channel may be created before forking the process and then one end used +/// in each process, e.g. the parent process has the sending end to send command +/// to the child process. +/// +/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html +/// +/// # Events +/// +/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive +/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is +/// written to the `Sender` the `Receiver` will receive an [readable event]. +/// +/// In addition to those events, events will also be generated if the other side +/// is dropped. To check if the `Sender` is dropped you'll need to check +/// [`is_read_closed`] on events for the `Receiver`, if it returns true the +/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it +/// returns true the `Receiver` was dropped. Also see the second example below. +/// +/// [`WRITABLE`]: Interest::WRITABLE +/// [writable events]: event::Event::is_writable +/// [`READABLE`]: Interest::READABLE +/// [readable event]: event::Event::is_readable +/// [`is_read_closed`]: event::Event::is_read_closed +/// [`is_write_closed`]: event::Event::is_write_closed +/// +/// # Deregistering +/// +/// Both `Sender` and `Receiver` will deregister themselves when dropped, +/// **iff** the file descriptors are not duplicated (via [`dup(2)`]). +/// +/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html +/// +/// # Examples +/// +/// Simple example that writes data into the sending end and read it from the +/// receiving end. +/// +/// ``` +/// use std::io::{self, Read, Write}; +/// +/// use mio::{Poll, Events, Interest, Token}; +/// use mio::unix::pipe; +/// +/// // Unique tokens for the two ends of the channel. +/// const PIPE_RECV: Token = Token(0); +/// const PIPE_SEND: Token = Token(1); +/// +/// # fn main() -> io::Result<()> { +/// // Create our `Poll` instance and the `Events` container. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// // Create a new pipe. +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// // Register both ends of the channel. +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// const MSG: &[u8; 11] = b"Hello world"; +/// +/// loop { +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_SEND => sender.write(MSG) +/// .and_then(|n| if n != MSG.len() { +/// // We'll consider a short write an error in this +/// // example. NOTE: we can't use `write_all` with +/// // non-blocking I/O. +/// Err(io::ErrorKind::WriteZero.into()) +/// } else { +/// Ok(()) +/// })?, +/// PIPE_RECV => { +/// let mut buf = [0; 11]; +/// let n = receiver.read(&mut buf)?; +/// println!("received: {:?}", &buf[0..n]); +/// assert_eq!(n, MSG.len()); +/// assert_eq!(&buf, &*MSG); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// } +/// # } +/// ``` +/// +/// Example that receives an event once the `Sender` is dropped. +/// +/// ``` +/// # use std::io; +/// # +/// # use mio::{Poll, Events, Interest, Token}; +/// # use mio::unix::pipe; +/// # +/// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); +/// # +/// # fn main() -> io::Result<()> { +/// // Same setup as in the example above. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// // Drop the sender. +/// drop(sender); +/// +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_RECV if event.is_read_closed() => { +/// // Detected that the sender was dropped. +/// println!("Sender dropped!"); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// # unreachable!(); +/// # } +/// ``` +pub fn new() -> io::Result<(Sender, Receiver)> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "ios", + target_os = "macos", + target_os = "solaris", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + // Safety: we just initialised the `fds` above. + let r = unsafe { Receiver::from_raw_fd(fds[0]) }; + let w = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((w, r)) +} + +/// Sending end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Sender { + inner: IoSource, +} + +impl Sender { + /// Set the `Sender` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Sender { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Sender { + fn from(stdin: ChildStdin) -> Sender { + // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. + unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } + } +} + +impl FromRawFd for Sender { + unsafe fn from_raw_fd(fd: RawFd) -> Sender { + Sender { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Sender { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +/// Receiving end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Receiver { + inner: IoSource, +} + +impl Receiver { + /// Set the `Receiver` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Receiver { + fn from(stdout: ChildStdout) -> Receiver { + // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From for Receiver { + fn from(stderr: ChildStderr) -> Receiver { + // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } + } +} + +impl FromRawFd for Receiver { + unsafe fn from_raw_fd(fd: RawFd) -> Receiver { + Receiver { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Receiver { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let value = nonblocking as libc::c_int; + if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs index b8a31a13f..76ee7f91a 100644 --- a/src/sys/unix/selector/epoll.rs +++ b/src/sys/unix/selector/epoll.rs @@ -106,7 +106,7 @@ impl Selector { } } -cfg_net! { +cfg_io_source! { impl Selector { #[cfg(debug_assertions)] pub fn id(&self) -> usize { diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs index fc5b9ed15..454f47d1c 100644 --- a/src/sys/unix/selector/kqueue.rs +++ b/src/sys/unix/selector/kqueue.rs @@ -303,7 +303,7 @@ fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result< Ok(()) } -cfg_net! { +cfg_io_source! { #[cfg(debug_assertions)] impl Selector { pub fn id(&self) -> usize { diff --git a/src/sys/windows/afd.rs b/src/sys/windows/afd.rs index 82c8e9ed7..b2e3b11a6 100644 --- a/src/sys/windows/afd.rs +++ b/src/sys/windows/afd.rs @@ -111,7 +111,7 @@ impl Afd { } } -cfg_net! { +cfg_io_source! { use miow::iocp::CompletionPort; use ntapi::ntioapi::FILE_OPEN; use ntapi::ntioapi::NtCreateFile; diff --git a/src/sys/windows/io_status_block.rs b/src/sys/windows/io_status_block.rs index db6729c84..9da5e7a90 100644 --- a/src/sys/windows/io_status_block.rs +++ b/src/sys/windows/io_status_block.rs @@ -4,7 +4,7 @@ use std::ops::{Deref, DerefMut}; pub struct IoStatusBlock(IO_STATUS_BLOCK); -cfg_net! { +cfg_io_source! { use ntapi::ntioapi::IO_STATUS_BLOCK_u; impl IoStatusBlock { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 7bba6dda2..25590c262 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -42,6 +42,10 @@ mod waker; pub(crate) use waker::Waker; cfg_net! { + mod net; +} + +cfg_io_source! { use std::io; use std::os::windows::io::RawSocket; use std::pin::Pin; @@ -49,8 +53,6 @@ cfg_net! { use crate::{poll, Interest, Registry, Token}; - mod net; - struct InternalState { selector: Arc, token: Token, diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 971558277..df2c3f0ef 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -47,7 +47,7 @@ impl AfdGroup { } } -cfg_net! { +cfg_io_source! { const POLL_GROUP__MAX_GROUP_SIZE: usize = 32; impl AfdGroup { @@ -256,7 +256,7 @@ impl SockState { } } -cfg_net! { +cfg_io_source! { impl SockState { fn new(raw_socket: RawSocket, afd: Arc) -> io::Result { Ok(SockState { @@ -380,7 +380,7 @@ impl Selector { } } -cfg_net! { +cfg_io_source! { use super::InternalState; use crate::Token; @@ -535,7 +535,7 @@ impl SelectorInner { } } -cfg_net! { +cfg_io_source! { use std::mem::size_of; use std::ptr::null_mut; use winapi::um::mswsock; diff --git a/tests/unix_pipe.rs b/tests/unix_pipe.rs new file mode 100644 index 000000000..7e757ccbb --- /dev/null +++ b/tests/unix_pipe.rs @@ -0,0 +1,279 @@ +#![cfg(all(unix, all(feature = "os-util", feature = "pipe")))] + +use std::io::{self, Read, Write}; +use std::process::{Command, Stdio}; +use std::sync::{Arc, Barrier}; +use std::thread; +use std::time::Duration; + +use mio::event::Event; +use mio::unix::pipe::{self, Receiver, Sender}; +use mio::{Events, Interest, Poll, Token}; + +const RECEIVER: Token = Token(0); +const SENDER: Token = Token(1); + +const DATA1: &[u8; 11] = b"Hello world"; + +#[test] +fn smoke() { + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(8); + + let (mut sender, mut receiver) = pipe::new().unwrap(); + + let mut buf = [0; 20]; + assert_would_block(receiver.read(&mut buf)); + + poll.registry() + .register(&mut receiver, RECEIVER, Interest::READABLE) + .unwrap(); + poll.registry() + .register(&mut sender, SENDER, Interest::WRITABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(SENDER, Interest::WRITABLE)], + ); + let n = sender.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(RECEIVER, Interest::READABLE)], + ); + let n = receiver.read(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], &*DATA1); +} + +#[test] +fn event_when_sender_is_dropped() { + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(8); + + let (mut sender, mut receiver) = pipe::new().unwrap(); + poll.registry() + .register(&mut receiver, RECEIVER, Interest::READABLE) + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_barrier = barrier.clone(); + + let handle = thread::spawn(move || { + let n = sender.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + thread_barrier.wait(); + + thread_barrier.wait(); + drop(sender); + thread_barrier.wait(); + }); + + barrier.wait(); // Wait for the write to complete. + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(RECEIVER, Interest::READABLE)], + ); + + barrier.wait(); // Unblock the thread. + barrier.wait(); // Wait until the sending end is dropped. + + expect_one_closed_event(&mut poll, &mut events, RECEIVER, true); + + handle.join().unwrap(); +} + +#[test] +fn event_when_receiver_is_dropped() { + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(8); + + let (mut sender, receiver) = pipe::new().unwrap(); + poll.registry() + .register(&mut sender, SENDER, Interest::WRITABLE) + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let thread_barrier = barrier.clone(); + + let handle = thread::spawn(move || { + thread_barrier.wait(); + drop(receiver); + thread_barrier.wait(); + }); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(SENDER, Interest::WRITABLE)], + ); + + barrier.wait(); // Unblock the thread. + barrier.wait(); // Wait until the receiving end is dropped. + + expect_one_closed_event(&mut poll, &mut events, SENDER, false); + + handle.join().unwrap(); +} + +#[test] +fn from_child_process_io() { + // `cat` simply echo everything that we write via standard in. + let mut child = Command::new("cat") + .env_clear() + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .expect("failed to start `cat` command"); + + let mut sender = Sender::from(child.stdin.take().unwrap()); + let mut receiver = Receiver::from(child.stdout.take().unwrap()); + + let mut poll = Poll::new().unwrap(); + let mut events = Events::with_capacity(8); + + poll.registry() + .register(&mut receiver, RECEIVER, Interest::READABLE) + .unwrap(); + poll.registry() + .register(&mut sender, SENDER, Interest::WRITABLE) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(SENDER, Interest::WRITABLE)], + ); + let n = sender.write(DATA1).unwrap(); + assert_eq!(n, DATA1.len()); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(RECEIVER, Interest::READABLE)], + ); + let mut buf = [0; 20]; + let n = receiver.read(&mut buf).unwrap(); + assert_eq!(n, DATA1.len()); + assert_eq!(&buf[..n], &*DATA1); + + drop(sender); + + expect_one_closed_event(&mut poll, &mut events, RECEIVER, true); + + child.wait().unwrap(); +} + +#[test] +fn nonblocking_child_process_io() { + // `cat` simply echo everything that we write via standard in. + let mut child = Command::new("cat") + .env_clear() + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .expect("failed to start `cat` command"); + + let sender = Sender::from(child.stdin.take().unwrap()); + let mut receiver = Receiver::from(child.stdout.take().unwrap()); + + receiver.set_nonblocking(true).unwrap(); + + let mut buf = [0; 20]; + assert_would_block(receiver.read(&mut buf)); + + drop(sender); + child.wait().unwrap(); +} + +/// An event that is expected to show up when `Poll` is polled, see +/// `expect_events`. +#[derive(Debug)] +pub struct ExpectEvent { + token: Token, + interests: Interest, +} + +impl ExpectEvent { + pub const fn new(token: Token, interests: Interest) -> ExpectEvent { + ExpectEvent { token, interests } + } + + fn matches(&self, event: &Event) -> bool { + event.token() == self.token && + // If we expect a readiness then also match on the event. + // In maths terms that is p -> q, which is the same as !p || q. + (!self.interests.is_readable() || event.is_readable()) && + (!self.interests.is_writable() || event.is_writable()) && + (!self.interests.is_aio() || event.is_aio()) && + (!self.interests.is_lio() || event.is_lio()) + } +} + +pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec) { + // In a lot of calls we expect more then one event, but it could be that + // poll returns the first event only in a single call. To be a bit more + // lenient we'll poll a couple of times. + for _ in 0..3 { + poll.poll(events, Some(Duration::from_millis(500))) + .expect("unable to poll"); + + for event in events.iter() { + let index = expected.iter().position(|expected| expected.matches(event)); + + if let Some(index) = index { + expected.swap_remove(index); + } else { + // Must accept sporadic events. + println!("got unexpected event: {:?}", event); + } + } + + if expected.is_empty() { + return; + } + } + + assert!( + expected.is_empty(), + "the following expected events were not found: {:?}", + expected + ); +} + +/// Assert that the provided result is an `io::Error` with kind `WouldBlock`. +pub fn assert_would_block(result: io::Result) { + match result { + Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => panic!("unexpected error result: {}", err), + } +} + +/// Expected a closed event. If `read` is true is checks for `is_read_closed`, +/// otherwise for `is_write_closed`. +pub fn expect_one_closed_event(poll: &mut Poll, events: &mut Events, token: Token, read: bool) { + poll.poll(events, Some(Duration::from_secs(1))).unwrap(); + let mut iter = events.iter(); + let event = iter.next().unwrap(); + assert_eq!(event.token(), token, "invalid token, event: {:#?}", event); + if read { + assert!( + event.is_read_closed(), + "expected closed or error, event: {:#?}", + event + ); + } else { + assert!( + event.is_write_closed(), + "expected closed or error, event: {:#?}", + event + ); + } + assert!(iter.next().is_none()); +}