Skip to content

Commit

Permalink
Add UDS support for Unix systems (#1098)
Browse files Browse the repository at this point in the history
### Summary

This change adds UDS support for Unix systems. The networking primitives
`src/net/uds`. These modules provide the structs for `UnixDatagram`s,
`UnixListener`s, and `UnixStream`s. Each of the methods on these structs
dispatch to the Unix system structs in `src/sys/unix/uds.rs`.

Use of these new structs should feel fairly similar to the `Tcp[...]`
equivalents. The tests added also reflect the similar behavior that would be
expected.

### Details

Since this change only adds UDS support, there should be no behavior changes
anywhere else. For reviewing, I recommend starting from the networking
primitives and seeing what methods they dispatch to.

The tests added in `tests/uds.rs` should feel fairly similar to
`tcp_stream.rs`--the main difference coming from supporting `PathBuf`s as the
socket address to connect to.

I originally pulled in some testing infrastructure added to `v0.6.x` in #944,
but ultimately found the functionality to not be as helpful due to a lot of
changes that have since taken place in `master`. I think #944 could still be
helpful, but as a separate change that is more about improving testing
functionality and not included in a change that introduces additional
functionality.
  • Loading branch information
kleimkuhler authored and carllerche committed Oct 9, 2019
1 parent 145a837 commit 9fe594e
Show file tree
Hide file tree
Showing 18 changed files with 1,647 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ pub use waker::Waker;
#[cfg(unix)]
pub mod unix {
//! Unix only extensions.

pub use crate::sys::SocketAddr;
pub use crate::sys::SourceFd;
}
5 changes: 5 additions & 0 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ pub use self::tcp_stream::TcpStream;

mod udp;
pub use self::udp::UdpSocket;

#[cfg(unix)]
mod uds;
#[cfg(unix)]
pub use self::uds::{UnixDatagram, UnixListener, UnixStream};
163 changes: 163 additions & 0 deletions src/net/uds/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::event::Source;
#[cfg(debug_assertions)]
use crate::poll::SelectorId;
use crate::{sys, Interests, Registry, Token};

use std::io;
use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::Path;

/// A Unix datagram socket.
#[derive(Debug)]
pub struct UnixDatagram {
sys: sys::UnixDatagram,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}

impl UnixDatagram {
fn new(sys: sys::UnixDatagram) -> UnixDatagram {
UnixDatagram {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}

/// Creates a Unix datagram socket bound to the given path.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let sys = sys::UnixDatagram::bind(path.as_ref())?;
Ok(UnixDatagram::new(sys))
}

/// Connects the socket to the specified address.
pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
self.sys.connect(path)
}

/// Creates a Unix Datagram socket which is not bound to any address.
pub fn unbound() -> io::Result<UnixDatagram> {
let sys = sys::UnixDatagram::unbound()?;
Ok(UnixDatagram::new(sys))
}

/// Create an unnamed pair of connected sockets.
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (a, b) = sys::UnixDatagram::pair()?;
let a = UnixDatagram::new(a);
let b = UnixDatagram::new(b);
Ok((a, b))
}

/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `UnixListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<UnixDatagram> {
let sys = self.sys.try_clone()?;
Ok(UnixDatagram::new(sys))
}

/// Returns the address of this socket.
pub fn local_addr(&self) -> io::Result<sys::SocketAddr> {
self.sys.local_addr()
}

/// Returns the address of this socket's peer.
///
/// The `connect` method will connect the socket to a peer.
pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> {
self.sys.peer_addr()
}

/// Receives data from the socket.
///
/// On success, returns the number of bytes read and the address from
/// whence the data came.
pub fn recv_from(&self, dst: &mut [u8]) -> io::Result<(usize, sys::SocketAddr)> {
self.sys.recv_from(dst)
}

/// Receives data from the socket.
///
/// On success, returns the number of bytes read.
pub fn recv(&self, dst: &mut [u8]) -> io::Result<usize> {
self.sys.recv(dst)
}

/// Sends data on the socket to the specified address.
///
/// On success, returns the number of bytes written.
pub fn send_to<P: AsRef<Path>>(&self, src: &[u8], path: P) -> io::Result<usize> {
self.sys.send_to(src, path)
}

/// Sends data on the socket to the socket's peer.
///
/// The peer address may be set by the `connect` method, and this method
/// will return an error if the socket has not already been connected.
///
/// On success, returns the number of bytes written.
pub fn send(&self, src: &[u8]) -> io::Result<usize> {
self.sys.send(src)
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.sys.take_error()
}

/// Shut down the read, write, or both halves of this connection.
///
/// This function will cause all pending and future I/O calls on the
/// specified portions to immediately return with an appropriate value
/// (see the documentation of `Shutdown`).
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.sys.shutdown(how)
}
}

impl Source for UnixDatagram {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.register(registry, token, interests)
}

fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}

fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}

impl AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> RawFd {
self.sys.as_raw_fd()
}
}

impl FromRawFd for UnixDatagram {
/// Converts a `std` `RawFd` to a `mio` `UnixDatagram`.
///
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram {
UnixDatagram::new(FromRawFd::from_raw_fd(fd))
}
}

impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd {
self.sys.into_raw_fd()
}
}
109 changes: 109 additions & 0 deletions src/net/uds/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use crate::event::Source;
use crate::net::UnixStream;
#[cfg(debug_assertions)]
use crate::poll::SelectorId;
use crate::unix::SocketAddr;
use crate::{sys, Interests, Registry, Token};

use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::path::Path;

/// A non-blocking Unix domain socket server.
#[derive(Debug)]
pub struct UnixListener {
sys: sys::UnixListener,
#[cfg(debug_assertions)]
selector_id: SelectorId,
}

impl UnixListener {
fn new(sys: sys::UnixListener) -> UnixListener {
UnixListener {
sys,
#[cfg(debug_assertions)]
selector_id: SelectorId::new(),
}
}

/// Creates a new `UnixListener` bound to the specified socket.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let sys = sys::UnixListener::bind(path.as_ref())?;
Ok(UnixListener::new(sys))
}

/// Accepts a new incoming connection to this listener.
///
/// The call is responsible for ensuring that the listening socket is in
/// non-blocking mode.
pub fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
let (sys, sockaddr) = self.sys.accept()?;
Ok((UnixStream::new(sys), sockaddr))
}

/// Creates a new independently owned handle to the underlying socket.
///
/// The returned `UnixListener` is a reference to the same socket that this
/// object references. Both handles can be used to accept incoming
/// connections and options set on one listener will affect the other.
pub fn try_clone(&self) -> io::Result<UnixListener> {
let sys = self.sys.try_clone()?;
Ok(UnixListener::new(sys))
}

/// Returns the local socket address of this listener.
pub fn local_addr(&self) -> io::Result<sys::SocketAddr> {
self.sys.local_addr()
}

/// Returns the value of the `SO_ERROR` option.
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
self.sys.take_error()
}
}

impl Source for UnixListener {
fn register(&self, registry: &Registry, token: Token, interests: Interests) -> io::Result<()> {
#[cfg(debug_assertions)]
self.selector_id.associate_selector(registry)?;
self.sys.reregister(registry, token, interests)
}

fn reregister(
&self,
registry: &Registry,
token: Token,
interests: Interests,
) -> io::Result<()> {
self.sys.reregister(registry, token, interests)
}

fn deregister(&self, registry: &Registry) -> io::Result<()> {
self.sys.deregister(registry)
}
}

#[cfg(unix)]
impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd {
self.sys.as_raw_fd()
}
}

#[cfg(unix)]
impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd {
self.sys.into_raw_fd()
}
}

#[cfg(unix)]
impl FromRawFd for UnixListener {
/// Converts a `std` `RawFd` to a `mio` `UnixListener`.
///
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> UnixListener {
UnixListener::new(FromRawFd::from_raw_fd(fd))
}
}
8 changes: 8 additions & 0 deletions src/net/uds/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod datagram;
pub use self::datagram::UnixDatagram;

mod listener;
pub use self::listener::UnixListener;

mod stream;
pub use self::stream::UnixStream;
Loading

0 comments on commit 9fe594e

Please sign in to comment.