Skip to content

Commit

Permalink
Encapsulate process pipeline building
Browse files Browse the repository at this point in the history
  • Loading branch information
jpikl committed Apr 20, 2024
1 parent 51d38ab commit 752b90d
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 155 deletions.
138 changes: 41 additions & 97 deletions src/commands/x.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use crate::pattern::Item;
use crate::pattern::Pattern;
use crate::pattern::SimpleItem;
use crate::pattern::SimplePattern;
use crate::process::CommandEx;
use crate::process::Pipeline;
use crate::pipeline;
use crate::pipeline::Pipeline;
use crate::process::Spawned;
use crate::process::StdinMode;
use crate::shell::Shell;
use crate::stdbuf::StdBuf;
use anyhow::Context as _;
Expand All @@ -36,7 +37,6 @@ use std::env::current_exe;
use std::panic::resume_unwind;
use std::process::ChildStdout;
use std::process::Command;
use std::process::Stdio;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
Expand Down Expand Up @@ -187,10 +187,10 @@ fn run(context: &Context, args: &Args) -> Result<()> {
}

if let Some(pattern) = pattern.try_simplify() {
return eval_simple_pattern(context, &pattern);
eval_simple_pattern(context, &pattern)
} else {
eval_pattern(context, &pattern, args.shell.as_deref())
}

eval_pattern(context, &pattern, args.shell.as_deref())
}

fn eval_simple_pattern(context: &Context, pattern: &SimplePattern) -> Result<()> {
Expand Down Expand Up @@ -221,14 +221,16 @@ fn eval_pattern(context: &Context, pattern: &Pattern, shell: Option<&str>) -> Re
match &item {
Item::Constant(value) => items.push(EvalItem::Constant(value.clone())),
Item::Expression(ref expression) => {
let pipeline = builder.build_expression(expression)?;
let pipeline = builder.build(expression)?;

for child in pipeline.children {
children.push(child);
}

items.push(EvalItem::Reader(
pipeline.stdout.map(|inner| context.line_reader_from(inner)),
pipeline
.stdout
.move_context(|inner| context.line_reader_from(inner)),
));

if pipeline.stdin.is_some() {
Expand Down Expand Up @@ -343,104 +345,39 @@ impl<'a> CommandBuilder<'a> {
}
}

fn build_expression(&mut self, expr: &Expression) -> Result<Pipeline> {
fn build(&mut self, expr: &Expression) -> Result<Pipeline> {
let raw_expr = format!("{YELLOW}{}{RESET}", expr.raw_value);

let result = match &expr.value {
ExpressionValue::RawShell(command) => self.build_raw_shell(command, expr.no_stdin),
ExpressionValue::Pipeline(commands) => self.build_pipeline(commands, expr.no_stdin),
};

match result {
match self.build_pipeline(expr) {
Ok(pipeline) => Ok(pipeline.context(format!("expression: {raw_expr}"))),
Err(err) => Err(err.context(format!("failed to initialize expression {raw_expr}"))),
}
}

fn build_raw_shell(&self, shell_command: &str, no_stdin: bool) -> Result<Pipeline> {
let mut command = self.shell.build_command(shell_command);
command.stdout(Stdio::piped());

if no_stdin {
command.stdin(Stdio::null());
} else {
command.stdin(Stdio::piped());
}

let mut child = command.spawn_with_context()?;

let stdin = child.take_stdin();
let stdout = child
.take_stdout()
.expect("raw shell child process should have stdout");
fn build_pipeline(&mut self, expr: &Expression) -> Result<Pipeline> {
let mut pipeline = pipeline::Builder::new(expr.stdin_mode);

Ok(Pipeline {
stdin,
stdout,
children: vec![child],
})
}

fn build_pipeline(
&mut self,
commands: &[pattern::Command],
mut no_stdin: bool,
) -> Result<Pipeline> {
let mut children = Vec::new();
let mut stdin = None;
let mut stdout: Option<Spawned<ChildStdout>> = None;

for params in commands {
let (mut command, group) = self.build_command(params)?;

if group == Group::Generators {
command.stdin(Stdio::null());
no_stdin = true;
} else if let Some(stdout) = stdout {
command.stdin(Stdio::from(stdout.inner));
} else {
command.stdin(Stdio::piped());
match &expr.value {
ExpressionValue::RawShell(command) => {
let command = self.shell.build_command(command);
pipeline = pipeline.command(command, expr.stdin_mode)?;
}

command.stdout(Stdio::piped());

let mut child = command.spawn_with_context()?;

if no_stdin {
stdin = None;
} else if stdin.is_none() {
stdin = child.take_stdin(); // The first process in pipeline
ExpressionValue::Pipeline(commands) => {
for command in commands {
let (command, stdin_mode) = self.build_command(command)?;
pipeline = pipeline.command(command, stdin_mode)?;
}
if pipeline.is_empty() {
let command = self.default_internal_command()?;
pipeline = pipeline.command(command, StdinMode::Connected)?;
}
}
};

stdout = child.take_stdout();
children.push(child);
}

if stdout.is_none() {
let mut command = self.default_internal_command()?;

let mut child = command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn_with_context()?;

stdin = child.take_stdin();
stdout = child.take_stdout();
}

let stdin = stdin.take();
let stdout = stdout
.take()
.expect("pipeline child process should have stdout");

Ok(Pipeline {
stdin,
stdout,
children,
})
Ok(pipeline.build())
}

fn build_command(&mut self, params: &pattern::Command) -> Result<(Command, Group)> {
fn build_command(&mut self, params: &pattern::Command) -> Result<(Command, StdinMode)> {
let pattern::Command {
name,
args,
Expand All @@ -450,19 +387,19 @@ impl<'a> CommandBuilder<'a> {
if !external {
if let Some(meta) = get_meta(name) {
let command = self.build_internal_command(Some(name), args)?;
return Ok((command, meta.group));
return Ok((command, group_stdin_mode(meta.group)));
}

if name == crate_name!() {
if let Some((name, args)) = args.split_first() {
if let Some(meta) = get_meta(name) {
let command = self.build_internal_command(Some(name), args)?;
return Ok((command, meta.group));
return Ok((command, group_stdin_mode(meta.group)));
}
}

let command = self.build_internal_command(None, args)?;
return Ok((command, Group::Transformers));
return Ok((command, StdinMode::Connected));
}
}

Expand All @@ -474,7 +411,7 @@ impl<'a> CommandBuilder<'a> {
command.env("PYTHONUNBUFFERED", "1"); // Python programs
}

Ok((command, Group::Transformers))
Ok((command, StdinMode::Connected))
}

fn build_internal_command(&self, name: Option<&str>, args: &[String]) -> Result<Command> {
Expand All @@ -498,3 +435,10 @@ impl<'a> CommandBuilder<'a> {
self.build_internal_command(Some(cat::META.name), &[])
}
}

fn group_stdin_mode(group: Group) -> StdinMode {
match group {
Group::Generators => StdinMode::Disconnected,
_ => StdinMode::Connected,
}
}
23 changes: 1 addition & 22 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,10 @@ use crate::colors::RESET;
use anstream::eprint;
use anstream::eprintln;
use anstream::stdout;
use anyhow::Context as AnyhowContext;
use anyhow::Context;
use clap::Command;
use std::env;

#[derive(Clone)]
pub struct Context(Vec<String>);

impl Context {
pub fn new(value: impl Into<String>) -> Self {
Self(vec![value.into()])
}

pub fn add(&mut self, value: impl Into<String>) {
self.0.push(value.into());
}

pub fn apply<E: Into<anyhow::Error>>(&self, error: E) -> anyhow::Error {
let mut error = error.into();
for context in &self.0 {
error = error.context(context.clone());
}
error
}
}

pub struct Reporter {
app: Command,
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub mod pager;
#[doc(hidden)]
pub mod pattern;
#[doc(hidden)]
pub mod pipeline;
#[doc(hidden)]
pub mod process;
#[doc(hidden)]
pub mod range;
Expand Down
2 changes: 1 addition & 1 deletion src/pager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::process::CommandEx;
use crate::process::SpawnWithContext;
use anstream::stream::IsTerminal;
use anyhow::Result;
use std::io::stdout;
Expand Down
13 changes: 10 additions & 3 deletions src/pattern.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::colors::BOLD_RED;
use crate::colors::RESET;
use crate::colors::YELLOW;
use crate::process::StdinMode;
use derive_more::Display;
use derive_more::IsVariant;
use std::fmt;
Expand Down Expand Up @@ -92,7 +93,7 @@ pub enum SimpleItem {

#[derive(Debug, Clone, PartialEq)]
pub struct Expression {
pub no_stdin: bool,
pub stdin_mode: StdinMode,
pub value: ExpressionValue,
pub raw_value: String,
}
Expand Down Expand Up @@ -174,7 +175,7 @@ impl Display for Pattern {
impl Display for Expression {
fn fmt(&self, fmt: &mut Formatter<'_>) -> fmt::Result {
write!(fmt, "{{")?;
if self.no_stdin {
if self.stdin_mode == StdinMode::Disconnected {
write!(fmt, ":")?;
}
write!(fmt, "{}", self.value)?;
Expand Down Expand Up @@ -275,6 +276,12 @@ impl Parser<'_> {

self.consume_whitespaces();

let stdin_mode = if no_stdin {
StdinMode::Disconnected
} else {
StdinMode::Connected
};

let value = if raw_shell {
ExpressionValue::RawShell(self.parse_raw_shell()?)
} else {
Expand All @@ -283,7 +290,7 @@ impl Parser<'_> {

if self.try_consume(EXPR_END) {
Ok(Expression {
no_stdin,
stdin_mode,
value,
raw_value: self.input[start_offset..self.offset].into(),
})
Expand Down

0 comments on commit 752b90d

Please sign in to comment.