Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use overlapped interface with NamedPipe #17

Merged
merged 6 commits into from Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -26,6 +26,7 @@ features = [
"minwindef",
"namedpipeapi",
"ntdef",
"synchapi",
"winerror",
"winsock2",
"ws2def",
Expand Down
85 changes: 76 additions & 9 deletions src/handle.rs
@@ -1,11 +1,18 @@
use std::io;
use std::cmp;
use std::ptr;

use winapi::shared::minwindef::*;
use winapi::shared::ntdef::HANDLE;
use winapi::shared::ntdef::{
BOOLEAN,
FALSE,
HANDLE,
TRUE,
};
use winapi::shared::winerror::*;
use winapi::um::fileapi::*;
use winapi::um::handleapi::*;
use winapi::um::ioapiset::*;
use winapi::um::minwinbase::*;

#[derive(Debug)]
Expand Down Expand Up @@ -50,20 +57,50 @@ impl Handle {
}

pub unsafe fn read_overlapped(&self, buf: &mut [u8],
overlapped: *mut OVERLAPPED)
-> io::Result<Option<usize>> {
overlapped: *mut OVERLAPPED)
-> io::Result<Option<usize>> {
self.read_overlapped_helper(buf, overlapped, FALSE)
}

pub unsafe fn read_overlapped_wait(&self, buf: &mut [u8],
overlapped: *mut OVERLAPPED)
-> io::Result<usize> {
match self.read_overlapped_helper(buf, overlapped, TRUE) {
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => panic!("logic error"),
Err(e) => Err(e),
}
}

pub unsafe fn read_overlapped_helper(&self, buf: &mut [u8],
overlapped: *mut OVERLAPPED,
wait: BOOLEAN)
-> io::Result<Option<usize>> {
let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
let mut bytes = 0;
let res = ::cvt({
ReadFile(self.0,
buf.as_mut_ptr() as *mut _,
len,
&mut bytes,
ptr::null_mut(),
overlapped)
});
match res {
Ok(_) => Ok(Some(bytes as usize)),
Ok(_) => (),
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
=> (),
Err(e) => return Err(e),
}

let mut bytes = 0;
let res = ::cvt({
GetOverlappedResult(self.0,
overlapped,
&mut bytes,
wait as BOOL)
});
match res {
Ok(_) => Ok(Some(bytes as usize)),
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE
=> Ok(None),
Err(e) => Err(e),
}
Expand All @@ -72,18 +109,48 @@ impl Handle {
pub unsafe fn write_overlapped(&self, buf: &[u8],
overlapped: *mut OVERLAPPED)
-> io::Result<Option<usize>> {
self.write_overlapped_helper(buf, overlapped, FALSE)
}

pub unsafe fn write_overlapped_wait(&self, buf: &[u8],
overlapped: *mut OVERLAPPED)
-> io::Result<usize> {
match self.write_overlapped_helper(buf, overlapped, TRUE) {
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => panic!("logic error"),
Err(e) => Err(e),
}
}

unsafe fn write_overlapped_helper(&self, buf: &[u8],
overlapped: *mut OVERLAPPED,
wait: BOOLEAN)
-> io::Result<Option<usize>> {
let len = cmp::min(buf.len(), <DWORD>::max_value() as usize) as DWORD;
let mut bytes = 0;
let res = ::cvt({
WriteFile(self.0,
buf.as_ptr() as *const _,
len,
&mut bytes,
ptr::null_mut(),
overlapped)
});
match res {
Ok(_) => Ok(Some(bytes as usize)),
Ok(_) => (),
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32)
=> (),
Err(e) => return Err(e),
}

let mut bytes = 0;
let res = ::cvt({
GetOverlappedResult(self.0,
overlapped,
&mut bytes,
wait as BOOL)
});
match res {
Ok(_) => Ok(Some(bytes as usize)),
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) && wait == FALSE
=> Ok(None),
Err(e) => Err(e),
}
Expand Down
22 changes: 21 additions & 1 deletion src/overlapped.rs
@@ -1,8 +1,14 @@
use std::fmt;
use std::io;
use std::mem;
use std::ptr;

use winapi::shared::ntdef::HANDLE;
use winapi::shared::ntdef::{
HANDLE,
NULL,
};
use winapi::um::minwinbase::*;
use winapi::um::synchapi::*;

/// A wrapper around `OVERLAPPED` to provide "rustic" accessors and
/// initializers.
Expand All @@ -26,6 +32,20 @@ impl Overlapped {
Overlapped(unsafe { mem::zeroed() })
}

/// Creates a new `Overlapped` with an initialized non-null `hEvent`. The caller is
/// responsible for calling `CloseHandle` on the `hEvent` field of the returned
/// `Overlapped`. The event is created with `bManualReset` set to `FALSE`, meaning after a
/// single thread waits on the event, it will be reset.
pub fn initialize_with_autoreset_event() -> io::Result<Overlapped> {
let event = unsafe {CreateEventW(ptr::null_mut(), 0i32, 0i32, ptr::null())};
if event == NULL {
return Err(io::Error::last_os_error());
}
let mut overlapped = Self::zero();
overlapped.set_event(event);
Ok(overlapped)
}

/// Creates a new `Overlapped` function pointer from the underlying
/// `OVERLAPPED`, wrapping in the "rusty" wrapper for working with
/// accessors.
Expand Down
96 changes: 92 additions & 4 deletions src/pipe.rs
@@ -1,5 +1,6 @@
//! Named pipes

use std::cell::RefCell;
use std::ffi::OsStr;
use std::fs::{OpenOptions, File};
use std::io::prelude::*;
Expand All @@ -18,6 +19,7 @@ use winapi::um::minwinbase::*;
use winapi::um::namedpipeapi::*;
use winapi::um::winbase::*;
use handle::Handle;
use overlapped::Overlapped;

/// Readable half of an anonymous pipe.
#[derive(Debug)]
Expand Down Expand Up @@ -318,21 +320,60 @@ impl NamedPipe {
}
}

thread_local! {
static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
}

/// Call a function with a threadlocal `Overlapped`. The function `f` should be
/// sure that the event is reset, either manually or by a thread being released.
fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
where F: FnOnce(&Overlapped) -> io::Result<usize>
{
NAMED_PIPE_OVERLAPPED.with(|overlapped| {
let mut mborrow = overlapped.borrow_mut();
if let None = *mborrow {
let op = Overlapped::initialize_with_autoreset_event()?;
*mborrow = Some(op);
}
f(mborrow.as_ref().unwrap())
})
}

impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
}
impl<'a> Read for &'a NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { self.0.read(buf) }
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
}

impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
fn flush(&mut self) -> io::Result<()> {
<&NamedPipe as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { self.0.write(buf) }
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
fn flush(&mut self) -> io::Result<()> {
::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
}
Expand Down Expand Up @@ -569,6 +610,53 @@ mod tests {
t!(t.join());
}

#[test]
fn named_read_write_multi() {
for _ in 0..5 {
named_read_write()
}
}

#[test]
fn named_read_write_multi_same_thread() {
let name1 = name();
let mut a1 = t!(NamedPipe::new(&name1));
let name2 = name();
let mut a2 = t!(NamedPipe::new(&name2));

let t = thread::spawn(move || {
let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
t!(f.write_all(&[1, 2, 3]));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);

let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
t!(f.write_all(&[1, 2, 3]));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
});

t!(a1.connect());
let mut b = [0; 10];
assert_eq!(t!(a1.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
t!(a1.write_all(&[1, 2, 3]));
t!(a1.flush());
t!(a1.disconnect());

t!(a2.connect());
let mut b = [0; 10];
assert_eq!(t!(a2.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
t!(a2.write_all(&[1, 2, 3]));
t!(a2.flush());
t!(a2.disconnect());

t!(t.join());
}

#[test]
fn named_read_overlapped() {
let name = name();
Expand Down