Skip to content

Commit

Permalink
Added from_ method constructor for ChildStdin, ChildStdout and ChildS…
Browse files Browse the repository at this point in the history
…tderr that may interact with mio to fix #387

Uses into_raw_fd to hand over ownership to mio's Pipe{Writer,Reader}
  • Loading branch information
danielrh committed May 20, 2016
1 parent e7ac893 commit 67d8872
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 7 deletions.
29 changes: 28 additions & 1 deletion src/net/unix.rs
Expand Up @@ -3,8 +3,8 @@ use io::MapNonBlock;
use std::io::{Read, Write};
use std::path::Path;
use bytes::{Buf, MutBuf};

pub use nix::sys::socket::Shutdown;
use std::process;

#[derive(Debug)]
pub struct UnixSocket {
Expand Down Expand Up @@ -231,6 +231,23 @@ pub struct PipeReader {
io: Io,
}

impl PipeReader {
pub fn from_stdout(stdout: process::ChildStdout) -> io::Result<Self> {
match sys::set_nonblock(&stdout) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeReader::from(Io::from_raw_fd(stdout.into_raw_fd())));
}
pub fn from_stderr(stderr: process::ChildStderr) -> io::Result<Self> {
match sys::set_nonblock(&stderr) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeReader::from(Io::from_raw_fd(stderr.into_raw_fd())));
}
}

impl Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
Expand Down Expand Up @@ -268,6 +285,16 @@ pub struct PipeWriter {
io: Io,
}

impl PipeWriter {
pub fn from_stdin(stdin: process::ChildStdin) -> io::Result<Self> {
match sys::set_nonblock(&stdin) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeWriter::from(Io::from_raw_fd(stdin.into_raw_fd())));
}
}

impl Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
Expand Down
2 changes: 2 additions & 0 deletions src/sys/mod.rs
Expand Up @@ -10,8 +10,10 @@ pub use self::unix::{
UdpSocket,
UnixSocket,
pipe,
set_nonblock,
};


#[cfg(unix)]
mod unix;

Expand Down
9 changes: 9 additions & 0 deletions src/sys/unix/io.rs
@@ -1,6 +1,14 @@
use {io, poll, Evented, EventSet, Poll, PollOpt, Token};
use std::io::{Read, Write};
use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};

pub fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(super::from_nix_error)
.map(|_| ())
}


/*
*
Expand Down Expand Up @@ -101,3 +109,4 @@ impl Drop for Io {
let _ = close(self.as_raw_fd());
}
}

2 changes: 1 addition & 1 deletion src/sys/unix/mod.rs
Expand Up @@ -25,7 +25,7 @@ mod uds;

pub use self::awakener::Awakener;
pub use self::eventedfd::EventedFd;
pub use self::io::Io;
pub use self::io::{Io, set_nonblock};
pub use self::socket::Socket;
pub use self::tcp::{TcpStream, TcpListener};
pub use self::udp::UdpSocket;
Expand Down
7 changes: 2 additions & 5 deletions src/sys/unix/tcp.rs
Expand Up @@ -14,7 +14,9 @@ use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};

use {io, poll, Evented, EventSet, Poll, PollOpt, Token};

use sys::unix::eventedfd::EventedFd;
use sys::unix::io::set_nonblock;

#[derive(Debug)]
pub struct TcpStream {
Expand All @@ -28,11 +30,6 @@ pub struct TcpListener {
selector_id: Cell<Option<usize>>,
}

fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(super::from_nix_error)
.map(|_| ())
}

impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
try!(set_nonblock(&stream));
Expand Down
1 change: 1 addition & 0 deletions test/mod.rs
Expand Up @@ -27,6 +27,7 @@ mod test_timer;
mod test_udp_level;
mod test_udp_socket;
mod test_uds_shutdown;
mod test_subprocess_pipe;

// ===== Unix only tests =====
#[cfg(unix)]
Expand Down
263 changes: 263 additions & 0 deletions test/test_subprocess_pipe.rs
@@ -0,0 +1,263 @@
#![cfg(unix)]

extern crate mio;
extern crate bytes;
use std::mem;
use mio::*;
use std::io;
use mio::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio, Child};


struct SubprocessClient {
stdin: Option<PipeWriter>,
stdout: Option<PipeReader>,
stderr: Option<PipeReader>,
stdin_token : Token,
stdout_token : Token,
stderr_token : Token,
output : Vec<u8>,
output_stderr : Vec<u8>,
input : Vec<u8>,
input_offset : usize,
buf : [u8; 65536],
}


// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
SubprocessClient {
stdin: stdin,
stdout: stdout,
stderr: stderr,
stdin_token : Token(0),
stdout_token : Token(1),
stderr_token : Token(2),
output : Vec::<u8>::new(),
output_stderr : Vec::<u8>::new(),
buf : [0; 65536],
input : data.to_vec(),
input_offset : 0,
}
}

fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");
let mut eof = false;
match self.stdout {
None => unreachable!(),
Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
if r == 0 {
eof = true;
} else {
self.output.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stdout.take());
match self.stderr {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}

fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");
let mut eof = false;
match self.stderr {
None => unreachable!(),
Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
if r == 0 {
eof = true;
} else {
self.output_stderr.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stderr.take());
match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}

fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket writable");
let mut ok = true;
match self.stdin {
None => unreachable!(),
Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
Ok(None) => {
println!("client flushing buf; WOULDBLOCK");
},
Ok(Some(r)) => {
if r == 0 {
println!("CLIENT : we wrote {} bytes!", r);
ok = false;
} else {
self.input_offset += r;
}
},
Err(e) => {
println!("not implemented; client err={:?}", e);
ok = false;
},
}
}
if self.input_offset == self.input.len() || !ok {
drop(self.stdin.take());
match self.stderr {
None => match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
},
Some(_) => {},
}
}
return Ok(());
}

}

impl Handler for SubprocessClient {
type Timeout = usize;
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
events: EventSet) {
println!("ready {:?} {:?} {:}", token, events, events.is_readable());
if token == self.stderr_token {
let _x = self.readable_stderr(event_loop);
} else {
let _x = self.readable(event_loop);
}
if token == self.stdin_token {
let _y = self.writable(event_loop);
}
}
}




const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
let stdin : Option<PipeWriter>;
let stdin_exists : bool;
match process.stdin {
None => stdin_exists = false,
Some(_) => stdin_exists = true,
}
if stdin_exists {
match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdin = Some(pipe),
}
} else {
stdin = None;
}
let stdout_exists : bool;
let stdout : Option<PipeReader>;
match process.stdout {
None => stdout_exists = false,
Some(_) => stdout_exists = true,
}
if stdout_exists {
match PipeReader::from_stdout(process.stdout.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdout = Some(pipe),
}
} else {
stdout = None;
}
let stderr_exists : bool;
let stderr : Option<PipeReader>;
match process.stderr {
None => stderr_exists = false,
Some(_) => stderr_exists = true,
}
if stderr_exists {
match PipeReader::from_stderr(process.stderr.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stderr = Some(pipe),
}
} else {
stderr = None
}
//println!("listen for connections {:?} {:?}", , process.stdout.unwrap().as_raw_fd());
let mut subprocess = SubprocessClient::new(stdin,
stdout,
stderr,
input);
match subprocess.stdout {
Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, EventSet::readable(),
PollOpt::level()).unwrap(),
None => {},
}

match subprocess.stderr {
Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, EventSet::readable(),
PollOpt::level()).unwrap(),
None => {},
}

// Connect to the server
match subprocess.stdin {
Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, EventSet::writable(),
PollOpt::level()).unwrap(),
None => {},
}

// Start the event loop
event_loop.run(&mut subprocess).unwrap();
let res = process.wait();
println!("Final output was {:} {:} {:?}\n", subprocess.output.len(), subprocess.output_stderr.len(), res);

let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
return (ret_stdout, ret_stderr);
}

#[test]
fn test_subprocess_pipe() {
let process =
Command::new("/bin/cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn().unwrap();
let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
assert_eq!(TEST_DATA.len(), ret_stdout.len());
assert_eq!(0usize, ret_stderr.len());
let mut i : usize = 0;
for item in TEST_DATA.iter() {
assert_eq!(*item, ret_stdout[i]);
i += 1;
}
}

0 comments on commit 67d8872

Please sign in to comment.