diff --git a/tokio/src/net/unix/mod.rs b/tokio/src/net/unix/mod.rs index 97b632744dc..a49b70af34a 100644 --- a/tokio/src/net/unix/mod.rs +++ b/tokio/src/net/unix/mod.rs @@ -1,4 +1,4 @@ -//! Unix domain socket utility types. +//! Unix specific network types. // This module does not currently provide any public API, but it was // unintentionally defined as a public module. Hide it from the documentation // instead of changing it to a private module to avoid breakage. @@ -22,6 +22,8 @@ pub(crate) use stream::UnixStream; mod ucred; pub use ucred::UCred; +pub mod pipe; + /// A type representing process and process group IDs. #[allow(non_camel_case_types)] pub type uid_t = u32; diff --git a/tokio/src/net/unix/pipe.rs b/tokio/src/net/unix/pipe.rs new file mode 100644 index 00000000000..0775717b0c6 --- /dev/null +++ b/tokio/src/net/unix/pipe.rs @@ -0,0 +1,1206 @@ +//! Unix pipe types. + +use crate::io::interest::Interest; +use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; + +use mio::unix::pipe as mio_pipe; +use std::fs::File; +use std::io::{self, Read, Write}; +use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + use bytes::BufMut; +} + +/// Options and flags which can be used to configure how a FIFO file is opened. +/// +/// This builder allows configuring how to create a pipe end from a FIFO file. +/// Generally speaking, when using `OpenOptions`, you'll first call [`new`], +/// then chain calls to methods to set each option, then call either +/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you +/// are trying to open. This will give you a [`io::Result`][result] with a pipe +/// end inside that you can further operate on. +/// +/// [`new`]: OpenOptions::new +/// [`open_receiver`]: OpenOptions::open_receiver +/// [`open_sender`]: OpenOptions::open_sender +/// [result]: std::io::Result +/// +/// # Examples +/// +/// Opening a pair of pipe ends from a FIFO file: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// # use std::error::Error; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: +/// +/// ```ignore +/// use tokio::net::unix::pipe; +/// use nix::{unistd::mkfifo, sys::stat::Mode}; +/// # use std::error::Error; +/// +/// // Our program has exclusive access to this path. +/// const FIFO_NAME: &str = "path/to/a/new/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; +/// let tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .unchecked(true) +/// .open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone, Debug)] +pub struct OpenOptions { + #[cfg(target_os = "linux")] + read_write: bool, + unchecked: bool, +} + +impl OpenOptions { + /// Creates a blank new set of options ready for configuration. + /// + /// All options are initially set to `false`. + pub fn new() -> OpenOptions { + OpenOptions { + #[cfg(target_os = "linux")] + read_write: false, + unchecked: false, + } + } + + /// Sets the option for read-write access. + /// + /// This option, when true, will indicate that a FIFO file will be opened + /// in read-write access mode. This operation is not defined by the POSIX + /// standard and is only guaranteed to work on Linux. + /// + /// # Examples + /// + /// Opening a [`Sender`] even if there are no open reading ends: + /// + /// ```ignore + /// use tokio::net::unix::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_sender("path/to/a/fifo"); + /// ``` + /// + /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not + /// fail with [`UnexpectedEof`] during reading if all writing ends of the + /// pipe close the FIFO file. + /// + /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof + /// + /// ```ignore + /// use tokio::net::unix::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_receiver("path/to/a/fifo"); + /// ``` + #[cfg(target_os = "linux")] + #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))] + pub fn read_write(&mut self, value: bool) -> &mut Self { + self.read_write = value; + self + } + + /// Sets the option to skip the check for FIFO file type. + /// + /// By default, [`open_receiver`] and [`open_sender`] functions will check + /// if the opened file is a FIFO file. Set this option to `true` if you are + /// sure the file is a FIFO file. + /// + /// [`open_receiver`]: OpenOptions::open_receiver + /// [`open_sender`]: OpenOptions::open_sender + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use nix::{unistd::mkfifo, sys::stat::Mode}; + /// # use std::error::Error; + /// + /// // Our program has exclusive access to this path. + /// const FIFO_NAME: &str = "path/to/a/new/fifo"; + /// + /// # async fn dox() -> Result<(), Box> { + /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; + /// let rx = pipe::OpenOptions::new() + /// .unchecked(true) + /// .open_receiver(FIFO_NAME)?; + /// # Ok(()) + /// # } + /// ``` + pub fn unchecked(&mut self, value: bool) -> &mut Self { + self.unchecked = value; + self + } + + /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for reading. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. + /// This function may also fail with other standard OS errors. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn open_receiver>(&self, path: P) -> io::Result { + let file = self.open(path.as_ref(), PipeEnd::Receiver)?; + Receiver::from_file_unchecked(file) + } + + /// Creates a [`Sender`] from a FIFO file with the options specified by `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for writing. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. + /// If the file is not opened in read-write access mode and the file is not + /// currently open for reading, this function will fail with `ENXIO`. + /// This function may also fail with other standard OS errors. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn open_sender>(&self, path: P) -> io::Result { + let file = self.open(path.as_ref(), PipeEnd::Sender)?; + Sender::from_file_unchecked(file) + } + + fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result { + let mut options = std::fs::OpenOptions::new(); + options + .read(pipe_end == PipeEnd::Receiver) + .write(pipe_end == PipeEnd::Sender) + .custom_flags(libc::O_NONBLOCK); + + #[cfg(target_os = "linux")] + if self.read_write { + options.read(true).write(true); + } + + let file = options.open(path)?; + + if !self.unchecked && !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + Ok(file) + } +} + +impl Default for OpenOptions { + fn default() -> OpenOptions { + OpenOptions::new() + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +enum PipeEnd { + Sender, + Receiver, +} + +/// Writing end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. +/// +/// Opening a named pipe for writing involves a few steps. +/// Call to [`OpenOptions::open_sender`] might fail with an error indicating +/// different things: +/// +/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. +/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. +/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. +/// Sleep for a while and try again. +/// * Other OS errors not specific to opening FIFO files. +/// +/// Opening a `Sender` from a FIFO file should look like this: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::time::{self, Duration}; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// // Wait for a reader to open the file. +/// let tx = loop { +/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { +/// Ok(tx) => break tx, +/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}, +/// Err(e) => return Err(e.into()), +/// } +/// +/// time::sleep(Duration::from_millis(50)).await; +/// }; +/// # Ok(()) +/// # } +/// ``` +/// +/// On Linux, it is possible to create a `Sender` without waiting in a sleeping +/// loop. This is done by opening a named pipe in read-write access mode with +/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold +/// both a writing end and a reading end, and the latter allows to open a FIFO +/// without [`ENXIO`] error since the pipe is open for reading as well. +/// +/// `Sender` cannot be used to read from a pipe, so in practice the read access +/// is only used when a FIFO is opened. However, using a `Sender` in read-write +/// mode **may lead to lost data**, because written data will be dropped by the +/// system as soon as all pipe ends are closed. To avoid lost data you have to +/// make sure that a reading end has been opened before dropping a `Sender`. +/// +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ```ignore +/// use tokio::io::AsyncWriteExt; +/// use tokio::net::unix::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// let mut tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_sender(FIFO_NAME)?; +/// +/// // Asynchronously write to the pipe before a reader. +/// tx.write_all(b"hello world").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html +#[derive(Debug)] +pub struct Sender { + io: PollEvented, +} + +impl Sender { + fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result { + let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?; + Ok(Sender { io }) + } + + /// Creates a new `Sender` from a [`File`]. + /// + /// This function is intended to construct a pipe from a [`File`] representing + /// a special FIFO file. It will check if the file is a pipe and has write access, + /// set it in non-blocking mode and perform the conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it + /// does not have write access. Also fails with any standard OS error if it occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file(mut file: File) -> io::Result { + if !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + let flags = get_file_flags(&file)?; + if has_write_access(flags) { + set_nonblocking(&mut file, flags)?; + Sender::from_file_unchecked(file) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "not in O_WRONLY or O_RDWR access mode", + )) + } + } + + /// Creates a new `Sender` from a [`File`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from a File representing + /// a special FIFO file. The conversion assumes nothing about the underlying + /// file; it is left up to the user to make sure it is opened with write access, + /// represents a pipe and is set in non-blocking mode. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::fs::OpenOptions; + /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; + /// # use std::error::Error; + /// + /// const FIFO_NAME: &str = "path/to/a/fifo"; + /// + /// # async fn dox() -> Result<(), Box> { + /// let file = OpenOptions::new() + /// .write(true) + /// .custom_flags(libc::O_NONBLOCK) + /// .open(FIFO_NAME)?; + /// if file.metadata()?.file_type().is_fifo() { + /// let tx = pipe::Sender::from_file_unchecked(file)?; + /// /* use the Sender */ + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file_unchecked(file: File) -> io::Result { + let raw_fd = file.into_raw_fd(); + let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; + Sender::from_mio(mio_tx) + } + + /// Waits for any of the requested ready states. + /// + /// This function can be used instead of [`writable()`] to check the returned + /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events. + /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// [`writable()`]: Self::writable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Waits for the pipe to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with [`try_write()`]. + /// + /// [`try_write()`]: Self::try_write + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Polls for write readiness. + /// + /// If the pipe is not currently ready for writing, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for writing, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`writable`]: Self::writable + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Tries to write a buffer to the pipe, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. If the length of `buf` is not + /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the + /// write is guaranteed to be atomic, i.e. either the entire content of + /// `buf` will be written or this method will fail with `WouldBlock`. There + /// is no such guarantee if `buf` is larger than `PIPE_BUF`. + /// + /// This function is usually paired with [`writable`]. + /// + /// [`writable`]: Self::writable + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + + /// Tries to write several buffers to the pipe, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// If the total length of buffers is not greater than `PIPE_BUF` (an OS + /// constant, 4096 under Linux), then the write is guaranteed to be atomic, + /// i.e. either the entire contents of buffers will be written or this + /// method will fail with `WouldBlock`. There is no such guarantee if the + /// total length of buffers is greater than `PIPE_BUF`. + /// + /// This function is usually paired with [`writable`]. + /// + /// [`try_write()`]: Self::try_write() + /// [`writable`]: Self::writable + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } +} + +impl AsyncWrite for Sender { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.io.poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + self.io.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + true + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +/// Reading end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. +/// +/// # Examples +/// +/// Receiving messages from a named pipe in a loop: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::io::{self, AsyncReadExt}; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// loop { +/// let mut msg = vec![0; 256]; +/// match rx.read_exact(&mut msg).await { +/// Ok(_) => { +/// /* handle the message */ +/// } +/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { +/// // Writing end has been closed, we should reopen the pipe. +/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// } +/// Err(e) => return Err(e.into()), +/// } +/// } +/// # } +/// ``` +/// +/// On Linux, you can use a `Receiver` in read-write access mode to implement +/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only +/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` +/// when the writing end is closed. This way, a `Receiver` can asynchronously +/// wait for the next writer to open the pipe. +/// +/// You should not use functions waiting for EOF such as [`read_to_end`] with +/// a `Receiver` in read-write access mode, since it **may wait forever**. +/// `Receiver` in this mode also holds an open writing end, which prevents +/// receiving EOF. +/// +/// To set the read-write access mode you can use `OpenOptions::read_write`. +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ```ignore +/// use tokio::net::unix::pipe; +/// use tokio::io::AsyncReadExt; +/// # use std::error::Error; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box> { +/// let mut rx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_receiver(FIFO_NAME)?; +/// loop { +/// let mut msg = vec![0; 256]; +/// rx.read_exact(&mut msg).await?; +/// /* handle the message */ +/// } +/// # } +/// ``` +/// +/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end +#[derive(Debug)] +pub struct Receiver { + io: PollEvented, +} + +impl Receiver { + fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result { + let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?; + Ok(Receiver { io }) + } + + /// Creates a new `Receiver` from a [`File`]. + /// + /// This function is intended to construct a pipe from a [`File`] representing + /// a special FIFO file. It will check if the file is a pipe and has read access, + /// set it in non-blocking mode and perform the conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it + /// does not have read access. Also fails with any standard OS error if it occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file(mut file: File) -> io::Result { + if !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + let flags = get_file_flags(&file)?; + if has_read_access(flags) { + set_nonblocking(&mut file, flags)?; + Receiver::from_file_unchecked(file) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "not in O_RDONLY or O_RDWR access mode", + )) + } + } + + /// Creates a new `Receiver` from a [`File`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from a File representing + /// a special FIFO file. The conversion assumes nothing about the underlying + /// file; it is left up to the user to make sure it is opened with read access, + /// represents a pipe and is set in non-blocking mode. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::fs::OpenOptions; + /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; + /// # use std::error::Error; + /// + /// const FIFO_NAME: &str = "path/to/a/fifo"; + /// + /// # async fn dox() -> Result<(), Box> { + /// let file = OpenOptions::new() + /// .read(true) + /// .custom_flags(libc::O_NONBLOCK) + /// .open(FIFO_NAME)?; + /// if file.metadata()?.file_type().is_fifo() { + /// let rx = pipe::Receiver::from_file_unchecked(file)?; + /// /* use the Receiver */ + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file_unchecked(file: File) -> io::Result { + let raw_fd = file.into_raw_fd(); + let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; + Receiver::from_mio(mio_rx) + } + + /// Waits for any of the requested ready states. + /// + /// This function can be used instead of [`readable()`] to check the returned + /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events. + /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// [`readable()`]: Self::readable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Waits for the pipe to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with [`try_read()`]. + /// + /// [`try_read()`]: Self::try_read() + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Polls for read readiness. + /// + /// If the pipe is not currently ready for reading, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for reading, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`readable`]: Self::readable + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + + /// Tries to read data from the pipe into the provided buffer, returning how + /// many bytes were read. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually [`readable()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. The pipe's writing end is closed and will no longer write data. + /// 2. The specified buffer was 0 bytes in length. + /// + /// If the pipe is not ready to read data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Tries to read data from the pipe into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's writing end is + /// closed and will no longer write data. If the pipe is not ready to read + /// data `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// io::IoSliceMut::new(&mut buf_a), + /// io::IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + + cfg_io_util! { + /// Tries to read data from the pipe into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable + /// [`ready()`]: Self::ready + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's writing end is + /// closed and will no longer write data. If the pipe is not ready to read + /// data `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf(&self, buf: &mut B) -> io::Result { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit] as *mut [u8]) }; + + // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, + // which correctly handles reads into uninitialized memory. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } +} + +impl AsyncRead for Receiver { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, + // which correctly handles reads into uninitialized memory. + unsafe { self.io.poll_read(cx, buf) } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +/// Checks if file is a FIFO +fn is_fifo(file: &File) -> io::Result { + Ok(file.metadata()?.file_type().is_fifo()) +} + +/// Gets file descriptor's flags by fcntl. +fn get_file_flags(file: &File) -> io::Result { + let fd = file.as_raw_fd(); + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + if flags < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(flags) + } +} + +/// Checks for O_RDONLY or O_RDWR access mode. +fn has_read_access(flags: libc::c_int) -> bool { + let mode = flags & libc::O_ACCMODE; + mode == libc::O_RDONLY || mode == libc::O_RDWR +} + +/// Checks for O_WRONLY or O_RDWR access mode. +fn has_write_access(flags: libc::c_int) -> bool { + let mode = flags & libc::O_ACCMODE; + mode == libc::O_WRONLY || mode == libc::O_RDWR +} + +/// Sets file's flags with O_NONBLOCK by fcntl. +fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { + let fd = file.as_raw_fd(); + + let flags = current_flags | libc::O_NONBLOCK; + + if flags != current_flags { + let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; + if ret < 0 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index e9c4040c0ca..0c2c34a0796 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -263,6 +263,19 @@ mod unix_datagram { async_assert_fn!(UnixStream::writable(_): Send & Sync & !Unpin); } +#[cfg(unix)] +mod unix_pipe { + use super::*; + use tokio::net::unix::pipe::*; + assert_value!(OpenOptions: Send & Sync & Unpin); + assert_value!(Receiver: Send & Sync & Unpin); + assert_value!(Sender: Send & Sync & Unpin); + async_assert_fn!(Receiver::readable(_): Send & Sync & !Unpin); + async_assert_fn!(Receiver::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(Sender::ready(_, tokio::io::Interest): Send & Sync & !Unpin); + async_assert_fn!(Sender::writable(_): Send & Sync & !Unpin); +} + #[cfg(windows)] mod windows_named_pipe { use super::*; diff --git a/tokio/tests/net_unix_pipe.rs b/tokio/tests/net_unix_pipe.rs new file mode 100644 index 00000000000..c96d6e70fbd --- /dev/null +++ b/tokio/tests/net_unix_pipe.rs @@ -0,0 +1,429 @@ +#![cfg(feature = "full")] +#![cfg(unix)] + +use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; +use tokio::net::unix::pipe; +use tokio_test::task; +use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok}; + +use std::fs::File; +use std::io; +use std::os::unix::fs::OpenOptionsExt; +use std::os::unix::io::AsRawFd; +use std::path::{Path, PathBuf}; + +/// Helper struct which will clean up temporary files once dropped. +struct TempFifo { + path: PathBuf, + _dir: tempfile::TempDir, +} + +impl TempFifo { + fn new(name: &str) -> io::Result { + let dir = tempfile::Builder::new() + .prefix("tokio-fifo-tests") + .tempdir()?; + let path = dir.path().join(name); + nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?; + + Ok(TempFifo { path, _dir: dir }) + } +} + +impl AsRef for TempFifo { + fn as_ref(&self) -> &Path { + self.path.as_ref() + } +} + +#[tokio::test] +async fn fifo_simple_send() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + let fifo = TempFifo::new("simple_send")?; + + // Create a reading task which should wait for data from the pipe. + let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let mut read_fut = task::spawn(async move { + let mut buf = vec![0; DATA.len()]; + reader.read_exact(&mut buf).await?; + Ok::<_, io::Error>(buf) + }); + assert_pending!(read_fut.poll()); + + let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?; + writer.write_all(DATA).await?; + + // Let the IO driver poll events for the reader. + while !read_fut.is_woken() { + tokio::task::yield_now().await; + } + + // Reading task should be ready now. + let read_data = assert_ready_ok!(read_fut.poll()); + assert_eq!(&read_data, DATA); + + Ok(()) +} + +#[tokio::test] +#[cfg(target_os = "linux")] +async fn fifo_simple_send_sender_first() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + // Create a new fifo file with *no reading ends open*. + let fifo = TempFifo::new("simple_send_sender_first")?; + + // Simple `open_sender` should fail with ENXIO (no such device or address). + let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo)); + assert_eq!(err.raw_os_error(), Some(libc::ENXIO)); + + // `open_sender` in read-write mode should succeed and the pipe should be ready to write. + let mut writer = pipe::OpenOptions::new() + .read_write(true) + .open_sender(&fifo)?; + writer.write_all(DATA).await?; + + // Read the written data and validate. + let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let mut read_data = vec![0; DATA.len()]; + reader.read_exact(&mut read_data).await?; + assert_eq!(&read_data, DATA); + + Ok(()) +} + +// Opens a FIFO file, write and *close the writer*. +async fn write_and_close(path: impl AsRef, msg: &[u8]) -> io::Result<()> { + let mut writer = pipe::OpenOptions::new().open_sender(path)?; + writer.write_all(msg).await?; + drop(writer); // Explicit drop. + Ok(()) +} + +/// Checks EOF behavior with single reader and writers sequentially opening +/// and closing a FIFO. +#[tokio::test] +async fn fifo_multiple_writes() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + let fifo = TempFifo::new("fifo_multiple_writes")?; + + let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + + write_and_close(&fifo, DATA).await?; + let ev = reader.ready(Interest::READABLE).await?; + assert!(ev.is_readable()); + let mut read_data = vec![0; DATA.len()]; + assert_ok!(reader.read_exact(&mut read_data).await); + + // Check that reader hits EOF. + let err = assert_err!(reader.read_exact(&mut read_data).await); + assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof); + + // Write more data and read again. + write_and_close(&fifo, DATA).await?; + assert_ok!(reader.read_exact(&mut read_data).await); + + Ok(()) +} + +/// Checks behavior of a resilient reader (Receiver in O_RDWR access mode) +/// with writers sequentially opening and closing a FIFO. +#[tokio::test] +#[cfg(target_os = "linux")] +async fn fifo_resilient_reader() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + let fifo = TempFifo::new("fifo_resilient_reader")?; + + // Open reader in read-write access mode. + let mut reader = pipe::OpenOptions::new() + .read_write(true) + .open_receiver(&fifo)?; + + write_and_close(&fifo, DATA).await?; + let ev = reader.ready(Interest::READABLE).await?; + let mut read_data = vec![0; DATA.len()]; + reader.read_exact(&mut read_data).await?; + + // Check that reader didn't hit EOF. + assert!(!ev.is_read_closed()); + + // Resilient reader can asynchronously wait for the next writer. + let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data)); + assert_pending!(second_read_fut.poll()); + + // Write more data and read again. + write_and_close(&fifo, DATA).await?; + assert_ok!(second_read_fut.await); + + Ok(()) +} + +#[tokio::test] +async fn open_detects_not_a_fifo() -> io::Result<()> { + let dir = tempfile::Builder::new() + .prefix("tokio-fifo-tests") + .tempdir() + .unwrap(); + let path = dir.path().join("not_a_fifo"); + + // Create an ordinary file. + File::create(&path)?; + + // Check if Sender detects invalid file type. + let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + // Check if Receiver detects invalid file type. + let err = assert_err!(pipe::OpenOptions::new().open_sender(&path)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + Ok(()) +} + +#[tokio::test] +async fn from_file() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + let fifo = TempFifo::new("from_file")?; + + // Construct a Receiver from a File. + let file = std::fs::OpenOptions::new() + .read(true) + .custom_flags(libc::O_NONBLOCK) + .open(&fifo)?; + let mut reader = pipe::Receiver::from_file(file)?; + + // Construct a Sender from a File. + let file = std::fs::OpenOptions::new() + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(&fifo)?; + let mut writer = pipe::Sender::from_file(file)?; + + // Write and read some data to test async. + let mut read_fut = task::spawn(async move { + let mut buf = vec![0; DATA.len()]; + reader.read_exact(&mut buf).await?; + Ok::<_, io::Error>(buf) + }); + assert_pending!(read_fut.poll()); + + writer.write_all(DATA).await?; + + let read_data = assert_ok!(read_fut.await); + assert_eq!(&read_data, DATA); + + Ok(()) +} + +#[tokio::test] +async fn from_file_detects_not_a_fifo() -> io::Result<()> { + let dir = tempfile::Builder::new() + .prefix("tokio-fifo-tests") + .tempdir() + .unwrap(); + let path = dir.path().join("not_a_fifo"); + + // Create an ordinary file. + File::create(&path)?; + + // Check if Sender detects invalid file type. + let file = std::fs::OpenOptions::new().write(true).open(&path)?; + let err = assert_err!(pipe::Sender::from_file(file)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + // Check if Receiver detects invalid file type. + let file = std::fs::OpenOptions::new().read(true).open(&path)?; + let err = assert_err!(pipe::Receiver::from_file(file)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + Ok(()) +} + +#[tokio::test] +async fn from_file_detects_wrong_access_mode() -> io::Result<()> { + let fifo = TempFifo::new("wrong_access_mode")?; + + // Open a read end to open the fifo for writing. + let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + + // Check if Receiver detects write-only access mode. + let wronly = std::fs::OpenOptions::new() + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(&fifo)?; + let err = assert_err!(pipe::Receiver::from_file(wronly)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + // Check if Sender detects read-only access mode. + let rdonly = std::fs::OpenOptions::new() + .read(true) + .custom_flags(libc::O_NONBLOCK) + .open(&fifo)?; + let err = assert_err!(pipe::Sender::from_file(rdonly)); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + Ok(()) +} + +fn is_nonblocking(fd: &T) -> io::Result { + let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?; + Ok((flags & libc::O_NONBLOCK) != 0) +} + +#[tokio::test] +async fn from_file_sets_nonblock() -> io::Result<()> { + let fifo = TempFifo::new("sets_nonblock")?; + + // Open read and write ends to let blocking files open. + let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let _writer = pipe::OpenOptions::new().open_sender(&fifo)?; + + // Check if Receiver sets the pipe in non-blocking mode. + let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?; + assert!(!is_nonblocking(&rdonly)?); + let reader = pipe::Receiver::from_file(rdonly)?; + assert!(is_nonblocking(&reader)?); + + // Check if Sender sets the pipe in non-blocking mode. + let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?; + assert!(!is_nonblocking(&wronly)?); + let writer = pipe::Sender::from_file(wronly)?; + assert!(is_nonblocking(&writer)?); + + Ok(()) +} + +fn writable_by_poll(writer: &pipe::Sender) -> bool { + task::spawn(writer.writable()).poll().is_ready() +} + +#[tokio::test] +async fn try_read_write() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + // Create a pipe pair over a fifo file. + let fifo = TempFifo::new("try_read_write")?; + let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let writer = pipe::OpenOptions::new().open_sender(&fifo)?; + + // Fill the pipe buffer with `try_write`. + let mut write_data = Vec::new(); + while writable_by_poll(&writer) { + match writer.try_write(DATA) { + Ok(n) => write_data.extend(&DATA[..n]), + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } + + // Drain the pipe buffer with `try_read`. + let mut read_data = vec![0; write_data.len()]; + let mut i = 0; + while i < write_data.len() { + reader.readable().await?; + match reader.try_read(&mut read_data[i..]) { + Ok(n) => i += n, + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + continue; + } + } + } + + assert_eq!(read_data, write_data); + + Ok(()) +} + +#[tokio::test] +async fn try_read_write_vectored() -> io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + // Create a pipe pair over a fifo file. + let fifo = TempFifo::new("try_read_write_vectored")?; + let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let writer = pipe::OpenOptions::new().open_sender(&fifo)?; + + let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect(); + + // Fill the pipe buffer with `try_write_vectored`. + let mut write_data = Vec::new(); + while writable_by_poll(&writer) { + match writer.try_write_vectored(&write_bufs) { + Ok(n) => write_data.extend(&DATA[..n]), + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } + + // Drain the pipe buffer with `try_read_vectored`. + let mut read_data = vec![0; write_data.len()]; + let mut i = 0; + while i < write_data.len() { + reader.readable().await?; + + let mut read_bufs: Vec<_> = read_data[i..] + .chunks_mut(0x10000) + .map(io::IoSliceMut::new) + .collect(); + match reader.try_read_vectored(&mut read_bufs) { + Ok(n) => i += n, + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + continue; + } + } + } + + assert_eq!(read_data, write_data); + + Ok(()) +} + +#[tokio::test] +async fn try_read_buf() -> std::io::Result<()> { + const DATA: &[u8] = b"this is some data to write to the fifo"; + + // Create a pipe pair over a fifo file. + let fifo = TempFifo::new("try_read_write_vectored")?; + let reader = pipe::OpenOptions::new().open_receiver(&fifo)?; + let writer = pipe::OpenOptions::new().open_sender(&fifo)?; + + // Fill the pipe buffer with `try_write`. + let mut write_data = Vec::new(); + while writable_by_poll(&writer) { + match writer.try_write(DATA) { + Ok(n) => write_data.extend(&DATA[..n]), + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } + + // Drain the pipe buffer with `try_read_buf`. + let mut read_data = vec![0; write_data.len()]; + let mut i = 0; + while i < write_data.len() { + reader.readable().await?; + match reader.try_read_buf(&mut read_data) { + Ok(n) => i += n, + Err(e) => { + assert_eq!(e.kind(), io::ErrorKind::WouldBlock); + continue; + } + } + } + + assert_eq!(read_data, write_data); + + Ok(()) +}