Skip to content

Commit

Permalink
Merge 627d9a5 into 3573e41
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiburt committed Dec 11, 2021
2 parents 3573e41 + 627d9a5 commit e13a507
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 56 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ async-io = { version = "1.6.0", optional = true }
conpty = "0.2.1"

[target.'cfg(unix)'.dependencies]
ptyprocess = "0.2.0"
ptyprocess = { git = "https://github.com/zhiburt/ptyprocess", branch = "main" }
nix = "0.21.0"
12 changes: 6 additions & 6 deletions src/interact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,10 @@ where
{
/// Runs interact interactively.
/// See [Session::interact]
///
///
/// On process exit it tries to read available bytes from output in order to run callbacks.
/// But it is not guaranteed that all output will be read therefore some callbacks might be not called.
///
///
/// To mitigate such an issue you could use [Session::is_empty] to verify that there is nothing in processes output.
/// (at the point of the call)
#[cfg(unix)]
Expand Down Expand Up @@ -379,7 +379,7 @@ where
set_raw(STDIN_FILENO)?;
}

session.set_echo(true)?;
session.set_echo(true, None)?;

let result = interact(session, options);

Expand All @@ -395,7 +395,7 @@ where
)?;
}

session.set_echo(origin_pty_echo)?;
session.set_echo(origin_pty_echo, None)?;

result
}
Expand Down Expand Up @@ -543,7 +543,7 @@ where
set_raw(STDIN_FILENO)?;
}

session.set_echo(true)?;
session.set_echo(true, None)?;

let result = interact(session, options).await;

Expand All @@ -559,7 +559,7 @@ where
)?;
}

session.set_echo(origin_pty_echo)?;
session.set_echo(origin_pty_echo, None)?;

result
}
Expand Down
17 changes: 15 additions & 2 deletions src/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,33 @@ pub async fn spawn_bash() -> Result<ReplSession, Error> {
}

/// Spawn default python's IDLE.
#[cfg(not(feature = "async"))]
pub fn spawn_python() -> Result<ReplSession, Error> {
#[cfg(unix)]
{
let idle = ReplSession::spawn(Command::new("python"), ">>> ", Some("quit()"))?;
let mut idle = ReplSession::spawn(Command::new("python"), ">>> ", Some("quit()"))?;
idle._expect_prompt()?;
Ok(idle)
}
#[cfg(windows)]
{
// If we spawn it as ProcAttr::default().commandline("python") it will spawn processes endlessly....
let idle = ReplSession::spawn(ProcAttr::cmd("python".to_string()), ">>> ", Some("quit()"))?;
let mut idle =
ReplSession::spawn(ProcAttr::cmd("python".to_string()), ">>> ", Some("quit()"))?;
idle._expect_prompt()?;
Ok(idle)
}
}

/// Spawn default python's IDLE.
#[cfg(feature = "async")]
#[cfg(unix)]
pub async fn spawn_python() -> Result<ReplSession, Error> {
let mut idle = ReplSession::spawn(Command::new("python"), ">>> ", Some("quit()"))?;
idle._expect_prompt().await?;
Ok(idle)
}

/// Spawn a powershell session.
///
/// It uses a custom prompt to be able to controll the shell.
Expand Down
2 changes: 1 addition & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Session {
#[cfg(unix)]
pub fn spawn(command: Command) -> Result<Self, Error> {
let ptyproc = PtyProcess::spawn(command)?;
let stream = Stream::new(ptyproc.get_pty_handle()?);
let stream = Stream::new(ptyproc.get_raw_handle()?);

Ok(Self {
proc: ptyproc,
Expand Down
75 changes: 51 additions & 24 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ mod unix {
/// Try to read in a non-blocking mode.
///
/// It raises io::ErrorKind::WouldBlock if there's nothing to read.
pub fn try_read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let fd = self.inner.as_raw_fd();
make_non_blocking(fd).map_err(nix_error_to_io)?;

let result = match self.read(&mut buf) {
let result = match self.read(buf) {
Ok(n) => Ok(n),
Err(err) => Err(err),
};
Expand All @@ -76,11 +76,11 @@ mod unix {
}

// non-buffered && non-blocking read
fn try_read_inner(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let fd = self.inner.as_raw_fd();
make_non_blocking(fd).map_err(nix_error_to_io)?;

let result = match self.reader.get_mut().inner.read(&mut buf) {
let result = match self.reader.get_mut().inner.read(buf) {
Ok(n) => Ok(n),
Err(err) => Err(err),
};
Expand All @@ -102,6 +102,8 @@ mod unix {
}

pub fn read_available(&mut self) -> std::io::Result<bool> {
self.flush_in_buffer();

let mut buf = [0; 248];
loop {
match self.try_read_inner(&mut buf) {
Expand All @@ -119,6 +121,8 @@ mod unix {
&mut self,
buf: &mut [u8],
) -> std::io::Result<Option<usize>> {
self.flush_in_buffer();

match self.try_read_inner(buf) {
Ok(0) => Ok(Some(0)),
Ok(n) => {
Expand All @@ -141,6 +145,16 @@ mod unix {
pub fn keep_in_buffer(&mut self, v: &[u8]) {
self.reader.get_mut().keep_in_buffer(v);
}

pub fn flush_in_buffer(&mut self) {
// Because we have 2 buffered streams there might appear inconsistancy
// in read operations and the data which was via `keep_in_buffer` function.
//
// To eliminate it we move BufReader buffer to our buffer.
let b = self.reader.buffer().to_vec();
self.reader.consume(b.len());
self.keep_in_buffer(&b);
}
}

impl Write for Stream {
Expand Down Expand Up @@ -265,6 +279,8 @@ mod unix {
}

pub async fn read_available(&mut self) -> std::io::Result<bool> {
self.flush_in_buffer();

let mut buf = [0; 248];
loop {
match self.try_read_inner(&mut buf).await {
Expand All @@ -282,6 +298,8 @@ mod unix {
&mut self,
buf: &mut [u8],
) -> std::io::Result<Option<usize>> {
self.flush_in_buffer();

match self.try_read_inner(buf).await {
Ok(0) => Ok(Some(0)),
Ok(n) => {
Expand All @@ -304,6 +322,16 @@ mod unix {
pub fn keep_in_buffer(&mut self, v: &[u8]) {
self.reader.get_mut().keep_in_buffer(v);
}

pub fn flush_in_buffer(&mut self) {
// Because we have 2 buffered streams there might appear inconsistancy
// in read operations and the data which was via `keep_in_buffer` function.
//
// To eliminate it we move BufReader buffer to our buffer.
let b = self.reader.buffer().to_vec();
self.reader.consume(b.len());
self.keep_in_buffer(&b);
}
}

impl AsyncWrite for AsyncStream {
Expand Down Expand Up @@ -385,10 +413,10 @@ mod win {
}
}

pub fn try_read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.output.get_mut().get_mut().set_non_blocking_mode()?;

let result = match self.read(&mut buf) {
let result = match self.read(buf) {
Ok(n) => Ok(n),
Err(err) => Err(err),
};
Expand All @@ -408,10 +436,10 @@ mod win {
}

// non-buffered && non-blocking read
fn try_read_inner(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.output.get_mut().get_mut().set_non_blocking_mode()?;

let result = match self.output.get_mut().inner.read(&mut buf) {
let result = match self.output.get_mut().inner.read(buf) {
Ok(n) => Ok(n),
Err(err) => Err(err),
};
Expand All @@ -422,6 +450,8 @@ mod win {
}

pub fn read_available(&mut self) -> std::io::Result<bool> {
self.flush_in_buffer();

let mut buf = [0; 248];
loop {
match self.try_read_inner(&mut buf) {
Expand All @@ -436,6 +466,8 @@ mod win {
}

pub fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
self.flush_in_buffer();

match self.try_read_inner(buf) {
Ok(0) => Ok(Some(0)),
Ok(n) => {
Expand All @@ -458,6 +490,16 @@ mod win {
pub fn keep_in_buffer(&mut self, v: &[u8]) {
self.output.get_mut().keep_in_buffer(v);
}

pub fn flush_in_buffer(&mut self) {
// Because we have 2 buffered streams there might appear inconsistancy
// in read operations and the data which was via `keep_in_buffer` function.
//
// To eliminate it we move BufReader buffer to our buffer.
let b = self.reader.buffer().to_vec();
self.reader.consume(b.len());
self.keep_in_buffer(&b);
}
}

impl Write for Stream {
Expand Down Expand Up @@ -521,28 +563,13 @@ impl<R: std::io::Read> ReaderWithBuffer<R> {
#[cfg(not(feature = "async"))]
impl<R: std::io::Read> std::io::Read for ReaderWithBuffer<R> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
// We intentinally try to read from inner buffer in any case
// because calling code might endlessly save into inner buffer something and actuall read won't be called at all,
//
// For example caller code waits before something appear in the buffer,
// And if its not the read data saved into our buffer.
// In such a situation we will return a buffer which will never be filled with expected data.
//
// As a down side we might lose a error which might be important to caller code.
if self.buffer.is_empty() {
self.inner.read(buf)
} else {
use std::io::Write;
let n = buf.write(&self.buffer)?;
self.buffer.drain(..n);

self.inner
.read(&mut buf[n..])
.map(|n1| {
// is it possible that overflow happen?
n + n1
})
.or(Ok(n))
Ok(n)
}
}
}
Expand Down
30 changes: 25 additions & 5 deletions tests/check.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use expectrl::{spawn, Any, Eof, NBytes, Regex};
use expectrl::{spawn, Any, Eof, NBytes, Regex, WaitStatus};
use std::thread;
use std::time::Duration;

Expand Down Expand Up @@ -91,8 +91,12 @@ fn check_eof() {
thread::sleep(Duration::from_millis(600));

let m = session.check(Eof).unwrap();
assert_eq!(m.first(), b"'Hello World'\r\n");

assert_eq!(m.before(), b"");
#[cfg(target_os = "linux")]
assert_eq!(m.first(), b"'Hello World'\r\n");
#[cfg(not(target_os = "linux"))]
assert!(m.matches().is_empty());
}

#[cfg(unix)]
Expand All @@ -105,8 +109,12 @@ fn check_eof() {
thread::sleep(Duration::from_millis(600));

let m = session.check(Eof).await.unwrap();
assert_eq!(m.first(), b"'Hello World'\r\n");

assert_eq!(m.before(), b"");
#[cfg(target_os = "linux")]
assert_eq!(m.first(), b"'Hello World'\r\n");
#[cfg(not(target_os = "linux"))]
assert!(m.matches().is_empty());
})
}

Expand Down Expand Up @@ -241,12 +249,18 @@ fn check_macro() {
fn check_macro_eof() {
let mut session = spawn("echo 'Hello World'").unwrap();

thread::sleep(Duration::from_millis(600));
assert_eq!(
WaitStatus::Exited(session.pid(), 0),
session.wait().unwrap()
);

expectrl::check!(
session,
output = Eof => {
#[cfg(target_os = "linux")]
assert_eq!(output.first(), b"'Hello World'\r\n");
#[cfg(not(target_os = "linux"))]
assert_eq!(output.first(), b"");
assert_eq!(output.before(), b"");
},
default => {
Expand All @@ -262,13 +276,19 @@ fn check_macro_eof() {
fn check_macro_eof() {
let mut session = spawn("echo 'Hello World'").unwrap();

thread::sleep(Duration::from_millis(600));
assert_eq!(
WaitStatus::Exited(session.pid(), 0),
session.wait().unwrap()
);

futures_lite::future::block_on(async {
expectrl::check!(
session,
output = Eof => {
#[cfg(target_os = "linux")]
assert_eq!(output.first(), b"'Hello World'\r\n");
#[cfg(not(target_os = "linux"))]
assert_eq!(output.first(), b"");
assert_eq!(output.before(), b"");
},
default => {
Expand Down
4 changes: 4 additions & 0 deletions tests/interact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ fn interact_callbacks_called_after_exit() {
"Nix error ECHILD: No child processes"
);

#[cfg(target_os = "linux")]
assert_eq!(*opts.get_state(), 1);

#[cfg(not(target_os = "linux"))]
assert_eq!(*opts.get_state(), 0);
}

#[cfg(unix)]
Expand Down
5 changes: 4 additions & 1 deletion tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,19 @@ fn try_read_after_eof() {

#[test]
#[cfg(unix)]
// #[cfg(not(target_os = "macos"))]
fn try_read_after_process_exit() {
let mut command = Command::new("echo");
command.arg("hello cat");
let mut proc = Session::spawn(command).unwrap();

assert_eq!(proc.wait().unwrap(), WaitStatus::Exited(proc.pid(), 0));

#[cfg(target_os = "linux")]
assert_eq!(_p_try_read(&mut proc, &mut [0; 128]).unwrap(), 11);

#[cfg(not(target_os = "linux"))]
assert_eq!(_p_try_read(&mut proc, &mut [0; 128]).unwrap(), 0);

assert_eq!(_p_try_read(&mut proc, &mut [0; 128]).unwrap(), 0);
assert!(_p_is_empty(&mut proc).unwrap());

Expand Down
Loading

0 comments on commit e13a507

Please sign in to comment.