From 8b5dbbb15df18a3924544d953732ed7072fe180d Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 22 Sep 2023 17:16:11 -0700 Subject: [PATCH 1/3] Expose waitable handles in Windows This commit allows waitable handles to be polled in Windows. This allows I/O constructs like processes, mutexes and waitable events be registered into the poller and be polled just like anything else. cc #25 Signed-off-by: John Nunley --- examples/windows-command.rs | 34 +++++++ src/lib.rs | 1 + src/os.rs | 3 + src/os/windows.rs | 191 ++++++++++++++++++++++++++++++++++++ src/reactor/windows.rs | 72 +++++++++++--- 5 files changed, 287 insertions(+), 14 deletions(-) create mode 100644 examples/windows-command.rs create mode 100644 src/os/windows.rs diff --git a/examples/windows-command.rs b/examples/windows-command.rs new file mode 100644 index 0000000..f8c0ff5 --- /dev/null +++ b/examples/windows-command.rs @@ -0,0 +1,34 @@ +//! Runs a command using waitable handles on Windows. +//! +//! Run with: +//! +//! ``` +//! cargo run --example windows-command +//! ``` + +#[cfg(windows)] +fn main() -> std::io::Result<()> { + use async_io::os::windows::Waitable; + use std::process::Command; + + futures_lite::future::block_on(async { + // Spawn a process. + let process = Command::new("cmd") + .args(["/C", "echo hello"]) + .spawn() + .expect("failed to spawn process"); + + // Wrap the process in an `Async` object that waits for it to exit. + let process = Waitable::new(process)?; + + // Wait for the process to exit. + process.ready().await?; + + Ok(()) + }) +} + +#[cfg(not(windows))] +fn main() { + println!("This example is only supported on Windows."); +} diff --git a/src/lib.rs b/src/lib.rs index cf35fd4..702c0b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,6 +53,7 @@ //! # std::io::Result::Ok(()) }); //! ``` +#![allow(clippy::needless_pass_by_ref_mut)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc( html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" diff --git a/src/os.rs b/src/os.rs index 7561849..bb5e660 100644 --- a/src/os.rs +++ b/src/os.rs @@ -14,3 +14,6 @@ pub mod unix; target_os = "dragonfly", ))] pub mod kqueue; + +#[cfg(windows)] +pub mod windows; diff --git a/src/os/windows.rs b/src/os/windows.rs new file mode 100644 index 0000000..94dae2d --- /dev/null +++ b/src/os/windows.rs @@ -0,0 +1,191 @@ +//! Functionality that is only available on Windows. + +use crate::reactor::{Reactor, Readable, Registration}; +use crate::Async; + +use std::convert::TryFrom; +use std::future::Future; +use std::io::{self, Result}; +use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, OwnedHandle, RawHandle}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A waitable handle registered in the reactor. +/// +/// Some handles in Windows are “waitable”, which means that they emit a “readiness” signal after some event occurs. This function can be used to wait for such events to occur on a handle. This function can be used in addition to regular socket polling. +/// +/// Waitable objects include the following: +/// +/// - Console inputs +/// - Waitable events +/// - Mutexes +/// - Processes +/// - Semaphores +/// - Threads +/// - Timer +/// +/// This structure can be used to wait for any of these objects to become ready. +/// +/// ## Implementation +/// +/// The current implementation waits on the handle by registering it in the application-global +/// Win32 threadpool. However, in the futur it may be possible to migrate to an implementation +/// on Windows 10 that uses a mechanism similar to [`MsgWaitForMultipleObjectsEx`]. +/// +/// [`MsgWaitForMultipleObjectsEx`]: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-msgwaitformultipleobjectsex +/// +/// ## Caveats +/// +/// Read the documentation for the [`Async`](crate::Async) type for more information regarding the +/// abilities and caveats with using this type. +#[derive(Debug)] +pub struct Waitable(Async); + +impl AsRef for Waitable { + fn as_ref(&self) -> &T { + self.0.as_ref() + } +} + +impl Waitable { + /// Create a new [`Waitable`] around a waitable handle. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::windows::Waitable; + /// + /// // Create a new process to wait for. + /// let mut child = Command::new("sleep").arg("5").spawn().unwrap(); + /// + /// // Wrap the process in an `Async` object that waits for it to exit. + /// let process = Waitable::new(child).unwrap(); + /// + /// // Wait for the process to exit. + /// # async_io::block_on(async { + /// process.ready().await.unwrap(); + /// # }); + /// ``` + pub fn new(handle: T) -> Result { + Ok(Self(Async { + source: Reactor::get().insert_io(unsafe { Registration::new_waitable(&handle) })?, + io: Some(handle), + })) + } +} + +impl AsRawHandle for Waitable { + fn as_raw_handle(&self) -> RawHandle { + self.get_ref().as_raw_handle() + } +} + +impl AsHandle for Waitable { + fn as_handle(&self) -> BorrowedHandle<'_> { + self.get_ref().as_handle() + } +} + +impl> TryFrom for Waitable { + type Error = io::Error; + + fn try_from(handle: OwnedHandle) -> Result { + Self::new(handle.into()) + } +} + +impl> TryFrom> for OwnedHandle { + type Error = io::Error; + + fn try_from(value: Waitable) -> std::result::Result { + value.into_inner().map(|handle| handle.into()) + } +} + +impl Waitable { + /// Get a reference to the inner handle. + pub fn get_ref(&self) -> &T { + self.0.get_ref() + } + + /// Get a mutable reference to the inner handle. + /// + /// # Safety + /// + /// The underlying I/O source must not be dropped or moved out using this function. + pub unsafe fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } + + /// Consumes the [`Waitable`], returning the inner handle. + pub fn into_inner(self) -> Result { + self.0.into_inner() + } + + /// Waits until the [`Waitable`] object is ready. + /// + /// This method completes when the underlying [`Waitable`] object has completed. See the documentation + /// for the [`Waitable`] object for more information. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::windows::Waitable; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Waitable::new(child)?; + /// + /// // Wait for the process to exit. + /// process.ready().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ready(&self) -> Ready<'_, T> { + Ready(self.0.readable()) + } + + /// Polls the I/O handle for readiness. + /// + /// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification + /// that the underlying [`Waitable`] object is ready. See the documentation for the [`Waitable`] + /// object for more information. + /// + /// # Caveats + /// + /// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks + /// will just keep waking each other in turn, thus wasting CPU time. + /// + /// # Examples + /// + /// ```no_run + /// use std::process::Command; + /// use async_io::os::windows::Waitable; + /// use futures_lite::future; + /// + /// # futures_lite::future::block_on(async { + /// let child = Command::new("sleep").arg("5").spawn()?; + /// let process = Waitable::new(child)?; + /// + /// // Wait for the process to exit. + /// future::poll_fn(|cx| process.poll_ready(cx)).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.0.poll_readable(cx) + } +} + +/// Future for [`Filter::ready`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct Ready<'a, T>(Readable<'a, T>); + +impl Future for Ready<'_, T> { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } +} diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs index e81a7eb..c0156a9 100644 --- a/src/reactor/windows.rs +++ b/src/reactor/windows.rs @@ -1,25 +1,43 @@ // SPDX-License-Identifier: MIT OR Apache-2.0 -use polling::{Event, Poller}; +use polling::os::iocp::PollerIocpExt; +use polling::{Event, PollMode, Poller}; use std::fmt; use std::io::Result; -use std::os::windows::io::{AsRawSocket, BorrowedSocket, RawSocket}; +use std::os::windows::io::{ + AsHandle, AsRawHandle, AsRawSocket, AsSocket, BorrowedHandle, BorrowedSocket, RawHandle, + RawSocket, +}; /// The raw registration into the reactor. #[doc(hidden)] -pub struct Registration { +pub enum Registration { /// Raw socket handle on Windows. /// /// # Invariant /// /// This describes a valid socket that has not been `close`d. It will not be /// closed while this object is alive. - raw: RawSocket, + Socket(RawSocket), + + /// Waitable handle for Windows. + /// + /// # Invariant + /// + /// This describes a valid waitable handle that has not been `close`d. It will not be + /// closed while this object is alive. + Handle(RawHandle), } +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + impl fmt::Debug for Registration { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Debug::fmt(&self.raw, f) + match self { + Self::Socket(raw) => fmt::Debug::fmt(raw, f), + Self::Handle(handle) => fmt::Debug::fmt(handle, f), + } } } @@ -29,32 +47,58 @@ impl Registration { /// # Safety /// /// The provided file descriptor must be valid and not be closed while this object is alive. - pub(crate) unsafe fn new(f: BorrowedSocket<'_>) -> Self { - Self { - raw: f.as_raw_socket(), - } + pub(crate) unsafe fn new(f: impl AsSocket) -> Self { + Self::Socket(f.as_socket().as_raw_socket()) + } + + /// Create a new [`Registration`] around a waitable handle. + /// + /// # Safety + /// + /// The provided handle must be valid and not be closed while this object is alive. + pub(crate) unsafe fn new_waitable(f: impl AsHandle) -> Self { + Self::Handle(f.as_handle().as_raw_handle()) } /// Registers the object into the reactor. #[inline] pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> { // SAFETY: This object's existence validates the invariants of Poller::add - unsafe { poller.add(self.raw, Event::none(token)) } + unsafe { + match self { + Self::Socket(raw) => poller.add(*raw, Event::none(token)), + Self::Handle(handle) => { + poller.add_waitable(*handle, Event::none(token), PollMode::Oneshot) + } + } + } } /// Re-registers the object into the reactor. #[inline] pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> { // SAFETY: self.raw is a valid file descriptor - let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; - poller.modify(fd, interest) + match self { + Self::Socket(raw) => { + poller.modify(unsafe { BorrowedSocket::borrow_raw(*raw) }, interest) + } + Self::Handle(handle) => poller.modify_waitable( + unsafe { BorrowedHandle::borrow_raw(*handle) }, + interest, + PollMode::Oneshot, + ), + } } /// Deregisters the object from the reactor. #[inline] pub(crate) fn delete(&self, poller: &Poller) -> Result<()> { // SAFETY: self.raw is a valid file descriptor - let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) }; - poller.delete(fd) + match self { + Self::Socket(raw) => poller.delete(unsafe { BorrowedSocket::borrow_raw(*raw) }), + Self::Handle(handle) => { + poller.remove_waitable(unsafe { BorrowedHandle::borrow_raw(*handle) }) + } + } } } From 758b6ebc32d3afd48c5cf1503c39b2342dce23c3 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 22 Sep 2023 18:54:53 -0700 Subject: [PATCH 2/3] Clippy + fmt Signed-off-by: John Nunley --- src/lib.rs | 1 - src/os/windows.rs | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 702c0b4..cf35fd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,7 +53,6 @@ //! # std::io::Result::Ok(()) }); //! ``` -#![allow(clippy::needless_pass_by_ref_mut)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![doc( html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" diff --git a/src/os/windows.rs b/src/os/windows.rs index 94dae2d..e2a56a9 100644 --- a/src/os/windows.rs +++ b/src/os/windows.rs @@ -25,13 +25,13 @@ use std::task::{Context, Poll}; /// - Timer /// /// This structure can be used to wait for any of these objects to become ready. -/// +/// /// ## Implementation -/// +/// /// The current implementation waits on the handle by registering it in the application-global /// Win32 threadpool. However, in the futur it may be possible to migrate to an implementation /// on Windows 10 that uses a mechanism similar to [`MsgWaitForMultipleObjectsEx`]. -/// +/// /// [`MsgWaitForMultipleObjectsEx`]: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-msgwaitformultipleobjectsex /// /// ## Caveats From 3f974ca38f68dec5a3320a9d7ff5cd5f8cc25b9a Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 2 Dec 2023 10:58:49 -0800 Subject: [PATCH 3/3] Update to newest master Signed-off-by: John Nunley --- src/os/windows.rs | 3 ++- src/reactor/windows.rs | 11 +++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/os/windows.rs b/src/os/windows.rs index e2a56a9..ef853c0 100644 --- a/src/os/windows.rs +++ b/src/os/windows.rs @@ -69,7 +69,8 @@ impl Waitable { /// ``` pub fn new(handle: T) -> Result { Ok(Self(Async { - source: Reactor::get().insert_io(unsafe { Registration::new_waitable(&handle) })?, + source: Reactor::get() + .insert_io(unsafe { Registration::new_waitable(handle.as_handle()) })?, io: Some(handle), })) } diff --git a/src/reactor/windows.rs b/src/reactor/windows.rs index c0156a9..75233c9 100644 --- a/src/reactor/windows.rs +++ b/src/reactor/windows.rs @@ -5,8 +5,7 @@ use polling::{Event, PollMode, Poller}; use std::fmt; use std::io::Result; use std::os::windows::io::{ - AsHandle, AsRawHandle, AsRawSocket, AsSocket, BorrowedHandle, BorrowedSocket, RawHandle, - RawSocket, + AsRawHandle, AsRawSocket, BorrowedHandle, BorrowedSocket, RawHandle, RawSocket, }; /// The raw registration into the reactor. @@ -47,8 +46,8 @@ impl Registration { /// # Safety /// /// The provided file descriptor must be valid and not be closed while this object is alive. - pub(crate) unsafe fn new(f: impl AsSocket) -> Self { - Self::Socket(f.as_socket().as_raw_socket()) + pub(crate) unsafe fn new(f: BorrowedSocket<'_>) -> Self { + Self::Socket(f.as_raw_socket()) } /// Create a new [`Registration`] around a waitable handle. @@ -56,8 +55,8 @@ impl Registration { /// # Safety /// /// The provided handle must be valid and not be closed while this object is alive. - pub(crate) unsafe fn new_waitable(f: impl AsHandle) -> Self { - Self::Handle(f.as_handle().as_raw_handle()) + pub(crate) unsafe fn new_waitable(f: BorrowedHandle<'_>) -> Self { + Self::Handle(f.as_raw_handle()) } /// Registers the object into the reactor.