Skip to content

Commit

Permalink
Group together output from multi exec commands
Browse files Browse the repository at this point in the history
So that if multiple `--exec` options are given, and the commands are run
in parallel, the buffered output for related commands will be
consecutive.
  • Loading branch information
tmccombs authored and sharkdp committed Mar 8, 2022
1 parent e54e352 commit 9fb0c5d
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 69 deletions.
118 changes: 82 additions & 36 deletions src/exec/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,93 @@ use std::sync::Mutex;
use crate::error::print_error;
use crate::exit_codes::ExitCode;

struct Outputs {
stdout: Vec<u8>,
stderr: Vec<u8>,
}
struct OutputBuf<'a> {
out_perm: &'a Mutex<()>,
outputs: Vec<Outputs>,
}

impl<'a> OutputBuf<'a> {
fn new(out_perm: &'a Mutex<()>) -> Self {
Self {
out_perm,
outputs: Vec::new(),
}
}

fn push(&mut self, stdout: Vec<u8>, stderr: Vec<u8>) {
self.outputs.push(Outputs { stdout, stderr });
}

fn write(self) {
// avoid taking the lock if there is nothing to do
if self.outputs.is_empty() {
return;
}
// While this lock is active, this thread will be the only thread allowed
// to write its outputs.
let _lock = self.out_perm.lock().unwrap();

let stdout = io::stdout();
let stderr = io::stderr();

let mut stdout = stdout.lock();
let mut stderr = stderr.lock();

for output in self.outputs.iter() {
let _ = stdout.write_all(&output.stdout);
let _ = stderr.write_all(&output.stderr);
}
}
}

/// Executes a command.
pub fn execute_command(
mut cmd: Command,
pub fn execute_commands<I: Iterator<Item = Command>>(
cmds: I,
out_perm: &Mutex<()>,
enable_output_buffering: bool,
) -> ExitCode {
// Spawn the supplied command.
let output = if enable_output_buffering {
cmd.output()
} else {
// If running on only one thread, don't buffer output
// Allows for viewing and interacting with intermediate command output
cmd.spawn().and_then(|c| c.wait_with_output())
};

// Then wait for the command to exit, if it was spawned.
match output {
Ok(output) => {
// While this lock is active, this thread will be the only thread allowed
// to write its outputs.
let _lock = out_perm.lock().unwrap();

let stdout = io::stdout();
let stderr = io::stderr();

let _ = stdout.lock().write_all(&output.stdout);
let _ = stderr.lock().write_all(&output.stderr);

if output.status.code() == Some(0) {
ExitCode::Success
} else {
ExitCode::GeneralError
let mut out_buf = OutputBuf::new(out_perm);
for mut cmd in cmds {
// Spawn the supplied command.
let output = if enable_output_buffering {
cmd.output()
} else {
// If running on only one thread, don't buffer output
// Allows for viewing and interacting with intermediate command output
cmd.spawn().and_then(|c| c.wait_with_output())
};

// Then wait for the command to exit, if it was spawned.
match output {
Ok(output) => {
if enable_output_buffering {
out_buf.push(output.stdout, output.stderr);
}
if output.status.code() != Some(0) {
out_buf.write();
return ExitCode::GeneralError;
}
}
Err(why) => {
out_buf.write();
return handle_cmd_error(&cmd, why);
}
}
Err(ref why) if why.kind() == io::ErrorKind::NotFound => {
print_error(format!("Command not found: {:?}", cmd));
ExitCode::GeneralError
}
Err(why) => {
print_error(format!("Problem while executing command: {}", why));
ExitCode::GeneralError
}
}
out_buf.write();
ExitCode::Success
}

pub fn handle_cmd_error(cmd: &Command, err: io::Error) -> ExitCode {
if err.kind() == io::ErrorKind::NotFound {
print_error(format!("Command not found: {:?}", cmd));
ExitCode::GeneralError
} else {
print_error(format!("Problem while executing command: {}", err));
ExitCode::GeneralError
}
}
5 changes: 2 additions & 3 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pub fn batch(
rx: Receiver<WorkerResult>,
cmd: &CommandSet,
show_filesystem_errors: bool,
buffer_output: bool,
limit: usize,
) -> ExitCode {
let paths = rx.iter().filter_map(|value| match value {
Expand All @@ -63,14 +62,14 @@ pub fn batch(
});
if limit == 0 {
// no limit
return cmd.execute_batch(paths, buffer_output);
return cmd.execute_batch(paths);
}

let mut exit_codes = Vec::new();
let mut peekable = paths.peekable();
while peekable.peek().is_some() {
let limited = peekable.by_ref().take(limit);
let exit_code = cmd.execute_batch(limited, buffer_output);
let exit_code = cmd.execute_batch(limited);
exit_codes.push(exit_code);
}
merge_exitcodes(exit_codes)
Expand Down
38 changes: 15 additions & 23 deletions src/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use regex::Regex;

use crate::exit_codes::ExitCode;

use self::command::execute_command;
use self::command::{execute_commands, handle_cmd_error};
use self::input::{basename, dirname, remove_extension};
pub use self::job::{batch, job};
use self::token::Token;
Expand Down Expand Up @@ -86,25 +86,22 @@ impl CommandSet {
buffer_output: bool,
) -> ExitCode {
let path_separator = self.path_separator.as_deref();
for cmd in &self.commands {
let exit =
cmd.generate_and_execute(input, path_separator, &mut out_perm, buffer_output);
if exit != ExitCode::Success {
return exit;
}
}
ExitCode::Success
let commands = self
.commands
.iter()
.map(|c| c.generate(input, path_separator));
execute_commands(commands, &mut out_perm, buffer_output)
}

pub fn execute_batch<I>(&self, paths: I, buffer_output: bool) -> ExitCode
pub fn execute_batch<I>(&self, paths: I) -> ExitCode
where
I: Iterator<Item = PathBuf>,
{
let path_separator = self.path_separator.as_deref();
let mut paths = paths.collect::<Vec<_>>();
paths.sort();
for cmd in &self.commands {
let exit = cmd.generate_and_execute_batch(&paths, path_separator, buffer_output);
let exit = cmd.generate_and_execute_batch(&paths, path_separator);
if exit != ExitCode::Success {
return exit;
}
Expand Down Expand Up @@ -189,27 +186,19 @@ impl CommandTemplate {
/// Generates and executes a command.
///
/// Using the internal `args` field, and a supplied `input` variable, a `Command` will be
/// build. Once all arguments have been processed, the command is executed.
fn generate_and_execute(
&self,
input: &Path,
path_separator: Option<&str>,
out_perm: &mut Arc<Mutex<()>>,
buffer_output: bool,
) -> ExitCode {
/// build.
fn generate(&self, input: &Path, path_separator: Option<&str>) -> Command {
let mut cmd = Command::new(self.args[0].generate(&input, path_separator));
for arg in &self.args[1..] {
cmd.arg(arg.generate(&input, path_separator));
}

execute_command(cmd, out_perm, buffer_output)
cmd
}

fn generate_and_execute_batch(
&self,
paths: &[PathBuf],
path_separator: Option<&str>,
buffer_output: bool,
) -> ExitCode {
let mut cmd = Command::new(self.args[0].generate("", None));
cmd.stdin(Stdio::inherit());
Expand All @@ -230,7 +219,10 @@ impl CommandTemplate {
}

if has_path {
execute_command(cmd, &Mutex::new(()), buffer_output)
match cmd.spawn().and_then(|mut c| c.wait()) {
Ok(_) => ExitCode::Success,
Err(e) => handle_cmd_error(&cmd, e),
}
} else {
ExitCode::Success
}
Expand Down
8 changes: 1 addition & 7 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,7 @@ fn spawn_receiver(
// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = config.command {
if cmd.in_batch_mode() {
exec::batch(
rx,
cmd,
show_filesystem_errors,
enable_output_buffering,
config.batch_size,
)
exec::batch(rx, cmd, show_filesystem_errors, config.batch_size)
} else {
let shared_rx = Arc::new(Mutex::new(rx));

Expand Down

0 comments on commit 9fb0c5d

Please sign in to comment.