From 7c3d009c2cce17dc985b3d0f5ad429f7c05ef0bc Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Fri, 4 Aug 2023 14:44:08 +0200 Subject: [PATCH] Move has_waker boolean to Registry And share it properly between all Registries --- src/poll.rs | 26 ++++++++++++++++++++------ src/sys/shell/selector.rs | 5 ----- src/sys/unix/selector/epoll.rs | 13 +------------ src/sys/unix/selector/kqueue.rs | 13 +------------ src/sys/unix/selector/poll.rs | 16 +--------------- src/sys/windows/selector.rs | 11 ----------- tests/waker.rs | 24 ++++++++++++++++++++++++ 7 files changed, 47 insertions(+), 61 deletions(-) diff --git a/src/poll.rs b/src/poll.rs index 695bede5d..d904eb97c 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,9 +1,14 @@ -use crate::{event, sys, Events, Interest, Token}; #[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(all(debug_assertions, not(target_os = "wasi")))] +use std::sync::atomic::{AtomicBool, Ordering}; +#[cfg(all(debug_assertions, not(target_os = "wasi")))] +use std::sync::Arc; use std::time::Duration; use std::{fmt, io}; +use crate::{event, sys, Events, Interest, Token}; + /// Polls for readiness events on all registered values. /// /// `Poll` allows a program to monitor a large number of [`event::Source`]s, @@ -252,6 +257,9 @@ pub struct Poll { /// Registers I/O resources. pub struct Registry { selector: sys::Selector, + /// Whether this selector currently has an associated waker. + #[cfg(all(debug_assertions, not(target_os = "wasi")))] + has_waker: Arc, } impl Poll { @@ -298,7 +306,11 @@ impl Poll { /// ``` pub fn new() -> io::Result { sys::Selector::new().map(|selector| Poll { - registry: Registry { selector }, + registry: Registry { + selector, + #[cfg(all(debug_assertions, not(target_os = "wasi")))] + has_waker: Arc::new(AtomicBool::new(false)), + }, }) } } @@ -668,9 +680,11 @@ impl Registry { /// Event sources registered with this `Registry` will be registered with /// the original `Registry` and `Poll` instance. pub fn try_clone(&self) -> io::Result { - self.selector - .try_clone() - .map(|selector| Registry { selector }) + self.selector.try_clone().map(|selector| Registry { + selector, + #[cfg(all(debug_assertions, not(target_os = "wasi")))] + has_waker: Arc::clone(&self.has_waker), + }) } /// Internal check to ensure only a single `Waker` is active per [`Poll`] @@ -678,7 +692,7 @@ impl Registry { #[cfg(all(debug_assertions, not(target_os = "wasi")))] pub(crate) fn register_waker(&self) { assert!( - !self.selector.register_waker(), + !self.has_waker.swap(true, Ordering::AcqRel), "Only a single `Waker` can be active per `Poll` instance" ); } diff --git a/src/sys/shell/selector.rs b/src/sys/shell/selector.rs index bfc598a12..0a168980d 100644 --- a/src/sys/shell/selector.rs +++ b/src/sys/shell/selector.rs @@ -18,11 +18,6 @@ impl Selector { pub fn select(&self, _: &mut Events, _: Option) -> io::Result<()> { os_required!(); } - - #[cfg(all(debug_assertions, not(target_os = "wasi")))] - pub fn register_waker(&self) -> bool { - os_required!(); - } } #[cfg(unix)] diff --git a/src/sys/unix/selector/epoll.rs b/src/sys/unix/selector/epoll.rs index f3e0988c6..c0a8a4c16 100644 --- a/src/sys/unix/selector/epoll.rs +++ b/src/sys/unix/selector/epoll.rs @@ -3,7 +3,7 @@ use crate::{Interest, Token}; use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP}; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, i32, io, ptr}; @@ -16,8 +16,6 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, ep: RawFd, - #[cfg(debug_assertions)] - has_waker: AtomicBool, } impl Selector { @@ -60,8 +58,6 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), ep, - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), }) } @@ -71,8 +67,6 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, ep, - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -138,11 +132,6 @@ impl Selector { pub fn deregister(&self, fd: RawFd) -> io::Result<()> { syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ()) } - - #[cfg(debug_assertions)] - pub fn register_waker(&self) -> bool { - self.has_waker.swap(true, Ordering::AcqRel) - } } cfg_io_source! { diff --git a/src/sys/unix/selector/kqueue.rs b/src/sys/unix/selector/kqueue.rs index 8e9aa4c47..0a2a24a57 100644 --- a/src/sys/unix/selector/kqueue.rs +++ b/src/sys/unix/selector/kqueue.rs @@ -3,7 +3,7 @@ use std::mem::{self, MaybeUninit}; use std::ops::{Deref, DerefMut}; use std::os::unix::io::{AsRawFd, RawFd}; #[cfg(debug_assertions)] -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use std::{cmp, io, ptr, slice}; @@ -66,8 +66,6 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, kq: RawFd, - #[cfg(debug_assertions)] - has_waker: AtomicBool, } impl Selector { @@ -77,8 +75,6 @@ impl Selector { #[cfg(debug_assertions)] id: NEXT_ID.fetch_add(1, Ordering::Relaxed), kq, - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), }; syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC))?; @@ -91,8 +87,6 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, kq, - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -213,11 +207,6 @@ impl Selector { kevent_register(self.kq, &mut changes, &[libc::ENOENT as i64]) } - #[cfg(debug_assertions)] - pub fn register_waker(&self) -> bool { - self.has_waker.swap(true, Ordering::AcqRel) - } - // Used by `Waker`. #[cfg(any( target_os = "freebsd", diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs index ab2eea3c8..cfafbc182 100644 --- a/src/sys/unix/selector/poll.rs +++ b/src/sys/unix/selector/poll.rs @@ -22,9 +22,6 @@ static NEXT_ID: AtomicUsize = AtomicUsize::new(1); #[derive(Debug)] pub struct Selector { state: Arc, - /// Whether this selector currently has an associated waker. - #[cfg(debug_assertions)] - has_waker: AtomicBool, } impl Selector { @@ -33,8 +30,6 @@ impl Selector { Ok(Selector { state: Arc::new(state), - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), }) } @@ -44,11 +39,7 @@ impl Selector { let state = self.state.clone(); - Ok(Selector { - state, - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), - }) + Ok(Selector { state }) } pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { @@ -77,11 +68,6 @@ impl Selector { self.state.deregister(fd) } - #[cfg(debug_assertions)] - pub fn register_waker(&self) -> bool { - self.has_waker.swap(true, Ordering::AcqRel) - } - pub fn wake(&self, token: Token) -> io::Result<()> { self.state.wake(token) } diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index ac5152c9f..23d7bc5c1 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -328,8 +328,6 @@ pub struct Selector { #[cfg(debug_assertions)] id: usize, pub(super) inner: Arc, - #[cfg(debug_assertions)] - has_waker: AtomicBool, } impl Selector { @@ -341,8 +339,6 @@ impl Selector { #[cfg(debug_assertions)] id, inner: Arc::new(inner), - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(false), } }) } @@ -352,8 +348,6 @@ impl Selector { #[cfg(debug_assertions)] id: self.id, inner: Arc::clone(&self.inner), - #[cfg(debug_assertions)] - has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), }) } @@ -365,11 +359,6 @@ impl Selector { self.inner.select(events, timeout) } - #[cfg(debug_assertions)] - pub fn register_waker(&self) -> bool { - self.has_waker.swap(true, Ordering::AcqRel) - } - pub(super) fn clone_port(&self) -> Arc { self.inner.cp.clone() } diff --git a/tests/waker.rs b/tests/waker.rs index 56945d2cb..0893b0fd3 100644 --- a/tests/waker.rs +++ b/tests/waker.rs @@ -128,6 +128,30 @@ fn using_multiple_wakers_panics() { drop(waker2); } +#[test] +#[cfg_attr( + not(debug_assertions), + ignore = "only works with debug_assertions enabled" +)] +#[should_panic = "Only a single `Waker` can be active per `Poll` instance"] +fn using_multiple_wakers_panics_different_cloned_registries() { + init(); + + let poll = Poll::new().expect("unable to create new Poll instance"); + let registry1 = poll.registry().try_clone().unwrap(); + let registry2 = poll.registry().try_clone().unwrap(); + + let token1 = Token(10); + let token2 = Token(11); + + let waker1 = Waker::new(®istry1, token1).expect("unable to first waker"); + // This should panic. + let waker2 = Waker::new(®istry2, token2).unwrap(); + + drop(waker1); + drop(waker2); +} + fn expect_waker_event(poll: &mut Poll, events: &mut Events, token: Token) { poll.poll(events, Some(Duration::from_millis(100))).unwrap(); assert!(!events.is_empty());