diff --git a/src/sys/windows/named_pipe.rs b/src/sys/windows/named_pipe.rs index a5688ce90..8c81f38a2 100644 --- a/src/sys/windows/named_pipe.rs +++ b/src/sys/windows/named_pipe.rs @@ -1,6 +1,6 @@ -use crate::{poll, Registry}; use crate::event::Source; use crate::sys::windows::{Event, Overlapped}; +use crate::{poll, Registry}; use winapi::um::minwinbase::OVERLAPPED_ENTRY; use std::ffi::OsStr; @@ -9,8 +9,8 @@ use std::io::{self, Read, Write}; use std::mem; use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; use std::slice; -use std::sync::atomic::{AtomicUsize, AtomicBool}; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; use crate::{Interest, Token}; @@ -128,9 +128,7 @@ fn would_block() -> io::Error { impl NamedPipe { /// Creates a new named pipe at the specified `addr` given a "reasonable /// set" of initial configuration options. - pub fn new>( - addr: A, - ) -> io::Result { + pub fn new>(addr: A) -> io::Result { let pipe = pipe::NamedPipe::new(addr)?; // Safety: nothing actually unsafe about this. The trait fn includes // `unsafe`. @@ -226,9 +224,7 @@ impl NamedPipe { } impl FromRawHandle for NamedPipe { - unsafe fn from_raw_handle( - handle: RawHandle, - ) -> NamedPipe { + unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe { NamedPipe { inner: Arc::new(Inner { // Safety: not really unsafe @@ -281,9 +277,7 @@ impl<'a> Read for &'a NamedPipe { match mem::replace(&mut state.read, State::None) { // In theory not possible with `token` checked above, // but return would block for now. - State::None => { - Err(would_block()) - } + State::None => Err(would_block()), // A read is in flight, still waiting for it to finish State::Pending(buf, amt) => { @@ -324,7 +318,7 @@ impl<'a> Read for &'a NamedPipe { } impl<'a> Write for &'a NamedPipe { - fn write(&mut self, buf: &[u8]) -> io::Result { + fn write(&mut self, buf: &[u8]) -> io::Result { // Make sure there's no writes pending let mut io = self.inner.io.lock().unwrap(); @@ -334,6 +328,12 @@ impl<'a> Write for &'a NamedPipe { match io.write { State::None => {} + State::Err(_) => match mem::replace(&mut io.write, State::None) { + State::Err(e) => return Err(e), + // `io` is locked, so this branch is unreachable + _ => unreachable!(), + }, + // any other state should be handled in `write_done` _ => { return Err(would_block()); } @@ -342,17 +342,26 @@ impl<'a> Write for &'a NamedPipe { // Move `buf` onto the heap and fire off the write let mut owned_buf = self.inner.get_buffer(); owned_buf.extend(buf); - Inner::schedule_write(&self.inner, owned_buf, 0, &mut io, None); - Ok(buf.len()) + match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? { + // Some bytes are written immediately + Some(n) => Ok(n), + // Write operation is anqueued for whole buffer + None => Ok(buf.len()), + } } - fn flush(&mut self) -> io::Result<()> { - Ok(()) + fn flush(&mut self) -> io::Result<()> { + Ok(()) } } impl Source for NamedPipe { - fn register(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> { + fn register( + &mut self, + registry: &Registry, + token: Token, + interest: Interest, + ) -> io::Result<()> { let mut io = self.inner.io.lock().unwrap(); io.check_association(registry, false)?; @@ -368,7 +377,10 @@ impl Source for NamedPipe { io.cp = Some(poll::selector(registry).clone_port()); let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2; - poll::selector(registry).inner.cp.add_handle(inner_token, &self.inner.handle)?; + poll::selector(registry) + .inner + .cp + .add_handle(inner_token, &self.inner.handle)?; } io.token = Some(token); @@ -381,7 +393,12 @@ impl Source for NamedPipe { Ok(()) } - fn reregister(&mut self, registry: &Registry, token: Token, interest: Interest) -> io::Result<()> { + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interest: Interest, + ) -> io::Result<()> { let mut io = self.inner.io.lock().unwrap(); io.check_association(registry, true)?; @@ -491,19 +508,61 @@ impl Inner { } } - fn schedule_write(me: &Arc, buf: Vec, pos: usize, io: &mut Io, events: Option<&mut Vec>) { + /// Maybe schedules overlapped write operation. + /// + /// * `None` means that overlapped operation was enqueued + /// * `Some(n)` means that `n` bytes was immediately written. + /// Note, that `write_done` will fire anyway to clean up the state. + fn maybe_schedule_write( + me: &Arc, + buf: Vec, + pos: usize, + io: &mut Io, + ) -> io::Result> { // Very similar to `schedule_read` above, just done for the write half. let e = unsafe { let overlapped = me.write.as_ptr() as *mut _; me.handle.write_overlapped(&buf[pos..], overlapped) }; + // See `connect` above for the rationale behind `forget` match e { - // See `connect` above for the rationale behind `forget` - Ok(_) => { + // `n` bytes are written immediately + Ok(Some(n)) => { + io.write = State::Ok(buf, pos); + mem::forget(me.clone()); + Ok(Some(n)) + } + // write operation is enqueued + Ok(None) => { io.write = State::Pending(buf, pos); - mem::forget(me.clone()) + mem::forget(me.clone()); + Ok(None) } + Err(e) => Err(e), + } + } + + fn schedule_write( + me: &Arc, + buf: Vec, + pos: usize, + io: &mut Io, + events: Option<&mut Vec>, + ) { + match Inner::maybe_schedule_write(me, buf, pos, io) { + Ok(Some(_)) => { + // immediate result will be handled in `write_done`, + // so we'll reinterpret the `Ok` state + let state = mem::replace(&mut io.write, State::None); + io.write = match state { + State::Ok(buf, pos) => State::Pending(buf, pos), + // io is locked, so this branch is unreachable + _ => unreachable!(), + }; + mem::forget(me.clone()); + } + Ok(None) => (), Err(e) => { io.write = State::Err(e); io.notify_writable(events); @@ -610,6 +669,12 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { // then we're writable again and otherwise we schedule another write. let mut io = me.io.lock().unwrap(); let (buf, pos) = match mem::replace(&mut io.write, State::None) { + // `Ok` here means, that the operation was completed immediately + // `bytes_transferred` is already reported to a client + State::Ok(..) => { + io.notify_writable(events); + return; + } State::Pending(buf, pos) => (buf, pos), _ => unreachable!(), }; @@ -638,18 +703,14 @@ fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec>) { impl Io { fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> { match self.cp { - Some(ref cp) if !poll::selector(registry).same_port(cp) => { - Err(io::Error::new( - io::ErrorKind::AlreadyExists, - "I/O source already registered with a different `Registry`" - )) - } - None if required => { - Err(io::Error::new( - io::ErrorKind::NotFound, - "I/O source not registered with `Registry`" - )) - } + Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "I/O source already registered with a different `Registry`", + )), + None if required => Err(io::Error::new( + io::ErrorKind::NotFound, + "I/O source not registered with `Registry`", + )), _ => Ok(()), } } diff --git a/tests/win_named_pipe.rs b/tests/win_named_pipe.rs index b1a191347..5d4d3022d 100644 --- a/tests/win_named_pipe.rs +++ b/tests/win_named_pipe.rs @@ -9,6 +9,7 @@ use std::time::Duration; use mio::windows::NamedPipe; use mio::{Events, Interest, Poll, Token}; use rand::Rng; +use winapi::shared::winerror::*; use winapi::um::winbase::FILE_FLAG_OVERLAPPED; fn _assert_kinds() { @@ -177,6 +178,44 @@ fn connect_after_client() { } } +#[test] +fn write_disconnected() { + let mut poll = t!(Poll::new()); + let (mut server, mut client) = pipe(); + t!(poll.registry().register( + &mut server, + Token(0), + Interest::READABLE | Interest::WRITABLE, + )); + t!(poll.registry().register( + &mut client, + Token(1), + Interest::READABLE | Interest::WRITABLE, + )); + + drop(client); + + let mut events = Events::with_capacity(128); + t!(poll.poll(&mut events, None)); + assert!(events.iter().count() > 0); + + // this should not hang + let mut i = 0; + loop { + i += 1; + assert!(i < 16, "too many iterations"); + + match server.write(&[0]) { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + t!(poll.poll(&mut events, None)); + assert!(events.iter().count() > 0); + } + Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => break, + e => panic!("{:?}", e), + } + } +} + #[test] fn write_then_drop() { let (mut server, mut client) = pipe();