From 079321700a3961ef88a48afa26c2fff3883ef0e5 Mon Sep 17 00:00:00 2001 From: Anatoly Ikorsky Date: Thu, 7 Jan 2021 00:41:00 +0300 Subject: [PATCH] Add named pipes implementation --- tokio/Cargo.toml | 4 + tokio/src/macros/cfg.rs | 10 + tokio/src/net/mod.rs | 5 + tokio/src/net/windows/mod.rs | 3 + tokio/src/net/windows/named_pipe.rs | 335 ++++++++++++++++++++++++++++ tokio/tests/named_pipe.rs | 106 +++++++++ 6 files changed, 463 insertions(+) create mode 100644 tokio/src/net/windows/mod.rs create mode 100644 tokio/src/net/windows/named_pipe.rs create mode 100644 tokio/tests/named_pipe.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0849a369ab2..40393cf24f8 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -55,6 +55,7 @@ net = [ "mio/tcp", "mio/udp", "mio/uds", + "winapi/winbase", ] process = [ "bytes", @@ -111,6 +112,9 @@ signal-hook-registry = { version = "1.1.1", optional = true } libc = { version = "0.2.42" } nix = { version = "0.19.0" } +[target.'cfg(windows)'.dependencies.miow] +version = "0.3.6" + [target.'cfg(windows)'.dependencies.winapi] version = "0.3.8" default-features = false diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9ae098fb072..22285b78c3e 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -183,6 +183,16 @@ macro_rules! cfg_net_unix { } } +macro_rules! cfg_net_windows { + ($($item:item)*) => { + $( + #[cfg(all(target_os = "windows", feature = "net"))] + #[cfg_attr(docsrs, doc(cfg(all(target_os = "windows", feature = "net"))))] + $item + )* + } +} + macro_rules! cfg_process { ($($item:item)*) => { $( diff --git a/tokio/src/net/mod.rs b/tokio/src/net/mod.rs index 2f17f9eab5e..a69edaaf26a 100644 --- a/tokio/src/net/mod.rs +++ b/tokio/src/net/mod.rs @@ -46,3 +46,8 @@ cfg_net_unix! { pub use unix::listener::UnixListener; pub use unix::stream::UnixStream; } + +cfg_net_windows! { + pub mod windows; + pub use windows::named_pipe::{NamedPipe, NamedPipeServer}; +} diff --git a/tokio/src/net/windows/mod.rs b/tokio/src/net/windows/mod.rs new file mode 100644 index 00000000000..d4b530c0f89 --- /dev/null +++ b/tokio/src/net/windows/mod.rs @@ -0,0 +1,3 @@ +//! Windows platform functionality. + +pub mod named_pipe; diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs new file mode 100644 index 00000000000..6fb1db0a28f --- /dev/null +++ b/tokio/src/net/windows/named_pipe.rs @@ -0,0 +1,335 @@ +//! Windows named pipes. + +use mio::windows::NamedPipe as MioNamedPipe; +use miow::pipe::{NamedPipe as RawNamedPipe, NamedPipeBuilder}; +use winapi::{ + shared::winerror::*, + um::{namedpipeapi::WaitNamedPipeW, winbase::*}, +}; + +use std::{ + ffi::{OsStr, OsString}, + fs::OpenOptions, + future::Future, + io::{self, ErrorKind, IoSlice, Result}, + mem, + os::windows::prelude::*, + pin::Pin, + sync::Mutex, + task::{Context, Poll}, +}; + +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; + +const DEFAULT_BUFFER_SIZE: u32 = 65_536; + +/// Creates named pipe builder with default parameters. +fn default_builder(addr: &OsStr) -> NamedPipeBuilder { + let mut builder = NamedPipeBuilder::new(addr); + builder + .inbound(true) + .outbound(true) + .out_buffer_size(DEFAULT_BUFFER_SIZE) + .in_buffer_size(DEFAULT_BUFFER_SIZE) + .max_instances(PIPE_UNLIMITED_INSTANCES as u8); + builder +} + +fn mio_from_miow(pipe: RawNamedPipe) -> MioNamedPipe { + // Safety: nothing actually unsafe about this. The trait fn includes `unsafe`. + unsafe { MioNamedPipe::from_raw_handle(pipe.into_raw_handle()) } +} + +/// Connecting instance future. +#[derive(Debug)] +enum ConnectingInstance { + Connecting(NamedPipe), + Error(io::Error), + Ready(Option), +} + +impl ConnectingInstance { + fn from_mio(mio: MioNamedPipe) -> Result { + if connect(&mio)? { + Ok(Self::Ready(Some(NamedPipe::server(mio)?))) + } else { + Ok(Self::Connecting(NamedPipe::server(mio)?)) + } + } + + /// Creates the first instance of a named pipe server. + /// + /// # Errors + /// + /// It'll error if pipe with this name already exist. + fn first(addr: &OsStr) -> Result { + let mio = mio_from_miow(default_builder(addr).first(true).create()?); + Self::from_mio(mio) + } + + fn _new(addr: &OsStr) -> Result { + let mio = mio_from_miow(default_builder(addr).first(false).create()?); + Self::from_mio(mio) + } + + /// Creates an instance of a named pipe server. + /// + /// In case of an error it will not error immediately, + /// but will resolve to an error as a future. + fn new(addr: &OsStr) -> Self { + match Self::_new(addr) { + Ok(this) => this, + Err(err) => Self::Error(err), + } + } +} + +impl Future for ConnectingInstance { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match mem::replace(&mut *self, ConnectingInstance::Ready(None)) { + Self::Ready(None) => { + // poll on completed future + Poll::Pending + } + Self::Ready(Some(pipe)) => Poll::Ready(Ok(pipe)), + Self::Connecting(pipe) => match pipe.poll_write_ready(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Ok(pipe)), + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => { + *self = Self::Connecting(pipe); + Poll::Pending + } + }, + Self::Error(err) => Poll::Ready(Err(err)), + } + } +} + +/// Named pipe server. +#[derive(Debug)] +pub struct NamedPipeServer { + pipe_name: OsString, + // At least one instance will always exist. + next_instance: Mutex, +} + +impl NamedPipeServer { + /// Constructor. + /// + /// # Errors + /// + /// It'll error if pipe with this name already exists. + pub fn new>(addr: A) -> Result { + let pipe_name = addr.into(); + + // Bind pipe name or fail if already exist to avoid + // named pipe instance creation race condition. + let first_instance = ConnectingInstance::first(&pipe_name)?; + + Ok(Self { + pipe_name, + next_instance: Mutex::new(first_instance), + }) + } + + /// Returns `'static` future that will wait for a client. + pub fn accept(&self) -> impl Future> + 'static { + let next_instance = ConnectingInstance::new(&self.pipe_name); + mem::replace(&mut *self.next_instance.lock().unwrap(), next_instance) + } +} + +#[derive(Debug)] +enum NamedPipeInner { + Client(PollEvented), + Server(PollEvented), +} + +/// Non-blocking windows named pipe. +#[derive(Debug)] +pub struct NamedPipe { + inner: NamedPipeInner, +} + +impl NamedPipe { + fn server(mio: MioNamedPipe) -> Result { + let io = PollEvented::new(mio)?; + Ok(Self { + inner: NamedPipeInner::Server(io) + }) + } + + fn client(mio: MioNamedPipe) -> Result { + let io = PollEvented::new(mio)?; + Ok(Self { + inner: NamedPipeInner::Client(io) + }) + } + + fn io_ref(&self) -> &PollEvented { + match &self.inner { + NamedPipeInner::Client(io) | NamedPipeInner::Server(io) => io, + } + } + + /// Will try to connect to a named pipe. Returned pipe may not be writable. + /// + /// # Errors + /// + /// Will error with `ERROR_PIPE_BUSY` if there are no available instances. + fn open(addr: &OsStr) -> Result { + let file = OpenOptions::new() + .read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED) + .security_qos_flags(SECURITY_IDENTIFICATION) + .open(addr)?; + + let pipe = unsafe { MioNamedPipe::from_raw_handle(file.into_raw_handle()) }; + Self::client(pipe) + } + + /// Connects to a nemed pipe server by `addr`. + /// + /// # Errors + /// + /// It'll error if there is no such pipe. + pub async fn connect>(addr: A) -> Result { + let mut pipe_name = into_wide(addr.as_ref()); + let mut busy = false; + + loop { + if !busy { + // pipe instance may be available, so trying to open it + match Self::open(addr.as_ref()) { + Ok(pipe) => { + // Pipe is opened. + pipe.writable().await?; + return Ok(pipe); + } + Err(err) if err.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => { + // We should wait since there are no free instances + } + Err(err) => return Err(err), + } + } + + let (status, name) = wait_pipe(pipe_name).await?; + pipe_name = name; + busy = matches!(status, WaitPipeResult::Busy); + } + } + + /// Polls for read readiness. + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io_ref() + .registration() + .poll_read_ready(cx) + .map_ok(|_| ()) + } + + /// Polls for write readiness. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io_ref() + .registration() + .poll_write_ready(cx) + .map_ok(|_| ()) + } + + /// Polls for any of the requested ready states. + pub async fn ready(&self, interest: Interest) -> Result { + let event = self.io_ref().registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Waits for the pipe client or server to become readable. + pub async fn readable(&self) -> Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Waits for the pipe client or server to become writeable. + pub async fn writable(&self) -> Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } +} + +impl AsRawHandle for NamedPipe { + fn as_raw_handle(&self) -> RawHandle { + self.io_ref().as_raw_handle() + } +} + +impl AsyncRead for NamedPipe { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + unsafe { self.io_ref().poll_read(cx, buf) } + } +} + +impl AsyncWrite for NamedPipe { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + self.io_ref().poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.io_ref().poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +fn into_wide(s: &OsStr) -> Vec { + s.encode_wide().chain(Some(0)).collect() +} + +#[derive(Debug, Clone, Copy)] +enum WaitPipeResult { + Available, + Busy, +} + +// `wide_name` will be returned. +async fn wait_pipe(wide_name: Vec) -> Result<(WaitPipeResult, Vec)> { + crate::task::spawn_blocking(move || { + let result = unsafe { WaitNamedPipeW(wide_name.as_ptr(), 0) }; + if result > 0 { + Ok((WaitPipeResult::Available, wide_name)) + } else { + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(ERROR_SEM_TIMEOUT as i32) { + Ok((WaitPipeResult::Busy, wide_name)) + } else { + Err(err) + } + } + }) + .await? +} + +/// Connects server side of a named pipe. +/// +/// Returns `true` if pipe is connected. +fn connect(pipe: &MioNamedPipe) -> io::Result { + match pipe.connect() { + Ok(()) => Ok(true), + Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(false), + Err(err) => Err(err), + } +} diff --git a/tokio/tests/named_pipe.rs b/tokio/tests/named_pipe.rs new file mode 100644 index 00000000000..fadb86ddde5 --- /dev/null +++ b/tokio/tests/named_pipe.rs @@ -0,0 +1,106 @@ +#![cfg(feature = "full")] +#![warn(rust_2018_idioms)] +#![cfg(target_os = "windows")] + +use bytes::Buf; +use futures::{ + future::{select, try_join, Either}, + stream::FuturesUnordered, + StreamExt, +}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{NamedPipe, NamedPipeServer}, + time::{sleep, Duration}, +}; + +#[tokio::test] +async fn basic() -> std::io::Result<()> { + const NUM_CLIENTS: u32 = 255; + const PIPE_NAME: &'static str = r"\\.\pipe\test-named-pipe-basic"; + let mut buf = [0_u8; 16]; + + // Create server to avoid NotFound from clients. + let server = NamedPipeServer::new(PIPE_NAME)?; + + let server = async move { + let mut pipe; + for _ in 0..NUM_CLIENTS { + pipe = server.accept().await?; + let mut buf = Vec::new(); + pipe.read_buf(&mut buf).await?; + let i = (&*buf).get_u32_le(); + pipe.write_all(format!("Server to {}", i).as_bytes()) + .await?; + } + std::io::Result::Ok(()) + }; + + // concurrent clients + let clients = (0..NUM_CLIENTS) + .map(|i| async move { + let mut pipe = NamedPipe::connect(PIPE_NAME).await?; + pipe.write_all(&i.to_le_bytes()).await?; + let mut buf = Vec::new(); + pipe.read_buf(&mut buf).await?; + assert_eq!(buf, format!("Server to {}", i).as_bytes()); + std::io::Result::Ok(()) + }) + .collect::>() + .fold(Ok(()), |a, x| async move { a.and(x) }); + + try_join(server, clients).await?; + + // client returns not found if there is no server + let err = NamedPipe::connect(PIPE_NAME).await.unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::NotFound); + + let server = NamedPipeServer::new(PIPE_NAME)?.accept(); + let client = NamedPipe::connect(PIPE_NAME); + let (mut server, mut client) = try_join(server, client).await?; + + ping_pong(&mut server, &mut client).await?; + + drop(server); + + // Client reads when server is gone + let len = client.read(&mut buf).await.unwrap(); + assert_eq!(len, 0); + + drop(client); + + let server = NamedPipeServer::new(PIPE_NAME)?.accept(); + let client = NamedPipe::connect(PIPE_NAME); + let (mut server, mut client) = try_join(server, client).await?; + + ping_pong(&mut server, &mut client).await?; + + drop(client); + + // Server reads when client is gone + let len = server.read(&mut buf).await?; + assert_eq!(len, 0); + + // There is no way to connect to a connected server instance + // even if client is gone. + let timeout = sleep(Duration::from_millis(300)); + let client = NamedPipe::connect(PIPE_NAME); + futures::pin_mut!(client); + futures::pin_mut!(timeout); + let result = select(timeout, client).await; + assert!(matches!(result, Either::Left(_))); + + Ok(()) +} + +async fn ping_pong(l: &mut NamedPipe, r: &mut NamedPipe) -> std::io::Result<()> { + let mut buf = [b' '; 5]; + + l.write_all(b"ping").await?; + r.read(&mut buf).await?; + assert_eq!(&buf, b"ping "); + r.write_all(b"pong").await?; + l.read(&mut buf).await?; + assert_eq!(&buf, b"pong "); + Ok(()) +}