Skip to content

Commit

Permalink
io: implement try_new and try_with_interest for AsyncFd (#6345)
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej authored Mar 14, 2024
1 parent c9e7578 commit e37bd63
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 8 deletions.
114 changes: 107 additions & 7 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use crate::runtime::io::{ReadyEvent, Registration};
use crate::runtime::scheduler;

use mio::unix::SourceFd;
use std::error::Error;
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::{task::Context, task::Poll};
Expand Down Expand Up @@ -249,15 +251,69 @@ impl<T: AsRawFd> AsyncFd<T> {
handle: scheduler::Handle,
interest: Interest,
) -> io::Result<Self> {
let fd = inner.as_raw_fd();
Self::try_new_with_handle_and_interest(inner, handle, interest).map_err(Into::into)
}

let registration =
Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle)?;
/// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
/// implementing [`AsRawFd`]. The backing file descriptor is cached at the
/// time of creation.
///
/// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
/// control, use [`AsyncFd::try_with_interest`].
///
/// This method must be called in the context of a tokio runtime.
///
/// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
/// passed to this function.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[inline]
#[track_caller]
pub fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>>
where
T: AsRawFd,
{
Self::try_with_interest(inner, Interest::READABLE | Interest::WRITABLE)
}

Ok(AsyncFd {
registration,
inner: Some(inner),
})
/// Creates an [`AsyncFd`] backed by (and taking ownership of) an object
/// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
/// file descriptor is cached at the time of creation.
///
/// In the case of failure, it returns [`AsyncFdTryNewError`] that contains the original object
/// passed to this function.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[inline]
#[track_caller]
pub fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>>
where
T: AsRawFd,
{
Self::try_new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
}

#[track_caller]
pub(crate) fn try_new_with_handle_and_interest(
inner: T,
handle: scheduler::Handle,
interest: Interest,
) -> Result<Self, AsyncFdTryNewError<T>> {
let fd = inner.as_raw_fd();

match Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle) {
Ok(registration) => Ok(AsyncFd {
registration,
inner: Some(inner),
}),
Err(cause) => Err(AsyncFdTryNewError { inner, cause }),
}
}

/// Returns a shared reference to the backing object of this [`AsyncFd`].
Expand Down Expand Up @@ -1257,3 +1313,47 @@ impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<
/// [`try_io`]: method@AsyncFdReadyGuard::try_io
#[derive(Debug)]
pub struct TryIoError(());

/// Error returned by [`try_new`] or [`try_with_interest`].
///
/// [`try_new`]: AsyncFd::try_new
/// [`try_with_interest`]: AsyncFd::try_with_interest
pub struct AsyncFdTryNewError<T> {
inner: T,
cause: io::Error,
}

impl<T> AsyncFdTryNewError<T> {
/// Returns the original object passed to [`try_new`] or [`try_with_interest`]
/// alongside the error that caused these functions to fail.
///
/// [`try_new`]: AsyncFd::try_new
/// [`try_with_interest`]: AsyncFd::try_with_interest
pub fn into_parts(self) -> (T, io::Error) {
(self.inner, self.cause)
}
}

impl<T> fmt::Display for AsyncFdTryNewError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.cause, f)
}
}

impl<T> fmt::Debug for AsyncFdTryNewError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.cause, f)
}
}

impl<T> Error for AsyncFdTryNewError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.cause)
}
}

impl<T> From<AsyncFdTryNewError<T>> for io::Error {
fn from(value: AsyncFdTryNewError<T>) -> Self {
value.cause
}
}
2 changes: 1 addition & 1 deletion tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ cfg_net_unix! {

pub mod unix {
//! Asynchronous IO structures specific to Unix-like operating systems.
pub use super::async_fd::{AsyncFd, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError};
pub use super::async_fd::{AsyncFd, AsyncFdTryNewError, AsyncFdReadyGuard, AsyncFdReadyMutGuard, TryIoError};
}
}

Expand Down
30 changes: 30 additions & 0 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use nix::unistd::{close, read, write};
use futures::poll;

use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
use tokio::io::Interest;
use tokio_test::{assert_err, assert_pending};

struct TestWaker {
Expand Down Expand Up @@ -834,3 +835,32 @@ async fn await_error_readiness_invalid_address() {
let guard = fd.ready(Interest::ERROR).await.unwrap();
assert_eq!(guard.ready(), Ready::ERROR);
}

#[derive(Debug, PartialEq, Eq)]
struct InvalidSource;

impl AsRawFd for InvalidSource {
fn as_raw_fd(&self) -> RawFd {
-1
}
}

#[tokio::test]
async fn try_new() {
let original = Arc::new(InvalidSource);

let error = AsyncFd::try_new(original.clone()).unwrap_err();
let (returned, _cause) = error.into_parts();

assert!(Arc::ptr_eq(&original, &returned));
}

#[tokio::test]
async fn try_with_interest() {
let original = Arc::new(InvalidSource);

let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err();
let (returned, _cause) = error.into_parts();

assert!(Arc::ptr_eq(&original, &returned));
}
45 changes: 45 additions & 0 deletions tokio/tests/io_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,48 @@ fn async_fd_with_interest_panic_caller() -> Result<(), Box<dyn Error>> {

Ok(())
}

#[test]
#[cfg(unix)]
fn async_fd_try_new_panic_caller() -> Result<(), Box<dyn Error>> {
use tokio::io::unix::AsyncFd;
use tokio::runtime::Builder;

let panic_location_file = test_panic(|| {
// Runtime without `enable_io` so it has no IO driver set.
let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let fd = unix::MockFd;

let _ = AsyncFd::try_new(fd);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

#[test]
#[cfg(unix)]
fn async_fd_try_with_interest_panic_caller() -> Result<(), Box<dyn Error>> {
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::runtime::Builder;

let panic_location_file = test_panic(|| {
// Runtime without `enable_io` so it has no IO driver set.
let rt = Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let fd = unix::MockFd;

let _ = AsyncFd::try_with_interest(fd, Interest::READABLE);
});
});

// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());

Ok(())
}

0 comments on commit e37bd63

Please sign in to comment.