Skip to content

Commit

Permalink
Data flows across commands via streams now
Browse files Browse the repository at this point in the history
  • Loading branch information
wycats committed May 23, 2019
1 parent 31dd579 commit 625a356
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 157 deletions.
1 change: 1 addition & 0 deletions rustfmt.toml
@@ -0,0 +1 @@
edition = "2018"
71 changes: 35 additions & 36 deletions src/cli.rs
@@ -1,12 +1,11 @@
use crate::prelude::*;

use crate::commands::classified::{ClassifiedCommand, ExternalCommand, InternalCommand};
use crate::commands::command::ReturnValue;
use crate::context::Context;
crate use crate::env::Host;
crate use crate::errors::ShellError;
crate use crate::format::{EntriesListView, GenericView};
use crate::object::Value;
use crate::stream::empty_stream;

use rustyline::error::ReadlineError;
use rustyline::{self, ColorMode, Config, Editor};
Expand All @@ -29,7 +28,7 @@ impl<T> MaybeOwned<'a, T> {
}
}

pub fn cli() -> Result<(), Box<Error>> {
pub async fn cli() -> Result<(), Box<Error>> {
let config = Config::builder().color_mode(ColorMode::Forced).build();
let h = crate::shell::Helper::new();
let mut rl: Editor<crate::shell::Helper> = Editor::with_config(config);
Expand All @@ -50,8 +49,6 @@ pub fn cli() -> Result<(), Box<Error>> {
use crate::commands::*;

context.add_commands(vec![
("format", Arc::new(format)),
("format-list", Arc::new(format_list)),
("ps", Arc::new(ps::ps)),
("ls", Arc::new(ls::ls)),
("cd", Arc::new(cd::cd)),
Expand All @@ -67,15 +64,18 @@ pub fn cli() -> Result<(), Box<Error>> {
}

loop {
let readline = rl.readline(&format!("{}> ", context.env.cwd().display().to_string()));
let readline = rl.readline(&format!(
"{}> ",
context.env.lock().unwrap().cwd().display().to_string()
));

match process_line(readline, &mut context) {
match process_line(readline, &mut context).await {
LineResult::Success(line) => {
rl.add_history_entry(line.clone());
}

LineResult::Error(err) => {
context.host.stdout(&err);
context.host.lock().unwrap().stdout(&err);
}

LineResult::Break => {
Expand All @@ -85,6 +85,8 @@ pub fn cli() -> Result<(), Box<Error>> {
LineResult::FatalError(err) => {
context
.host
.lock()
.unwrap()
.stdout(&format!("A surprising fatal error occurred.\n{:?}", err));
}
}
Expand All @@ -103,7 +105,7 @@ enum LineResult {
FatalError(ShellError),
}

fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
async fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> LineResult {
match &readline {
Ok(line) if line.trim() == "exit" => LineResult::Break,

Expand All @@ -120,29 +122,25 @@ fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> L

let parsed = result.1;

let mut input = VecDeque::new();
let mut input: InputStream = VecDeque::new().boxed();

for item in parsed {
input = match process_command(item.clone(), input, ctx) {
input = match process_command(item.clone(), input, ctx).await {
Ok(val) => val,
Err(err) => return LineResult::Error(format!("{}", err.description())),
};
}

if input.len() > 0 {
if equal_shapes(&input) {
let array = crate::commands::stream_to_array(input);
let input_vec: VecDeque<_> = input.collect().await;

if input_vec.len() > 0 {
if equal_shapes(&input_vec) {
let array = crate::commands::stream_to_array(input_vec.boxed()).await;
let args = CommandArgs::from_context(ctx, vec![], array);
match format(args) {
Ok(_) => {}
Err(err) => return LineResult::Error(err.to_string()),
}
format(args).await;
} else {
let args = CommandArgs::from_context(ctx, vec![], input);
match format(args) {
Ok(_) => {}
Err(err) => return LineResult::Error(err.to_string()),
}
let args = CommandArgs::from_context(ctx, vec![], input_vec.boxed());
format(args).await;
}
}

Expand All @@ -163,14 +161,14 @@ fn process_line(readline: Result<String, ReadlineError>, ctx: &mut Context) -> L
}
}

fn process_command(
async fn process_command(
parsed: Vec<crate::parser::Item>,
input: VecDeque<Value>,
input: InputStream,
context: &mut Context,
) -> Result<VecDeque<Value>, ShellError> {
) -> Result<InputStream, ShellError> {
let command = classify_command(&parsed, context)?;

command.run(input, context)
command.run(input, context).await
}

fn classify_command(
Expand Down Expand Up @@ -199,25 +197,26 @@ fn classify_command(
}
}

fn format(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
let last = args.input.len() - 1;
for (i, item) in args.input.iter().enumerate() {
async fn format(args: CommandArgs) -> OutputStream {
let input: Vec<_> = args.input.collect().await;
let last = input.len() - 1;
for (i, item) in input.iter().enumerate() {
let view = GenericView::new(item);
crate::format::print_view(&view, args.host);
crate::format::print_view(&view, &mut *args.host.lock().unwrap());

if last != i {
println!("");
}
}

Ok(VecDeque::new())
empty_stream()
}

fn format_list(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
let view = EntriesListView::from_stream(args.input);
crate::format::print_view(&view, args.host);
async fn format_list(args: CommandArgs) -> OutputStream {
let view = EntriesListView::from_stream(args.input).await;
crate::format::print_view(&view, &mut *args.host.lock().unwrap());

Ok(VecDeque::new())
empty_stream()
}

fn equal_shapes(input: &VecDeque<Value>) -> bool {
Expand Down
6 changes: 3 additions & 3 deletions src/commands/cd.rs
Expand Up @@ -2,18 +2,18 @@ use crate::errors::ShellError;
use crate::prelude::*;
use std::env;

pub fn cd(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn cd(args: CommandArgs) -> Result<OutputStream, ShellError> {
let target = match args.args.first() {
// TODO: This needs better infra
None => return Err(ShellError::string(format!("cd must take one arg"))),
Some(v) => v.as_string()?.clone(),
};

let cwd = args.env.cwd().to_path_buf();
let cwd = args.env.lock().unwrap().cwd().to_path_buf();

let mut stream = VecDeque::new();
let path = dunce::canonicalize(cwd.join(&target).as_path())?;
let _ = env::set_current_dir(&path);
stream.push_back(ReturnValue::change_cwd(path));
Ok(stream)
Ok(stream.boxed())
}
33 changes: 17 additions & 16 deletions src/commands/classified.rs
Expand Up @@ -8,37 +8,38 @@ crate enum ClassifiedCommand {
}

impl ClassifiedCommand {
crate fn run(
crate async fn run(
self,
input: VecDeque<Value>,
input: InputStream,
context: &mut Context,
) -> Result<VecDeque<Value>, ShellError> {
) -> Result<InputStream, ShellError> {
match self {
ClassifiedCommand::Internal(internal) => {
let result = context.run_command(internal.command, internal.args, input)?;
let env = context.env.clone();

let mut next = VecDeque::new();
let stream = result.filter_map(move |v| match v {
ReturnValue::Action(action) => match action {
CommandAction::ChangeCwd(cwd) => {
env.lock().unwrap().cwd = cwd;
futures::future::ready(None)
}
},

for v in result {
match v {
ReturnValue::Action(action) => match action {
CommandAction::ChangeCwd(cwd) => context.env.cwd = cwd,
},
ReturnValue::Value(v) => futures::future::ready(Some(v)),
});

ReturnValue::Value(v) => next.push_back(v),
}
}

Ok(next)
Ok(stream.boxed() as InputStream)
}

ClassifiedCommand::External(external) => {
Exec::shell(&external.name)
.args(&external.args)
.cwd(context.env.cwd())
.cwd(context.env.lock().unwrap().cwd())
.join()
.unwrap();
Ok(VecDeque::new())

Ok(VecDeque::new().boxed() as InputStream)
}
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/commands/command.rs
Expand Up @@ -3,22 +3,22 @@ use crate::object::Value;
use crate::prelude::*;
use std::path::PathBuf;

pub struct CommandArgs<'caller> {
pub host: &'caller mut dyn Host,
pub env: &'caller Environment,
pub struct CommandArgs {
pub host: Arc<Mutex<dyn Host>>,
pub env: Arc<Mutex<Environment>>,
pub args: Vec<Value>,
pub input: VecDeque<Value>,
pub input: InputStream,
}

impl CommandArgs<'caller> {
impl CommandArgs {
crate fn from_context(
ctx: &'caller mut Context,
args: Vec<Value>,
input: VecDeque<Value>,
) -> CommandArgs<'caller> {
input: InputStream,
) -> CommandArgs {
CommandArgs {
host: &mut ctx.host,
env: &ctx.env,
host: ctx.host.clone(),
env: ctx.env.clone(),
args,
input,
}
Expand Down Expand Up @@ -49,14 +49,14 @@ impl ReturnValue {
}

pub trait Command {
fn run(&self, args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError>;
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError>;
}

impl<F> Command for F
where
F: Fn(CommandArgs<'_>) -> Result<VecDeque<ReturnValue>, ShellError>,
F: Fn(CommandArgs) -> Result<OutputStream, ShellError>,
{
fn run(&self, args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
fn run(&self, args: CommandArgs) -> Result<OutputStream, ShellError> {
self(args)
}
}
6 changes: 3 additions & 3 deletions src/commands/ls.rs
Expand Up @@ -2,8 +2,8 @@ use crate::errors::ShellError;
use crate::object::{dir_entry_dict, Value};
use crate::prelude::*;

pub fn ls(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError> {
let cwd = args.env.cwd().to_path_buf();
pub fn ls(args: CommandArgs) -> Result<OutputStream, ShellError> {
let cwd = args.env.lock().unwrap().cwd().to_path_buf();

let entries = std::fs::read_dir(&cwd).map_err(|e| ShellError::string(format!("{:?}", e)))?;

Expand All @@ -14,5 +14,5 @@ pub fn ls(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError
shell_entries.push_back(ReturnValue::Value(value))
}

Ok(shell_entries)
Ok(shell_entries.boxed())
}
4 changes: 2 additions & 2 deletions src/commands/ps.rs
Expand Up @@ -4,7 +4,7 @@ use crate::object::Value;
use crate::prelude::*;
use sysinfo::SystemExt;

pub fn ps(_args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn ps(_args: CommandArgs) -> Result<OutputStream, ShellError> {
let mut system = sysinfo::System::new();
system.refresh_all();

Expand All @@ -15,5 +15,5 @@ pub fn ps(_args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellErr
.map(|(_, process)| ReturnValue::Value(Value::Object(process_dict(process))))
.collect::<VecDeque<_>>();

Ok(list)
Ok(list.boxed())
}
12 changes: 5 additions & 7 deletions src/commands/reject.rs
Expand Up @@ -3,20 +3,18 @@ use crate::object::base::reject_fields;
use crate::object::Value;
use crate::prelude::*;

pub fn reject(args: CommandArgs<'value>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn reject(args: CommandArgs) -> Result<OutputStream, ShellError> {
if args.args.is_empty() {
return Err(ShellError::string("select requires a field"));
}

let fields: Result<Vec<String>, _> = args.args.iter().map(|a| a.as_string()).collect();
let fields = fields?;

let objects = args
let stream = args
.input
.iter()
.map(|item| Value::Object(reject_fields(item, &fields)))
.map(|item| ReturnValue::Value(item))
.collect();
.map(move |item| Value::Object(reject_fields(&item, &fields)))
.map(|item| ReturnValue::Value(item));

Ok(objects)
Ok(stream.boxed())
}
11 changes: 5 additions & 6 deletions src/commands/select.rs
Expand Up @@ -3,7 +3,7 @@ use crate::object::base::select_fields;
use crate::object::Value;
use crate::prelude::*;

pub fn select(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, ShellError> {
pub fn select(args: CommandArgs) -> Result<OutputStream, ShellError> {
if args.args.is_empty() {
return Err(ShellError::string("select requires a field"));
}
Expand All @@ -13,10 +13,9 @@ pub fn select(args: CommandArgs<'caller>) -> Result<VecDeque<ReturnValue>, Shell

let objects = args
.input
.iter()
.map(|item| Value::Object(select_fields(item, &fields)))
.map(|item| ReturnValue::Value(item))
.collect();
.map(move |item| Value::Object(select_fields(&item, &fields)))
.map(|item| ReturnValue::Value(item));

Ok(objects)
let stream = Pin::new(Box::new(objects));
Ok(stream)
}

0 comments on commit 625a356

Please sign in to comment.