Skip to content

Commit

Permalink
net: add UnixDatagram readiness and non-blocking ops
Browse files Browse the repository at this point in the history
  • Loading branch information
cssivision committed Dec 7, 2020
1 parent cd13b95 commit fbbd3d1
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 4 deletions.
141 changes: 137 additions & 4 deletions tokio/src/net/unix/datagram/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::io::{Interest, PollEvented, ReadBuf};
use crate::io::{Interest, PollEvented, ReadBuf, Ready};
use crate::net::unix::SocketAddr;

use std::convert::TryFrom;
Expand Down Expand Up @@ -84,6 +84,131 @@ cfg_net_unix! {
}

impl UnixDatagram {
/// Wait for any of the requested ready states.
///
/// This function is usually paired with `try_recv()` or `try_send()`. It
/// can be used to concurrently recv / send to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`.
///
/// # Examples
///
/// Concurrently receive from and send to the socket on the same task
/// without splitting.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}

/// Wait for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is
/// usually paired with `try_send()` or `try_send_to()`.
///
/// The function may complete without the socket being writable. This is a
/// false-positive and attempting a `try_send()` will return with
/// `io::ErrorKind::WouldBlock`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixDatagram;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let dir = tempfile::tempdir().unwrap();
/// let client_path = dir.path().join("client.sock");
/// let server_path = dir.path().join("server.sock");
/// let socket = UnixDatagram::bind(&client_path)?;
/// socket.connect(&server_path)?;
///
/// loop {
/// // Wait for the socket to be writable
/// socket.writable().await?;
///
/// // Try to send data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_send(b"hello world") {
/// Ok(n) => {
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn writable(&self) -> io::Result<()> {
self.ready(Interest::WRITABLE).await?;
Ok(())
}

/// Wait for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_recv()`.
///
/// The function may complete without the socket being readable. This is a
/// false-positive and attempting a `try_recv()` will return with
/// `io::ErrorKind::WouldBlock`.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixDatagram;
/// use std::io;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// // Connect to a peer
/// let dir = tempfile::tempdir().unwrap();
/// let client_path = dir.path().join("client.sock");
/// let server_path = dir.path().join("server.sock");
/// let socket = UnixDatagram::bind(&client_path)?;
/// socket.connect(&server_path)?;
///
/// loop {
/// // Wait for the socket to be readable
/// socket.readable().await?;
///
/// // The buffer is **not** included in the async task and will
/// // only exist on the stack.
/// let mut buf = [0; 1024];
///
/// // Try to recv data, this may still fail with `WouldBlock`
/// // if the readiness event is a false positive.
/// match socket.try_recv(&mut buf) {
/// Ok(n) => {
/// println!("GOT {:?}", &buf[..n]);
/// break;
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
/// }
/// Err(e) => {
/// return Err(e);
/// }
/// }
/// }
///
/// Ok(())
/// }
/// ```
pub async fn readable(&self) -> io::Result<()> {
self.ready(Interest::READABLE).await?;
Ok(())
}

/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
Expand Down Expand Up @@ -332,7 +457,9 @@ impl UnixDatagram {
/// # }
/// ```
pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
self.io.send(buf)
self.io
.registration()
.try_io(Interest::WRITABLE, || self.io.send(buf))
}

/// Try to send a datagram to the peer without waiting.
Expand Down Expand Up @@ -432,7 +559,9 @@ impl UnixDatagram {
/// # }
/// ```
pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.recv(buf)
self.io
.registration()
.try_io(Interest::READABLE, || self.io.recv(buf))
}

/// Sends data on the socket to the specified address.
Expand Down Expand Up @@ -694,7 +823,11 @@ impl UnixDatagram {
/// # }
/// ```
pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.recv_from(buf)?;
let (n, addr) = self
.io
.registration()
.try_io(Interest::READABLE, || self.io.recv_from(buf))?;

Ok((n, SocketAddr(addr)))
}

Expand Down
3 changes: 3 additions & 0 deletions tokio/tests/uds_datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ async fn try_send_recv_never_block() -> io::Result<()> {

// Send until we hit the OS `net.unix.max_dgram_qlen`.
loop {
dgram1.writable().await.unwrap();

match dgram1.try_send(payload) {
Err(err) => match err.kind() {
io::ErrorKind::WouldBlock | io::ErrorKind::Other => break,
Expand All @@ -98,6 +100,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {

// Read every dgram we sent.
while count > 0 {
dgram2.readable().await.unwrap();
let len = dgram2.try_recv(&mut recv_buf[..])?;
assert_eq!(len, payload.len());
assert_eq!(payload, &recv_buf[..len]);
Expand Down

0 comments on commit fbbd3d1

Please sign in to comment.