Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added from_ method constructor for ChildStdin, ChildStdout & ChildStderr #391

Merged
merged 1 commit into from
May 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/net/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use io::MapNonBlock;
use std::io::{Read, Write};
use std::path::Path;
use bytes::{Buf, MutBuf};

pub use nix::sys::socket::Shutdown;
use std::process;

#[derive(Debug)]
pub struct UnixSocket {
Expand Down Expand Up @@ -231,6 +231,23 @@ pub struct PipeReader {
io: Io,
}

impl PipeReader {
pub fn from_stdout(stdout: process::ChildStdout) -> io::Result<Self> {
match sys::set_nonblock(&stdout) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeReader::from(Io::from_raw_fd(stdout.into_raw_fd())));
}
pub fn from_stderr(stderr: process::ChildStderr) -> io::Result<Self> {
match sys::set_nonblock(&stderr) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeReader::from(Io::from_raw_fd(stderr.into_raw_fd())));
}
}

impl Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
Expand Down Expand Up @@ -268,6 +285,16 @@ pub struct PipeWriter {
io: Io,
}

impl PipeWriter {
pub fn from_stdin(stdin: process::ChildStdin) -> io::Result<Self> {
match sys::set_nonblock(&stdin) {
Err(e) => return Err(e),
_ => {},
}
return Ok(PipeWriter::from(Io::from_raw_fd(stdin.into_raw_fd())));
}
}

impl Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
Expand Down
2 changes: 2 additions & 0 deletions src/sys/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub use self::unix::{
UdpSocket,
UnixSocket,
pipe,
set_nonblock,
};


#[cfg(unix)]
mod unix;

Expand Down
9 changes: 9 additions & 0 deletions src/sys/unix/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use {io, poll, Evented, EventSet, Poll, PollOpt, Token};
use std::io::{Read, Write};
use std::os::unix::io::{IntoRawFd, AsRawFd, FromRawFd, RawFd};
use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};

pub fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(super::from_nix_error)
.map(|_| ())
}


/*
*
Expand Down Expand Up @@ -101,3 +109,4 @@ impl Drop for Io {
let _ = close(self.as_raw_fd());
}
}

2 changes: 1 addition & 1 deletion src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod uds;

pub use self::awakener::Awakener;
pub use self::eventedfd::EventedFd;
pub use self::io::Io;
pub use self::io::{Io, set_nonblock};
pub use self::socket::Socket;
pub use self::tcp::{TcpStream, TcpListener};
pub use self::udp::UdpSocket;
Expand Down
10 changes: 2 additions & 8 deletions src/sys/unix/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use net2::TcpStreamExt;
#[allow(unused_imports)]
use net2::TcpListenerExt;

use nix::fcntl::FcntlArg::F_SETFL;
use nix::fcntl::{fcntl, O_NONBLOCK};

use {io, poll, Evented, EventSet, Poll, PollOpt, Token};

use sys::unix::eventedfd::EventedFd;
use sys::unix::io::set_nonblock;

#[derive(Debug)]
pub struct TcpStream {
Expand All @@ -28,11 +27,6 @@ pub struct TcpListener {
selector_id: Cell<Option<usize>>,
}

fn set_nonblock(s: &AsRawFd) -> io::Result<()> {
fcntl(s.as_raw_fd(), F_SETFL(O_NONBLOCK)).map_err(super::from_nix_error)
.map(|_| ())
}

impl TcpStream {
pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
try!(set_nonblock(&stream));
Expand Down
1 change: 1 addition & 0 deletions test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod test_timer;
mod test_udp_level;
mod test_udp_socket;
mod test_uds_shutdown;
mod test_subprocess_pipe;

// ===== Unix only tests =====
#[cfg(unix)]
Expand Down
263 changes: 263 additions & 0 deletions test/test_subprocess_pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#![cfg(unix)]

extern crate mio;
extern crate bytes;
use std::mem;
use mio::*;
use std::io;
use mio::unix::{PipeReader, PipeWriter};
use std::process::{Command, Stdio, Child};


struct SubprocessClient {
stdin: Option<PipeWriter>,
stdout: Option<PipeReader>,
stderr: Option<PipeReader>,
stdin_token : Token,
stdout_token : Token,
stderr_token : Token,
output : Vec<u8>,
output_stderr : Vec<u8>,
input : Vec<u8>,
input_offset : usize,
buf : [u8; 65536],
}


// Sends a message and expects to receive the same exact message, one at a time
impl SubprocessClient {
fn new(stdin: Option<PipeWriter>, stdout : Option<PipeReader>, stderr : Option<PipeReader>, data : &[u8]) -> SubprocessClient {
SubprocessClient {
stdin: stdin,
stdout: stdout,
stderr: stderr,
stdin_token : Token(0),
stdout_token : Token(1),
stderr_token : Token(2),
output : Vec::<u8>::new(),
output_stderr : Vec::<u8>::new(),
buf : [0; 65536],
input : data.to_vec(),
input_offset : 0,
}
}

fn readable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");
let mut eof = false;
match self.stdout {
None => unreachable!(),
Some (ref mut stdout) => match stdout.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
if r == 0 {
eof = true;
} else {
self.output.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stdout.take());
match self.stderr {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}

fn readable_stderr(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket readable");
let mut eof = false;
match self.stderr {
None => unreachable!(),
Some(ref mut stderr) => match stderr.try_read(&mut self.buf[..]) {
Ok(None) => {
println!("CLIENT : spurious read wakeup");
}
Ok(Some(r)) => {
println!("CLIENT : We read {} bytes!", r);
if r == 0 {
eof = true;
} else {
self.output_stderr.extend(&self.buf[0..r]);
}
}
Err(e) => {
return Err(e);
}
}
};
if eof {
drop(self.stderr.take());
match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
}
}
return Ok(());
}

fn writable(&mut self, event_loop: &mut EventLoop<SubprocessClient>) -> io::Result<()> {
println!("client socket writable");
let mut ok = true;
match self.stdin {
None => unreachable!(),
Some(ref mut stdin) => match stdin.try_write(&(&self.input)[self.input_offset..]) {
Ok(None) => {
println!("client flushing buf; WOULDBLOCK");
},
Ok(Some(r)) => {
if r == 0 {
println!("CLIENT : we wrote {} bytes!", r);
ok = false;
} else {
self.input_offset += r;
}
},
Err(e) => {
println!("not implemented; client err={:?}", e);
ok = false;
},
}
}
if self.input_offset == self.input.len() || !ok {
drop(self.stdin.take());
match self.stderr {
None => match self.stdout {
None => event_loop.shutdown(),
Some(_) => {},
},
Some(_) => {},
}
}
return Ok(());
}

}

impl Handler for SubprocessClient {
type Timeout = usize;
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<SubprocessClient>, token: Token,
events: EventSet) {
println!("ready {:?} {:?} {:}", token, events, events.is_readable());
if token == self.stderr_token {
let _x = self.readable_stderr(event_loop);
} else {
let _x = self.readable(event_loop);
}
if token == self.stdin_token {
let _y = self.writable(event_loop);
}
}
}




const TEST_DATA : [u8; 1024 * 4096] = [42; 1024 * 4096];
pub fn subprocess_communicate(mut process : Child, input : &[u8]) -> (Vec<u8>, Vec<u8>) {
let mut event_loop = EventLoop::<SubprocessClient>::new().unwrap();
let stdin : Option<PipeWriter>;
let stdin_exists : bool;
match process.stdin {
None => stdin_exists = false,
Some(_) => stdin_exists = true,
}
if stdin_exists {
match PipeWriter::from_stdin(process.stdin.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdin = Some(pipe),
}
} else {
stdin = None;
}
let stdout_exists : bool;
let stdout : Option<PipeReader>;
match process.stdout {
None => stdout_exists = false,
Some(_) => stdout_exists = true,
}
if stdout_exists {
match PipeReader::from_stdout(process.stdout.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stdout = Some(pipe),
}
} else {
stdout = None;
}
let stderr_exists : bool;
let stderr : Option<PipeReader>;
match process.stderr {
None => stderr_exists = false,
Some(_) => stderr_exists = true,
}
if stderr_exists {
match PipeReader::from_stderr(process.stderr.take().unwrap()) {
Err(e) => panic!(e),
Ok(pipe) => stderr = Some(pipe),
}
} else {
stderr = None
}
//println!("listen for connections {:?} {:?}", , process.stdout.unwrap().as_raw_fd());
let mut subprocess = SubprocessClient::new(stdin,
stdout,
stderr,
input);
match subprocess.stdout {
Some(ref sub_stdout) => event_loop.register(sub_stdout, subprocess.stdout_token, EventSet::readable(),
PollOpt::level()).unwrap(),
None => {},
}

match subprocess.stderr {
Some(ref sub_stderr) => event_loop.register(sub_stderr, subprocess.stderr_token, EventSet::readable(),
PollOpt::level()).unwrap(),
None => {},
}

// Connect to the server
match subprocess.stdin {
Some (ref sub_stdin) => event_loop.register(sub_stdin, subprocess.stdin_token, EventSet::writable(),
PollOpt::level()).unwrap(),
None => {},
}

// Start the event loop
event_loop.run(&mut subprocess).unwrap();
let res = process.wait();
println!("Final output was {:} {:} {:?}\n", subprocess.output.len(), subprocess.output_stderr.len(), res);

let ret_stdout = mem::replace(&mut subprocess.output, Vec::<u8>::new());
let ret_stderr = mem::replace(&mut subprocess.output_stderr, Vec::<u8>::new());
return (ret_stdout, ret_stderr);
}

#[test]
fn test_subprocess_pipe() {
let process =
Command::new("/bin/cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn().unwrap();
let (ret_stdout, ret_stderr) = subprocess_communicate(process, &TEST_DATA[..]);
assert_eq!(TEST_DATA.len(), ret_stdout.len());
assert_eq!(0usize, ret_stderr.len());
let mut i : usize = 0;
for item in TEST_DATA.iter() {
assert_eq!(*item, ret_stdout[i]);
i += 1;
}
}