Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll selector #1602

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
dae7fc0
Initial poll selector port from polling
Janrupf Aug 9, 2022
e6151cf
Add an error hint and fix a resource leak
Janrupf Aug 9, 2022
9cae6f5
Fix selector cloning and wakers
Janrupf Aug 9, 2022
4430339
Allow forcing poll api as a feature
Janrupf Aug 9, 2022
2acc95a
Unregister fds when dropping wakers
Janrupf Aug 9, 2022
3ebbf05
Finish doc comment
Janrupf Aug 9, 2022
a2d4448
Add missing .cargo back to .gitignore
Janrupf Aug 9, 2022
1652f75
Add did_wake to shell implementation
Janrupf Aug 9, 2022
1f0292e
Add proper cfg to LOWEST_FD in unix selector
Janrupf Aug 9, 2022
21850e8
Remove super verbose debug logging
Janrupf Aug 9, 2022
2baaade
Compiles on Haiku
Janrupf Aug 16, 2022
ef6b580
Add missing nonblock on Haiku
Janrupf Aug 17, 2022
d09c48f
Disable unix datagrams for haiku (not supported)
Janrupf Aug 17, 2022
a366956
Some platforms don't signal read closed
Janrupf Aug 17, 2022
f44704a
All doc tests pass on Haiku now
Janrupf Aug 17, 2022
ce45088
Fix poll selector not closing fd's
Janrupf Aug 17, 2022
942f439
Only break out of poll when receiving user events
Janrupf Aug 17, 2022
ecde672
Properly implement timeouts
Janrupf Aug 17, 2022
976bf25
Put huge cfg statements into macros
Janrupf Oct 1, 2022
a2d5fd0
Prepare for splitting io source states
Janrupf Oct 1, 2022
fa04dbc
Redirect source calls through state on unix
Janrupf Oct 1, 2022
9bd3c3c
Implement edge triggering for poll selector
Janrupf Oct 1, 2022
b39cd6e
Edge triggering should allow original code to work
Janrupf Oct 1, 2022
200dfb5
Actually initialize edge trigger state
Janrupf Oct 1, 2022
f2aea09
Remove interests which were triggered
Janrupf Oct 1, 2022
65c7153
Unbreak Waker API
Janrupf Oct 1, 2022
ca105df
All tests pass now with poll
Janrupf Oct 1, 2022
77e3250
Remove example used for debugging only
Janrupf Oct 1, 2022
6ea8066
Fix typo in comment
Janrupf Oct 1, 2022
aefc00a
Credit @Kestrer for original poll implementation
Janrupf Oct 1, 2022
2628894
Fix bad imports
Janrupf Oct 1, 2022
1387b17
Ignore pipe reset error
Janrupf Oct 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
.cargo
Cargo.lock
target*
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ os-ext = [
# Enables `mio::net` module containing networking primitives.
net = []

# Forces the use of the poll system call instead of epoll on systems
# where both are available
force-old-poll = ["os-poll"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not going to add a feature for this or support poll on any OS that has something better (e.g. epoll or kqueue). Mio is complex enough as-is it maintain with the matrix OSs architectures we're not getting selectors in the mix as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also be removed once this PR is ready, I currently use this to develop the poll selector on Linux because development on Haiku and the espidf is quite hard. I still run the tests for Haiku, but this allows me to iron out bugs on a system which is easy to develop for.

TL;DR: Will be removed when PR is ready


[dependencies]
log = "0.4.8"

Expand Down
12 changes: 5 additions & 7 deletions src/io_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,8 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate(registry)?;
registry
.selector()
.register(self.inner.as_raw_fd(), token, interests)
self.state
.register(registry, token, interests, self.inner.as_raw_fd())
}

fn reregister(
Expand All @@ -155,15 +154,14 @@ where
) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.check_association(registry)?;
registry
.selector()
.reregister(self.inner.as_raw_fd(), token, interests)
self.state
.reregister(registry, token, interests, self.inner.as_raw_fd())
}

fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.remove_association(registry)?;
registry.selector().deregister(self.inner.as_raw_fd())
self.state.deregister(registry, self.inner.as_raw_fd())
}
}

Expand Down
73 changes: 73 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,76 @@ macro_rules! cfg_any_os_ext {
)*
}
}

/// The `os-poll` feature or one feature that requires is is enabled and the system
/// supports epoll.
macro_rules! cfg_epoll_selector {
($($item:item)*) => {
$(
#[cfg(all(
any(feature = "os-poll", feature = "net"),
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
),
not(feature = "force-old-poll")
))]
$item
)*
};
}

/// The `os-poll` feature or one feature that requires is is enabled and the system
/// supports kqueue.
macro_rules! cfg_kqueue_selector {
($($item:item)*) => {
$(
#[cfg(all(
any(feature = "os-poll", feature = "net"),
any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
),
not(feature = "force-old-poll")
))]
$item
)*
};
}

/// The `os-poll` feature or one feature that requires is is enabled and the system
/// is a generic unix which does not support epoll nor kqueue.
macro_rules! cfg_poll_selector {
($($item:item)*) => {
$(
#[cfg(
all(
unix,
any(feature = "os-poll", feature = "net"),
any(
not(any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
)),
feature = "force-old-poll"
)
)
)]
$item
)*
};
}
4 changes: 3 additions & 1 deletion src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ pub use self::udp::UdpSocket;
#[cfg(unix)]
mod uds;
#[cfg(unix)]
pub use self::uds::{SocketAddr, UnixDatagram, UnixListener, UnixStream};
pub use self::uds::{SocketAddr, UnixListener, UnixStream};
#[cfg(all(unix, not(target_os = "haiku")))]
pub use self::uds::UnixDatagram;
2 changes: 2 additions & 0 deletions src/net/uds/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#[cfg(not(target_os = "haiku"))]
mod datagram;
#[cfg(not(target_os = "haiku"))]
pub use self::datagram::UnixDatagram;

mod listener;
Expand Down
71 changes: 66 additions & 5 deletions src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use crate::{event, sys, Events, Interest, Token};
use log::trace;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
use std::{fmt, io};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit.

Suggested change

#[cfg(all(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't repeat this cfg block so often; I'd follow along with what the rest of mio does and create a macro that encapsulates the cfg.

unix,
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
),
not(feature = "force-old-poll")
))]
use std::os::unix::io::{AsRawFd, RawFd};

/// Polls for readiness events on all registered values.
///
/// `Poll` allows a program to monitor a large number of [`event::Source`]s,
Expand Down Expand Up @@ -412,7 +428,22 @@ impl Poll {
}
}

#[cfg(unix)]
#[cfg(all(
unix,
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
),
not(feature = "force-old-poll")
))]
impl AsRawFd for Poll {
fn as_raw_fd(&self) -> RawFd {
self.registry.as_raw_fd()
Expand Down Expand Up @@ -697,15 +728,45 @@ impl fmt::Debug for Registry {
}
}

#[cfg(unix)]
#[cfg(all(
unix,
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
),
not(feature = "force-old-poll")
))]
impl AsRawFd for Registry {
fn as_raw_fd(&self) -> RawFd {
self.selector.as_raw_fd()
}
}

cfg_os_poll! {
#[cfg(unix)]
#[cfg(all(
unix,
any(
target_os = "android",
target_os = "illumos",
target_os = "linux",
target_os = "redox",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
),
not(feature = "force-old-poll")
))]
#[test]
pub fn as_raw_fd() {
let poll = Poll::new().unwrap();
Expand Down
24 changes: 1 addition & 23 deletions src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ macro_rules! syscall {

cfg_os_poll! {
mod selector;
pub(crate) use self::selector::{event, Event, Events, Selector};
pub(crate) use self::selector::{event, Event, Events, Selector, IoSourceState};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to put this inside of cfg_io_source!.


mod sourcefd;
pub use self::sourcefd::SourceFd;
Expand All @@ -32,28 +32,6 @@ cfg_os_poll! {
pub use self::uds::SocketAddr;
}

cfg_io_source! {
use std::io;

// Both `kqueue` and `epoll` don't need to hold any user space state.
pub(crate) struct IoSourceState;

impl IoSourceState {
pub fn new() -> IoSourceState {
IoSourceState
}

pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R>
where
F: FnOnce(&T) -> io::Result<R>,
{
// We don't hold state, so we can just call the function and
// return.
f(io)
}
}
}

cfg_os_ext! {
pub(crate) mod pipe;
}
Expand Down
17 changes: 11 additions & 6 deletions src/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::R
.map(|_| socket)
});

// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
#[cfg(any(target_os = "ios", target_os = "macos"))]
// Darwin nor Haiku have SOCK_NONBLOCK or SOCK_CLOEXEC.
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))]
let socket = socket.and_then(|socket| {
// For platforms that don't support flags in socket, we need to
// set the flags ourselves.
syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))
.and_then(|_| syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket))
syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC))
.map(|_| socket)
.map_err(|e| {
// If either of the `fcntl` calls failed, ensure the socket is
// closed and return the error.
Expand Down Expand Up @@ -89,14 +89,18 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
sin_family: libc::AF_INET as libc::sa_family_t,
sin_port: addr.port().to_be(),
sin_addr,
#[cfg(not(target_os = "haiku"))]
sin_zero: [0; 8],
#[cfg(target_os = "haiku")]
sin_zero: [0; 24],
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
target_os = "openbsd",
target_os = "haiku",
))]
sin_len: 0,
};
Expand All @@ -120,7 +124,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
target_os = "openbsd",
target_os = "haiku",
))]
sin6_len: 0,
#[cfg(target_os = "illumos")]
Expand Down
17 changes: 13 additions & 4 deletions src/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ use crate::{event, Interest, Registry, Token};
///
/// ```
/// # use std::io;
/// # use std::io::Read;
/// #
/// # use mio::{Poll, Events, Interest, Token};
/// # use mio::unix::pipe;
Expand Down Expand Up @@ -138,6 +139,15 @@ use crate::{event, Interest, Registry, Token};
/// println!("Sender dropped!");
/// return Ok(());
/// },
/// PIPE_RECV => {
/// // Some platforms only signal a read readines event
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: s/readines/readiness/

/// println!("Pipe is readable due to dropped sender!");
///
/// // Reading from a closed pipe always returns Ok(0)
/// let mut buf = [0; 1];
/// assert_eq!(receiver.read(&mut buf).ok(), Some(0));
/// return Ok(());
/// }
/// _ => unreachable!(),
/// }
/// }
Expand All @@ -163,7 +173,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
}
}

#[cfg(any(target_os = "ios", target_os = "macos"))]
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "haiku"))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
Expand All @@ -172,9 +182,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
}

for fd in &fds {
if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
|| libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
{
if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK | libc::FD_CLOEXEC) != 0 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As pointed out by @Thomasdezeeuw elsewhere: "This uses F_SETFD and F_SETFL (FD vs FL), two different commands. So this doesn't work."

let err = io::Error::last_os_error();
// Don't leak file descriptors. Can't handle error though.
let _ = libc::close(fds[0]);
Expand All @@ -195,6 +203,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "macos",
target_os = "illumos",
target_os = "redox",
target_os = "haiku",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");

Expand Down
5 changes: 5 additions & 0 deletions src/sys/unix/selector/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl Selector {
})
}

pub(crate) fn register_waker_fd(&self, fd: RawFd, token: Token) -> io::Result<()> {
// No special handling required for epoll
self.register(fd, token, Interest::READABLE)
}

pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
let mut event = libc::epoll_event {
events: interests_to_epoll(interests),
Expand Down