Skip to content

Commit

Permalink
Fix error handling in NamedPipe::write
Browse files Browse the repository at this point in the history
  • Loading branch information
AIkorsky authored and Thomasdezeeuw committed Feb 22, 2021
1 parent 6f86b92 commit aec872b
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 35 deletions.
131 changes: 96 additions & 35 deletions src/sys/windows/named_pipe.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{poll, Registry};
use crate::event::Source;
use crate::sys::windows::{Event, Overlapped};
use crate::{poll, Registry};
use winapi::um::minwinbase::OVERLAPPED_ENTRY;

use std::ffi::OsStr;
Expand All @@ -9,8 +9,8 @@ use std::io::{self, Read, Write};
use std::mem;
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
use std::slice;
use std::sync::atomic::{AtomicUsize, AtomicBool};
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};

use crate::{Interest, Token};
Expand Down Expand Up @@ -128,9 +128,7 @@ fn would_block() -> io::Error {
impl NamedPipe {
/// Creates a new named pipe at the specified `addr` given a "reasonable
/// set" of initial configuration options.
pub fn new<A: AsRef<OsStr>>(
addr: A,
) -> io::Result<NamedPipe> {
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
let pipe = pipe::NamedPipe::new(addr)?;
// Safety: nothing actually unsafe about this. The trait fn includes
// `unsafe`.
Expand Down Expand Up @@ -226,9 +224,7 @@ impl NamedPipe {
}

impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(
handle: RawHandle,
) -> NamedPipe {
unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
NamedPipe {
inner: Arc::new(Inner {
// Safety: not really unsafe
Expand Down Expand Up @@ -281,9 +277,7 @@ impl<'a> Read for &'a NamedPipe {
match mem::replace(&mut state.read, State::None) {
// In theory not possible with `token` checked above,
// but return would block for now.
State::None => {
Err(would_block())
}
State::None => Err(would_block()),

// A read is in flight, still waiting for it to finish
State::Pending(buf, amt) => {
Expand Down Expand Up @@ -324,7 +318,7 @@ impl<'a> Read for &'a NamedPipe {
}

impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Make sure there's no writes pending
let mut io = self.inner.io.lock().unwrap();

Expand All @@ -334,6 +328,12 @@ impl<'a> Write for &'a NamedPipe {

match io.write {
State::None => {}
State::Err(_) => match mem::replace(&mut io.write, State::None) {
State::Err(e) => return Err(e),
// `io` is locked, so this branch is unreachable
_ => unreachable!(),
},
// any other state should be handled in `write_done`
_ => {
return Err(would_block());
}
Expand All @@ -342,17 +342,26 @@ impl<'a> Write for &'a NamedPipe {
// Move `buf` onto the heap and fire off the write
let mut owned_buf = self.inner.get_buffer();
owned_buf.extend(buf);
Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None);
Ok(buf.len())
match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
// Some bytes are written immediately
Some(n) => Ok(n),
// Write operation is anqueued for whole buffer
None => Ok(buf.len()),
}
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

impl Source for NamedPipe {
fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
fn register(
&mut self,
registry: &Registry,
token: Token,
interest: Interest,
) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();

io.check_association(registry, false)?;
Expand All @@ -368,7 +377,10 @@ impl Source for NamedPipe {
io.cp = Some(poll::selector(registry).clone_port());

let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?;
poll::selector(registry)
.inner
.cp
.add_handle(inner_token, &self.inner.handle)?;
}

io.token = Some(token);
Expand All @@ -381,7 +393,12 @@ impl Source for NamedPipe {
Ok(())
}

fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> {
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interest: Interest,
) -> io::Result<()> {
let mut io = self.inner.io.lock().unwrap();

io.check_association(registry, true)?;
Expand Down Expand Up @@ -491,19 +508,61 @@ impl Inner {
}
}

fn schedule_write(me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>) {
/// Maybe schedules overlapped write operation.
///
/// * `None` means that overlapped operation was enqueued
/// * `Some(n)` means that `n` bytes was immediately written.
/// Note, that `write_done` will fire anyway to clean up the state.
fn maybe_schedule_write(
me: &Arc<Inner>,
buf: Vec<u8>,
pos: usize,
io: &mut Io,
) -> io::Result<Option<usize>> {
// Very similar to `schedule_read` above, just done for the write half.
let e = unsafe {
let overlapped = me.write.as_ptr() as *mut _;
me.handle.write_overlapped(&buf[pos..], overlapped)
};

// See `connect` above for the rationale behind `forget`
match e {
// See `connect` above for the rationale behind `forget`
Ok(_) => {
// `n` bytes are written immediately
Ok(Some(n)) => {
io.write = State::Ok(buf, pos);
mem::forget(me.clone());
Ok(Some(n))
}
// write operation is enqueued
Ok(None) => {
io.write = State::Pending(buf, pos);
mem::forget(me.clone())
mem::forget(me.clone());
Ok(None)
}
Err(e) => Err(e),
}
}

fn schedule_write(
me: &Arc<Inner>,
buf: Vec<u8>,
pos: usize,
io: &mut Io,
events: Option<&mut Vec<Event>>,
) {
match Inner::maybe_schedule_write(me, buf, pos, io) {
Ok(Some(_)) => {
// immediate result will be handled in `write_done`,
// so we'll reinterpret the `Ok` state
let state = mem::replace(&mut io.write, State::None);
io.write = match state {
State::Ok(buf, pos) => State::Pending(buf, pos),
// io is locked, so this branch is unreachable
_ => unreachable!(),
};
mem::forget(me.clone());
}
Ok(None) => (),
Err(e) => {
io.write = State::Err(e);
io.notify_writable(events);
Expand Down Expand Up @@ -610,6 +669,12 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
// then we're writable again and otherwise we schedule another write.
let mut io = me.io.lock().unwrap();
let (buf, pos) = match mem::replace(&mut io.write, State::None) {
// `Ok` here means, that the operation was completed immediately
// `bytes_transferred` is already reported to a client
State::Ok(..) => {
io.notify_writable(events);
return;
}
State::Pending(buf, pos) => (buf, pos),
_ => unreachable!(),
};
Expand Down Expand Up @@ -638,18 +703,14 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
impl Io {
fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
match self.cp {
Some(ref cp) if !poll::selector(registry).same_port(cp) => {
Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`"
))
}
None if required => {
Err(io::Error::new(
io::ErrorKind::NotFound,
"I/O source not registered with `Registry`"
))
}
Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"I/O source already registered with a different `Registry`",
)),
None if required => Err(io::Error::new(
io::ErrorKind::NotFound,
"I/O source not registered with `Registry`",
)),
_ => Ok(()),
}
}
Expand Down
39 changes: 39 additions & 0 deletions tests/win_named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::time::Duration;
use mio::windows::NamedPipe;
use mio::{Events, Interest, Poll, Token};
use rand::Rng;
use winapi::shared::winerror::*;
use winapi::um::winbase::FILE_FLAG_OVERLAPPED;

fn _assert_kinds() {
Expand Down Expand Up @@ -177,6 +178,44 @@ fn connect_after_client() {
}
}

#[test]
fn write_disconnected() {
let mut poll = t!(Poll::new());
let (mut server, mut client) = pipe();
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));

drop(client);

let mut events = Events::with_capacity(128);
t!(poll.poll(&mut events, None));
assert!(events.iter().count() > 0);

// this should not hang
let mut i = 0;
loop {
i += 1;
assert!(i < 16, "too many iterations");

match server.write(&[0]) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
t!(poll.poll(&mut events, None));
assert!(events.iter().count() > 0);
}
Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => break,
e => panic!("{:?}", e),
}
}
}

#[test]
fn write_then_drop() {
let (mut server, mut client) = pipe();
Expand Down

0 comments on commit aec872b

Please sign in to comment.