From fafe7d4766b131ab7df9782e82c4fc4e9f87f63c Mon Sep 17 00:00:00 2001 From: jasta Date: Fri, 7 Jul 2023 16:32:58 -0700 Subject: [PATCH] Implement poll-based backend Introduces a new backend for UNIX using the level triggered poll() syscall instead of epoll or kqueue. This support is crucial for embedded systems like the esp32 family but also for alternative operating systems like Haiku. This diff does not introduce any new platform support targets itself but provides the core technical implementation necessary to support these other targets. Future PRs will introduce specific platform support however due to reasons outlined in #1602 (many thanks for this initial effort BTW!) it is not possible to automate tests for those platforms. We will instead rely on the fact that existing strong POSIX platforms like Linux can serve as a proxy to prove that the mio code is working nominally. --- src/io_source.rs | 10 +- src/macros.rs | 57 ++ src/poll.rs | 30 +- src/sys/mod.rs | 1 + src/sys/unix/mod.rs | 21 +- .../edge_triggered/io_source_state.rs | 47 ++ .../selector/adapters/edge_triggered/mod.rs | 7 + .../edge_triggered/waker_registrar.rs | 19 + .../level_triggered/io_source_state.rs | 108 +++ .../selector/adapters/level_triggered/mod.rs | 8 + .../level_triggered/waker_registrar.rs | 26 + src/sys/unix/selector/adapters/mod.rs | 27 + src/sys/unix/selector/mod.rs | 54 +- src/sys/unix/selector/poll.rs | 670 ++++++++++++++++++ src/sys/unix/sourcefd.rs | 2 +- src/sys/unix/waker.rs | 17 +- tests/tcp_stream.rs | 1 + tests/unix_stream.rs | 2 + tests/util/mod.rs | 6 +- 19 files changed, 1032 insertions(+), 81 deletions(-) create mode 100644 src/sys/unix/selector/adapters/edge_triggered/io_source_state.rs create mode 100644 src/sys/unix/selector/adapters/edge_triggered/mod.rs create mode 100644 src/sys/unix/selector/adapters/edge_triggered/waker_registrar.rs create mode 100644 src/sys/unix/selector/adapters/level_triggered/io_source_state.rs create mode 100644 src/sys/unix/selector/adapters/level_triggered/mod.rs create mode 100644 src/sys/unix/selector/adapters/level_triggered/waker_registrar.rs create mode 100644 src/sys/unix/selector/adapters/mod.rs create mode 100644 src/sys/unix/selector/poll.rs diff --git a/src/io_source.rs b/src/io_source.rs index 99623c116..cfbcbe99f 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,9 +142,7 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - registry - .selector() - .register(self.inner.as_raw_fd(), token, interests) + self.state.register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -155,15 +153,13 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - registry - .selector() - .reregister(self.inner.as_raw_fd(), token, interests) + self.state.reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; - registry.selector().deregister(self.inner.as_raw_fd()) + self.state.deregister(registry, self.inner.as_raw_fd()) } } diff --git a/src/macros.rs b/src/macros.rs index e380c6b14..26b71c949 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -69,6 +69,63 @@ macro_rules! cfg_any_os_ext { } } +/// The current platform supports epoll. +macro_rules! cfg_epoll_selector { + ($($item:item)*) => { + $( + #[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + )))] + $item + )* + } +} + +macro_rules! cfg_poll_selector { + ($($item:item)*) => { + $( + #[cfg(mio_unsupported_force_poll_poll)] + $item + )* + } +} + +/// The current platform supports kqueue. +macro_rules! cfg_kqueue_selector { + ($($item:item)*) => { + $( + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ))] + $item + )* + } +} + +macro_rules! cfg_selector_has_fd { + ($($item:item)*) => { + $( + #[cfg(all( + unix, + not(mio_unsupported_force_poll_poll), + ))] + $item + )* + } +} + macro_rules! trace { ($($t:tt)*) => { log!(trace, $($t)*) diff --git a/src/poll.rs b/src/poll.rs index 25a273ad2..686ab2a77 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,6 +1,7 @@ use crate::{event, sys, Events, Interest, Token}; -#[cfg(unix)] -use std::os::unix::io::{AsRawFd, RawFd}; +cfg_selector_has_fd! { + use std::os::unix::io::{AsRawFd, RawFd}; +} use std::time::Duration; use std::{fmt, io}; @@ -411,7 +412,10 @@ impl Poll { } } -#[cfg(unix)] +#[cfg(all( + unix, + not(mio_unsupported_force_poll_poll), +))] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() @@ -696,18 +700,20 @@ impl fmt::Debug for Registry { } } -#[cfg(unix)] -impl AsRawFd for Registry { - fn as_raw_fd(&self) -> RawFd { - self.selector.as_raw_fd() +cfg_selector_has_fd! { + impl AsRawFd for Registry { + fn as_raw_fd(&self) -> RawFd { + self.selector.as_raw_fd() + } } } cfg_os_poll! { - #[cfg(unix)] - #[test] - pub fn as_raw_fd() { - let poll = Poll::new().unwrap(); - assert!(poll.as_raw_fd() > 0); + cfg_selector_has_fd! { + #[test] + pub fn as_raw_fd() { + let poll = Poll::new().unwrap(); + assert!(poll.as_raw_fd() > 0); + } } } diff --git a/src/sys/mod.rs b/src/sys/mod.rs index 2a968b265..c4e80f6bf 100644 --- a/src/sys/mod.rs +++ b/src/sys/mod.rs @@ -13,6 +13,7 @@ //! methods. //! * `tcp` and `udp` modules: see the [`crate::net`] module. //! * `Waker`: see [`crate::Waker`]. +//! * `WakerRegistrar`: state for `Waker` type. cfg_os_poll! { macro_rules! debug_detail { diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 231480a5d..0556823c8 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -16,6 +16,7 @@ macro_rules! syscall { cfg_os_poll! { mod selector; pub(crate) use self::selector::{event, Event, Events, Selector}; + pub(crate) use self::selector::WakerRegistrar; mod sourcefd; pub use self::sourcefd::SourceFd; @@ -33,25 +34,7 @@ cfg_os_poll! { } cfg_io_source! { - use std::io; - - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; - - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState - } - - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) - } - } + pub(crate) use self::selector::{IoSourceState}; } cfg_os_ext! { diff --git a/src/sys/unix/selector/adapters/edge_triggered/io_source_state.rs b/src/sys/unix/selector/adapters/edge_triggered/io_source_state.rs new file mode 100644 index 000000000..55c73462d --- /dev/null +++ b/src/sys/unix/selector/adapters/edge_triggered/io_source_state.rs @@ -0,0 +1,47 @@ +use crate::{Interest, Registry, Token}; +use std::io; +use std::os::unix::io::RawFd; + +pub(crate) struct IoSourceState; + +impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().register(fd, token, interests) + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().reregister(fd, token, interests) + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().deregister(fd) + } +} diff --git a/src/sys/unix/selector/adapters/edge_triggered/mod.rs b/src/sys/unix/selector/adapters/edge_triggered/mod.rs new file mode 100644 index 000000000..90192bc52 --- /dev/null +++ b/src/sys/unix/selector/adapters/edge_triggered/mod.rs @@ -0,0 +1,7 @@ +//! Implementation details for when we have an edge-triggered backend (i.e. epoll and kqueue). + +cfg_io_source! { + pub(super) mod io_source_state; +} + +pub(super) mod waker_registrar; diff --git a/src/sys/unix/selector/adapters/edge_triggered/waker_registrar.rs b/src/sys/unix/selector/adapters/edge_triggered/waker_registrar.rs new file mode 100644 index 000000000..3be22f607 --- /dev/null +++ b/src/sys/unix/selector/adapters/edge_triggered/waker_registrar.rs @@ -0,0 +1,19 @@ +use std::io; +use std::os::fd::RawFd; +use crate::{Interest, Token}; +use crate::sys::Selector; + +#[derive(Debug)] +pub(crate) struct WakerRegistrar; + +impl WakerRegistrar { + pub fn register(selector: &Selector, fd: RawFd, token: Token) -> io::Result { + selector.register(fd, token, Interest::READABLE)?; + Ok(Self) + } + + pub fn prepare_to_wake(&self) -> io::Result<()> { + // Nothing to do in the case that we are using an edge-triggered API + Ok(()) + } +} \ No newline at end of file diff --git a/src/sys/unix/selector/adapters/level_triggered/io_source_state.rs b/src/sys/unix/selector/adapters/level_triggered/io_source_state.rs new file mode 100644 index 000000000..f61cd5395 --- /dev/null +++ b/src/sys/unix/selector/adapters/level_triggered/io_source_state.rs @@ -0,0 +1,108 @@ +use crate::sys::Selector; +use crate::{Interest, Registry, Token}; +use std::io; +use std::os::unix::io::RawFd; +use std::sync::Arc; +use crate::sys::unix::selector::poll::RegistrationRecord; + +struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + fd: RawFd, + shared_record: Arc, +} + +impl Drop for InternalState { + fn drop(&mut self) { + if self.shared_record.is_registered() { + let _ = self.selector.deregister(self.fd); + } + } +} + +pub(crate) struct IoSourceState { + inner: Option>, +} + +impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + if let Err(err) = &result { + println!("err={err:?}"); + + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } + } + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = registry.selector().try_clone()?; + + selector.register_internal(fd, token, interests).map(move |shared_record| { + let state = InternalState { + selector, + token, + interests, + fd, + shared_record, + }; + + self.inner = Some(Box::new(state)); + }) + } + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => registry + .selector() + .reregister(fd, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }), + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(state) = self.inner.take() { + // Marking unregistered will short circuit the drop behaviour of calling + // deregister so the call to deregister below is strictly required. + state.shared_record.mark_unregistered(); + } + + registry.selector().deregister(fd) + } +} diff --git a/src/sys/unix/selector/adapters/level_triggered/mod.rs b/src/sys/unix/selector/adapters/level_triggered/mod.rs new file mode 100644 index 000000000..fade83241 --- /dev/null +++ b/src/sys/unix/selector/adapters/level_triggered/mod.rs @@ -0,0 +1,8 @@ +//! Implementation details for when we need to mimic an edge-triggered backend but actually have a +//! level-triggered backend (e.g. poll). + +cfg_io_source! { + pub(super) mod io_source_state; +} + +pub(super) mod waker_registrar; diff --git a/src/sys/unix/selector/adapters/level_triggered/waker_registrar.rs b/src/sys/unix/selector/adapters/level_triggered/waker_registrar.rs new file mode 100644 index 000000000..38270bd8c --- /dev/null +++ b/src/sys/unix/selector/adapters/level_triggered/waker_registrar.rs @@ -0,0 +1,26 @@ +use std::io; +use std::os::fd::RawFd; +use crate::sys::Selector; +use crate::{Interest, Token}; + +#[derive(Debug)] +pub(crate) struct WakerRegistrar { + selector: Selector, + fd: RawFd, + token: Token, +} + +impl WakerRegistrar { + pub fn register(selector: &Selector, fd: RawFd, token: Token) -> io::Result { + selector.register(fd, token, Interest::READABLE)?; + Ok(WakerRegistrar { + selector: selector.try_clone().unwrap(), + fd, + token, + }) + } + + pub fn prepare_to_wake(&self) -> io::Result<()> { + self.selector.reregister(self.fd, self.token, Interest::READABLE) + } +} \ No newline at end of file diff --git a/src/sys/unix/selector/adapters/mod.rs b/src/sys/unix/selector/adapters/mod.rs new file mode 100644 index 000000000..79104b6f5 --- /dev/null +++ b/src/sys/unix/selector/adapters/mod.rs @@ -0,0 +1,27 @@ +cfg_epoll_selector! { + mod edge_triggered; + pub(crate) use self::edge_triggered::waker_registrar::WakerRegistrar; + + cfg_io_source! { + pub(crate) use self::edge_triggered::io_source_state::IoSourceState; + } +} + +cfg_kqueue_selector! { + mod edge_triggered; + pub(crate) use self::edge_triggered::waker_registrar::WakerRegistrar; + + cfg_io_source! { + pub(crate) use self::edge_triggered::io_source_state::IoSourceState; + } +} + +cfg_poll_selector! { + mod level_triggered; + pub(crate) use self::level_triggered::waker_registrar::WakerRegistrar; + + cfg_io_source! { + pub(crate) use self::level_triggered::io_source_state::IoSourceState; + } +} + diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 3ccbdeadf..2e3154866 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,42 +1,24 @@ -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", -))] -mod epoll; +mod adapters; +pub(crate) use self::adapters::WakerRegistrar; -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", -))] -pub(crate) use self::epoll::{event, Event, Events, Selector}; +cfg_io_source! { + pub(crate) use self::adapters::IoSourceState; +} -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", -))] -mod kqueue; +cfg_epoll_selector! { + mod epoll; + pub(crate) use self::epoll::{event, Event, Events, Selector}; +} -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", -))] -pub(crate) use self::kqueue::{event, Event, Events, Selector}; +cfg_poll_selector! { + mod poll; + pub(crate) use self::poll::{event, Event, Events, Selector}; +} + +cfg_kqueue_selector! { + mod kqueue; + pub(crate) use self::kqueue::{event, Event, Events, Selector}; +} /// Lowest file descriptor used in `Selector::try_clone`. /// diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..56e598d88 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,670 @@ +// This implementation is based on the one in the `polling` crate. +// Thanks to https://github.com/Kestrer for the original implementation! +// Permission to use this code has been granted by original author: +// https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 + +use crate::{Interest, Token}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::os::unix::io::RawFd; +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; +use std::{fmt, io}; +use crate::sys::unix::selector::LOWEST_FD; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[cfg(target_os = "espidf")] +type NotifyType = u64; + +#[cfg(not(target_os = "espidf"))] +type NotifyType = u8; + +#[derive(Debug)] +pub struct Selector { + state: Arc, + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + println!("DO NOT COMMIT: {}", LOWEST_FD); + + let state = SelectorState::new()?; + + Ok(Selector { + state: Arc::new(state), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + let state = self.state.clone(); + + Ok(Selector { + state, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(fd, token, interests) + } + + pub(crate) fn register_internal(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result> { + self.state.register_internal(fd, token, interests) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.reregister(fd, token, interests) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.state.deregister(fd) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.state.id + } + } +} + +/// Interface to poll. +#[derive(Debug)] +struct SelectorState { + /// File descriptors to poll. + fds: Mutex, + + /// File descriptors which will be removed before the next poll call. + /// + /// When a file descriptor is deregistered while a poll is running, we need to filter + /// out all removed descriptors after that poll is finished running. + pending_removal: Mutex>, + + /// The file descriptor of the read half of the notify pipe. This is also stored as the first + /// file descriptor in `fds.poll_fds`. + notify_read: RawFd, + /// The file descriptor of the write half of the notify pipe. + /// + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). + notify_write: RawFd, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, + + /// This selectors id. + #[cfg(debug_assertions)] + id: usize, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug, Clone)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. It is also + /// stored in `notify_read`. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors `notify_read` or `notify_write`. + fd_data: HashMap, +} + +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +#[derive(Clone)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug, Clone)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + token: Token, + /// Used to communicate with IoSourceState when we need to internally deregister + /// based on a closed fd. + shared_record: Arc, +} + +impl SelectorState { + pub fn new() -> io::Result { + let notify_fds = Self::create_notify_fds()?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![PollFd(libc::pollfd { + fd: notify_fds[0], + events: libc::POLLRDNORM, + revents: 0, + })], + fd_data: HashMap::new(), + }), + pending_removal: Mutex::new(Vec::new()), + notify_read: notify_fds[0], + notify_write: notify_fds[1], + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + }) + } + + fn create_notify_fds() -> io::Result<[libc::c_int; 2]> { + let mut notify_fd = [0, 0]; + + // Note that the eventfd() implementation in ESP-IDF deviates from the specification in the following ways: + // 1) The file descriptor is always in a non-blocking mode, as if EFD_NONBLOCK was passed as a flag; + // passing EFD_NONBLOCK or calling fcntl(.., F_GETFL/F_SETFL) on the eventfd() file descriptor is not supported + // 2) It always returns the counter value, even if it is 0. This is contrary to the specification which mandates + // that it should instead fail with EAGAIN + // + // (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway + // (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified + #[cfg(target_os = "espidf")] + { + extern "C" { + fn eventfd(initval: libc::c_uint, flags: libc::c_int) -> libc::c_int; + } + + let fd = unsafe { eventfd(0, 0) }; + if fd == -1 { + // TODO: Switch back to syscall! once + // https://github.com/rust-lang/libc/pull/2864 is published + return Err(std::io::ErrorKind::Other.into()); + } + + notify_fd[0] = fd; + notify_fd[1] = fd; + } + + #[cfg(not(target_os = "espidf"))] + { + syscall!(pipe(notify_fd.as_mut_ptr()))?; + + // Put the reading side into non-blocking mode. + let notify_read_flags = syscall!(fcntl(notify_fd[0], libc::F_GETFL))?; + + syscall!(fcntl( + notify_fd[0], + libc::F_SETFL, + notify_read_flags | libc::O_NONBLOCK + ))?; + } + + Ok(notify_fd) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let deadline = timeout.map(|t| Instant::now() + t); + + events.clear(); + + let mut fds = self.fds.lock().unwrap(); + let mut closed_raw_fds = Vec::new(); + + loop { + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Perform the poll. + log::trace!("Polling on {:?}", fds); + let num_events = poll(&mut fds.poll_fds, deadline)?; + if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { + // timeout + return Ok(()); + } + + log::trace!("Poll finished: {:?}", fds); + let notified = fds.poll_fds[0].0.revents != 0; + let num_fd_events = if notified { num_events - 1 } else { num_events }; + + // Read all notifications. + if notified { + if self.notify_read != self.notify_write { + // When using the `pipe` syscall, we have to read all accumulated notifications in the pipe. + while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)) + .is_ok() + {} + } else { + // When using the `eventfd` syscall, it is OK to read just once, so as to clear the counter. + // In fact, reading in a loop will result in an endless loop on the ESP-IDF + // which is not following the specification strictly. + let _ = self.pop_notification(); + } + } + + // We now check whether this poll was performed with descriptors which were pending + // for removal and filter out any matching. + let mut pending_removal_guard = self.pending_removal.lock().unwrap(); + let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + drop(pending_removal_guard); + + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + + if pending_removal.contains(&poll_fd.fd) { + // Fd was removed while poll was running + continue; + } + + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { + pending_removal.push(poll_fd.fd); + closed_raw_fds.push(poll_fd.fd); + } + + // Remove the interest which just got triggered + // the IoSourceState/WakerRegistrar used with this selector will add back + // the interest using reregister. + poll_fd.events &= !poll_fd.revents; + + if events.len() == num_fd_events { + break; + } + } + } + + break; + } + } + + drop(fds); + let _ = self.deregister_all(&closed_raw_fds); + + Ok(()) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.register_internal(fd, token, interests).map(|_| ()) + } + + pub fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { + if fd == self.notify_read || fd == self.notify_write { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // We must handle the unlikely case that the following order of operations happens: + // + // register(1 as RawFd) + // deregister(1 as RawFd) + // register(1 as RawFd) + // + // + // Fd's pending removal only get cleared when poll has been run. It is possible that + // between registering and deregistering and then _again_ registering the file descriptor + // poll never gets called, thus the fd stays stuck in the pending removal list. + // + // To avoid this scenario we remove an fd from pending removals when registering it. + let mut pending_removal = self.pending_removal.lock().unwrap(); + if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { + pending_removal.remove(idx); + } + drop(pending_removal); + + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "\ + same file descriptor registered twice for polling \ + (an old file descriptor might have been closed without deregistration)\ + ", + )); + } + + let poll_fds_index = fds.poll_fds.len(); + let record = Arc::new(RegistrationRecord::new()); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + token, + shared_record: record.clone(), + }, + ); + + fds.poll_fds.push(PollFd(libc::pollfd { + fd, + events: interests_to_poll(interests), + revents: 0, + })); + + Ok(record) + }) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.token = token; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); + + Ok(()) + }) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.deregister_all(&[fd]) + .map_err(|_| io::ErrorKind::NotFound)?; + Ok(()) + } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> T) -> T { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_inner().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our notification was not removed from the pipe. + if sent_notification { + let _ = self.pop_notification(); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Special optimized version of [Self::deregister] which handles multiple removals + /// at once. Ok result if all removals were performed, Err if any entries + /// were not found. + fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> { + if targets.is_empty() { + return Ok(()); + } + + let mut pending_removal = self.pending_removal.lock().unwrap(); + pending_removal.extend(targets); + drop(pending_removal); + + self.modify_fds(|fds| { + for target in targets { + let data = fds.fd_data.remove(&target).ok_or(())?; + data.shared_record.mark_unregistered(); + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + } + + Ok(()) + }) + } + + /// Wake the current thread that is calling `wait`. + fn notify_inner(&self) -> io::Result<()> { + syscall!(write( + self.notify_write, + &(1 as NotifyType) as *const _ as *const _, + std::mem::size_of::() + ))?; + Ok(()) + } + + /// Remove a notification created by `notify_inner`. + fn pop_notification(&self) -> io::Result<()> { + syscall!(read( + self.notify_read, + &mut [0; std::mem::size_of::()] as *mut _ as *mut _, + std::mem::size_of::() + ))?; + Ok(()) + } +} + +impl Drop for SelectorState { + fn drop(&mut self) { + let _ = syscall!(close(self.notify_read)); + + if self.notify_read != self.notify_write { + let _ = syscall!(close(self.notify_write)); + } + } +} + +/// Shared record between IoSourceState and SelectorState that allows us to internally +/// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without +/// confusing IoSourceState and trying to deregister twice. This isn't strictly +/// required as technically deregister is idempotent but it is confusing +/// when trying to debug behaviour as we get imbalanced calls to register/deregister and +/// superfluous NotFound errors. +#[derive(Debug)] +pub(crate) struct RegistrationRecord { + is_unregistered: AtomicBool, +} + +impl RegistrationRecord { + pub fn new() -> Self { + Self { is_unregistered: AtomicBool::new(false) } + } + + pub fn mark_unregistered(&self) { + self.is_unregistered.store(true, Ordering::Relaxed); + } + + pub fn is_registered(&self) -> bool { + !self.is_unregistered.load(Ordering::Relaxed) + } +} + +const READ_EVENTS: libc::c_short = libc::POLLIN | libc::POLLRDHUP; + +const WRITE_EVENTS: libc::c_short = libc::POLLOUT; + +const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI; + +/// Get the input poll events for the given event. +fn interests_to_poll(interest: Interest) -> libc::c_short { + let mut kind = 0; + + if interest.is_readable() { + kind |= READ_EVENTS; + } + + if interest.is_writable() { + kind |= WRITE_EVENTS; + } + + if interest.is_priority() { + kind |= PRIORITY_EVENTS; + } + + kind +} + +/// Helper function to call poll. +fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { + loop { + // Convert the timeout to milliseconds. + let timeout_ms = deadline + .map(|deadline| { + let timeout = deadline.saturating_duration_since(Instant::now()); + + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(u64::MAX); + if Duration::from_millis(ms) < timeout { + ms = ms.saturating_add(1); + } + ms.try_into().unwrap_or(i32::MAX) + }) + .unwrap_or(-1); + + let res = syscall!(poll( + fds.as_mut_ptr() as *mut libc::pollfd, + fds.len() as libc::nfds_t, + timeout_ms, + )); + + match res { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, + Err(e) => return Err(e), + } + } +} + +#[derive(Debug, Clone)] +pub struct Event { + token: Token, + events: libc::c_short, +} + +pub type Events = Vec; + +pub mod event { + use crate::sys::Event; + use crate::Token; + use std::fmt; + + pub fn token(event: &Event) -> Token { + event.token + } + + pub fn is_readable(event: &Event) -> bool { + (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events & libc::POLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || (event.events & libc::POLLRDHUP) != 0 + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Unix pipe write end has closed + || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) + // The other side (read end) of a Unix pipe has closed. + || (event.events == libc::POLLERR) + } + + pub fn is_priority(event: &Event) -> bool { + (event.events & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { + (*got & want) != 0 + } + debug_detail!( + EventsDetails(libc::c_short), + check_events, + libc::POLLIN, + libc::POLLPRI, + libc::POLLOUT, + libc::POLLRDNORM, + libc::POLLRDBAND, + libc::POLLWRNORM, + libc::POLLWRBAND, + libc::POLLERR, + libc::POLLHUP, + libc::POLLRDHUP, + ); + + // Can't reference fields in packed structures. + let e_u64 = event.token.0; + f.debug_struct("epoll_event") + .field("events", &EventsDetails(event.events)) + .field("u64", &e_u64) + .finish() + } +} diff --git a/src/sys/unix/sourcefd.rs b/src/sys/unix/sourcefd.rs index 84e776d21..f861f18b8 100644 --- a/src/sys/unix/sourcefd.rs +++ b/src/sys/unix/sourcefd.rs @@ -98,7 +98,7 @@ impl<'a> event::Source for SourceFd<'a> { token: Token, interests: Interest, ) -> io::Result<()> { - registry.selector().register(*self.0, token, interests) + registry.selector().register(*self.0, token, interests).map(|_| ()) } fn reregister( diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 65002d690..476f4f23e 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -4,7 +4,8 @@ ))] mod eventfd { use crate::sys::Selector; - use crate::{Interest, Token}; + use crate::sys::WakerRegistrar; + use crate::Token; use std::fs::File; use std::io::{self, Read, Write}; @@ -18,6 +19,7 @@ mod eventfd { /// reset the count to 0, returning the count. #[derive(Debug)] pub struct Waker { + registrar: WakerRegistrar, fd: File, } @@ -26,11 +28,13 @@ mod eventfd { let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; let file = unsafe { File::from_raw_fd(fd) }; - selector.register(fd, token, Interest::READABLE)?; - Ok(Waker { fd: file }) + let registrar = WakerRegistrar::register(selector, fd, token)?; + Ok(Waker { registrar, fd: file }) } pub fn wake(&self) -> io::Result<()> { + self.registrar.prepare_to_wake()?; + let buf: [u8; 8] = 1u64.to_ne_bytes(); match (&self.fd).write(&buf) { Ok(_) => Ok(()), @@ -139,6 +143,7 @@ mod pipe { /// if writing to it (waking) fails. #[derive(Debug)] pub struct Waker { + registrar: WakerRegistrar, sender: File, receiver: File, } @@ -150,8 +155,8 @@ mod pipe { let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector.register(fds[0], token, Interest::READABLE)?; - Ok(Waker { sender, receiver }) + let registrar = WakerRegistrar::register(selector, fds[0], token)?; + Ok(Waker { registrar, sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -161,6 +166,8 @@ mod pipe { #[cfg(target_os = "illumos")] self.empty(); + self.registrar.prepare_to_wake(); + match (&self.sender).write(&[1]) { Ok(_) => Ok(()), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index df9b3f2e3..3a6010ce9 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -647,6 +647,7 @@ fn tcp_reset_close_event() { loop { poll.poll(&mut events, Some(Duration::from_millis(100))) .expect("poll failed"); + println!("Got: {events:?}"); if events.iter().count() == 0 { break; } diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index fcab0a057..babf18df9 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -451,6 +451,8 @@ where assert!(stream.take_error().unwrap().is_none()); + assert_would_block(stream.read(&mut buf)); + let bufs = [IoSlice::new(DATA1), IoSlice::new(DATA2)]; let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN); diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 7a192d9b0..e1974aaa9 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -135,13 +135,17 @@ impl From for Readiness { } pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec) { + println!("Expecting: {expected:?}..."); + // In a lot of calls we expect more then one event, but it could be that // poll returns the first event only in a single call. To be a bit more // lenient we'll poll a couple of times. - for _ in 0..3 { + for i in 0..3 { poll.poll(events, Some(Duration::from_millis(500))) .expect("unable to poll"); + println!("[{i}] Got events: {events:?}"); + for event in events.iter() { let index = expected.iter().position(|expected| expected.matches(event));