From a6edda761203f5411d5d057d7ec8e0dcd03b538f Mon Sep 17 00:00:00 2001 From: jasta Date: Fri, 7 Jul 2023 16:32:58 -0700 Subject: [PATCH 01/24] Implement poll-based backend Introduces a new backend for UNIX using the level triggered poll() syscall instead of epoll or kqueue. This support is crucial for embedded systems like the esp32 family but also for alternative operating systems like Haiku. This diff does not introduce any new platform support targets itself but provides the core technical implementation necessary to support these other targets. Future PRs will introduce specific platform support however due to reasons outlined in #1602 (many thanks for this initial effort BTW!) it is not possible to automate tests for those platforms. We will instead rely on the fact that Linux can serve as a proxy to prove that the mio code is working nominally. Note that only Linux has a sufficiently complex implementation to pass all tests. This is due to SIGRDHUP missing on other platforms and is required for about a dozen or so tests that check is_read_closed(). --- .github/workflows/ci.yml | 13 + src/io_source.rs | 10 +- src/poll.rs | 11 +- src/sys/unix/mod.rs | 68 +++- src/sys/unix/selector/mod.rs | 74 ++-- src/sys/unix/selector/poll.rs | 718 ++++++++++++++++++++++++++++++++++ src/sys/unix/waker.rs | 163 ++++++-- tests/aio.rs | 5 +- tests/unix_stream.rs | 2 + 9 files changed, 984 insertions(+), 80 deletions(-) create mode 100644 src/sys/unix/selector/poll.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe50a343d..4cbb920cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,18 @@ jobs: run: cargo test --all-features - name: Tests release build run: cargo test --release --all-features + TestPoll: + runs-on: ubuntu-latest + timeout-minutes: 10 + env: + RUSTFLAGS="--cfg mio_unsupported_force_poll_poll" + steps: + - uses: actions/checkout@v3 + - uses: dtolnay/rust-toolchain@stable + - name: Tests + run: cargo test --all-features + - name: Tests release build + run: cargo test --release --all-features MinimalVersions: runs-on: ${{ matrix.os }} timeout-minutes: 10 @@ -133,6 +145,7 @@ jobs: runs-on: ubuntu-latest needs: - Test + - TestPoll - MinimalVersions - MSRV - Nightly diff --git a/src/io_source.rs b/src/io_source.rs index 99623c116..cfbcbe99f 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,9 +142,7 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - registry - .selector() - .register(self.inner.as_raw_fd(), token, interests) + self.state.register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -155,15 +153,13 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - registry - .selector() - .reregister(self.inner.as_raw_fd(), token, interests) + self.state.reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; - registry.selector().deregister(self.inner.as_raw_fd()) + self.state.deregister(registry, self.inner.as_raw_fd()) } } diff --git a/src/poll.rs b/src/poll.rs index 25a273ad2..695bede5d 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,5 +1,5 @@ use crate::{event, sys, Events, Interest, Token}; -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; @@ -411,7 +411,7 @@ impl Poll { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() @@ -696,7 +696,7 @@ impl fmt::Debug for Registry { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() @@ -704,7 +704,10 @@ impl AsRawFd for Registry { } cfg_os_poll! { - #[cfg(unix)] + #[cfg(all( + unix, + not(mio_unsupported_force_poll_poll) + ))] #[test] pub fn as_raw_fd() { let poll = Poll::new().unwrap(); diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 231480a5d..53b579df3 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -33,25 +33,65 @@ cfg_os_poll! { } cfg_io_source! { - use std::io; - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; + #[cfg(not(mio_unsupported_force_poll_poll))] + mod stateless_io_source { + use std::io; + use std::os::unix::io::RawFd; + use crate::Registry; + use crate::Token; + use crate::Interest; - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState - } + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().register(fd, token, interests) + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().reregister(fd, token, interests) + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().deregister(fd) + } } } + + #[cfg(not(mio_unsupported_force_poll_poll))] + pub(crate) use self::stateless_io_source::IoSourceState; + + #[cfg(mio_unsupported_force_poll_poll)] + pub(crate) use self::selector::IoSourceState; } cfg_os_ext! { diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 3ccbdeadf..16ff5c010 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,40 +1,58 @@ -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] mod epoll; -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(mio_unsupported_force_poll_poll)] +mod poll; + +#[cfg(mio_unsupported_force_poll_poll)] +pub(crate) use self::poll::{event, Event, Events, Selector, IoSourceState}; + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ) ))] mod kqueue; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ), ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..537031132 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,718 @@ +// This implementation is based on the one in the `polling` crate. +// Thanks to https://github.com/Kestrer for the original implementation! +// Permission to use this code has been granted by original author: +// https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 + +use crate::{Interest, Token}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::os::unix::io::RawFd; +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; +use std::{fmt, io}; +use std::os::fd::AsRawFd; +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +pub struct Selector { + state: Arc, + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + let state = SelectorState::new()?; + + Ok(Selector { + state: Arc::new(state), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + // Just to keep the compiler happy :) + let _ = LOWEST_FD; + + let state = self.state.clone(); + + Ok(Selector { + state, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(fd, token, interests) + } + + #[allow(dead_code)] + pub(crate) fn register_internal(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result> { + self.state.register_internal(fd, token, interests) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.reregister(fd, token, interests) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.state.deregister(fd) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + self.state.wake(token) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.state.id + } + } +} + +/// Interface to poll. +#[derive(Debug)] +struct SelectorState { + /// File descriptors to poll. + fds: Mutex, + + /// File descriptors which will be removed before the next poll call. + /// + /// When a file descriptor is deregistered while a poll is running, we need to filter + /// out all removed descriptors after that poll is finished running. + pending_removal: Mutex>, + + /// Tokens associated with Wakers that have recently asked to wake. This will + /// cause a synthetic behaviour where on any wakeup we add all pending tokens + /// to the list of emitted events. + pending_wake_tokens: Mutex>, + + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). + notify_waker: WakerInternal, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, + + /// This selectors id. + #[cfg(debug_assertions)] + #[allow(dead_code)] + id: usize, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug, Clone)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors created by the internal notify waker. + fd_data: HashMap, +} + +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +#[derive(Clone)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug, Clone)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + token: Token, + /// Used to communicate with IoSourceState when we need to internally deregister + /// based on a closed fd. + shared_record: Arc, +} + +impl SelectorState { + pub fn new() -> io::Result { + let notify_waker = WakerInternal::new()?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![PollFd(libc::pollfd { + fd: notify_waker.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + })], + fd_data: HashMap::new(), + }), + pending_removal: Mutex::new(Vec::new()), + pending_wake_tokens: Mutex::new(Vec::new()), + notify_waker, + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let deadline = timeout.map(|t| Instant::now() + t); + + events.clear(); + + let mut fds = self.fds.lock().unwrap(); + let mut closed_raw_fds = Vec::new(); + + loop { + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Perform the poll. + log::trace!("Polling on {:?}", fds); + let num_events = poll(&mut fds.poll_fds, deadline)?; + if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { + // timeout + return Ok(()); + } + + log::trace!("Poll finished: {:?}", fds); + let notified_events = fds.poll_fds[0].0.revents; + let notified = notified_events != 0; + let mut num_fd_events = if notified { num_events - 1 } else { num_events }; + + let mut pending_wake_tokens_guard = self.pending_wake_tokens.lock().unwrap(); + let pending_wake_tokens = std::mem::replace(pending_wake_tokens_guard.as_mut(), Vec::new()); + drop(pending_wake_tokens_guard); + + if notified { + self.notify_waker.ack_and_reset(); + num_fd_events += pending_wake_tokens.len(); + } + + // We now check whether this poll was performed with descriptors which were pending + // for removal and filter out any matching. + let mut pending_removal_guard = self.pending_removal.lock().unwrap(); + let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + drop(pending_removal_guard); + + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + + for pending_wake_token in pending_wake_tokens { + events.push(Event { + token: pending_wake_token, + events: notified_events, + }); + } + + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + + if pending_removal.contains(&poll_fd.fd) { + // Fd was removed while poll was running + continue; + } + + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { + pending_removal.push(poll_fd.fd); + closed_raw_fds.push(poll_fd.fd); + } + + // Remove the interest which just got triggered + // the IoSourceState/WakerRegistrar used with this selector will add back + // the interest using reregister. + poll_fd.events &= !poll_fd.revents; + + if events.len() == num_fd_events { + break; + } + } + } + + break; + } + } + + drop(fds); + let _ = self.deregister_all(&closed_raw_fds); + + Ok(()) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.register_internal(fd, token, interests).map(|_| ()) + } + + pub fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { + #[cfg(debug_assertions)] + if fd == self.notify_waker.as_raw_fd() { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // We must handle the unlikely case that the following order of operations happens: + // + // register(1 as RawFd) + // deregister(1 as RawFd) + // register(1 as RawFd) + // + // + // Fd's pending removal only get cleared when poll has been run. It is possible that + // between registering and deregistering and then _again_ registering the file descriptor + // poll never gets called, thus the fd stays stuck in the pending removal list. + // + // To avoid this scenario we remove an fd from pending removals when registering it. + let mut pending_removal = self.pending_removal.lock().unwrap(); + if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { + pending_removal.remove(idx); + } + drop(pending_removal); + + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "\ + same file descriptor registered twice for polling \ + (an old file descriptor might have been closed without deregistration)\ + ", + )); + } + + let poll_fds_index = fds.poll_fds.len(); + let record = Arc::new(RegistrationRecord::new()); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + token, + shared_record: record.clone(), + }, + ); + + fds.poll_fds.push(PollFd(libc::pollfd { + fd, + events: interests_to_poll(interests), + revents: 0, + })); + + Ok(record) + }) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.token = token; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); + + Ok(()) + }) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.deregister_all(&[fd]) + .map_err(|_| io::ErrorKind::NotFound)?; + Ok(()) + } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> T) -> T { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_waker.wake().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our notification was not removed from the pipe. + if sent_notification { + self.notify_waker.ack_and_reset(); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Special optimized version of [Self::deregister] which handles multiple removals + /// at once. Ok result if all removals were performed, Err if any entries + /// were not found. + fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> { + if targets.is_empty() { + return Ok(()); + } + + let mut pending_removal = self.pending_removal.lock().unwrap(); + pending_removal.extend(targets); + drop(pending_removal); + + self.modify_fds(|fds| { + for target in targets { + let data = fds.fd_data.remove(&target).ok_or(())?; + data.shared_record.mark_unregistered(); + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + } + + Ok(()) + }) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + let mut pending_wake_tokens = self.pending_wake_tokens.lock().unwrap(); + pending_wake_tokens.push(token); + drop(pending_wake_tokens); + self.notify_waker.wake() + } +} + +/// Shared record between IoSourceState and SelectorState that allows us to internally +/// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without +/// confusing IoSourceState and trying to deregister twice. This isn't strictly +/// required as technically deregister is idempotent but it is confusing +/// when trying to debug behaviour as we get imbalanced calls to register/deregister and +/// superfluous NotFound errors. +#[derive(Debug)] +pub(crate) struct RegistrationRecord { + is_unregistered: AtomicBool, +} + +impl RegistrationRecord { + pub fn new() -> Self { + Self { is_unregistered: AtomicBool::new(false) } + } + + pub fn mark_unregistered(&self) { + self.is_unregistered.store(true, Ordering::Relaxed); + } + + #[allow(dead_code)] + pub fn is_registered(&self) -> bool { + !self.is_unregistered.load(Ordering::Relaxed) + } +} + +#[cfg(target_os = "linux")] +const POLLRDHUP: libc::c_short = libc::POLLRDHUP; +#[cfg(not(target_os = "linux"))] +const POLLRDHUP: libc::c_short = 0; + +const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP; + +const WRITE_EVENTS: libc::c_short = libc::POLLOUT; + +const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI; + +/// Get the input poll events for the given event. +fn interests_to_poll(interest: Interest) -> libc::c_short { + let mut kind = 0; + + if interest.is_readable() { + kind |= READ_EVENTS; + } + + if interest.is_writable() { + kind |= WRITE_EVENTS; + } + + if interest.is_priority() { + kind |= PRIORITY_EVENTS; + } + + kind +} + +/// Helper function to call poll. +fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { + loop { + // Convert the timeout to milliseconds. + let timeout_ms = deadline + .map(|deadline| { + let timeout = deadline.saturating_duration_since(Instant::now()); + + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(u64::MAX); + if Duration::from_millis(ms) < timeout { + ms = ms.saturating_add(1); + } + ms.try_into().unwrap_or(i32::MAX) + }) + .unwrap_or(-1); + + let res = syscall!(poll( + fds.as_mut_ptr() as *mut libc::pollfd, + fds.len() as libc::nfds_t, + timeout_ms, + )); + + match res { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, + Err(e) => return Err(e), + } + } +} + +#[derive(Debug, Clone)] +pub struct Event { + token: Token, + events: libc::c_short, +} + +pub type Events = Vec; + +pub mod event { + use crate::sys::Event; + use crate::Token; + use std::fmt; + use crate::sys::unix::selector::poll::POLLRDHUP; + + pub fn token(event: &Event) -> Token { + event.token + } + + pub fn is_readable(event: &Event) -> bool { + (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events & libc::POLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || (event.events & POLLRDHUP) != 0 + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Unix pipe write end has closed + || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) + // The other side (read end) of a Unix pipe has closed. + || (event.events == libc::POLLERR) + } + + pub fn is_priority(event: &Event) -> bool { + (event.events & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { + (*got & want) != 0 + } + debug_detail!( + EventsDetails(libc::c_short), + check_events, + libc::POLLIN, + libc::POLLPRI, + libc::POLLOUT, + libc::POLLRDNORM, + libc::POLLRDBAND, + libc::POLLWRNORM, + libc::POLLWRBAND, + libc::POLLERR, + libc::POLLHUP, + ); + + // Can't reference fields in packed structures. + let e_u64 = event.token.0; + f.debug_struct("epoll_event") + .field("events", &EventsDetails(event.events)) + .field("u64", &e_u64) + .finish() + } +} + +cfg_io_source! { + use crate::Registry; + + struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + fd: RawFd, + shared_record: Arc, + } + + impl Drop for InternalState { + fn drop(&mut self) { + if self.shared_record.is_registered() { + let _ = self.selector.deregister(self.fd); + } + } + } + + pub(crate) struct IoSourceState { + inner: Option>, + } + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + if let Err(err) = &result { + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } + } + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = registry.selector().try_clone()?; + + selector.register_internal(fd, token, interests).map(move |shared_record| { + let state = InternalState { + selector, + token, + interests, + fd, + shared_record, + }; + + self.inner = Some(Box::new(state)); + }) + } + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => registry + .selector() + .reregister(fd, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }), + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(state) = self.inner.take() { + // Marking unregistered will short circuit the drop behaviour of calling + // deregister so the call to deregister below is strictly required. + state.shared_record.mark_unregistered(); + } + + registry.selector().deregister(fd) + } + } +} \ No newline at end of file diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 65002d690..e22b8ce8a 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,13 +1,77 @@ #[cfg(all( - not(mio_unsupported_force_waker_pipe), - any(target_os = "linux", target_os = "android") + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) ))] -mod eventfd { +mod fdbased { + use std::io; + use std::os::fd::AsRawFd; use crate::sys::Selector; + #[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android"), + ))] + use crate::sys::unix::waker::eventfd::WakerInternal; + #[cfg(any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ))] + use crate::sys::unix::waker::pipe::WakerInternal; use crate::{Interest, Token}; + #[derive(Debug)] + pub struct Waker { + waker: WakerInternal, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + let waker = WakerInternal::new()?; + selector.register(waker.as_raw_fd(), token, Interest::READABLE)?; + Ok(Waker { waker }) + } + + pub fn wake(&self) -> io::Result<()> { + self.waker.wake() + } + } +} + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) +))] +pub use self::fdbased::Waker; + +#[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android") +))] +mod eventfd { use std::fs::File; use std::io::{self, Read, Write}; + use std::os::fd::{AsRawFd, RawFd}; use std::os::unix::io::FromRawFd; /// Waker backed by `eventfd`. @@ -17,17 +81,15 @@ mod eventfd { /// unsigned integer and added to the count. Reads must also be 8 bytes and /// reset the count to 0, returning the count. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { fd: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; let file = unsafe { File::from_raw_fd(fd) }; - - selector.register(fd, token, Interest::READABLE)?; - Ok(Waker { fd: file }) + Ok(WakerInternal { fd: file }) } pub fn wake(&self) -> io::Result<()> { @@ -44,6 +106,11 @@ mod eventfd { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + let _ = self.reset(); + } + /// Reset the eventfd object, only need to call this if `wake` fails. fn reset(&self) -> io::Result<()> { let mut buf: [u8; 8] = 0u64.to_ne_bytes(); @@ -56,13 +123,20 @@ mod eventfd { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } + } } #[cfg(all( + mio_unsupported_force_poll_poll, not(mio_unsupported_force_waker_pipe), any(target_os = "linux", target_os = "android") ))] -pub use self::eventfd::Waker; +pub(crate) use self::eventfd::WakerInternal; #[cfg(all( not(mio_unsupported_force_waker_pipe), @@ -126,11 +200,9 @@ pub use self::kqueue::Waker; target_os = "redox", ))] mod pipe { - use crate::sys::unix::Selector; - use crate::{Interest, Token}; - use std::fs::File; use std::io::{self, Read, Write}; + use std::os::fd::{AsRawFd, RawFd}; use std::os::unix::io::FromRawFd; /// Waker backed by a unix pipe. @@ -138,20 +210,19 @@ mod pipe { /// Waker controls both the sending and receiving ends and empties the pipe /// if writing to it (waking) fails. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { sender: File, receiver: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let mut fds = [-1; 2]; syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector.register(fds[0], token, Interest::READABLE)?; - Ok(Waker { sender, receiver }) + Ok(WakerInternal { sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -174,6 +245,11 @@ mod pipe { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + self.empty(); + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. /// This ignores any errors. fn empty(&self) { @@ -186,14 +262,49 @@ mod pipe { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.receiver.as_raw_fd() + } + } } -#[cfg(any( - mio_unsupported_force_waker_pipe, - target_os = "dragonfly", - target_os = "illumos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox", +#[cfg(all( + mio_unsupported_force_poll_poll, + any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ) ))] -pub use self::pipe::Waker; +pub(crate) use self::pipe::WakerInternal; + +#[cfg(mio_unsupported_force_poll_poll)] +mod poll { + use std::io; + use crate::sys::Selector; + use crate::Token; + + #[derive(Debug)] + pub struct Waker { + selector: Selector, + token: Token, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + Ok(Waker { selector: selector.try_clone()?, token }) + } + + pub fn wake(&self) -> io::Result<()> { + self.selector.wake(self.token) + } + } +} + +#[cfg(mio_unsupported_force_poll_poll)] +pub use self::poll::Waker; diff --git a/tests/aio.rs b/tests/aio.rs index b8c1b47b0..19ea02003 100644 --- a/tests/aio.rs +++ b/tests/aio.rs @@ -1,4 +1,7 @@ -#![cfg(any(target_os = "freebsd", target_os = "dragonfly"))] +#![cfg(all( + not(mio_unsupported_force_poll_poll), + any(target_os = "freebsd", target_os = "dragonfly"), +))] #![cfg(all(feature = "os-poll", feature = "net"))] use mio::{event::Source, Events, Interest, Poll, Registry, Token}; diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index fcab0a057..babf18df9 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -451,6 +451,8 @@ where assert!(stream.take_error().unwrap().is_none()); + assert_would_block(stream.read(&mut buf)); + let bufs = [IoSlice::new(DATA1), IoSlice::new(DATA2)]; let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN); From 496bc0e368e7062179ed745326d79300dd1fd342 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Mon, 24 Jul 2023 16:55:47 -0700 Subject: [PATCH 02/24] Fixed syntax error in ci workflow --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4cbb920cd..f95e0cebb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 10 env: - RUSTFLAGS="--cfg mio_unsupported_force_poll_poll" + RUSTFLAGS: "--cfg mio_unsupported_force_poll_poll" steps: - uses: actions/checkout@v3 - uses: dtolnay/rust-toolchain@stable From 28895574efce1a9ee271ec9426a825691a10cfe7 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Tue, 25 Jul 2023 10:17:56 -0700 Subject: [PATCH 03/24] Fix build break with --no-default-features --features net --- src/sys/shell/mod.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index 8a3175f76..e017bb54f 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -23,8 +23,9 @@ cfg_io_source! { use std::io; #[cfg(windows)] use std::os::windows::io::RawSocket; + #[cfg(not(windows))] + use std::os::unix::io::RawFd; - #[cfg(windows)] use crate::{Registry, Token, Interest}; pub(crate) struct IoSourceState; @@ -44,6 +45,33 @@ cfg_io_source! { } } + #[cfg(not(windows))] + impl IoSourceState { + pub fn register( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + + pub fn reregister( + &mut self, + _: &Registry, + _: Token, + _: Interest, + _: RawFd, + ) -> io::Result<()> { + os_required!() + } + + pub fn deregister(&mut self, _: &Registry, _: RawFd) -> io::Result<()> { + os_required!() + } + } + #[cfg(windows)] impl IoSourceState { pub fn register( From 1813a63748b6e21c82af002e55c5d0355b96a85a Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Tue, 25 Jul 2023 10:22:21 -0700 Subject: [PATCH 04/24] Fix mio_unsupported_force_poll_poll release builds --- src/sys/unix/selector/poll.rs | 1 - tests/tcp.rs | 2 +- tests/udp_socket.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 537031132..cf4c12f9a 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -8,7 +8,6 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fmt::{Debug, Formatter}; use std::os::unix::io::RawFd; -#[cfg(debug_assertions)] use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; diff --git a/tests/tcp.rs b/tests/tcp.rs index 4b783bcb0..82ed6bcbf 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -21,7 +21,7 @@ const CLIENT: Token = Token(1); const SERVER: Token = Token(2); #[test] -#[cfg(all(unix, not(debug_assertions)))] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll), not(debug_assertions)))] fn assert_size() { use mio::net::*; use std::mem::size_of; diff --git a/tests/udp_socket.rs b/tests/udp_socket.rs index 79b5d0312..2a3a20c9b 100644 --- a/tests/udp_socket.rs +++ b/tests/udp_socket.rs @@ -30,7 +30,7 @@ const ID2: Token = Token(3); const ID3: Token = Token(4); #[test] -#[cfg(all(unix, not(debug_assertions)))] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll), not(debug_assertions)))] fn assert_size() { use mio::net::*; use std::mem::size_of; From cfc13bc6bc3e8a8bb3121455fc44409ab2100070 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Tue, 25 Jul 2023 10:27:07 -0700 Subject: [PATCH 05/24] Fix non-conforming usage of log::trace --- src/sys/unix/selector/poll.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index cf4c12f9a..d49cf3931 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -213,14 +213,14 @@ impl SelectorState { } // Perform the poll. - log::trace!("Polling on {:?}", fds); + trace!("Polling on {:?}", &fds); let num_events = poll(&mut fds.poll_fds, deadline)?; if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { // timeout return Ok(()); } - log::trace!("Poll finished: {:?}", fds); + trace!("Poll finished: {:?}", &fds); let notified_events = fds.poll_fds[0].0.revents; let notified = notified_events != 0; let mut num_fd_events = if notified { num_events - 1 } else { num_events }; From ca576a50c16c70c68c5210c226c9fd9470501c62 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Tue, 25 Jul 2023 10:27:42 -0700 Subject: [PATCH 06/24] Run rustfmt --- src/io_source.rs | 6 ++++-- src/sys/unix/selector/mod.rs | 2 +- src/sys/unix/selector/poll.rs | 24 ++++++++++++++++-------- src/sys/unix/waker.rs | 13 ++++++++----- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/io_source.rs b/src/io_source.rs index cfbcbe99f..06dc5e17e 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,7 +142,8 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - self.state.register(registry, token, interests, self.inner.as_raw_fd()) + self.state + .register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -153,7 +154,8 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - self.state.reregister(registry, token, interests, self.inner.as_raw_fd()) + self.state + .reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 16ff5c010..7fa73b8d6 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -24,7 +24,7 @@ pub(crate) use self::epoll::{event, Event, Events, Selector}; mod poll; #[cfg(mio_unsupported_force_poll_poll)] -pub(crate) use self::poll::{event, Event, Events, Selector, IoSourceState}; +pub(crate) use self::poll::{event, Event, Events, IoSourceState, Selector}; #[cfg(all( not(mio_unsupported_force_poll_poll), diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index d49cf3931..3aaaf67d5 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -3,19 +3,19 @@ // Permission to use this code has been granted by original author: // https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; use crate::{Interest, Token}; use std::collections::HashMap; use std::convert::TryInto; use std::fmt::{Debug, Formatter}; +use std::os::fd::AsRawFd; use std::os::unix::io::RawFd; use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use std::{fmt, io}; -use std::os::fd::AsRawFd; -use crate::sys::unix::selector::LOWEST_FD; -use crate::sys::unix::waker::WakerInternal; /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] @@ -62,7 +62,12 @@ impl Selector { } #[allow(dead_code)] - pub(crate) fn register_internal(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result> { + pub(crate) fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { self.state.register_internal(fd, token, interests) } @@ -226,7 +231,8 @@ impl SelectorState { let mut num_fd_events = if notified { num_events - 1 } else { num_events }; let mut pending_wake_tokens_guard = self.pending_wake_tokens.lock().unwrap(); - let pending_wake_tokens = std::mem::replace(pending_wake_tokens_guard.as_mut(), Vec::new()); + let pending_wake_tokens = + std::mem::replace(pending_wake_tokens_guard.as_mut(), Vec::new()); drop(pending_wake_tokens_guard); if notified { @@ -449,7 +455,9 @@ pub(crate) struct RegistrationRecord { impl RegistrationRecord { pub fn new() -> Self { - Self { is_unregistered: AtomicBool::new(false) } + Self { + is_unregistered: AtomicBool::new(false), + } } pub fn mark_unregistered(&self) { @@ -533,10 +541,10 @@ pub struct Event { pub type Events = Vec; pub mod event { + use crate::sys::unix::selector::poll::POLLRDHUP; use crate::sys::Event; use crate::Token; use std::fmt; - use crate::sys::unix::selector::poll::POLLRDHUP; pub fn token(event: &Event) -> Token { event.token @@ -714,4 +722,4 @@ cfg_io_source! { registry.selector().deregister(fd) } } -} \ No newline at end of file +} diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index e22b8ce8a..ba78fccbd 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -12,9 +12,6 @@ )) ))] mod fdbased { - use std::io; - use std::os::fd::AsRawFd; - use crate::sys::Selector; #[cfg(all( not(mio_unsupported_force_waker_pipe), any(target_os = "linux", target_os = "android"), @@ -29,7 +26,10 @@ mod fdbased { target_os = "redox", ))] use crate::sys::unix::waker::pipe::WakerInternal; + use crate::sys::Selector; use crate::{Interest, Token}; + use std::io; + use std::os::fd::AsRawFd; #[derive(Debug)] pub struct Waker { @@ -285,9 +285,9 @@ pub(crate) use self::pipe::WakerInternal; #[cfg(mio_unsupported_force_poll_poll)] mod poll { - use std::io; use crate::sys::Selector; use crate::Token; + use std::io; #[derive(Debug)] pub struct Waker { @@ -297,7 +297,10 @@ mod poll { impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { - Ok(Waker { selector: selector.try_clone()?, token }) + Ok(Waker { + selector: selector.try_clone()?, + token, + }) } pub fn wake(&self) -> io::Result<()> { From ad3cc9697d818b4b76ae349206f5e9392f77242a Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:04:24 -0700 Subject: [PATCH 07/24] Update src/sys/unix/selector/poll.rs Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 3aaaf67d5..e99f48fa9 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -87,10 +87,7 @@ impl Selector { pub fn wake(&self, token: Token) -> io::Result<()> { self.state.wake(token) } -} - -cfg_io_source! { - impl Selector { + cfg_io_source! { #[cfg(debug_assertions)] pub fn id(&self) -> usize { self.state.id From 4b8a9354a91398bf4c182901dcc5fd4f3074a6c0 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:23:52 -0700 Subject: [PATCH 08/24] poll: Optimize register corner case Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index e99f48fa9..6cdfd3404 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -326,7 +326,7 @@ impl SelectorState { // To avoid this scenario we remove an fd from pending removals when registering it. let mut pending_removal = self.pending_removal.lock().unwrap(); if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { - pending_removal.remove(idx); + pending_removal.swap_remove(idx); } drop(pending_removal); From 037a538823457842f74fe0437e3171981dd44e13 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:24:31 -0700 Subject: [PATCH 09/24] poll: Slight change to error message consistency Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 6cdfd3404..ae445a191 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -334,10 +334,8 @@ impl SelectorState { if fds.fd_data.contains_key(&fd) { return Err(io::Error::new( io::ErrorKind::AlreadyExists, - "\ - same file descriptor registered twice for polling \ - (an old file descriptor might have been closed without deregistration)\ - ", + "I/O source already registered this `Registry` \ + (an old file descriptor might have been closed without deregistration)", )); } From 5fda03c23d46e13ed55e66d95838e06ce63dbb87 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:25:08 -0700 Subject: [PATCH 10/24] poll: Fix event copy/paste debug_details formatting Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index ae445a191..908753688 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -606,11 +606,9 @@ pub mod event { libc::POLLHUP, ); - // Can't reference fields in packed structures. - let e_u64 = event.token.0; - f.debug_struct("epoll_event") + f.debug_struct("poll_event") + .field("token", &event.token) .field("events", &EventsDetails(event.events)) - .field("u64", &e_u64) .finish() } } From f7d08856c8ea0f3932eb799c7ab1080d424be21c Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:25:25 -0700 Subject: [PATCH 11/24] poll: Manually fix missing rustfmt case Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 908753688..c5491c5ee 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -648,14 +648,14 @@ cfg_io_source! { let result = f(io); if let Err(err) = &result { - if err.kind() == io::ErrorKind::WouldBlock { - self.inner.as_ref().map_or(Ok(()), |state| { - state - .selector - .reregister(state.fd, state.token, state.interests) - })?; + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } } - } result } From a4c72a8f75eb232efd6c3b21ae5cc7048f0f0939 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:14:06 -0700 Subject: [PATCH 12/24] poll: Fix try_clone not carrying has_waker state forward --- src/sys/unix/selector/poll.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index c5491c5ee..bc5b32fb0 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -49,7 +49,7 @@ impl Selector { Ok(Selector { state, #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } From 1de715cb653095c38bcdd1cf60b68e087c78e51f Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:21:02 -0700 Subject: [PATCH 13/24] poll: Simplify and optimize pending_wake_tokens to use a singular Token --- src/sys/unix/selector/poll.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index bc5b32fb0..7bec9a70a 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -107,10 +107,10 @@ struct SelectorState { /// out all removed descriptors after that poll is finished running. pending_removal: Mutex>, - /// Tokens associated with Wakers that have recently asked to wake. This will + /// Token associated with Waker that have recently asked to wake. This will /// cause a synthetic behaviour where on any wakeup we add all pending tokens /// to the list of emitted events. - pending_wake_tokens: Mutex>, + pending_wake_token: Mutex>, /// Data is written to this to wake up the current instance of `wait`, which can occur when the /// user notifies it (in which case `notified` would have been set) or when an operation needs @@ -187,7 +187,7 @@ impl SelectorState { fd_data: HashMap::new(), }), pending_removal: Mutex::new(Vec::new()), - pending_wake_tokens: Mutex::new(Vec::new()), + pending_wake_token: Mutex::new(None), notify_waker, waiting_operations: AtomicUsize::new(0), operations_complete: Condvar::new(), @@ -227,14 +227,13 @@ impl SelectorState { let notified = notified_events != 0; let mut num_fd_events = if notified { num_events - 1 } else { num_events }; - let mut pending_wake_tokens_guard = self.pending_wake_tokens.lock().unwrap(); - let pending_wake_tokens = - std::mem::replace(pending_wake_tokens_guard.as_mut(), Vec::new()); - drop(pending_wake_tokens_guard); + let pending_wake_token = self.pending_wake_token.lock().unwrap().take(); if notified { self.notify_waker.ack_and_reset(); - num_fd_events += pending_wake_tokens.len(); + if pending_wake_token.is_some() { + num_fd_events += 1; + } } // We now check whether this poll was performed with descriptors which were pending @@ -249,7 +248,7 @@ impl SelectorState { events.reserve(num_fd_events); - for pending_wake_token in pending_wake_tokens { + if let Some(pending_wake_token) = pending_wake_token { events.push(Event { token: pending_wake_token, events: notified_events, @@ -430,9 +429,7 @@ impl SelectorState { } pub fn wake(&self, token: Token) -> io::Result<()> { - let mut pending_wake_tokens = self.pending_wake_tokens.lock().unwrap(); - pending_wake_tokens.push(token); - drop(pending_wake_tokens); + self.pending_wake_token.lock().unwrap().replace(token); self.notify_waker.wake() } } From 0ce239edf5a88a291ec87d3a8d90bcb0d73fab4e Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:33:43 -0700 Subject: [PATCH 14/24] poll: No longer special case poll returning 0 and redundantly checking for an expired timeout --- src/sys/unix/selector/poll.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 7bec9a70a..5e01ad607 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -217,12 +217,12 @@ impl SelectorState { // Perform the poll. trace!("Polling on {:?}", &fds); let num_events = poll(&mut fds.poll_fds, deadline)?; - if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { - // timeout + trace!("Poll finished: {:?}", &fds); + + if num_events == 0 { return Ok(()); } - trace!("Poll finished: {:?}", &fds); let notified_events = fds.poll_fds[0].0.revents; let notified = notified_events != 0; let mut num_fd_events = if notified { num_events - 1 } else { num_events }; From e15d449c51e5d125e68efecb324ee48e99b2137b Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:38:32 -0700 Subject: [PATCH 15/24] poll: Copy timeout determination logic from epoll impl --- src/sys/unix/selector/poll.rs | 43 +++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 5e01ad607..1c3802924 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -7,15 +7,14 @@ use crate::sys::unix::selector::LOWEST_FD; use crate::sys::unix::waker::WakerInternal; use crate::{Interest, Token}; use std::collections::HashMap; -use std::convert::TryInto; use std::fmt::{Debug, Formatter}; use std::os::fd::AsRawFd; use std::os::unix::io::RawFd; use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; -use std::time::{Duration, Instant}; -use std::{fmt, io}; +use std::time::Duration; +use std::{cmp, fmt, io}; /// Unique id for use as `SelectorId`. #[cfg(debug_assertions)] @@ -197,8 +196,6 @@ impl SelectorState { } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - let deadline = timeout.map(|t| Instant::now() + t); - events.clear(); let mut fds = self.fds.lock().unwrap(); @@ -216,7 +213,7 @@ impl SelectorState { // Perform the poll. trace!("Polling on {:?}", &fds); - let num_events = poll(&mut fds.poll_fds, deadline)?; + let num_events = poll(&mut fds.poll_fds, timeout)?; trace!("Poll finished: {:?}", &fds); if num_events == 0 { @@ -493,26 +490,34 @@ fn interests_to_poll(interest: Interest) -> libc::c_short { } /// Helper function to call poll. -fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { +fn poll(fds: &mut [PollFd], timeout: Option) -> io::Result { loop { - // Convert the timeout to milliseconds. - let timeout_ms = deadline - .map(|deadline| { - let timeout = deadline.saturating_duration_since(Instant::now()); - - // Round up to a whole millisecond. - let mut ms = timeout.as_millis().try_into().unwrap_or(u64::MAX); - if Duration::from_millis(ms) < timeout { - ms = ms.saturating_add(1); - } - ms.try_into().unwrap_or(i32::MAX) + // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ + // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits + // architectures. The magic number is the same constant used by libuv. + #[cfg(target_pointer_width = "32")] + const MAX_SAFE_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128; + + let timeout = timeout + .map(|to| { + // `Duration::as_millis` truncates, so round up. This avoids + // turning sub-millisecond timeouts into a zero timeout, unless + // the caller explicitly requests that by specifying a zero + // timeout. + let to_ms = to + .checked_add(Duration::from_nanos(999_999)) + .unwrap_or(to) + .as_millis(); + cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int }) .unwrap_or(-1); let res = syscall!(poll( fds.as_mut_ptr() as *mut libc::pollfd, fds.len() as libc::nfds_t, - timeout_ms, + timeout, )); match res { From c61d221e6d200df832f9fe611d62deba99c03b54 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:50:53 -0700 Subject: [PATCH 16/24] poll: Clarify nested poll loop behaviour slightly Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 1c3802924..42aa91635 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -285,6 +285,8 @@ impl SelectorState { break; } + + // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again. } drop(fds); From fa88f19fb227dcad9675b3ee7fb419cc5e9b6b8b Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:52:22 -0700 Subject: [PATCH 17/24] poll: Further clarify nested poll loop behaviour Co-authored-by: Thomas de Zeeuw --- src/sys/unix/selector/poll.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 42aa91635..47afdd8f1 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -283,7 +283,7 @@ impl SelectorState { } } - break; + break; // No more polling. } // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again. From 185dbf597455fe23f6cf1d7754b22602e36c7d1e Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:48:27 -0700 Subject: [PATCH 18/24] poll: rename notified_events for consistency --- src/sys/unix/selector/poll.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 47afdd8f1..0d96cd0a0 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -220,8 +220,8 @@ impl SelectorState { return Ok(()); } - let notified_events = fds.poll_fds[0].0.revents; - let notified = notified_events != 0; + let waker_events = fds.poll_fds[0].0.revents; + let notified = waker_events != 0; let mut num_fd_events = if notified { num_events - 1 } else { num_events }; let pending_wake_token = self.pending_wake_token.lock().unwrap().take(); @@ -248,7 +248,7 @@ impl SelectorState { if let Some(pending_wake_token) = pending_wake_token { events.push(Event { token: pending_wake_token, - events: notified_events, + events: waker_events, }); } From 71c635f63d5d9badbc55ad0829231220017f6194 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Wed, 26 Jul 2023 12:51:54 -0700 Subject: [PATCH 19/24] poll: Minor documentation for pending_wake_token behaviour --- src/sys/unix/selector/poll.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 0d96cd0a0..6a6583de8 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -245,6 +245,7 @@ impl SelectorState { events.reserve(num_fd_events); + // Add synthetic events we picked up from calls to wake() if let Some(pending_wake_token) = pending_wake_token { events.push(Event { token: pending_wake_token, From 0970313c82b81c2e8b962ac6650efcbc16dd46a2 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Sun, 30 Jul 2023 12:16:27 -0700 Subject: [PATCH 20/24] poll: Document design choice quirks in comments --- src/sys/unix/selector/poll.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 6a6583de8..f5f4d5cdd 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -199,6 +199,11 @@ impl SelectorState { events.clear(); let mut fds = self.fds.lock().unwrap(); + + // Keep track of fds that receive POLLHUP or POLLERR (i.e. won't receive further + // events) and internally deregister them before they are externally deregister'd. See + // IoSourceState below to track how the external deregister call will be handled + // when this state occurs. let mut closed_raw_fds = Vec::new(); loop { @@ -278,6 +283,9 @@ impl SelectorState { // the interest using reregister. poll_fd.events &= !poll_fd.revents; + // Minor optimization to potentially avoid looping n times where n is the + // number of input fds (i.e. we might loop between m and n times where m is + // the number of fds with revents != 0). if events.len() == num_fd_events { break; } From fc0a15397580c2557a95ff266798ef9ddb6386a2 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Sun, 30 Jul 2023 12:16:54 -0700 Subject: [PATCH 21/24] poll: Avoid early return on error in deregister_all --- src/sys/unix/selector/poll.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index f5f4d5cdd..e546f70f0 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -420,19 +420,25 @@ impl SelectorState { drop(pending_removal); self.modify_fds(|fds| { + let mut all_successful = true; + for target in targets { - let data = fds.fd_data.remove(&target).ok_or(())?; - data.shared_record.mark_unregistered(); - fds.poll_fds.swap_remove(data.poll_fds_index); - if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { - fds.fd_data - .get_mut(&swapped_pollfd.0.fd) - .unwrap() - .poll_fds_index = data.poll_fds_index; + match fds.fd_data.remove(target).ok_or(()) { + Ok(data) => { + data.shared_record.mark_unregistered(); + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + } + Err(_) => all_successful = false, } } - Ok(()) + if all_successful { Ok(()) } else { Err(()) } }) } From 8570ec9cdc6c378a9a9292799ce8bffa550d7d47 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Sun, 30 Jul 2023 12:31:41 -0700 Subject: [PATCH 22/24] poll: Rerun rustfmt --- src/sys/unix/selector/poll.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index e546f70f0..917052773 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -294,7 +294,7 @@ impl SelectorState { break; // No more polling. } - + // If we didn't break above it means we got woken up internally (for example for adding an fd), so we poll again. } @@ -438,7 +438,11 @@ impl SelectorState { } } - if all_successful { Ok(()) } else { Err(()) } + if all_successful { + Ok(()) + } else { + Err(()) + } }) } From 189a117ddfe61f3a7908711fdc9bc34e778caa03 Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Sun, 30 Jul 2023 12:42:50 -0700 Subject: [PATCH 23/24] poll: Fix target_os="wasi" compilation error --- src/sys/shell/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sys/shell/mod.rs b/src/sys/shell/mod.rs index e017bb54f..76085b8a2 100644 --- a/src/sys/shell/mod.rs +++ b/src/sys/shell/mod.rs @@ -23,9 +23,10 @@ cfg_io_source! { use std::io; #[cfg(windows)] use std::os::windows::io::RawSocket; - #[cfg(not(windows))] + #[cfg(unix)] use std::os::unix::io::RawFd; + #[cfg(any(windows, unix))] use crate::{Registry, Token, Interest}; pub(crate) struct IoSourceState; @@ -45,7 +46,7 @@ cfg_io_source! { } } - #[cfg(not(windows))] + #[cfg(unix)] impl IoSourceState { pub fn register( &mut self, From a7c41ee6914193aeac46a384b70b3dadfd6fa03b Mon Sep 17 00:00:00 2001 From: Josh Guilfoyle Date: Sun, 30 Jul 2023 12:49:27 -0700 Subject: [PATCH 24/24] poll: Fix MSRV by not using std::os::fd --- src/sys/unix/selector/poll.rs | 3 +-- src/sys/unix/waker.rs | 8 +++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 917052773..ab2eea3c8 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -8,8 +8,7 @@ use crate::sys::unix::waker::WakerInternal; use crate::{Interest, Token}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; -use std::os::fd::AsRawFd; -use std::os::unix::io::RawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::atomic::AtomicBool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index ba78fccbd..b24fd5058 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -29,7 +29,7 @@ mod fdbased { use crate::sys::Selector; use crate::{Interest, Token}; use std::io; - use std::os::fd::AsRawFd; + use std::os::unix::io::AsRawFd; #[derive(Debug)] pub struct Waker { @@ -71,8 +71,7 @@ pub use self::fdbased::Waker; mod eventfd { use std::fs::File; use std::io::{self, Read, Write}; - use std::os::fd::{AsRawFd, RawFd}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by `eventfd`. /// @@ -202,8 +201,7 @@ pub use self::kqueue::Waker; mod pipe { use std::fs::File; use std::io::{self, Read, Write}; - use std::os::fd::{AsRawFd, RawFd}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by a unix pipe. ///