From dae7fc06b65769d3ea929da18a44f6bcd36c5789 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 17:05:11 +0200 Subject: [PATCH 01/32] Initial poll selector port from polling --- .gitignore | 3 +- src/poll.rs | 12 +- src/sys/unix/selector/mod.rs | 8 +- src/sys/unix/selector/poll.rs | 531 ++++++++++++++++++++++++++++++++++ 4 files changed, 545 insertions(+), 9 deletions(-) create mode 100644 src/sys/unix/selector/poll.rs diff --git a/.gitignore b/.gitignore index ac1542720..e5649029d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ -.cargo +.idea/ +cargo Cargo.lock target* libs diff --git a/src/poll.rs b/src/poll.rs index 289d6686c..acc65d6b7 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,7 +1,7 @@ use crate::{event, sys, Events, Interest, Token}; use log::trace; -#[cfg(unix)] -use std::os::unix::io::{AsRawFd, RawFd}; +// #[cfg(unix)] +// use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; @@ -412,12 +412,12 @@ impl Poll { } } -#[cfg(unix)] +/* #[cfg(unix)] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() } -} +} */ impl fmt::Debug for Poll { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -697,12 +697,12 @@ impl fmt::Debug for Registry { } } -#[cfg(unix)] +/* #[cfg(unix)] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() } -} +} */ cfg_os_poll! { #[cfg(unix)] diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 9ae4c1416..dacb8b5c1 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,7 +1,7 @@ #[cfg(any( target_os = "android", target_os = "illumos", - target_os = "linux", + // target_os = "linux", target_os = "redox", ))] mod epoll; @@ -9,7 +9,7 @@ mod epoll; #[cfg(any( target_os = "android", target_os = "illumos", - target_os = "linux", + // target_os = "linux", target_os = "redox", ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; @@ -34,6 +34,9 @@ mod kqueue; ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; +mod poll; +pub(crate) use self::poll::{event, Event, Events, Selector}; + /// Lowest file descriptor used in `Selector::try_clone`. /// /// # Notes @@ -42,4 +45,5 @@ pub(crate) use self::kqueue::{event, Event, Events, Selector}; /// blindly assume this to be true, which means using any one of those a select /// could result in some interesting and unexpected errors. Avoid that by using /// an fd that doesn't have a pre-determined usage. +#[allow(dead_code)] const LOWEST_FD: libc::c_int = 3; diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..8d92d11d1 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,531 @@ +use crate::{Interest, Token}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::os::unix::io::RawFd; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex}; +use std::time::{Duration, Instant}; +use std::{fmt, io}; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[cfg(target_os = "espidf")] +type NotifyType = u64; + +#[cfg(not(target_os = "espidf"))] +type NotifyType = u8; + +/// Interface to poll. +#[derive(Debug)] +pub struct Selector { + /// File descriptors to poll. + fds: Mutex, + + /// The file descriptor of the read half of the notify pipe. This is also stored as the first + /// file descriptor in `fds.poll_fds`. + notify_read: RawFd, + /// The file descriptor of the write half of the notify pipe. + /// + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). + notify_write: RawFd, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, + + /// This selectors id. + #[cfg(debug_assertions)] + id: usize, + + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug, Clone)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. It is also + /// stored in `notify_read`. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors `notify_read` or `notify_write`. + fd_data: HashMap, +} + +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +#[derive(Clone)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug, Clone)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + token: Token, +} + +impl Selector { + pub fn new() -> io::Result { + let notify_fds = Self::create_notify_fds()?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![PollFd(libc::pollfd { + fd: notify_fds[0], + events: libc::POLLRDNORM, + revents: 0, + })], + fd_data: HashMap::new(), + }), + notify_read: notify_fds[0], + notify_write: notify_fds[1], + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + fn create_notify_fds() -> io::Result<[libc::c_int; 2]> { + let mut notify_fd = [0, 0]; + + // Note that the eventfd() implementation in ESP-IDF deviates from the specification in the following ways: + // 1) The file descriptor is always in a non-blocking mode, as if EFD_NONBLOCK was passed as a flag; + // passing EFD_NONBLOCK or calling fcntl(.., F_GETFL/F_SETFL) on the eventfd() file descriptor is not supported + // 2) It always returns the counter value, even if it is 0. This is contrary to the specification which mandates + // that it should instead fail with EAGAIN + // + // (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway + // (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified + #[cfg(target_os = "espidf")] + { + extern "C" { + fn eventfd(initval: libc::c_uint, flags: libc::c_int) -> libc::c_int; + } + + let fd = unsafe { eventfd(0, 0) }; + if fd == -1 { + // TODO: Switch back to syscall! once + // https://github.com/rust-lang/libc/pull/2864 is published + return Err(std::io::ErrorKind::Other.into()); + } + + notify_fd[0] = fd; + notify_fd[1] = fd; + } + + #[cfg(not(target_os = "espidf"))] + { + syscall!(pipe(notify_fd.as_mut_ptr()))?; + + // Put the reading side into non-blocking mode. + let notify_read_flags = syscall!(fcntl(notify_fd[0], libc::F_GETFL))?; + + syscall!(fcntl( + notify_fd[0], + libc::F_SETFL, + notify_read_flags | libc::O_NONBLOCK + ))?; + } + + log::trace!( + "new: notify_read={}, notify_write={}", + notify_fd[0], + notify_fd[1] + ); + + Ok(notify_fd) + } + + pub fn try_clone(&self) -> io::Result { + let mut fds = self.modify_fds(|fds| Ok(fds.clone()))?; + + let notify_fds = Self::create_notify_fds()?; + + fds.poll_fds[0] = PollFd(libc::pollfd { + fd: notify_fds[0], + events: libc::POLLRDNORM, + revents: 0, + }); + + Ok(Self { + fds: Mutex::new(fds), + notify_read: notify_fds[0], + notify_write: notify_fds[1], + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + log::trace!( + "select: notify_read={}, timeout={:?}", + self.notify_read, + timeout + ); + + let deadline = timeout.map(|t| Instant::now() + t); + + events.clear(); + + let mut fds = self.fds.lock().unwrap(); + + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Perform the poll. + let num_events = poll(&mut fds.poll_fds, deadline)?; + let notified = fds.poll_fds[0].0.revents != 0; + let num_fd_events = if notified { num_events - 1 } else { num_events }; + log::trace!( + "new events: notify_read={}, num={}", + self.notify_read, + num_events + ); + log::trace!("fds = {:?}", fds); + + // Read all notifications. + if notified { + if self.notify_read != self.notify_write { + // When using the `pipe` syscall, we have to read all accumulated notifications in the pipe. + while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok() + { + } + } else { + // When using the `eventfd` syscall, it is OK to read just once, so as to clear the counter. + // In fact, reading in a loop will result in an endless loop on the ESP-IDF + // which is not following the specification strictly. + let _ = self.pop_notification(); + } + } + + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if events.len() == num_fd_events { + break; + } + } + } + } + + Ok(()) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + if fd == self.notify_read || fd == self.notify_write { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + log::trace!( + "register: notify_read={}, fd={}, token={:?}, interests={:?}", + self.notify_read, + fd, + token, + interests + ); + + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::from(io::ErrorKind::AlreadyExists)); + } + + let poll_fds_index = fds.poll_fds.len(); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + token, + }, + ); + + fds.poll_fds.push(PollFd(libc::pollfd { + fd, + events: interests_to_poll(interests), + revents: 0, + })); + + Ok(()) + }) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + log::trace!( + "reregister: notify_read={}, fd={}, token={:?}, interests={:?}", + self.notify_read, + fd, + token, + interests + ); + + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.token = token; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); + + Ok(()) + }) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + log::trace!("deregister: notify_read={}, fd={}", self.notify_read, fd); + + self.modify_fds(|fds| { + let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + + Ok(()) + }) + } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> io::Result) -> io::Result { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_inner().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our notification was not removed from the pipe. + if sent_notification { + let _ = self.pop_notification(); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Wake the current thread that is calling `wait`. + fn notify_inner(&self) -> io::Result<()> { + syscall!(write( + self.notify_write, + &(1 as NotifyType) as *const _ as *const _, + std::mem::size_of::() + ))?; + Ok(()) + } + + /// Remove a notification created by `notify_inner`. + fn pop_notification(&self) -> io::Result<()> { + syscall!(read( + self.notify_read, + &mut [0; std::mem::size_of::()] as *mut _ as *mut _, + std::mem::size_of::() + ))?; + Ok(()) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.id + } + } +} + +/// Get the input poll events for the given event. +fn interests_to_poll(interest: Interest) -> libc::c_short { + let mut kind = 0; + + if interest.is_readable() { + kind |= libc::POLLIN | libc::POLLPRI | libc::POLLHUP; + } + + if interest.is_writable() { + kind |= libc::POLLOUT | libc::POLLWRBAND; + } + + kind +} + +/// Helper function to call poll. +fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { + loop { + // Convert the timeout to milliseconds. + let timeout_ms = deadline + .map(|deadline| { + let timeout = deadline.saturating_duration_since(Instant::now()); + + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(u64::MAX); + if Duration::from_millis(ms) < timeout { + ms = ms.saturating_add(1); + } + ms.try_into().unwrap_or(i32::MAX) + }) + .unwrap_or(-1); + + log::trace!("Polling on {:?}", fds); + let res = syscall!(poll( + fds.as_mut_ptr() as *mut libc::pollfd, + fds.len() as libc::nfds_t, + timeout_ms, + )); + log::trace!("Polling finished: {:?} = {:?}", res, fds); + + match res { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, + Err(e) => return Err(e), + } + } +} + +#[derive(Debug, Clone)] +pub struct Event { + token: Token, + events: libc::c_short, +} + +pub type Events = Vec; + +pub mod event { + use crate::sys::Event; + use crate::Token; + use std::fmt; + + pub fn token(event: &Event) -> Token { + event.token + } + + pub fn is_readable(event: &Event) -> bool { + (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events & libc::POLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || ((event.events & libc::POLLIN) != 0 && (event.events & libc::POLLRDHUP) != 0) + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Unix pipe write end has closed + || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) + // The other side (read end) of a Unix pipe has closed. + || (event.events == libc::POLLERR) + } + + pub fn is_priority(event: &Event) -> bool { + (event.events & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { + (*got & want) != 0 + } + debug_detail!( + EventsDetails(libc::c_short), + check_events, + libc::POLLIN, + libc::POLLPRI, + libc::POLLOUT, + libc::POLLRDNORM, + libc::POLLRDBAND, + libc::POLLWRNORM, + libc::POLLWRBAND, + libc::POLLERR, + libc::POLLHUP, + libc::POLLRDHUP, + ); + + // Can't reference fields in packed structures. + let e_u64 = event.token.0; + f.debug_struct("epoll_event") + .field("events", &EventsDetails(event.events)) + .field("u64", &e_u64) + .finish() + } +} From e6151cf3fd0f6a7f49224dc8549bf760c7ab68ee Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 18:13:30 +0200 Subject: [PATCH 02/32] Add an error hint and fix a resource leak --- src/sys/unix/selector/poll.rs | 8 +++++++- src/token.rs | 4 +++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 8d92d11d1..5f6fce6af 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -277,7 +277,13 @@ impl Selector { self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { - return Err(io::Error::from(io::ErrorKind::AlreadyExists)); + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "\ + same file descriptor registered twice for polling \ + (an old file descriptor might have been closed without deregistration)\ + ", + )); } let poll_fds_index = fds.poll_fds.len(); diff --git a/src/token.rs b/src/token.rs index 91601cde0..1b9459599 100644 --- a/src/token.rs +++ b/src/token.rs @@ -107,9 +107,11 @@ /// token => { /// // Always operate in a loop /// loop { -/// match sockets.get_mut(&token).unwrap().read(&mut buf) { +/// let socket = sockets.get_mut(&token).unwrap(); +/// match socket.read(&mut buf) { /// Ok(0) => { /// // Socket is closed, remove it from the map +/// poll.registry().deregister(socket)?; /// sockets.remove(&token); /// break; /// } From 9cae6f50e23d3604fae3c8dae552fef781051376 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 21:59:35 +0200 Subject: [PATCH 03/32] Fix selector cloning and wakers --- src/sys/unix/selector/poll.rs | 118 ++++++++++++++++++++-------------- src/sys/unix/waker.rs | 10 +++ src/sys/windows/waker.rs | 2 + src/waker.rs | 7 ++ 4 files changed, 87 insertions(+), 50 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 5f6fce6af..194e8deb8 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -4,7 +4,7 @@ use std::convert::TryInto; use std::fmt::{Debug, Formatter}; use std::os::unix::io::RawFd; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use std::{fmt, io}; @@ -18,9 +18,69 @@ type NotifyType = u64; #[cfg(not(target_os = "espidf"))] type NotifyType = u8; -/// Interface to poll. #[derive(Debug)] pub struct Selector { + state: Arc, + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + let state = SelectorState::new()?; + + Ok(Selector { + state: Arc::new(state), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + let state = self.state.clone(); + + Ok(Selector { + state, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(fd, token, interests) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.reregister(fd, token, interests) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.state.deregister(fd) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.state.id + } + } +} + +/// Interface to poll. +#[derive(Debug)] +struct SelectorState { /// File descriptors to poll. fds: Mutex, @@ -47,10 +107,6 @@ pub struct Selector { /// This selectors id. #[cfg(debug_assertions)] id: usize, - - /// Whether this selector currently has an associated waker. - #[cfg(debug_assertions)] - has_waker: AtomicBool, } /// The file descriptors to poll in a `Poller`. @@ -91,8 +147,8 @@ struct FdData { token: Token, } -impl Selector { - pub fn new() -> io::Result { +impl SelectorState { + pub fn new() -> io::Result { let notify_fds = Self::create_notify_fds()?; Ok(Self { @@ -110,8 +166,6 @@ impl Selector { operations_complete: Condvar::new(), #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), }) } @@ -166,30 +220,6 @@ impl Selector { Ok(notify_fd) } - pub fn try_clone(&self) -> io::Result { - let mut fds = self.modify_fds(|fds| Ok(fds.clone()))?; - - let notify_fds = Self::create_notify_fds()?; - - fds.poll_fds[0] = PollFd(libc::pollfd { - fd: notify_fds[0], - events: libc::POLLRDNORM, - revents: 0, - }); - - Ok(Self { - fds: Mutex::new(fds), - notify_read: notify_fds[0], - notify_write: notify_fds[1], - waiting_operations: AtomicUsize::new(0), - operations_complete: Condvar::new(), - #[cfg(debug_assertions)] - id: NEXT_ID.fetch_add(1, Ordering::Relaxed), - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), - }) - } - pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { log::trace!( "select: notify_read={}, timeout={:?}", @@ -221,7 +251,6 @@ impl Selector { self.notify_read, num_events ); - log::trace!("fds = {:?}", fds); // Read all notifications. if notified { @@ -383,32 +412,21 @@ impl Selector { ))?; Ok(()) } - - #[cfg(debug_assertions)] - pub fn register_waker(&self) -> bool { - self.has_waker.swap(true, Ordering::AcqRel) - } } -cfg_io_source! { - impl Selector { - #[cfg(debug_assertions)] - pub fn id(&self) -> usize { - self.id - } - } -} +const READ_EVENTS: libc::c_short = libc::POLLIN | libc::POLLRDHUP; +const WRITE_EVENTS: libc::c_short = libc::POLLOUT; /// Get the input poll events for the given event. fn interests_to_poll(interest: Interest) -> libc::c_short { let mut kind = 0; if interest.is_readable() { - kind |= libc::POLLIN | libc::POLLPRI | libc::POLLHUP; + kind |= READ_EVENTS; } if interest.is_writable() { - kind |= libc::POLLOUT | libc::POLLWRBAND; + kind |= WRITE_EVENTS; } kind diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index de88e3181..aab9b6ba2 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -44,6 +44,10 @@ mod eventfd { } } + pub fn did_wake(&self) { + let _ = self.reset(); + } + /// Reset the eventfd object, only need to call this if `wake` fails. fn reset(&self) -> io::Result<()> { let mut buf: [u8; 8] = 0u64.to_ne_bytes(); @@ -92,6 +96,8 @@ mod kqueue { pub fn wake(&self) -> io::Result<()> { self.selector.wake(self.token) } + + pub fn did_wake(&self) {} } } @@ -156,6 +162,10 @@ mod pipe { } } + pub fn did_wake(&self) { + self.empty(); + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. /// This ignores any errors. fn empty(&self) { diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs index 103aa01a7..c6c678c4a 100644 --- a/src/sys/windows/waker.rs +++ b/src/sys/windows/waker.rs @@ -26,4 +26,6 @@ impl Waker { self.port.post(ev.to_completion_status()) } + + pub fn did_wake(&self) {} } diff --git a/src/waker.rs b/src/waker.rs index 92fdb4c16..39fd85e71 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -93,4 +93,11 @@ impl Waker { pub fn wake(&self) -> io::Result<()> { self.inner.wake() } + + /// Notifies the waker that it was actually woken. + /// + /// This is required when using a level triggered polling api. + pub fn did_wake(&self) { + self.inner.did_wake() + } } From 44303392b61f12681f1d00006f87ad677e99a70f Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 22:06:47 +0200 Subject: [PATCH 04/32] Allow forcing poll api as a feature --- Cargo.toml | 4 ++ src/poll.rs | 75 ++++++++++++++++++++++++++--- src/sys/unix/selector/mod.rs | 93 ++++++++++++++++++++++++++---------- 3 files changed, 140 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8433f91ca..2c1ac636e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,10 @@ os-ext = [ # Enables `mio::net` module containing networking primitives. net = [] +# Forces the use of the poll system call instead of epoll on systems +# where both are available +force-old-poll = ["os-poll"] + [dependencies] log = "0.4.8" diff --git a/src/poll.rs b/src/poll.rs index acc65d6b7..9f14d4a4b 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,10 +1,26 @@ use crate::{event, sys, Events, Interest, Token}; use log::trace; -// #[cfg(unix)] -// use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; +#[cfg(all( + unix, + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") +))] +use std::os::unix::io::{AsRawFd, RawFd}; + /// Polls for readiness events on all registered values. /// /// `Poll` allows a program to monitor a large number of [`event::Source`]s, @@ -412,12 +428,27 @@ impl Poll { } } -/* #[cfg(unix)] +#[cfg(all( + unix, + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") +))] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() } -} */ +} impl fmt::Debug for Poll { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -697,15 +728,45 @@ impl fmt::Debug for Registry { } } -/* #[cfg(unix)] +#[cfg(all( + unix, + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") +))] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() } -} */ +} cfg_os_poll! { - #[cfg(unix)] + #[cfg(all( + unix, + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") + ))] #[test] pub fn as_raw_fd() { let poll = Poll::new().unwrap(); diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index dacb8b5c1..f1c19a9b7 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,40 +1,83 @@ -#[cfg(any( - target_os = "android", - target_os = "illumos", - // target_os = "linux", - target_os = "redox", +#[cfg(all( + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ), + not(feature = "force-old-poll") ))] mod epoll; -#[cfg(any( - target_os = "android", - target_os = "illumos", - // target_os = "linux", - target_os = "redox", +#[cfg(all( + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ), + not(feature = "force-old-poll") ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" +#[cfg(all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") ))] mod kqueue; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" +#[cfg(all( + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; +#[cfg(any( + not(any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + )), + feature = "force-old-poll" +))] mod poll; + +#[cfg(any( + not(any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + )), + feature = "force-old-poll" +))] pub(crate) use self::poll::{event, Event, Events, Selector}; /// Lowest file descriptor used in `Selector::try_clone`. @@ -45,5 +88,5 @@ pub(crate) use self::poll::{event, Event, Events, Selector}; /// blindly assume this to be true, which means using any one of those a select /// could result in some interesting and unexpected errors. Avoid that by using /// an fd that doesn't have a pre-determined usage. -#[allow(dead_code)] +#[cfg(not(feature = "force-old-poll"))] const LOWEST_FD: libc::c_int = 3; From 2acc95a8eaf2f9f1d72e23f8508b53b9cb92920f Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 22:27:14 +0200 Subject: [PATCH 05/32] Unregister fds when dropping wakers --- src/sys/unix/waker.rs | 30 ++++++++++++++++++++++++++---- src/waker.rs | 6 +++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index aab9b6ba2..f9b62d2d8 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -5,7 +5,7 @@ mod eventfd { use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd}; /// Waker backed by `eventfd`. /// @@ -16,17 +16,20 @@ mod eventfd { #[derive(Debug)] pub struct Waker { fd: File, + selector: Selector, } impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { + let selector = selector.try_clone()?; + syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| { // Turn the file descriptor into a file first so we're ensured // it's closed when dropped, e.g. when register below fails. let file = unsafe { File::from_raw_fd(fd) }; selector .register(fd, token, Interest::READABLE) - .map(|()| Waker { fd: file }) + .map(|()| Waker { fd: file, selector }) }) } @@ -60,6 +63,12 @@ mod eventfd { } } } + + impl Drop for Waker { + fn drop(&mut self) { + let _ = self.selector.deregister(self.fd.as_raw_fd()); + } + } } #[cfg(any(target_os = "linux", target_os = "android"))] @@ -117,7 +126,7 @@ mod pipe { use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::FromRawFd; + use std::os::unix::io::{AsRawFd, FromRawFd}; /// Waker backed by a unix pipe. /// @@ -127,10 +136,13 @@ mod pipe { pub struct Waker { sender: File, receiver: File, + selector: Selector, } impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { + let selector = selector.try_clone()?; + let mut fds = [-1; 2]; syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; // Turn the file descriptors into files first so we're ensured @@ -139,7 +151,11 @@ mod pipe { let receiver = unsafe { File::from_raw_fd(fds[0]) }; selector .register(fds[0], token, Interest::READABLE) - .map(|()| Waker { sender, receiver }) + .map(|()| Waker { + sender, + receiver, + selector, + }) } pub fn wake(&self) -> io::Result<()> { @@ -178,6 +194,12 @@ mod pipe { } } } + + impl Drop for Waker { + fn drop(&mut self) { + let _ = self.selector.deregister(self.receiver.as_raw_fd()); + } + } } #[cfg(any( diff --git a/src/waker.rs b/src/waker.rs index 39fd85e71..14c8874b4 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -70,6 +70,8 @@ use std::io; /// let waker_event = events.iter().next().unwrap(); /// assert!(waker_event.is_readable()); /// assert_eq!(waker_event.token(), WAKE_TOKEN); +/// +/// // If we were to use the waker again, we need /// # handle.join().unwrap(); /// # Ok(()) /// # } @@ -96,7 +98,9 @@ impl Waker { /// Notifies the waker that it was actually woken. /// - /// This is required when using a level triggered polling api. + /// This is required when using a level triggered polling api, + /// as otherwise the waker will not loose its woken status and keep + /// waking. pub fn did_wake(&self) { self.inner.did_wake() } From 3ebbf05324117cb9b60d0d2d504497c5b8cb7b03 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 22:29:02 +0200 Subject: [PATCH 06/32] Finish doc comment --- src/waker.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/waker.rs b/src/waker.rs index 14c8874b4..6a2ff5c17 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -71,7 +71,9 @@ use std::io; /// assert!(waker_event.is_readable()); /// assert_eq!(waker_event.token(), WAKE_TOKEN); /// -/// // If we were to use the waker again, we need +/// // We need to tell the waker that we woke up, us otherwise +/// // it might wake us again when polling +/// waker.did_wake(); /// # handle.join().unwrap(); /// # Ok(()) /// # } From a2d44487133f6f971369dc55dc90a0a03780b978 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 22:54:09 +0200 Subject: [PATCH 07/32] Add missing .cargo back to .gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index e5649029d..6ebb15549 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea/ -cargo +.cargo Cargo.lock target* libs From 1652f758e2d7c6ccf33b75d19e50129b47ce7325 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 23:04:29 +0200 Subject: [PATCH 08/32] Add did_wake to shell implementation --- src/sys/shell/waker.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sys/shell/waker.rs b/src/sys/shell/waker.rs index bbdd7c33a..11935c54a 100644 --- a/src/sys/shell/waker.rs +++ b/src/sys/shell/waker.rs @@ -13,4 +13,8 @@ impl Waker { pub fn wake(&self) -> io::Result<()> { os_required!(); } + + pub fn did_wake(&self) { + os_required!(); + } } From 1f0292e2fe363e6def8071b02f5074363e8fb175 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 9 Aug 2022 23:49:42 +0200 Subject: [PATCH 09/32] Add proper cfg to LOWEST_FD in unix selector --- src/sys/unix/selector/mod.rs | 65 +++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index f1c19a9b7..ef5947164 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,14 +1,3 @@ -#[cfg(all( - any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", - ), - not(feature = "force-old-poll") -))] -mod epoll; - #[cfg(all( any( target_os = "android", @@ -19,7 +8,6 @@ mod epoll; not(feature = "force-old-poll") ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; - #[cfg(all( any( target_os = "dragonfly", @@ -31,37 +19,47 @@ pub(crate) use self::epoll::{event, Event, Events, Selector}; ), not(feature = "force-old-poll") ))] -mod kqueue; - -#[cfg(all( - any( +pub(crate) use self::kqueue::{event, Event, Events, Selector}; +#[cfg(any( + not(any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd" - ), - not(feature = "force-old-poll") + )), + feature = "force-old-poll" ))] -pub(crate) use self::kqueue::{event, Event, Events, Selector}; +pub(crate) use self::poll::{event, Event, Events, Selector}; -#[cfg(any( - not(any( +#[cfg(all( + any( target_os = "android", target_os = "illumos", target_os = "linux", target_os = "redox", + ), + not(feature = "force-old-poll") +))] +mod epoll; + +#[cfg(all( + any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", target_os = "openbsd" - )), - feature = "force-old-poll" + ), + not(feature = "force-old-poll") ))] -mod poll; +mod kqueue; #[cfg(any( not(any( @@ -78,7 +76,7 @@ mod poll; )), feature = "force-old-poll" ))] -pub(crate) use self::poll::{event, Event, Events, Selector}; +mod poll; /// Lowest file descriptor used in `Selector::try_clone`. /// @@ -88,5 +86,20 @@ pub(crate) use self::poll::{event, Event, Events, Selector}; /// blindly assume this to be true, which means using any one of those a select /// could result in some interesting and unexpected errors. Avoid that by using /// an fd that doesn't have a pre-determined usage. -#[cfg(not(feature = "force-old-poll"))] +#[cfg(all( + unix, + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") +))] const LOWEST_FD: libc::c_int = 3; From 21850e8dd9b789dbb1f02f62b67319a2e2cca5b6 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 10 Aug 2022 00:45:59 +0200 Subject: [PATCH 10/32] Remove super verbose debug logging --- src/sys/unix/selector/poll.rs | 37 ----------------------------------- 1 file changed, 37 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 194e8deb8..546804a98 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -211,22 +211,10 @@ impl SelectorState { ))?; } - log::trace!( - "new: notify_read={}, notify_write={}", - notify_fd[0], - notify_fd[1] - ); - Ok(notify_fd) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { - log::trace!( - "select: notify_read={}, timeout={:?}", - self.notify_read, - timeout - ); - let deadline = timeout.map(|t| Instant::now() + t); events.clear(); @@ -246,11 +234,6 @@ impl SelectorState { let num_events = poll(&mut fds.poll_fds, deadline)?; let notified = fds.poll_fds[0].0.revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; - log::trace!( - "new events: notify_read={}, num={}", - self.notify_read, - num_events - ); // Read all notifications. if notified { @@ -296,14 +279,6 @@ impl SelectorState { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } - log::trace!( - "register: notify_read={}, fd={}, token={:?}, interests={:?}", - self.notify_read, - fd, - token, - interests - ); - self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { return Err(io::Error::new( @@ -335,14 +310,6 @@ impl SelectorState { } pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { - log::trace!( - "reregister: notify_read={}, fd={}, token={:?}, interests={:?}", - self.notify_read, - fd, - token, - interests - ); - self.modify_fds(|fds| { let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; data.token = token; @@ -354,8 +321,6 @@ impl SelectorState { } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { - log::trace!("deregister: notify_read={}, fd={}", self.notify_read, fd); - self.modify_fds(|fds| { let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; fds.poll_fds.swap_remove(data.poll_fds_index); @@ -449,13 +414,11 @@ fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { }) .unwrap_or(-1); - log::trace!("Polling on {:?}", fds); let res = syscall!(poll( fds.as_mut_ptr() as *mut libc::pollfd, fds.len() as libc::nfds_t, timeout_ms, )); - log::trace!("Polling finished: {:?} = {:?}", res, fds); match res { Ok(num_events) => break Ok(num_events as usize), From 2baaadebebb91e2cf735bcfe7a793c01c0c1c65c Mon Sep 17 00:00:00 2001 From: Janrupf Date: Tue, 16 Aug 2022 22:47:10 +0000 Subject: [PATCH 11/32] Compiles on Haiku --- src/sys/unix/net.rs | 9 +++++++-- src/sys/unix/pipe.rs | 3 ++- src/sys/unix/selector/poll.rs | 14 +++++++++++++- src/sys/unix/tcp.rs | 3 ++- src/sys/unix/uds/listener.rs | 2 ++ src/sys/unix/uds/mod.rs | 6 +++--- src/sys/unix/waker.rs | 27 ++++++++++++++++++++++++++- 7 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs index 78f1387b1..f58396b01 100644 --- a/src/sys/unix/net.rs +++ b/src/sys/unix/net.rs @@ -89,14 +89,18 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_ sin_family: libc::AF_INET as libc::sa_family_t, sin_port: addr.port().to_be(), sin_addr, + #[cfg(not(target_os = "haiku"))] sin_zero: [0; 8], + #[cfg(target_os = "haiku")] + sin_zero: [0; 24], #[cfg(any( target_os = "dragonfly", target_os = "freebsd", target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "haiku", ))] sin_len: 0, }; @@ -120,7 +124,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_ target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "haiku", ))] sin6_len: 0, #[cfg(target_os = "illumos")] diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index 7a95b9697..a7ed1c155 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -163,7 +163,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { } } - #[cfg(any(target_os = "ios", target_os = "macos"))] + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))] unsafe { // For platforms that don't have `pipe2(2)` we need to manually set the // correct flags on the file descriptor. @@ -195,6 +195,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { target_os = "macos", target_os = "illumos", target_os = "redox", + target_os = "haiku", )))] compile_error!("unsupported target for `mio::unix::pipe`"); diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 546804a98..67cd24984 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -3,7 +3,9 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fmt::{Debug, Formatter}; use std::os::unix::io::RawFd; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use std::{fmt, io}; @@ -379,7 +381,11 @@ impl SelectorState { } } +#[cfg(not(target_os = "haiku"))] const READ_EVENTS: libc::c_short = libc::POLLIN | libc::POLLRDHUP; +#[cfg(target_os = "haiku")] +const READ_EVENTS: libc::c_short = libc::POLLIN; + const WRITE_EVENTS: libc::c_short = libc::POLLOUT; /// Get the input poll events for the given event. @@ -458,6 +464,7 @@ pub mod event { (event.events & libc::POLLERR) != 0 } + #[cfg(not(target_os = "haiku"))] pub fn is_read_closed(event: &Event) -> bool { // Both halves of the socket have closed (event.events & libc::POLLHUP) != 0 @@ -465,6 +472,10 @@ pub mod event { || ((event.events & libc::POLLIN) != 0 && (event.events & libc::POLLRDHUP) != 0) } + pub fn is_read_closed(event: &Event) -> bool { + event.events & libc::POLLHUP != 0 + } + pub fn is_write_closed(event: &Event) -> bool { // Both halves of the socket have closed (event.events & libc::POLLHUP) != 0 @@ -505,6 +516,7 @@ pub mod event { libc::POLLWRBAND, libc::POLLERR, libc::POLLHUP, + #[cfg(not(target_os = "haiku"))] libc::POLLRDHUP, ); diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs index c4d7e9469..27f68365d 100644 --- a/src/sys/unix/tcp.rs +++ b/src/sys/unix/tcp.rs @@ -88,7 +88,8 @@ pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, all(target_arch = "x86", target_os = "android"), target_os = "ios", target_os = "macos", - target_os = "redox" + target_os = "redox", + target_os = "haiku", ))] let stream = { syscall!(accept( diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs index 79bd14ee0..8fdb06904 100644 --- a/src/sys/unix/uds/listener.rs +++ b/src/sys/unix/uds/listener.rs @@ -43,6 +43,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So target_os = "macos", target_os = "netbsd", target_os = "redox", + target_os = "haiku", // Android x86's seccomp profile forbids calls to `accept4(2)` // See https://github.com/tokio-rs/mio/issues/1445 for details all( @@ -66,6 +67,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So target_os = "macos", target_os = "netbsd", target_os = "redox", + target_os = "haiku", all(target_arch = "x86", target_os = "android") ))] let socket = syscall!(accept( diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs index 526bbdfd0..8d28bb8cf 100644 --- a/src/sys/unix/uds/mod.rs +++ b/src/sys/unix/uds/mod.rs @@ -77,20 +77,20 @@ cfg_os_poll! { fn pair(flags: libc::c_int) -> io::Result<(T, T)> where T: FromRawFd, { - #[cfg(not(any(target_os = "ios", target_os = "macos")))] + #[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "haiku")))] let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; let mut fds = [-1; 2]; syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; - // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. + // Darwin nor Haiku have SOCK_NONBLOCK or SOCK_CLOEXEC. // // In order to set those flags, additional `fcntl` sys calls must be // performed. If a `fnctl` fails after the sockets have been created, // the file descriptors will leak. Creating `pair` above ensures that if // there is an error, the file descriptors are closed. - #[cfg(any(target_os = "ios", target_os = "macos"))] + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))] { syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?; syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?; diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index f9b62d2d8..6da5df4a0 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -119,6 +119,7 @@ pub use self::kqueue::Waker; target_os = "netbsd", target_os = "openbsd", target_os = "redox", + target_os = "haiku", ))] mod pipe { use crate::sys::unix::Selector; @@ -144,7 +145,30 @@ mod pipe { let selector = selector.try_clone()?; let mut fds = [-1; 2]; - syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + + #[cfg(not(target_os = "haiku"))] + { + syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + } + + #[cfg(target_os = "haiku")] + { + // Haiku does not have `pipe2(2)`, manually set nonblocking + syscall!(pipe(fds.as_mut_ptr()))?; + + for fd in &fds { + unsafe { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 || + libc::fcntl(*fd, libc::F_SETFL, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + } // Turn the file descriptors into files first so we're ensured // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; @@ -208,5 +232,6 @@ mod pipe { target_os = "netbsd", target_os = "openbsd", target_os = "redox", + target_os = "haiku", ))] pub use self::pipe::Waker; From ef6b58012029ae01a58752aee6afa75266fde37a Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 00:24:39 +0000 Subject: [PATCH 12/32] Add missing nonblock on Haiku --- src/sys/unix/net.rs | 4 ++-- src/sys/unix/selector/poll.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs index f58396b01..59149f176 100644 --- a/src/sys/unix/net.rs +++ b/src/sys/unix/net.rs @@ -41,8 +41,8 @@ pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::R .map(|_| socket) }); - // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. - #[cfg(any(target_os = "ios", target_os = "macos"))] + // Darwin nor Haiku have SOCK_NONBLOCK or SOCK_CLOEXEC. + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))] let socket = socket.and_then(|socket| { // For platforms that don't support flags in socket, we need to // set the flags ourselves. diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 67cd24984..80855c2f8 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -472,6 +472,7 @@ pub mod event { || ((event.events & libc::POLLIN) != 0 && (event.events & libc::POLLRDHUP) != 0) } + #[cfg(target_os = "haiku")] pub fn is_read_closed(event: &Event) -> bool { event.events & libc::POLLHUP != 0 } From d09c48ff066667a6b1d87c6109fb2015762f4b0e Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 00:43:49 +0000 Subject: [PATCH 13/32] Disable unix datagrams for haiku (not supported) --- examples/test_example.rs | 0 src/net/mod.rs | 4 +++- src/net/uds/mod.rs | 2 ++ src/sys/unix/uds/mod.rs | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 examples/test_example.rs diff --git a/examples/test_example.rs b/examples/test_example.rs new file mode 100644 index 000000000..e69de29bb diff --git a/src/net/mod.rs b/src/net/mod.rs index 7d714ca00..796f79c21 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -36,4 +36,6 @@ pub use self::udp::UdpSocket; #[cfg(unix)] mod uds; #[cfg(unix)] -pub use self::uds::{SocketAddr, UnixDatagram, UnixListener, UnixStream}; +pub use self::uds::{SocketAddr, UnixListener, UnixStream}; +#[cfg(all(unix, not(target_os = "haiku")))] +pub use self::uds::UnixDatagram; diff --git a/src/net/uds/mod.rs b/src/net/uds/mod.rs index 6b4ffdc43..08806f150 100644 --- a/src/net/uds/mod.rs +++ b/src/net/uds/mod.rs @@ -1,4 +1,6 @@ +#[cfg(not(target_os = "haiku"))] mod datagram; +#[cfg(not(target_os = "haiku"))] pub use self::datagram::UnixDatagram; mod listener; diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs index 8d28bb8cf..6ee93c133 100644 --- a/src/sys/unix/uds/mod.rs +++ b/src/sys/unix/uds/mod.rs @@ -20,6 +20,7 @@ cfg_os_poll! { use std::path::Path; use std::{io, mem}; + #[cfg(not(target_os = "haiku"))] pub(crate) mod datagram; pub(crate) mod listener; pub(crate) mod stream; From a366956d0f0ae90dc63114f115b0c316850f4ac7 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 01:41:00 +0000 Subject: [PATCH 14/32] Some platforms don't signal read closed --- Cargo.toml | 4 ++++ examples/test_example.rs | 39 +++++++++++++++++++++++++++++++++++++++ src/sys/unix/pipe.rs | 14 +++++++++++--- tests/regressions.rs | 2 +- tests/unix_datagram.rs | 2 +- 5 files changed, 56 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2c1ac636e..fa9082315 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,3 +101,7 @@ required-features = ["os-poll", "net"] [[example]] name = "udp_server" required-features = ["os-poll", "net"] + +[[example]] +name = "test_example" +required-features = ["os-poll", "net"] diff --git a/examples/test_example.rs b/examples/test_example.rs index e69de29bb..a52642f6a 100644 --- a/examples/test_example.rs +++ b/examples/test_example.rs @@ -0,0 +1,39 @@ +use std::io; + +use mio::{Poll, Events, Interest, Token}; +use mio::unix::pipe; +use std::io::Read; + +const PIPE_RECV: Token = Token(0); + +fn main() -> io::Result<()> { + env_logger::init(); + +// Same setup as in the example above. + let mut poll = Poll::new()?; + let mut events = Events::with_capacity(8); + let (sender, mut receiver) = pipe::new()?; + poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +// Drop the sender. + drop(sender); + poll.poll(&mut events, None)?; + for event in events.iter() { + log::info!("Got event {:?}", event); + match event.token() { + PIPE_RECV if event.is_read_closed() => { + // Detected that the sender was dropped. + println!("Sender dropped!"); + return Ok(()); + }, + PIPE_RECV => { + // Some platforms don't support detecting that the write end has been closed + println!("Receiving end is readable, but doesn't know the write has been closed!"); + let mut buf = [0u8; 1]; + assert_eq!(receiver.read(&mut buf).ok(), Some(0)); + return Ok(()); + } + _ => unreachable!(), + } + } + unreachable!(); + } \ No newline at end of file diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index a7ed1c155..6335d7a9b 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -109,22 +109,21 @@ use crate::{event, Interest, Registry, Token}; /// /// ``` /// # use std::io; +/// # use std::io::Read; /// # /// # use mio::{Poll, Events, Interest, Token}; /// # use mio::unix::pipe; /// # /// # const PIPE_RECV: Token = Token(0); -/// # const PIPE_SEND: Token = Token(1); /// # /// # fn main() -> io::Result<()> { /// // Same setup as in the example above. /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(8); /// -/// let (mut sender, mut receiver) = pipe::new()?; +/// let (sender, mut receiver) = pipe::new()?; /// /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; -/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; /// /// // Drop the sender. /// drop(sender); @@ -138,6 +137,15 @@ use crate::{event, Interest, Registry, Token}; /// println!("Sender dropped!"); /// return Ok(()); /// }, +/// PIPE_RECV => { +/// // Some platforms only signal a read readines event +/// println!("Pipe is readable due to dropped sender!"); +/// +/// // Reading from a closed pipe always returns Ok(0) +/// let mut buf = [0; 1]; +/// assert_eq!(receiver.read(&mut buf).ok(), Some(0)); +/// return Ok(()); +/// } /// _ => unreachable!(), /// } /// } diff --git a/tests/regressions.rs b/tests/regressions.rs index 0e3e5a9d8..5a3028253 100644 --- a/tests/regressions.rs +++ b/tests/regressions.rs @@ -107,7 +107,7 @@ fn issue_1205() { } #[test] -#[cfg(unix)] +#[cfg(all(unix, not(target_os = "haiku")))] fn issue_1403() { use mio::net::UnixDatagram; use util::temp_file; diff --git a/tests/unix_datagram.rs b/tests/unix_datagram.rs index 7c4b01f14..48a8e0bf5 100644 --- a/tests/unix_datagram.rs +++ b/tests/unix_datagram.rs @@ -1,4 +1,4 @@ -#![cfg(all(unix, feature = "os-poll", feature = "net"))] +#![cfg(all(unix, not(target_os = "haiku"), feature = "os-poll", feature = "net"))] use mio::net::UnixDatagram; use mio::{Interest, Token}; From f44704a6841b7330eb38f43d00a57ef9e9e02335 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 02:06:13 +0000 Subject: [PATCH 15/32] All doc tests pass on Haiku now --- examples/test_example.rs | 70 +++++++++++++++++------------------ src/sys/unix/net.rs | 4 +- src/sys/unix/pipe.rs | 3 +- src/sys/unix/selector/poll.rs | 2 + src/sys/unix/uds/listener.rs | 6 +-- src/sys/unix/uds/mod.rs | 6 +-- src/sys/unix/waker.rs | 3 +- 7 files changed, 44 insertions(+), 50 deletions(-) diff --git a/examples/test_example.rs b/examples/test_example.rs index a52642f6a..e4723ffef 100644 --- a/examples/test_example.rs +++ b/examples/test_example.rs @@ -1,39 +1,39 @@ -use std::io; - -use mio::{Poll, Events, Interest, Token}; -use mio::unix::pipe; -use std::io::Read; - -const PIPE_RECV: Token = Token(0); - -fn main() -> io::Result<()> { +fn main() -> Result<(), Box> { + use std::thread; + use std::time::Duration; + use std::sync::Arc; + use mio::{Events, Token, Poll, Waker}; env_logger::init(); -// Same setup as in the example above. + const WAKE_TOKEN: Token = Token(10); let mut poll = Poll::new()?; - let mut events = Events::with_capacity(8); - let (sender, mut receiver) = pipe::new()?; - poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; -// Drop the sender. - drop(sender); + let mut events = Events::with_capacity(2); + let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); +// We need to keep the Waker alive, so we'll create a clone for the +// thread we create below. + let waker1 = waker.clone(); + let handle = thread::spawn(move || { + // Working hard, or hardly working? + thread::sleep(Duration::from_millis(500)); + log::trace!("WAKING!"); + // Now we'll wake the queue on the other thread. + waker1.wake().expect("unable to wake"); + }); +// On our current thread we'll poll for events, without a timeout. poll.poll(&mut events, None)?; - for event in events.iter() { - log::info!("Got event {:?}", event); - match event.token() { - PIPE_RECV if event.is_read_closed() => { - // Detected that the sender was dropped. - println!("Sender dropped!"); - return Ok(()); - }, - PIPE_RECV => { - // Some platforms don't support detecting that the write end has been closed - println!("Receiving end is readable, but doesn't know the write has been closed!"); - let mut buf = [0u8; 1]; - assert_eq!(receiver.read(&mut buf).ok(), Some(0)); - return Ok(()); - } - _ => unreachable!(), - } - } - unreachable!(); - } \ No newline at end of file +// After about 500 milliseconds we should be awoken by the other thread and +// get a single event. + assert!(!events.is_empty()); + let waker_event = events.iter().next().unwrap(); + assert!(waker_event.is_readable()); + assert_eq!(waker_event.token(), WAKE_TOKEN); +// We need to tell the waker that we woke up, us otherwise +// it might wake us again when polling + log::trace!("Signalling waker it did wake!"); + waker.did_wake(); + + log::trace!("About to join thread!"); +handle.join().unwrap(); + log::trace!("Thread joined!"); + Ok(()) +} \ No newline at end of file diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs index 59149f176..983ee9f42 100644 --- a/src/sys/unix/net.rs +++ b/src/sys/unix/net.rs @@ -46,8 +46,8 @@ pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::R let socket = socket.and_then(|socket| { // For platforms that don't support flags in socket, we need to // set the flags ourselves. - syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) - .and_then(|_| syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket)) + syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC)) + .map(|_| socket) .map_err(|e| { // If either of the `fcntl` calls failed, ensure the socket is // closed and return the error. diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index 6335d7a9b..77701ce7e 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -180,8 +180,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { } for fd in &fds { - if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 - || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC) != 0 { let err = io::Error::last_os_error(); // Don't leak file descriptors. Can't handle error though. diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 80855c2f8..e9fa9c73e 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -233,7 +233,9 @@ impl SelectorState { } // Perform the poll. + log::trace!("POLLing on {:?}", fds); let num_events = poll(&mut fds.poll_fds, deadline)?; + log::trace!("Poll finished: {:?}", fds); let notified = fds.poll_fds[0].0.revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs index 8fdb06904..995b381c5 100644 --- a/src/sys/unix/uds/listener.rs +++ b/src/sys/unix/uds/listener.rs @@ -79,11 +79,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So // Ensure the socket is closed if either of the `fcntl` calls // error below. let s = unsafe { net::UnixStream::from_raw_fd(socket) }; - syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC))?; - - // See https://github.com/tokio-rs/mio/issues/1450 - #[cfg(all(target_arch = "x86", target_os = "android"))] - syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))?; + syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC | libc::O_NONBLOCK))?; Ok(s) }); diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs index 6ee93c133..024481a6c 100644 --- a/src/sys/unix/uds/mod.rs +++ b/src/sys/unix/uds/mod.rs @@ -93,10 +93,8 @@ cfg_os_poll! { // there is an error, the file descriptors are closed. #[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))] { - syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?; - syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?; - syscall!(fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK))?; - syscall!(fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC))?; + syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC | libc::O_NONBLOCK))?; + syscall!(fcntl(fds[1], libc::F_SETFL, libc::FD_CLOEXEC | libc::O_NONBLOCK))?; } Ok(pair) } diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 6da5df4a0..d1c9d70b8 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -158,8 +158,7 @@ mod pipe { for fd in &fds { unsafe { - if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 || - libc::fcntl(*fd, libc::F_SETFL, libc::FD_CLOEXEC) != 0 + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC) != 0 { let err = io::Error::last_os_error(); let _ = libc::close(fds[0]); From ce4508872aa22550195618c5037eaa61404dc8ab Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 19:54:46 +0200 Subject: [PATCH 16/32] Fix poll selector not closing fd's --- src/sys/unix/selector/poll.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index e9fa9c73e..337d2c344 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fmt::{Debug, Formatter}; use std::os::unix::io::RawFd; -use std::sync::atomic::{AtomicUsize, Ordering}; #[cfg(debug_assertions)] use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use std::{fmt, io}; @@ -383,6 +383,16 @@ impl SelectorState { } } +impl Drop for SelectorState { + fn drop(&mut self) { + let _ = syscall!(close(self.notify_read)); + + if self.notify_read != self.notify_write { + let _ = syscall!(close(self.notify_write)); + } + } +} + #[cfg(not(target_os = "haiku"))] const READ_EVENTS: libc::c_short = libc::POLLIN | libc::POLLRDHUP; #[cfg(target_os = "haiku")] From 942f439bfe79f3ae68ddd9748e0d53ee4ce68945 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 20:20:07 +0200 Subject: [PATCH 17/32] Only break out of poll when receiving user events --- src/sys/unix/selector/poll.rs | 86 ++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 41 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 337d2c344..441a8ef7d 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -223,55 +223,59 @@ impl SelectorState { let mut fds = self.fds.lock().unwrap(); - // Complete all current operations. loop { - if self.waiting_operations.load(Ordering::SeqCst) == 0 { - break; - } + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } - fds = self.operations_complete.wait(fds).unwrap(); - } + fds = self.operations_complete.wait(fds).unwrap(); + } - // Perform the poll. - log::trace!("POLLing on {:?}", fds); - let num_events = poll(&mut fds.poll_fds, deadline)?; - log::trace!("Poll finished: {:?}", fds); - let notified = fds.poll_fds[0].0.revents != 0; - let num_fd_events = if notified { num_events - 1 } else { num_events }; - - // Read all notifications. - if notified { - if self.notify_read != self.notify_write { - // When using the `pipe` syscall, we have to read all accumulated notifications in the pipe. - while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)).is_ok() - { + // Perform the poll. + log::trace!("POLLing on {:?}", fds); + let num_events = poll(&mut fds.poll_fds, deadline)?; + log::trace!("Poll finished: {:?}", fds); + let notified = fds.poll_fds[0].0.revents != 0; + let num_fd_events = if notified { num_events - 1 } else { num_events }; + + // Read all notifications. + if notified { + if self.notify_read != self.notify_write { + // When using the `pipe` syscall, we have to read all accumulated notifications in the pipe. + while syscall!(read(self.notify_read, &mut [0; 64] as *mut _ as *mut _, 64)) + .is_ok() + {} + } else { + // When using the `eventfd` syscall, it is OK to read just once, so as to clear the counter. + // In fact, reading in a loop will result in an endless loop on the ESP-IDF + // which is not following the specification strictly. + let _ = self.pop_notification(); } - } else { - // When using the `eventfd` syscall, it is OK to read just once, so as to clear the counter. - // In fact, reading in a loop will result in an endless loop on the ESP-IDF - // which is not following the specification strictly. - let _ = self.pop_notification(); } - } - // Store the events if there were any. - if num_fd_events > 0 { - let fds = &mut *fds; - - events.reserve(num_fd_events); - for fd_data in fds.fd_data.values_mut() { - let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; - if poll_fd.revents != 0 { - // Store event - events.push(Event { - token: fd_data.token, - events: poll_fd.revents, - }); - - if events.len() == num_fd_events { - break; + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if events.len() == num_fd_events { + break; + } } } + + break; } } From ecde67265dfc0993c2c8d2af0a423957728914be Mon Sep 17 00:00:00 2001 From: Janrupf Date: Wed, 17 Aug 2022 20:25:11 +0200 Subject: [PATCH 18/32] Properly implement timeouts --- src/sys/unix/selector/poll.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 441a8ef7d..d6dd57014 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -236,6 +236,11 @@ impl SelectorState { // Perform the poll. log::trace!("POLLing on {:?}", fds); let num_events = poll(&mut fds.poll_fds, deadline)?; + if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { + // timeout + return Ok(()); + } + log::trace!("Poll finished: {:?}", fds); let notified = fds.poll_fds[0].0.revents != 0; let num_fd_events = if notified { num_events - 1 } else { num_events }; From 976bf2549e70e88fc0b6b56424298c0a2586ad86 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 17:21:51 +0200 Subject: [PATCH 19/32] Put huge cfg statements into macros --- src/macros.rs | 73 +++++++++++++++++++++++++++++ src/sys/unix/selector/mod.rs | 90 ++++++------------------------------ 2 files changed, 86 insertions(+), 77 deletions(-) diff --git a/src/macros.rs b/src/macros.rs index f97f90911..c93573f01 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -68,3 +68,76 @@ macro_rules! cfg_any_os_ext { )* } } + +/// The `os-poll` feature or one feature that requires is is enabled and the system +/// supports epoll. +macro_rules! cfg_epoll_selector { + ($($item:item)*) => { + $( + #[cfg(all( + any(feature = "os-poll", feature = "net"), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ), + not(feature = "force-old-poll") + ))] + $item + )* + }; +} + +/// The `os-poll` feature or one feature that requires is is enabled and the system +/// supports kqueue. +macro_rules! cfg_kqueue_selector { + ($($item:item)*) => { + $( + #[cfg(all( + any(feature = "os-poll", feature = "net"), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ), + not(feature = "force-old-poll") + ))] + $item + )* + }; +} + +/// The `os-poll` feature or one feature that requires is is enabled and the system +/// is a generic unix which does not support epoll nor kqueue. +macro_rules! cfg_poll_selector { + ($($item:item)*) => { + $( + #[cfg( + all( + unix, + any(feature = "os-poll", feature = "net"), + any( + not(any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + )), + feature = "force-old-poll" + ) + ) + )] + $item + )* + }; +} diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index ef5947164..22288cdc5 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,82 +1,17 @@ -#[cfg(all( - any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", - ), - not(feature = "force-old-poll") -))] -pub(crate) use self::epoll::{event, Event, Events, Selector}; -#[cfg(all( - any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" - ), - not(feature = "force-old-poll") -))] -pub(crate) use self::kqueue::{event, Event, Events, Selector}; -#[cfg(any( - not(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" - )), - feature = "force-old-poll" -))] -pub(crate) use self::poll::{event, Event, Events, Selector}; +cfg_epoll_selector! { + mod epoll; + pub(crate) use self::epoll::{event, Event, Events, Selector}; +} -#[cfg(all( - any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", - ), - not(feature = "force-old-poll") -))] -mod epoll; +cfg_kqueue_selector! { + mod kqueue; + pub(crate) use self::kqueue::{event, Event, Events, Selector}; +} -#[cfg(all( - any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" - ), - not(feature = "force-old-poll") -))] -mod kqueue; - -#[cfg(any( - not(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd" - )), - feature = "force-old-poll" -))] -mod poll; +cfg_poll_selector! { + mod poll; + pub(crate) use self::poll::{event, Event, Events, Selector}; +} /// Lowest file descriptor used in `Selector::try_clone`. /// @@ -86,6 +21,7 @@ mod poll; /// blindly assume this to be true, which means using any one of those a select /// could result in some interesting and unexpected errors. Avoid that by using /// an fd that doesn't have a pre-determined usage. +// TODO: Compact this into a macro too (naming?) #[cfg(all( unix, any( From a2d5fd04baef228398b5f886e1c6eb524aa1cc5e Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 17:27:53 +0200 Subject: [PATCH 20/32] Prepare for splitting io source states --- src/sys/unix/mod.rs | 24 +------------------- src/sys/unix/selector/io_source/mod.rs | 1 + src/sys/unix/selector/io_source/stateless.rs | 18 +++++++++++++++ src/sys/unix/selector/mod.rs | 7 ++++++ 4 files changed, 27 insertions(+), 23 deletions(-) create mode 100644 src/sys/unix/selector/io_source/mod.rs create mode 100644 src/sys/unix/selector/io_source/stateless.rs diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 231480a5d..f01ce8810 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -15,7 +15,7 @@ macro_rules! syscall { cfg_os_poll! { mod selector; - pub(crate) use self::selector::{event, Event, Events, Selector}; + pub(crate) use self::selector::{event, Event, Events, Selector, IoSourceState}; mod sourcefd; pub use self::sourcefd::SourceFd; @@ -32,28 +32,6 @@ cfg_os_poll! { pub use self::uds::SocketAddr; } - cfg_io_source! { - use std::io; - - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; - - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState - } - - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) - } - } - } - cfg_os_ext! { pub(crate) mod pipe; } diff --git a/src/sys/unix/selector/io_source/mod.rs b/src/sys/unix/selector/io_source/mod.rs new file mode 100644 index 000000000..28d232aa0 --- /dev/null +++ b/src/sys/unix/selector/io_source/mod.rs @@ -0,0 +1 @@ +pub(super) mod stateless; diff --git a/src/sys/unix/selector/io_source/stateless.rs b/src/sys/unix/selector/io_source/stateless.rs new file mode 100644 index 000000000..03dd905e1 --- /dev/null +++ b/src/sys/unix/selector/io_source/stateless.rs @@ -0,0 +1,18 @@ +use std::io; + +pub(crate) struct IoSourceState; + +impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } +} diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 22288cdc5..56087cac7 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,16 +1,23 @@ +cfg_io_source! { + mod io_source; +} + cfg_epoll_selector! { mod epoll; pub(crate) use self::epoll::{event, Event, Events, Selector}; + pub(crate) use self::io_source::stateless::IoSourceState; } cfg_kqueue_selector! { mod kqueue; pub(crate) use self::kqueue::{event, Event, Events, Selector}; + pub(crate) use self::io_source::stateless::IoSourceState; } cfg_poll_selector! { mod poll; pub(crate) use self::poll::{event, Event, Events, Selector}; + pub(crate) use self::io_source::stateless::IoSourceState; } /// Lowest file descriptor used in `Selector::try_clone`. From fa04dbce9dace23a28d541802d17bbb9b99a96d8 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 17:35:35 +0200 Subject: [PATCH 21/32] Redirect source calls through state on unix --- src/io_source.rs | 12 +++---- .../unix/selector/io_source/edge_triggered.rs | 0 src/sys/unix/selector/io_source/mod.rs | 1 + src/sys/unix/selector/io_source/stateless.rs | 33 +++++++++++++++++-- 4 files changed, 37 insertions(+), 9 deletions(-) create mode 100644 src/sys/unix/selector/io_source/edge_triggered.rs diff --git a/src/io_source.rs b/src/io_source.rs index 99623c116..06dc5e17e 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,9 +142,8 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - registry - .selector() - .register(self.inner.as_raw_fd(), token, interests) + self.state + .register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -155,15 +154,14 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - registry - .selector() - .reregister(self.inner.as_raw_fd(), token, interests) + self.state + .reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; - registry.selector().deregister(self.inner.as_raw_fd()) + self.state.deregister(registry, self.inner.as_raw_fd()) } } diff --git a/src/sys/unix/selector/io_source/edge_triggered.rs b/src/sys/unix/selector/io_source/edge_triggered.rs new file mode 100644 index 000000000..e69de29bb diff --git a/src/sys/unix/selector/io_source/mod.rs b/src/sys/unix/selector/io_source/mod.rs index 28d232aa0..e50c978cf 100644 --- a/src/sys/unix/selector/io_source/mod.rs +++ b/src/sys/unix/selector/io_source/mod.rs @@ -1 +1,2 @@ +pub(super) mod edge_triggered; pub(super) mod stateless; diff --git a/src/sys/unix/selector/io_source/stateless.rs b/src/sys/unix/selector/io_source/stateless.rs index 03dd905e1..dbf5bd71f 100644 --- a/src/sys/unix/selector/io_source/stateless.rs +++ b/src/sys/unix/selector/io_source/stateless.rs @@ -1,4 +1,6 @@ +use crate::{Interest, Registry, Token}; use std::io; +use std::os::unix::io::RawFd; pub(crate) struct IoSourceState; @@ -8,11 +10,38 @@ impl IoSourceState { } pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, + where + F: FnOnce(&T) -> io::Result, { // We don't hold state, so we can just call the function and // return. f(io) } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().register(fd, token, interests) + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().reregister(fd, token, interests) + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().deregister(fd) + } } From 9bd3c3cc82f70db3b9debb13225b441c3d41578f Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 17:52:59 +0200 Subject: [PATCH 22/32] Implement edge triggering for poll selector --- .../unix/selector/io_source/edge_triggered.rs | 87 +++++++++++++++++++ src/sys/unix/selector/io_source/mod.rs | 13 ++- src/sys/unix/selector/mod.rs | 2 +- src/sys/unix/selector/poll.rs | 7 +- 4 files changed, 105 insertions(+), 4 deletions(-) diff --git a/src/sys/unix/selector/io_source/edge_triggered.rs b/src/sys/unix/selector/io_source/edge_triggered.rs index e69de29bb..7ea40cfc3 100644 --- a/src/sys/unix/selector/io_source/edge_triggered.rs +++ b/src/sys/unix/selector/io_source/edge_triggered.rs @@ -0,0 +1,87 @@ +use crate::sys::Selector; +use crate::{Interest, Registry, Token}; +use std::io; +use std::os::unix::io::RawFd; +use std::sync::Arc; + +struct InternalState { + selector: Arc, + token: Token, + interests: Interest, + fd: RawFd, + is_registered: bool, +} + +impl Drop for InternalState { + fn drop(&mut self) { + if self.is_registered { + let _ = self.selector.deregister(self.fd); + } + } +} + +pub(crate) struct IoSourceState { + inner: Option>, +} + +impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + if let Err(err) = &result { + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } + } + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + registry.selector().register(fd, token, interests) + } + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => registry + .selector() + .reregister(fd, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }), + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + registry.selector().deregister(fd) + } +} diff --git a/src/sys/unix/selector/io_source/mod.rs b/src/sys/unix/selector/io_source/mod.rs index e50c978cf..f6cd533e6 100644 --- a/src/sys/unix/selector/io_source/mod.rs +++ b/src/sys/unix/selector/io_source/mod.rs @@ -1,2 +1,11 @@ -pub(super) mod edge_triggered; -pub(super) mod stateless; +cfg_epoll_selector! { + pub(super) mod stateless; +} + +cfg_kqueue_selector! { + pub(super) mod stateless; +} + +cfg_poll_selector! { + pub(super) mod edge_triggered; +} diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 56087cac7..11bac1711 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -17,7 +17,7 @@ cfg_kqueue_selector! { cfg_poll_selector! { mod poll; pub(crate) use self::poll::{event, Event, Events, Selector}; - pub(crate) use self::io_source::stateless::IoSourceState; + pub(crate) use self::io_source::edge_triggered ::IoSourceState; } /// Lowest file descriptor used in `Selector::try_clone`. diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index d6dd57014..45e87baf8 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -234,7 +234,7 @@ impl SelectorState { } // Perform the poll. - log::trace!("POLLing on {:?}", fds); + log::trace!("Polling on {:?}", fds); let num_events = poll(&mut fds.poll_fds, deadline)?; if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { // timeout @@ -274,6 +274,11 @@ impl SelectorState { events: poll_fd.revents, }); + // Remove interest + // the IoSourceState used with this selector will add back the interest + // as soon as an WouldBlock I/O error occurred + poll_fd.events = 0; + if events.len() == num_fd_events { break; } From b39cd6e99109b09fa4f7279e44f56dc42da17172 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 17:58:04 +0200 Subject: [PATCH 23/32] Edge triggering should allow original code to work --- src/sys/unix/pipe.rs | 7 ++++--- src/token.rs | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index 77701ce7e..63ed6fea5 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -115,15 +115,17 @@ use crate::{event, Interest, Registry, Token}; /// # use mio::unix::pipe; /// # /// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); /// # /// # fn main() -> io::Result<()> { /// // Same setup as in the example above. /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(8); /// -/// let (sender, mut receiver) = pipe::new()?; +/// let (mut sender, mut receiver) = pipe::new()?; /// /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; /// /// // Drop the sender. /// drop(sender); @@ -180,8 +182,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { } for fd in &fds { - if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC) != 0 - { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC) != 0 { let err = io::Error::last_os_error(); // Don't leak file descriptors. Can't handle error though. let _ = libc::close(fds[0]); diff --git a/src/token.rs b/src/token.rs index 1b9459599..91601cde0 100644 --- a/src/token.rs +++ b/src/token.rs @@ -107,11 +107,9 @@ /// token => { /// // Always operate in a loop /// loop { -/// let socket = sockets.get_mut(&token).unwrap(); -/// match socket.read(&mut buf) { +/// match sockets.get_mut(&token).unwrap().read(&mut buf) { /// Ok(0) => { /// // Socket is closed, remove it from the map -/// poll.registry().deregister(socket)?; /// sockets.remove(&token); /// break; /// } From 200dfb576b2113bb52e9f5db7ca5f8571181da94 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 18:03:59 +0200 Subject: [PATCH 24/32] Actually initialize edge trigger state --- .../unix/selector/io_source/edge_triggered.rs | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/sys/unix/selector/io_source/edge_triggered.rs b/src/sys/unix/selector/io_source/edge_triggered.rs index 7ea40cfc3..dedce9672 100644 --- a/src/sys/unix/selector/io_source/edge_triggered.rs +++ b/src/sys/unix/selector/io_source/edge_triggered.rs @@ -2,10 +2,9 @@ use crate::sys::Selector; use crate::{Interest, Registry, Token}; use std::io; use std::os::unix::io::RawFd; -use std::sync::Arc; struct InternalState { - selector: Arc, + selector: Selector, token: Token, interests: Interest, fd: RawFd, @@ -58,7 +57,19 @@ impl IoSourceState { if self.inner.is_some() { Err(io::ErrorKind::AlreadyExists.into()) } else { - registry.selector().register(fd, token, interests) + let selector = registry.selector().try_clone()?; + + selector.register(fd, token, interests).map(move |()| { + let state = InternalState { + selector, + token, + interests, + fd, + is_registered: false, + }; + + self.inner = Some(Box::new(state)); + }) } } @@ -82,6 +93,12 @@ impl IoSourceState { } pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(mut state) = self.inner.take() { + // Deregistration _may_ fail below, however, dropping the state would only + // do the same thing twice anyway + state.is_registered = false; + } + registry.selector().deregister(fd) } } From f2aea09415b6f72be4ec42d27bcb38c1a71878ae Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 19:10:37 +0200 Subject: [PATCH 25/32] Remove interests which were triggered --- examples/test_example.rs | 22 ++++----- .../unix/selector/io_source/edge_triggered.rs | 2 +- src/sys/unix/selector/poll.rs | 45 ++++++++++++++++++- 3 files changed, 55 insertions(+), 14 deletions(-) diff --git a/examples/test_example.rs b/examples/test_example.rs index e4723ffef..7943c8ca3 100644 --- a/examples/test_example.rs +++ b/examples/test_example.rs @@ -1,16 +1,16 @@ fn main() -> Result<(), Box> { + use mio::{Events, Poll, Token, Waker}; + use std::sync::Arc; use std::thread; use std::time::Duration; - use std::sync::Arc; - use mio::{Events, Token, Poll, Waker}; env_logger::init(); const WAKE_TOKEN: Token = Token(10); let mut poll = Poll::new()?; let mut events = Events::with_capacity(2); let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); -// We need to keep the Waker alive, so we'll create a clone for the -// thread we create below. + // We need to keep the Waker alive, so we'll create a clone for the + // thread we create below. let waker1 = waker.clone(); let handle = thread::spawn(move || { // Working hard, or hardly working? @@ -19,21 +19,21 @@ fn main() -> Result<(), Box> { // Now we'll wake the queue on the other thread. waker1.wake().expect("unable to wake"); }); -// On our current thread we'll poll for events, without a timeout. + // On our current thread we'll poll for events, without a timeout. poll.poll(&mut events, None)?; -// After about 500 milliseconds we should be awoken by the other thread and -// get a single event. + // After about 500 milliseconds we should be awoken by the other thread and + // get a single event. assert!(!events.is_empty()); let waker_event = events.iter().next().unwrap(); assert!(waker_event.is_readable()); assert_eq!(waker_event.token(), WAKE_TOKEN); -// We need to tell the waker that we woke up, us otherwise -// it might wake us again when polling + // We need to tell the waker that we woke up, us otherwise + // it might wake us again when polling log::trace!("Signalling waker it did wake!"); waker.did_wake(); log::trace!("About to join thread!"); -handle.join().unwrap(); + handle.join().unwrap(); log::trace!("Thread joined!"); Ok(()) -} \ No newline at end of file +} diff --git a/src/sys/unix/selector/io_source/edge_triggered.rs b/src/sys/unix/selector/io_source/edge_triggered.rs index dedce9672..2cde390af 100644 --- a/src/sys/unix/selector/io_source/edge_triggered.rs +++ b/src/sys/unix/selector/io_source/edge_triggered.rs @@ -65,7 +65,7 @@ impl IoSourceState { token, interests, fd, - is_registered: false, + is_registered: true, }; self.inner = Some(Box::new(state)); diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index 45e87baf8..ac095bfca 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -86,6 +86,12 @@ struct SelectorState { /// File descriptors to poll. fds: Mutex, + /// File descriptors which will be removed before the next poll call. + /// + /// When a file descriptor is deregistered while a poll is running, we need to filter + /// out all removed descriptors after that poll is finished running. + pending_removal: Mutex>, + /// The file descriptor of the read half of the notify pipe. This is also stored as the first /// file descriptor in `fds.poll_fds`. notify_read: RawFd, @@ -162,6 +168,7 @@ impl SelectorState { })], fd_data: HashMap::new(), }), + pending_removal: Mutex::new(Vec::new()), notify_read: notify_fds[0], notify_write: notify_fds[1], waiting_operations: AtomicUsize::new(0), @@ -260,6 +267,12 @@ impl SelectorState { } } + // We now check whether this poll was performed with descriptors which were pending + // for removal and filter out any matching. + let mut pending_removal_guard = self.pending_removal.lock().unwrap(); + let pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + drop(pending_removal_guard); + // Store the events if there were any. if num_fd_events > 0 { let fds = &mut *fds; @@ -267,6 +280,12 @@ impl SelectorState { events.reserve(num_fd_events); for fd_data in fds.fd_data.values_mut() { let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + + if pending_removal.contains(&poll_fd.fd) { + // Fd was removed while poll was running + continue; + } + if poll_fd.revents != 0 { // Store event events.push(Event { @@ -274,10 +293,10 @@ impl SelectorState { events: poll_fd.revents, }); - // Remove interest + // Remove the interest which just got triggered // the IoSourceState used with this selector will add back the interest // as soon as an WouldBlock I/O error occurred - poll_fd.events = 0; + poll_fd.events &= !poll_fd.revents; if events.len() == num_fd_events { break; @@ -297,6 +316,24 @@ impl SelectorState { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } + // We must handle the unlikely case that the following order of operations happens: + // + // register(1 as RawFd) + // deregister(1 as RawFd) + // register(1 as RawFd) + // + // + // Fd's pending removal only get cleared when poll has been run. It is possible that + // between registering and deregistering and then _again_ registering the file descriptor + // poll never gets called, thus the fd stays stuck in the pending removal list. + // + // To avoid this scenario we remove an fd from pending removals when registering it. + let mut pending_removal = self.pending_removal.lock().unwrap(); + if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { + pending_removal.remove(idx); + } + drop(pending_removal); + self.modify_fds(|fds| { if fds.fd_data.contains_key(&fd) { return Err(io::Error::new( @@ -339,6 +376,10 @@ impl SelectorState { } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + let mut pending_removal = self.pending_removal.lock().unwrap(); + pending_removal.push(fd); + drop(pending_removal); + self.modify_fds(|fds| { let data = fds.fd_data.remove(&fd).ok_or(io::ErrorKind::NotFound)?; fds.poll_fds.swap_remove(data.poll_fds_index); From 65c71531ef28d493d04af0a85f2c42c39ae83141 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 20:07:26 +0200 Subject: [PATCH 26/32] Unbreak Waker API --- examples/test_example.rs | 95 +++++++++++++++++++++++----------- src/sys/shell/waker.rs | 4 -- src/sys/unix/selector/epoll.rs | 5 ++ src/sys/unix/selector/poll.rs | 45 ++++++++++++++-- src/sys/unix/waker.rs | 53 +++++++++++-------- src/sys/windows/waker.rs | 2 - src/waker.rs | 13 ----- 7 files changed, 141 insertions(+), 76 deletions(-) diff --git a/examples/test_example.rs b/examples/test_example.rs index 7943c8ca3..fa201ac30 100644 --- a/examples/test_example.rs +++ b/examples/test_example.rs @@ -1,39 +1,76 @@ +macro_rules! checked_write { + ($socket: ident . $method: ident ( $data: expr $(, $arg: expr)* ) ) => {{ + let data = $data; + let n = $socket.$method($data $(, $arg)*) + .expect("unable to write to socket"); + assert_eq!(n, data.len(), "short write"); + }}; +} + +/// Assert that the provided result is an `io::Error` with kind `WouldBlock`. +pub fn assert_would_block(result: std::io::Result) { + match result { + Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), + Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {} + Err(err) => panic!("unexpected error result: {}", err), + } +} + fn main() -> Result<(), Box> { - use mio::{Events, Poll, Token, Waker}; + use mio::{net, net::TcpStream, Events, Interest, Poll, Token, Waker}; + use std::io::{Read, Write}; use std::sync::Arc; use std::thread; use std::time::Duration; env_logger::init(); - const WAKE_TOKEN: Token = Token(10); + const DATA1: &[u8] = b"Hello world!"; + const ID1: Token = Token(1); + let mut poll = Poll::new()?; - let mut events = Events::with_capacity(2); - let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); - // We need to keep the Waker alive, so we'll create a clone for the - // thread we create below. - let waker1 = waker.clone(); - let handle = thread::spawn(move || { - // Working hard, or hardly working? - thread::sleep(Duration::from_millis(500)); - log::trace!("WAKING!"); - // Now we'll wake the queue on the other thread. - waker1.wake().expect("unable to wake"); - }); - // On our current thread we'll poll for events, without a timeout. + let mut events = Events::with_capacity(1024); + + let listener = net::TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); + let sockaddr = listener.local_addr().unwrap(); + let mut stream = TcpStream::connect(sockaddr).unwrap(); + + poll.registry() + .register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE)) + .unwrap(); + + let server_stream = listener.accept().unwrap(); + poll.poll(&mut events, None)?; - // After about 500 milliseconds we should be awoken by the other thread and - // get a single event. - assert!(!events.is_empty()); - let waker_event = events.iter().next().unwrap(); - assert!(waker_event.is_readable()); - assert_eq!(waker_event.token(), WAKE_TOKEN); - // We need to tell the waker that we woke up, us otherwise - // it might wake us again when polling - log::trace!("Signalling waker it did wake!"); - waker.did_wake(); - - log::trace!("About to join thread!"); - handle.join().unwrap(); - log::trace!("Thread joined!"); + + /* expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Interest::WRITABLE)], + ); */ + checked_write!(stream.write(DATA1)); + + // Try to read something. + assert_would_block(stream.read(&mut [0])); + + // Server goes away. + drop(server_stream); + + poll.poll(&mut events, None)?; + + /* expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], + ); */ + + // Make sure we quiesce. `expect_no_events` seems to flake sometimes on mac/freebsd. + loop { + poll.poll(&mut events, Some(Duration::from_millis(100))) + .expect("poll failed"); + if events.iter().count() == 0 { + break; + } + } + Ok(()) } diff --git a/src/sys/shell/waker.rs b/src/sys/shell/waker.rs index 11935c54a..bbdd7c33a 100644 --- a/src/sys/shell/waker.rs +++ b/src/sys/shell/waker.rs @@ -13,8 +13,4 @@ impl Waker { pub fn wake(&self) -> io::Result<()> { os_required!(); } - - pub fn did_wake(&self) { - os_required!(); - } } diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs index 1256663da..0c0c8f13c 100644 --- a/src/sys/unix/selector/epoll.rs +++ b/src/sys/unix/selector/epoll.rs @@ -116,6 +116,11 @@ impl Selector { }) } + pub(crate) fn register_waker_fd(&self, fd: RawFd, token: Token) -> io::Result<()> { + // No special handling required for epoll + self.register(fd, token, Interest::READABLE) + } + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { let mut event = libc::epoll_event { events: interests_to_epoll(interests), diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index ac095bfca..c2a917f28 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -53,6 +53,10 @@ impl Selector { self.state.select(events, timeout) } + pub fn register_waker_fd(&self, fd: RawFd, token: Token) -> io::Result<()> { + self.state.register_waker_fd(fd, token) + } + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { self.state.register(fd, token, interests) } @@ -153,6 +157,8 @@ struct FdData { poll_fds_index: usize, /// The key of the `Event` associated with this file descriptor. token: Token, + /// Indicates whether this is a waker fd which needs to be reset after becoming ready + is_waker: bool, } impl SelectorState { @@ -286,6 +292,15 @@ impl SelectorState { continue; } + if poll_fd.events == 0 { + // We can get events even when we didn't ask for them. + // This mainly happens when we have a HUP but did not ask for read nor for + // write readiness. + // + // TODO: Can this cause busy loops? + continue; + } + if poll_fd.revents != 0 { // Store event events.push(Event { @@ -293,10 +308,15 @@ impl SelectorState { events: poll_fd.revents, }); - // Remove the interest which just got triggered - // the IoSourceState used with this selector will add back the interest - // as soon as an WouldBlock I/O error occurred - poll_fd.events &= !poll_fd.revents; + if fd_data.is_waker { + // Don't remove interests, instead tell the waker to reset itself + crate::sys::Waker::reset_by_fd(poll_fd.fd)?; + } else { + // Remove the interest which just got triggered + // the IoSourceState used with this selector will add back the interest + // as soon as an WouldBlock I/O error occurred + poll_fd.events &= !poll_fd.revents; + } if events.len() == num_fd_events { break; @@ -311,7 +331,21 @@ impl SelectorState { Ok(()) } + pub(crate) fn register_waker_fd(&self, fd: RawFd, token: Token) -> io::Result<()> { + self.register_internal(fd, token, Interest::READABLE, true) + } + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.register_internal(fd, token, interests, false) + } + + pub fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + is_waker: bool, + ) -> io::Result<()> { if fd == self.notify_read || fd == self.notify_write { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } @@ -351,6 +385,7 @@ impl SelectorState { FdData { poll_fds_index, token, + is_waker, }, ); @@ -536,7 +571,7 @@ pub mod event { // Both halves of the socket have closed (event.events & libc::POLLHUP) != 0 // Socket has received FIN or called shutdown(SHUT_RD) - || ((event.events & libc::POLLIN) != 0 && (event.events & libc::POLLRDHUP) != 0) + || (event.events & libc::POLLRDHUP) != 0 } #[cfg(target_os = "haiku")] diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index d1c9d70b8..e7b5f15e3 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,11 +1,12 @@ #[cfg(any(target_os = "linux", target_os = "android"))] mod eventfd { use crate::sys::Selector; - use crate::{Interest, Token}; + use crate::Token; use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::{AsRawFd, FromRawFd}; + use std::mem::ManuallyDrop; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by `eventfd`. /// @@ -28,7 +29,7 @@ mod eventfd { // it's closed when dropped, e.g. when register below fails. let file = unsafe { File::from_raw_fd(fd) }; selector - .register(fd, token, Interest::READABLE) + .register_waker_fd(fd, token) .map(|()| Waker { fd: file, selector }) }) } @@ -47,14 +48,17 @@ mod eventfd { } } - pub fn did_wake(&self) { - let _ = self.reset(); + /// Reset the eventfd object + fn reset(&self) -> io::Result<()> { + Self::reset_by_fd(self.fd.as_raw_fd()) } - /// Reset the eventfd object, only need to call this if `wake` fails. - fn reset(&self) -> io::Result<()> { + /// Reset the eventfd object + pub(crate) fn reset_by_fd(fd: RawFd) -> io::Result<()> { + let mut file = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); + let mut buf: [u8; 8] = 0u64.to_ne_bytes(); - match (&self.fd).read(&mut buf) { + match file.read(&mut buf) { Ok(_) => Ok(()), // If the `Waker` hasn't been awoken yet this will return a // `WouldBlock` error which we can safely ignore. @@ -106,7 +110,10 @@ mod kqueue { self.selector.wake(self.token) } - pub fn did_wake(&self) {} + pub(crate) fn reset_by_fd(fd: RawFd) -> io::Result<()> { + // No-op for kqueue + Ok(()) + } } } @@ -172,13 +179,11 @@ mod pipe { // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector - .register(fds[0], token, Interest::READABLE) - .map(|()| Waker { - sender, - receiver, - selector, - }) + selector.register_waker_fd(fds[0], token).map(|()| Waker { + sender, + receiver, + selector, + }) } pub fn wake(&self) -> io::Result<()> { @@ -193,7 +198,7 @@ mod pipe { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { // The reading end is full so we'll empty the buffer and try // again. - self.empty(); + self.reset(); self.wake() } Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), @@ -201,18 +206,20 @@ mod pipe { } } - pub fn did_wake(&self) { - self.empty(); + fn reset(&self) { + Self::reset_by_fd(self.receiver.as_raw_fd()); } - /// Empty the pipe's buffer, only need to call this if `wake` fails. + /// Empty the pipe's buffer. /// This ignores any errors. - fn empty(&self) { + pub(crate) fn reset_by_fd(fd: RawFd) -> io::Result<()> { + let mut file = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) }); + let mut buf = [0; 4096]; loop { - match (&self.receiver).read(&mut buf) { + match file.read(&mut buf) { Ok(n) if n > 0 => continue, - _ => return, + _ => return Ok(()), } } } diff --git a/src/sys/windows/waker.rs b/src/sys/windows/waker.rs index c6c678c4a..103aa01a7 100644 --- a/src/sys/windows/waker.rs +++ b/src/sys/windows/waker.rs @@ -26,6 +26,4 @@ impl Waker { self.port.post(ev.to_completion_status()) } - - pub fn did_wake(&self) {} } diff --git a/src/waker.rs b/src/waker.rs index 6a2ff5c17..92fdb4c16 100644 --- a/src/waker.rs +++ b/src/waker.rs @@ -70,10 +70,6 @@ use std::io; /// let waker_event = events.iter().next().unwrap(); /// assert!(waker_event.is_readable()); /// assert_eq!(waker_event.token(), WAKE_TOKEN); -/// -/// // We need to tell the waker that we woke up, us otherwise -/// // it might wake us again when polling -/// waker.did_wake(); /// # handle.join().unwrap(); /// # Ok(()) /// # } @@ -97,13 +93,4 @@ impl Waker { pub fn wake(&self) -> io::Result<()> { self.inner.wake() } - - /// Notifies the waker that it was actually woken. - /// - /// This is required when using a level triggered polling api, - /// as otherwise the waker will not loose its woken status and keep - /// waking. - pub fn did_wake(&self) { - self.inner.did_wake() - } } From ca105dfb41a1e8aa1232c9e38a817aec62f2fe44 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 20:44:38 +0200 Subject: [PATCH 27/32] All tests pass now with poll --- src/sys/unix/selector/poll.rs | 2 +- tests/tcp_stream.rs | 15 +++++++-------- tests/unix_stream.rs | 7 ++++++- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index c2a917f28..efc1e4124 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -298,7 +298,7 @@ impl SelectorState { // write readiness. // // TODO: Can this cause busy loops? - continue; + // continue; } if poll_fd.revents != 0 { diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index a2288a173..3906255b9 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -639,14 +639,13 @@ fn tcp_reset_close_event() { vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], ); - // Make sure we quiesce. `expect_no_events` seems to flake sometimes on mac/freebsd. - loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) - .expect("poll failed"); - if events.iter().count() == 0 { - break; - } - } + // Something weird is going on here. Linux keeps emitting HUP, so event though everything is + // closed down you still get events. In the linux kernel source there is a comment stating + // that EPOLLHUP is not done correctly (see net/ipv4/tcp.c, v6.0-rc7 tcp_poll) and other flaky + // things are going on. + // TODO: investigate what exactly is going on here and why a loop expecting no more events + // ever worked (linux documentation says that HUP is not maskable and causes poll to + // always return, no exceptions) } #[test] diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index 79b7c3d4b..0403ec4de 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -2,7 +2,7 @@ use mio::net::UnixStream; use mio::{Interest, Token}; -use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::io::{self, ErrorKind, IoSlice, IoSliceMut, Read, Write}; use std::net::Shutdown; use std::os::unix::net; use std::path::Path; @@ -442,6 +442,11 @@ where ); expect_read!(stream.read(&mut buf), DATA1); + // mio can only guarantee further readiness events when WouldBlock is returned + assert_eq!( + stream.read(&mut buf).map_err(|err| err.kind()).err(), + Some(ErrorKind::WouldBlock) + ); assert!(stream.take_error().unwrap().is_none()); From 77e3250f1bb85d701c78e17c2915ca8e7ffe5c5c Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 20:45:28 +0200 Subject: [PATCH 28/32] Remove example used for debugging only --- Cargo.toml | 4 --- examples/test_example.rs | 76 ---------------------------------------- 2 files changed, 80 deletions(-) delete mode 100644 examples/test_example.rs diff --git a/Cargo.toml b/Cargo.toml index fa9082315..2c1ac636e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,7 +101,3 @@ required-features = ["os-poll", "net"] [[example]] name = "udp_server" required-features = ["os-poll", "net"] - -[[example]] -name = "test_example" -required-features = ["os-poll", "net"] diff --git a/examples/test_example.rs b/examples/test_example.rs deleted file mode 100644 index fa201ac30..000000000 --- a/examples/test_example.rs +++ /dev/null @@ -1,76 +0,0 @@ -macro_rules! checked_write { - ($socket: ident . $method: ident ( $data: expr $(, $arg: expr)* ) ) => {{ - let data = $data; - let n = $socket.$method($data $(, $arg)*) - .expect("unable to write to socket"); - assert_eq!(n, data.len(), "short write"); - }}; -} - -/// Assert that the provided result is an `io::Error` with kind `WouldBlock`. -pub fn assert_would_block(result: std::io::Result) { - match result { - Ok(_) => panic!("unexpected OK result, expected a `WouldBlock` error"), - Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {} - Err(err) => panic!("unexpected error result: {}", err), - } -} - -fn main() -> Result<(), Box> { - use mio::{net, net::TcpStream, Events, Interest, Poll, Token, Waker}; - use std::io::{Read, Write}; - use std::sync::Arc; - use std::thread; - use std::time::Duration; - env_logger::init(); - - const DATA1: &[u8] = b"Hello world!"; - const ID1: Token = Token(1); - - let mut poll = Poll::new()?; - let mut events = Events::with_capacity(1024); - - let listener = net::TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap(); - let sockaddr = listener.local_addr().unwrap(); - let mut stream = TcpStream::connect(sockaddr).unwrap(); - - poll.registry() - .register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE)) - .unwrap(); - - let server_stream = listener.accept().unwrap(); - - poll.poll(&mut events, None)?; - - /* expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Interest::WRITABLE)], - ); */ - checked_write!(stream.write(DATA1)); - - // Try to read something. - assert_would_block(stream.read(&mut [0])); - - // Server goes away. - drop(server_stream); - - poll.poll(&mut events, None)?; - - /* expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], - ); */ - - // Make sure we quiesce. `expect_no_events` seems to flake sometimes on mac/freebsd. - loop { - poll.poll(&mut events, Some(Duration::from_millis(100))) - .expect("poll failed"); - if events.iter().count() == 0 { - break; - } - } - - Ok(()) -} From 6ea8066508cf6f1421e328c89f548ecc6c67e65c Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 20:46:46 +0200 Subject: [PATCH 29/32] Fix typo in comment --- tests/tcp_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 3906255b9..b0842aaa0 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -639,7 +639,7 @@ fn tcp_reset_close_event() { vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)], ); - // Something weird is going on here. Linux keeps emitting HUP, so event though everything is + // Something weird is going on here. Linux keeps emitting HUP, so even though everything is // closed down you still get events. In the linux kernel source there is a comment stating // that EPOLLHUP is not done correctly (see net/ipv4/tcp.c, v6.0-rc7 tcp_poll) and other flaky // things are going on. From aefc00a7c054c222adc730cac54a83dfe1e4e809 Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 20:49:53 +0200 Subject: [PATCH 30/32] Credit @Kestrer for original poll implementation --- src/sys/unix/selector/poll.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index efc1e4124..905724a76 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -1,3 +1,6 @@ +// This implementation is based on the one in the `polling` crate. +// Thanks to https://github.com/Kestrer for the original implementation! + use crate::{Interest, Token}; use std::collections::HashMap; use std::convert::TryInto; From 26288942a6ef8aee9b549f7ed17477594abdc1dd Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 21:40:07 +0200 Subject: [PATCH 31/32] Fix bad imports --- src/sys/unix/waker.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index e7b5f15e3..ede9770f7 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -130,11 +130,12 @@ pub use self::kqueue::Waker; ))] mod pipe { use crate::sys::unix::Selector; - use crate::{Interest, Token}; + use crate::Token; use std::fs::File; use std::io::{self, Read, Write}; - use std::os::unix::io::{AsRawFd, FromRawFd}; + use std::mem::ManuallyDrop; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; /// Waker backed by a unix pipe. /// From 1387b178629f80de65b112c416ce41f047447e9e Mon Sep 17 00:00:00 2001 From: Janrupf Date: Sat, 1 Oct 2022 21:40:30 +0200 Subject: [PATCH 32/32] Ignore pipe reset error --- src/sys/unix/waker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index ede9770f7..f46537b57 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -208,7 +208,7 @@ mod pipe { } fn reset(&self) { - Self::reset_by_fd(self.receiver.as_raw_fd()); + let _ = Self::reset_by_fd(self.receiver.as_raw_fd()); } /// Empty the pipe's buffer.