Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tmc-langs-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ zip = "0.5"
schemars = "0.7"
once_cell = "1"
nom = "5"
shared_child = "0.3.4"
os_pipe = "0.9.2"

[dev-dependencies]
env_logger = "0.7"
Expand Down
155 changes: 101 additions & 54 deletions tmc-langs-framework/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
//! Custom wrapper for Command that supports timeouts and contains custom error handling.

use crate::{error::CommandError, TmcError};
use std::fmt;
use os_pipe::pipe;
#[cfg(unix)]
use shared_child::unix::SharedChildExt;
use shared_child::SharedChild;
use std::io::Read;
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;
use std::process::{Command, ExitStatus, Output, Stdio};
use std::process::{Command, ExitStatus, Output};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::time::Duration;
use std::{fmt, sync::Mutex};

// todo: collect args?
#[derive(Debug)]
pub struct TmcCommand {
name: String,
path: PathBuf,
Expand Down Expand Up @@ -106,59 +112,100 @@ impl TmcCommand {
}

/// Waits with the given timeout. Sets stdout and stderr in order to capture them after erroring.
pub fn wait_with_timeout(&mut self, timeout: Duration) -> Result<OutputWithTimeout, TmcError> {
// spawn process and init timer
let mut child = self
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| TmcError::Command(CommandError::Spawn(self.to_string(), e)))?;
let timer = Instant::now();

loop {
match child.try_wait().map_err(TmcError::Process)? {
Some(_exit_status) => {
// done, get output
return child
.wait_with_output()
.map(OutputWithTimeout::Output)
.map_err(|e| {
if let std::io::ErrorKind::NotFound = e.kind() {
TmcError::Command(CommandError::NotFound {
name: self.name.clone(),
path: self.path.clone(),
source: e,
})
} else {
TmcError::Command(CommandError::FailedToRun(self.to_string(), e))
}
});
pub fn wait_with_timeout(mut self, timeout: Duration) -> Result<OutputWithTimeout, TmcError> {
let name = self.name.clone();
let path = self.path.clone();
let self_string = self.to_string();
let self_string2 = self.to_string();

let (mut stdout_reader, stdout_writer) = pipe().unwrap();
let (mut stderr_reader, stderr_writer) = pipe().unwrap();

let (process_result, timed_out) = {
let mut command = self.command;
command.stdout(stdout_writer).stderr(stderr_writer);

let shared_child = SharedChild::spawn(&mut command)
.map_err(|e| TmcError::Command(CommandError::Spawn(self_string, e)))?;
let child_arc = Arc::new(shared_child);

let running = Arc::new(Mutex::new(true));
let running_clone = running.clone();
let timed_out = Arc::new(Mutex::new(false));

let child_arc_clone = child_arc.clone();
let timed_out_clone = timed_out.clone();
let _timeout_checker = thread::spawn(move || {
thread::sleep(timeout);

if !running_clone.lock().unwrap().clone() {
return;
}
None => {
// still running, check timeout
if timer.elapsed() > timeout {
log::warn!("command {} timed out", self.name);
// todo: cleaner method for killing
child.kill().map_err(TmcError::Process)?;

let mut stdout = vec![];
let mut stderr = vec![];
let stdout_handle = child.stdout.as_mut().unwrap();
let stderr_handle = child.stderr.as_mut().unwrap();
stdout_handle
.read_to_end(&mut stdout)
.map_err(TmcError::ReadStdio)?;
stderr_handle
.read_to_end(&mut stderr)
.map_err(TmcError::ReadStdio)?;
return Ok(OutputWithTimeout::Timeout { stdout, stderr });
}

// TODO: gradually increase sleep duration?
thread::sleep(Duration::from_millis(100));
let mut timed_out_handle = timed_out_clone.lock().unwrap();
*timed_out_handle = true;

#[cfg(unix)]
{
// Ask process to terminate nicely
let _res2 = child_arc_clone.send_signal(15);
thread::sleep(Duration::from_millis(500));
}
// Force kill the process
let _res = child_arc_clone.kill();
});

let process_result = child_arc.wait();
let mut running_handle = running.lock().unwrap();
*running_handle = true;
(process_result, timed_out)
};

// Very important when using pipes: This parent process is still
// holding its copies of the write ends, and we have to close them
// before we read, otherwise the read end will never report EOF.
// The block above drops everything unnecessary

let res = match process_result {
Ok(exit_status) => {
let mut stdout = vec![];
let mut stderr = vec![];
stdout_reader
.read_to_end(&mut stdout)
.map_err(TmcError::ReadStdio)?;
stderr_reader
.read_to_end(&mut stderr)
.map_err(TmcError::ReadStdio)?;

Output {
status: exit_status,
stdout: stdout,
stderr: stderr,
}
}
Err(e) => {
if let std::io::ErrorKind::NotFound = e.kind() {
return Err(TmcError::Command(CommandError::NotFound {
name: name,
path: path,
source: e,
}));
} else {
return Err(TmcError::Command(CommandError::FailedToRun(
self_string2,
e,
)));
}
}
};

if timed_out.lock().unwrap().clone() {
return Ok(OutputWithTimeout::Timeout {
stdout: res.stdout,
stderr: res.stderr,
});
}

return Ok(OutputWithTimeout::Output(res));
}
}

Expand All @@ -175,7 +222,7 @@ impl DerefMut for TmcCommand {
&mut self.command
}
}

#[derive(Debug)]
pub enum OutputWithTimeout {
Output(Output),
Timeout { stdout: Vec<u8>, stderr: Vec<u8> },
Expand Down Expand Up @@ -207,7 +254,7 @@ mod test {
let res = cmd.wait_with_timeout(Duration::from_millis(100)).unwrap();
if let OutputWithTimeout::Timeout { .. } = res {
} else {
panic!("unexpected result");
panic!(format!("Unexpected result {:?}", res));
}
}
}