Skip to content

Commit

Permalink
net: add poll_{recv,send}_ready methods to udp and uds_datagram (
Browse files Browse the repository at this point in the history
  • Loading branch information
baloo committed Sep 24, 2021
1 parent 7875f26 commit d32acd9
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 0 deletions.
66 changes: 66 additions & 0 deletions tokio/src/net/udp.rs
Expand Up @@ -443,6 +443,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for write/send readiness.
///
/// If the udp stream is not currently ready for sending, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// stream becomes ready for sending, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Sends data on the socket to the remote address that the socket is
/// connected to.
///
Expand Down Expand Up @@ -630,6 +663,39 @@ impl UdpSocket {
Ok(())
}

/// Polls for read/receive readiness.
///
/// If the udp stream is not currently ready for receiving, this method will
/// store a clone of the `Waker` from the provided `Context`. When the udp
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_send_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the udp stream is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Receives a single datagram message on the socket from the remote address
/// to which it is connected. On success, returns the number of bytes read.
///
Expand Down
66 changes: 66 additions & 0 deletions tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -226,6 +226,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for write/send readiness.
///
/// If the socket is not currently ready for sending, this method will
/// store a clone of the `Waker` from the provided `Context`. When the socket
/// becomes ready for sending, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
/// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
/// second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`writable`] is not feasible. Where possible, using [`writable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for writing.
/// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`writable`]: method@Self::writable
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}

/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
Expand Down Expand Up @@ -289,6 +322,39 @@ impl UnixDatagram {
Ok(())
}

/// Polls for read/receive readiness.
///
/// If the socket is not currently ready for receiving, this method will
/// store a clone of the `Waker` from the provided `Context`. When the
/// socket becomes ready for reading, `Waker::wake` will be called on the
/// waker.
///
/// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
/// `poll_peek`, only the `Waker` from the `Context` passed to the most
/// recent call is scheduled to receive a wakeup. (However,
/// `poll_send_ready` retains a second, independent waker.)
///
/// This function is intended for cases where creating and pinning a future
/// via [`readable`] is not feasible. Where possible, using [`readable`] is
/// preferred, as this supports polling from multiple tasks at once.
///
/// # Return value
///
/// The function returns:
///
/// * `Poll::Pending` if the socket is not ready for reading.
/// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
/// * `Poll::Ready(Err(e))` if an error is encountered.
///
/// # Errors
///
/// This function may encounter any standard I/O error except `WouldBlock`.
///
/// [`readable`]: method@Self::readable
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}

/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
Expand Down
44 changes: 44 additions & 0 deletions tokio/tests/udp.rs
Expand Up @@ -5,6 +5,7 @@ use futures::future::poll_fn;
use std::io;
use std::sync::Arc;
use tokio::{io::ReadBuf, net::UdpSocket};
use tokio_test::assert_ok;

const MSG: &[u8] = b"hello";
const MSG_LEN: usize = MSG.len();
Expand Down Expand Up @@ -440,3 +441,46 @@ async fn try_recv_buf_from() {
}
}
}

#[tokio::test]
async fn poll_ready() {
// Create listener
let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let saddr = server.local_addr().unwrap();

// Create socket pair
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let caddr = client.local_addr().unwrap();

for _ in 0..5 {
loop {
assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);

match client.try_send_to(b"hello world", saddr) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr, caddr);
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}
}
47 changes: 47 additions & 0 deletions tokio/tests/uds_datagram.rs
Expand Up @@ -328,3 +328,50 @@ async fn try_recv_buf_never_block() -> io::Result<()> {

Ok(())
}

#[tokio::test]
async fn poll_ready() -> io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let server_path = dir.path().join("server.sock");
let client_path = dir.path().join("client.sock");

// Create listener
let server = UnixDatagram::bind(&server_path)?;

// Create socket pair
let client = UnixDatagram::bind(&client_path)?;

for _ in 0..5 {
loop {
poll_fn(|cx| client.poll_send_ready(cx)).await?;

match client.try_send_to(b"hello world", &server_path) {
Ok(n) => {
assert_eq!(n, 11);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}

loop {
poll_fn(|cx| server.poll_recv_ready(cx)).await?;

let mut buf = Vec::with_capacity(512);

match server.try_recv_buf_from(&mut buf) {
Ok((n, addr)) => {
assert_eq!(n, 11);
assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
assert_eq!(&buf[0..11], &b"hello world"[..]);
break;
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => panic!("{:?}", e),
}
}
}

Ok(())
}

0 comments on commit d32acd9

Please sign in to comment.