Skip to content

Commit

Permalink
add try_recv_timeout method, implement for Windows
Browse files Browse the repository at this point in the history
  • Loading branch information
Xaeroxe committed Nov 17, 2021
1 parent 483439c commit eb08381
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ sc = { version = "0.2.2", optional = true }
crossbeam-utils = "0.7"

[target.'cfg(target_os = "windows")'.dependencies]
winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default"]}
winapi = {version = "0.3.7", features = ["minwindef", "ioapiset", "memoryapi", "namedpipeapi", "handleapi", "fileapi", "impl-default", "synchapi"]}
14 changes: 14 additions & 0 deletions src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::time::Duration;

thread_local! {
static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
Expand Down Expand Up @@ -263,6 +264,19 @@ impl<T> IpcReceiver<T> where T: for<'de> Deserialize<'de> + Serialize {
.map_err(TryRecvError::IpcError)
}

/// Blocks for up to the specified duration attempting to receive a message.
///
/// This may block for longer than the specified duration if the channel is busy.
pub fn try_recv_timeout(&self, duration: Duration) -> Result<T, TryRecvError> {
let (data, os_ipc_channels, os_ipc_shared_memory_regions) =
self.os_receiver.try_recv_timeout(duration)?;
OpaqueIpcMessage::new(data, os_ipc_channels, os_ipc_shared_memory_regions)
.to()
.map_err(IpcError::Bincode)
.map_err(TryRecvError::IpcError)
}


/// Erase the type of the channel.
///
/// Useful for adding routes to a `RouterProxy`.
Expand Down
49 changes: 35 additions & 14 deletions src/platform/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::ipc;
use libc::intptr_t;
use std::cell::{Cell, RefCell};
use std::cmp::PartialEq;
use std::convert::TryInto;
use std::default::Default;
use std::env;
use std::error::Error as StdError;
Expand All @@ -26,11 +27,13 @@ use std::ops::{Deref, DerefMut, RangeFrom};
use std::ptr;
use std::slice;
use std::thread;
use std::time::Duration;
use uuid::Uuid;
use winapi::um::winnt::{HANDLE};
use winapi::um::handleapi::{INVALID_HANDLE_VALUE};
use winapi::shared::minwindef::{LPVOID};
use winapi::shared::minwindef::LPVOID;
use winapi;
use winapi::um::synchapi::CreateEventA;

mod aliased_cell;
use self::aliased_cell::AliasedCell;
Expand Down Expand Up @@ -590,17 +593,22 @@ impl MessageReader {
// issue the read to the buffer, at the current length offset
self.r#async = Some(AliasedCell::new(AsyncData {
handle: self.handle.take(),
ov: NoDebug(Box::new(mem::zeroed())),
ov: NoDebug(Box::new({
let mut overlapped: winapi::um::minwinbase::OVERLAPPED = mem::zeroed();
// Create a manually reset event. The documentation for GetOverlappedResultEx
// states you must do this in the remarks section.
overlapped.hEvent = CreateEventA(ptr::null_mut(), winapi::shared::minwindef::TRUE, winapi::shared::minwindef::FALSE, ptr::null_mut());
overlapped
})),
buf: mem::replace(&mut self.read_buf, vec![]),
}));
let mut bytes_read: u32 = 0;
let ok = {
let async_data = self.r#async.as_mut().unwrap().alias_mut();
let remaining_buf = &mut async_data.buf[buf_len..];
winapi::um::fileapi::ReadFile(async_data.handle.as_raw(),
remaining_buf.as_mut_ptr() as LPVOID,
remaining_buf.len() as u32,
&mut bytes_read,
ptr::null_mut(),
&mut **async_data.ov.deref_mut())
};

Expand Down Expand Up @@ -722,30 +730,38 @@ impl MessageReader {
/// since it's still aliased by the kernel.
/// (And there is nothing new to pick up anyway.)
/// It will only become available again
/// when `fetch_async_result()` returns sucessfully upon retry.
/// when `fetch_async_result()` returns successfully upon retry.
/// (Or the async read is aborted with `cancel_io()`.)
fn fetch_async_result(&mut self, blocking_mode: BlockingMode) -> Result<(), WinError> {
unsafe {
// Get the overlapped result, blocking if we need to.
let mut nbytes: u32 = 0;
let block = match blocking_mode {
BlockingMode::Blocking => winapi::shared::minwindef::TRUE,
BlockingMode::Nonblocking => winapi::shared::minwindef::FALSE,
let timeout = match blocking_mode {
BlockingMode::Blocking => winapi::um::winbase::INFINITE,
BlockingMode::Nonblocking => 0,
BlockingMode::Timeout(duration) => duration.as_millis().try_into().unwrap_or(winapi::um::winbase::INFINITE),
};
let ok = winapi::um::ioapiset::GetOverlappedResult(self.r#async.as_ref().unwrap().alias().handle.as_raw(),
let ok = winapi::um::ioapiset::GetOverlappedResultEx(self.r#async.as_ref().unwrap().alias().handle.as_raw(),
&mut **self.r#async.as_mut().unwrap().alias_mut().ov.deref_mut(),
&mut nbytes,
block);
timeout,
winapi::shared::minwindef::FALSE);
winapi::um::synchapi::ResetEvent(self.r#async.as_mut().unwrap().alias_mut().ov.deref_mut().hEvent);
let io_result = if ok == winapi::shared::minwindef::FALSE {
let err = GetLastError();
if blocking_mode == BlockingMode::Nonblocking && err == winapi::shared::winerror::ERROR_IO_INCOMPLETE {
if blocking_mode != BlockingMode::Blocking && err == winapi::shared::winerror::ERROR_IO_INCOMPLETE {
// Async read hasn't completed yet.
// Inform the caller, while keeping the read in flight.
return Err(WinError::NoData);
}
// Timeout has elapsed, so we must cancel the read operation before proceeding
if err == winapi::shared::winerror::WAIT_TIMEOUT {
self.cancel_io();
return Err(WinError::NoData);
}
// We pass err through to notify_completion so
// that it can handle other errors.
Err(WinError::from_system(err, "GetOverlappedResult"))
Err(WinError::from_system(err, "GetOverlappedResultEx"))
} else {
Ok(())
};
Expand Down Expand Up @@ -934,6 +950,7 @@ fn write_buf(handle: &WinHandle, bytes: &[u8], atomic: AtomicMode) -> Result<(),
enum BlockingMode {
Blocking,
Nonblocking,
Timeout(Duration),
}

#[derive(Debug)]
Expand Down Expand Up @@ -1002,14 +1019,13 @@ impl OsIpcReceiver {
OsIpcReceiver::from_handle(reader.handle.take())
}

// This is only used for recv/try_recv. When this is added to an IpcReceiverSet, then
// This is only used for recv/try_recv/try_recv_timeout. When this is added to an IpcReceiverSet, then
// the implementation in select() is used. It does much the same thing, but across multiple
// channels.
fn receive_message(&self, mut blocking_mode: BlockingMode)
-> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),WinError> {
let mut reader = self.reader.borrow_mut();
assert!(reader.entry_id.is_none(), "receive_message is only valid before this OsIpcReceiver was added to a Set");

// This function loops, because in the case of a blocking read, we may need to
// read multiple sets of bytes from the pipe to receive a complete message.
loop {
Expand Down Expand Up @@ -1048,6 +1064,11 @@ impl OsIpcReceiver {
self.receive_message(BlockingMode::Nonblocking)
}

pub fn try_recv_timeout(&self, duration: Duration) -> Result<(Vec<u8>, Vec<OsOpaqueIpcChannel>, Vec<OsIpcSharedMemory>),WinError> {
win32_trace!("try_recv_timeout");
self.receive_message(BlockingMode::Timeout(duration))
}

/// Do a pipe connect.
///
/// Only used for one-shot servers.
Expand Down
28 changes: 28 additions & 0 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::ipc::IpcOneShotServer;
target_os = "ios"
)))]
use std::io::Error;
use std::time::{Duration, Instant};

#[cfg(not(any(
feature = "force-inprocess",
Expand Down Expand Up @@ -511,6 +512,33 @@ fn try_recv() {
}
}

#[test]
fn try_recv_timeout() {
let person = ("Jacob Kiesel".to_owned(), 25);
let (tx, rx) = ipc::channel().unwrap();
let timeout = Duration::from_millis(250);
let start_recv = Instant::now();
match rx.try_recv_timeout(timeout) {
Err(ipc::TryRecvError::Empty) => assert!(start_recv.elapsed() >= timeout),
v => panic!("Expected empty channel err: {:?}", v),
}
tx.send(person.clone()).unwrap();
let start_recv = Instant::now();
let received_person = rx.try_recv_timeout(timeout).unwrap();
assert!(start_recv.elapsed() < timeout);
assert_eq!(person, received_person);
let start_recv = Instant::now();
match rx.try_recv_timeout(timeout) {
Err(ipc::TryRecvError::Empty) => assert!(start_recv.elapsed() >= timeout),
v => panic!("Expected empty channel err: {:?}", v),
}
drop(tx);
match rx.try_recv_timeout(timeout) {
Err(ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected)) => (),
v => panic!("Expected disconnected err: {:?}", v),
}
}

#[test]
fn multiple_paths_to_a_sender() {
let person = ("Patrick Walton".to_owned(), 29);
Expand Down

0 comments on commit eb08381

Please sign in to comment.