diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe50a343d..f95e0cebb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,18 @@ jobs: run: cargo test --all-features - name: Tests release build run: cargo test --release --all-features + TestPoll: + runs-on: ubuntu-latest + timeout-minutes: 10 + env: + RUSTFLAGS: "--cfg mio_unsupported_force_poll_poll" + steps: + - uses: actions/checkout@v3 + - uses: dtolnay/rust-toolchain@stable + - name: Tests + run: cargo test --all-features + - name: Tests release build + run: cargo test --release --all-features MinimalVersions: runs-on: ${{ matrix.os }} timeout-minutes: 10 @@ -133,6 +145,7 @@ jobs: runs-on: ubuntu-latest needs: - Test + - TestPoll - MinimalVersions - MSRV - Nightly diff --git a/src/io_source.rs b/src/io_source.rs index 99623c116..06dc5e17e 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,9 +142,8 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - registry - .selector() - .register(self.inner.as_raw_fd(), token, interests) + self.state + .register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -155,15 +154,14 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - registry - .selector() - .reregister(self.inner.as_raw_fd(), token, interests) + self.state + .reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; - registry.selector().deregister(self.inner.as_raw_fd()) + self.state.deregister(registry, self.inner.as_raw_fd()) } } diff --git a/src/poll.rs b/src/poll.rs index 25a273ad2..695bede5d 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,5 +1,5 @@ use crate::{event, sys, Events, Interest, Token}; -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; @@ -411,7 +411,7 @@ impl Poll { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() @@ -696,7 +696,7 @@ impl fmt::Debug for Registry { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() @@ -704,7 +704,10 @@ impl AsRawFd for Registry { } cfg_os_poll! { - #[cfg(unix)] + #[cfg(all( + unix, + not(mio_unsupported_force_poll_poll) + ))] #[test] pub fn as_raw_fd() { let poll = Poll::new().unwrap(); diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 8a3175f76..76085b8a2 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -23,8 +23,10 @@ cfg_io_source! { use std::io; #[cfg(windows)] use std::os::windows::io::RawSocket; + #[cfg(unix)] + use std::os::unix::io::RawFd; - #[cfg(windows)] + #[cfg(any(windows, unix))] use crate::{Registry, Token, Interest}; pub(crate) struct IoSourceState; @@ -44,6 +46,33 @@ cfg_io_source! { } } + #[cfg(unix)] + impl IoSourceState { + pub fn register( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + + pub fn reregister( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + + pub fn deregister(&mut self, _: &Registry, _: RawFd) -> io::Result<()> { + os_required!() + } + } + #[cfg(windows)] impl IoSourceState { pub fn register( diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 231480a5d..53b579df3 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -33,25 +33,65 @@ cfg_os_poll! { } cfg_io_source! { - use std::io; - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; + #[cfg(not(mio_unsupported_force_poll_poll))] + mod stateless_io_source { + use std::io; + use std::os::unix::io::RawFd; + use crate::Registry; + use crate::Token; + use crate::Interest; - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState - } + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().register(fd, token, interests) + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().reregister(fd, token, interests) + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().deregister(fd) + } } } + + #[cfg(not(mio_unsupported_force_poll_poll))] + pub(crate) use self::stateless_io_source::IoSourceState; + + #[cfg(mio_unsupported_force_poll_poll)] + pub(crate) use self::selector::IoSourceState; } cfg_os_ext! { diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 3ccbdeadf..7fa73b8d6 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,40 +1,58 @@ -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] mod epoll; -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(mio_unsupported_force_poll_poll)] +mod poll; + +#[cfg(mio_unsupported_force_poll_poll)] +pub(crate) use self::poll::{event, Event, Events, IoSourceState, Selector}; + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ) ))] mod kqueue; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ), ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..ab2eea3c8 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,740 @@ +// This implementation is based on the one in the `polling` crate. +// Thanks to https://github.com/Kestrer for the original implementation! +// Permission to use this code has been granted by original author: +// https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 + +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; +use crate::{Interest, Token}; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; +use std::{cmp, fmt, io}; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +pub struct Selector { + state: Arc, + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + let state = SelectorState::new()?; + + Ok(Selector { + state: Arc::new(state), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + // Just to keep the compiler happy :) + let _ = LOWEST_FD; + + let state = self.state.clone(); + + Ok(Selector { + state, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(fd, token, interests) + } + + #[allow(dead_code)] + pub(crate) fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { + self.state.register_internal(fd, token, interests) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.reregister(fd, token, interests) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.state.deregister(fd) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + self.state.wake(token) + } + cfg_io_source! { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.state.id + } + } +} + +/// Interface to poll. +#[derive(Debug)] +struct SelectorState { + /// File descriptors to poll. + fds: Mutex, + + /// File descriptors which will be removed before the next poll call. + /// + /// When a file descriptor is deregistered while a poll is running, we need to filter + /// out all removed descriptors after that poll is finished running. + pending_removal: Mutex>, + + /// Token associated with Waker that have recently asked to wake. This will + /// cause a synthetic behaviour where on any wakeup we add all pending tokens + /// to the list of emitted events. + pending_wake_token: Mutex>, + + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). + notify_waker: WakerInternal, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, + + /// This selectors id. + #[cfg(debug_assertions)] + #[allow(dead_code)] + id: usize, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug, Clone)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors created by the internal notify waker. + fd_data: HashMap, +} + +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +#[derive(Clone)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug, Clone)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + token: Token, + /// Used to communicate with IoSourceState when we need to internally deregister + /// based on a closed fd. + shared_record: Arc, +} + +impl SelectorState { + pub fn new() -> io::Result { + let notify_waker = WakerInternal::new()?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![PollFd(libc::pollfd { + fd: notify_waker.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + })], + fd_data: HashMap::new(), + }), + pending_removal: Mutex::new(Vec::new()), + pending_wake_token: Mutex::new(None), + notify_waker, + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.clear(); + + let mut fds = self.fds.lock().unwrap(); + + // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further + // events) and internally deregister them before they are externally deregister'd. See + // IoSourceState below to track how the external deregister call will be handled + // when this state occurs. + let mut closed_raw_fds = Vec::new(); + + loop { + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Perform the poll. + trace!("Polling on {:?}", &fds); + let num_events = poll(&mut fds.poll_fds, timeout)?; + trace!("Poll finished: {:?}", &fds); + + if num_events == 0 { + return Ok(()); + } + + let waker_events = fds.poll_fds[0].0.revents; + let notified = waker_events != 0; + let mut num_fd_events = if notified { num_events - 1 } else { num_events }; + + let pending_wake_token = self.pending_wake_token.lock().unwrap().take(); + + if notified { + self.notify_waker.ack_and_reset(); + if pending_wake_token.is_some() { + num_fd_events += 1; + } + } + + // We now check whether this poll was performed with descriptors which were pending + // for removal and filter out any matching. + let mut pending_removal_guard = self.pending_removal.lock().unwrap(); + let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + drop(pending_removal_guard); + + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + + // Add synthetic events we picked up from calls to wake() + if let Some(pending_wake_token) = pending_wake_token { + events.push(Event { + token: pending_wake_token, + events: waker_events, + }); + } + + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + + if pending_removal.contains(&poll_fd.fd) { + // Fd was removed while poll was running + continue; + } + + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { + pending_removal.push(poll_fd.fd); + closed_raw_fds.push(poll_fd.fd); + } + + // Remove the interest which just got triggered + // the IoSourceState/WakerRegistrar used with this selector will add back + // the interest using reregister. + poll_fd.events &= !poll_fd.revents; + + // Minor optimization to potentially avoid looping n times where n is the + // number of input fds (i.e. we might loop between m and n times where m is + // the number of fds with revents != 0). + if events.len() == num_fd_events { + break; + } + } + } + + break; // No more polling. + } + + // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again. + } + + drop(fds); + let _ = self.deregister_all(&closed_raw_fds); + + Ok(()) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.register_internal(fd, token, interests).map(|_| ()) + } + + pub fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { + #[cfg(debug_assertions)] + if fd == self.notify_waker.as_raw_fd() { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // We must handle the unlikely case that the following order of operations happens: + // + // register(1 as RawFd) + // deregister(1 as RawFd) + // register(1 as RawFd) + // + // + // Fd's pending removal only get cleared when poll has been run. It is possible that + // between registering and deregistering and then _again_ registering the file descriptor + // poll never gets called, thus the fd stays stuck in the pending removal list. + // + // To avoid this scenario we remove an fd from pending removals when registering it. + let mut pending_removal = self.pending_removal.lock().unwrap(); + if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { + pending_removal.swap_remove(idx); + } + drop(pending_removal); + + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "I/O source already registered this `Registry` \ + (an old file descriptor might have been closed without deregistration)", + )); + } + + let poll_fds_index = fds.poll_fds.len(); + let record = Arc::new(RegistrationRecord::new()); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + token, + shared_record: record.clone(), + }, + ); + + fds.poll_fds.push(PollFd(libc::pollfd { + fd, + events: interests_to_poll(interests), + revents: 0, + })); + + Ok(record) + }) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.token = token; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); + + Ok(()) + }) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.deregister_all(&[fd]) + .map_err(|_| io::ErrorKind::NotFound)?; + Ok(()) + } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> T) -> T { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_waker.wake().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our notification was not removed from the pipe. + if sent_notification { + self.notify_waker.ack_and_reset(); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Special optimized version of [Self::deregister] which handles multiple removals + /// at once. Ok result if all removals were performed, Err if any entries + /// were not found. + fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> { + if targets.is_empty() { + return Ok(()); + } + + let mut pending_removal = self.pending_removal.lock().unwrap(); + pending_removal.extend(targets); + drop(pending_removal); + + self.modify_fds(|fds| { + let mut all_successful = true; + + for target in targets { + match fds.fd_data.remove(target).ok_or(()) { + Ok(data) => { + data.shared_record.mark_unregistered(); + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + } + Err(_) => all_successful = false, + } + } + + if all_successful { + Ok(()) + } else { + Err(()) + } + }) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + self.pending_wake_token.lock().unwrap().replace(token); + self.notify_waker.wake() + } +} + +/// Shared record between IoSourceState and SelectorState that allows us to internally +/// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without +/// confusing IoSourceState and trying to deregister twice. This isn't strictly +/// required as technically deregister is idempotent but it is confusing +/// when trying to debug behaviour as we get imbalanced calls to register/deregister and +/// superfluous NotFound errors. +#[derive(Debug)] +pub(crate) struct RegistrationRecord { + is_unregistered: AtomicBool, +} + +impl RegistrationRecord { + pub fn new() -> Self { + Self { + is_unregistered: AtomicBool::new(false), + } + } + + pub fn mark_unregistered(&self) { + self.is_unregistered.store(true, Ordering::Relaxed); + } + + #[allow(dead_code)] + pub fn is_registered(&self) -> bool { + !self.is_unregistered.load(Ordering::Relaxed) + } +} + +#[cfg(target_os = "linux")] +const POLLRDHUP: libc::c_short = libc::POLLRDHUP; +#[cfg(not(target_os = "linux"))] +const POLLRDHUP: libc::c_short = 0; + +const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP; + +const WRITE_EVENTS: libc::c_short = libc::POLLOUT; + +const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI; + +/// Get the input poll events for the given event. +fn interests_to_poll(interest: Interest) -> libc::c_short { + let mut kind = 0; + + if interest.is_readable() { + kind |= READ_EVENTS; + } + + if interest.is_writable() { + kind |= WRITE_EVENTS; + } + + if interest.is_priority() { + kind |= PRIORITY_EVENTS; + } + + kind +} + +/// Helper function to call poll. +fn poll(fds: &mut [PollFd], timeout: Option) -> io::Result { + loop { + // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ + // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits + // architectures. The magic number is the same constant used by libuv. + #[cfg(target_pointer_width = "32")] + const MAX_SAFE_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128; + + let timeout = timeout + .map(|to| { + // `Duration::as_millis` truncates, so round up. This avoids + // turning sub-millisecond timeouts into a zero timeout, unless + // the caller explicitly requests that by specifying a zero + // timeout. + let to_ms = to + .checked_add(Duration::from_nanos(999_999)) + .unwrap_or(to) + .as_millis(); + cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int + }) + .unwrap_or(-1); + + let res = syscall!(poll( + fds.as_mut_ptr() as *mut libc::pollfd, + fds.len() as libc::nfds_t, + timeout, + )); + + match res { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, + Err(e) => return Err(e), + } + } +} + +#[derive(Debug, Clone)] +pub struct Event { + token: Token, + events: libc::c_short, +} + +pub type Events = Vec; + +pub mod event { + use crate::sys::unix::selector::poll::POLLRDHUP; + use crate::sys::Event; + use crate::Token; + use std::fmt; + + pub fn token(event: &Event) -> Token { + event.token + } + + pub fn is_readable(event: &Event) -> bool { + (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events & libc::POLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || (event.events & POLLRDHUP) != 0 + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Unix pipe write end has closed + || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) + // The other side (read end) of a Unix pipe has closed. + || (event.events == libc::POLLERR) + } + + pub fn is_priority(event: &Event) -> bool { + (event.events & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { + (*got & want) != 0 + } + debug_detail!( + EventsDetails(libc::c_short), + check_events, + libc::POLLIN, + libc::POLLPRI, + libc::POLLOUT, + libc::POLLRDNORM, + libc::POLLRDBAND, + libc::POLLWRNORM, + libc::POLLWRBAND, + libc::POLLERR, + libc::POLLHUP, + ); + + f.debug_struct("poll_event") + .field("token", &event.token) + .field("events", &EventsDetails(event.events)) + .finish() + } +} + +cfg_io_source! { + use crate::Registry; + + struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + fd: RawFd, + shared_record: Arc, + } + + impl Drop for InternalState { + fn drop(&mut self) { + if self.shared_record.is_registered() { + let _ = self.selector.deregister(self.fd); + } + } + } + + pub(crate) struct IoSourceState { + inner: Option>, + } + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + if let Err(err) = &result { + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } + } + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = registry.selector().try_clone()?; + + selector.register_internal(fd, token, interests).map(move |shared_record| { + let state = InternalState { + selector, + token, + interests, + fd, + shared_record, + }; + + self.inner = Some(Box::new(state)); + }) + } + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => registry + .selector() + .reregister(fd, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }), + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(state) = self.inner.take() { + // Marking unregistered will short circuit the drop behaviour of calling + // deregister so the call to deregister below is strictly required. + state.shared_record.mark_unregistered(); + } + + registry.selector().deregister(fd) + } + } +} diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 65002d690..b24fd5058 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,14 +1,77 @@ #[cfg(all( - not(mio_unsupported_force_waker_pipe), - any(target_os = "linux", target_os = "android") + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) ))] -mod eventfd { +mod fdbased { + #[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android"), + ))] + use crate::sys::unix::waker::eventfd::WakerInternal; + #[cfg(any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ))] + use crate::sys::unix::waker::pipe::WakerInternal; use crate::sys::Selector; use crate::{Interest, Token}; + use std::io; + use std::os::unix::io::AsRawFd; + + #[derive(Debug)] + pub struct Waker { + waker: WakerInternal, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + let waker = WakerInternal::new()?; + selector.register(waker.as_raw_fd(), token, Interest::READABLE)?; + Ok(Waker { waker }) + } + + pub fn wake(&self) -> io::Result<()> { + self.waker.wake() + } + } +} + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) +))] +pub use self::fdbased::Waker; +#[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android") +))] +mod eventfd { use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by `eventfd`. /// @@ -17,17 +80,15 @@ mod eventfd { /// unsigned integer and added to the count. Reads must also be 8 bytes and /// reset the count to 0, returning the count. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { fd: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; let file = unsafe { File::from_raw_fd(fd) }; - - selector.register(fd, token, Interest::READABLE)?; - Ok(Waker { fd: file }) + Ok(WakerInternal { fd: file }) } pub fn wake(&self) -> io::Result<()> { @@ -44,6 +105,11 @@ mod eventfd { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + let _ = self.reset(); + } + /// Reset the eventfd object, only need to call this if `wake` fails. fn reset(&self) -> io::Result<()> { let mut buf: [u8; 8] = 0u64.to_ne_bytes(); @@ -56,13 +122,20 @@ mod eventfd { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } + } } #[cfg(all( + mio_unsupported_force_poll_poll, not(mio_unsupported_force_waker_pipe), any(target_os = "linux", target_os = "android") ))] -pub use self::eventfd::Waker; +pub(crate) use self::eventfd::WakerInternal; #[cfg(all( not(mio_unsupported_force_waker_pipe), @@ -126,32 +199,28 @@ pub use self::kqueue::Waker; target_os = "redox", ))] mod pipe { - use crate::sys::unix::Selector; - use crate::{Interest, Token}; - use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by a unix pipe. /// /// Waker controls both the sending and receiving ends and empties the pipe /// if writing to it (waking) fails. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { sender: File, receiver: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let mut fds = [-1; 2]; syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector.register(fds[0], token, Interest::READABLE)?; - Ok(Waker { sender, receiver }) + Ok(WakerInternal { sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -174,6 +243,11 @@ mod pipe { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + self.empty(); + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. /// This ignores any errors. fn empty(&self) { @@ -186,14 +260,52 @@ mod pipe { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.receiver.as_raw_fd() + } + } } -#[cfg(any( - mio_unsupported_force_waker_pipe, - target_os = "dragonfly", - target_os = "illumos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox", +#[cfg(all( + mio_unsupported_force_poll_poll, + any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ) ))] -pub use self::pipe::Waker; +pub(crate) use self::pipe::WakerInternal; + +#[cfg(mio_unsupported_force_poll_poll)] +mod poll { + use crate::sys::Selector; + use crate::Token; + use std::io; + + #[derive(Debug)] + pub struct Waker { + selector: Selector, + token: Token, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + Ok(Waker { + selector: selector.try_clone()?, + token, + }) + } + + pub fn wake(&self) -> io::Result<()> { + self.selector.wake(self.token) + } + } +} + +#[cfg(mio_unsupported_force_poll_poll)] +pub use self::poll::Waker; diff --git a/tests/aio.rs b/tests/aio.rs index b8c1b47b0..19ea02003 100644 --- a/tests/aio.rs +++ b/tests/aio.rs @@ -1,4 +1,7 @@ -#![cfg(any(target_os = "freebsd", target_os = "dragonfly"))] +#![cfg(all( + not(mio_unsupported_force_poll_poll), + any(target_os = "freebsd", target_os = "dragonfly"), +))] #![cfg(all(feature = "os-poll", feature = "net"))] use mio::{event::Source, Events, Interest, Poll, Registry, Token}; diff --git a/tests/tcp.rs b/tests/tcp.rs index 4b783bcb0..82ed6bcbf 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -21,7 +21,7 @@ const CLIENT: Token = Token(1); const SERVER: Token = Token(2); #[test] -#[cfg(all(unix, not(debug_assertions)))] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll), not(debug_assertions)))] fn assert_size() { use mio::net::*; use std::mem::size_of; diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index 79b5d0312..2a3a20c9b 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -30,7 +30,7 @@ const ID2: Token = Token(3); const ID3: Token = Token(4); #[test] -#[cfg(all(unix, not(debug_assertions)))] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll), not(debug_assertions)))] fn assert_size() { use mio::net::*; use std::mem::size_of; diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index fcab0a057..babf18df9 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -451,6 +451,8 @@ where assert!(stream.take_error().unwrap().is_none()); + assert_would_block(stream.read(&mut buf)); + let bufs = [IoSlice::new(DATA1), IoSlice::new(DATA2)]; let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN);