Skip to content

Commit

Permalink
Bidirectional communication and streams for plugins (#11911)
Browse files Browse the repository at this point in the history
  • Loading branch information
devyn committed Feb 25, 2024
1 parent 461f69a commit 88f1f38
Show file tree
Hide file tree
Showing 47 changed files with 8,010 additions and 1,481 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ members = [
"crates/nu_plugin_inc",
"crates/nu_plugin_gstat",
"crates/nu_plugin_example",
"crates/nu_plugin_stream_example",
"crates/nu_plugin_query",
"crates/nu_plugin_custom_values",
"crates/nu_plugin_formats",
Expand Down
22 changes: 13 additions & 9 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use nu_cli::eval_source;
use nu_parser::parse;
use nu_plugin::{EncodingType, PluginResponse};
use nu_plugin::{Encoder, EncodingType, PluginCallResponse, PluginOutput};
use nu_protocol::{engine::EngineState, PipelineData, Span, Value};
use nu_utils::{get_default_config, get_default_env};
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -148,10 +148,12 @@ fn encoding_benchmarks(c: &mut Criterion) {
for fmt in ["json", "msgpack"] {
group.bench_function(&format!("{fmt} encode {row_cnt} * {col_cnt}"), |b| {
let mut res = vec![];
let test_data =
PluginResponse::Value(Box::new(encoding_test_data(row_cnt, col_cnt)));
let test_data = PluginOutput::CallResponse(
0,
PluginCallResponse::value(encoding_test_data(row_cnt, col_cnt)),
);
let encoder = EncodingType::try_from_bytes(fmt.as_bytes()).unwrap();
b.iter(|| encoder.encode_response(&test_data, &mut res))
b.iter(|| encoder.encode(&test_data, &mut res))
});
}
}
Expand All @@ -165,14 +167,16 @@ fn decoding_benchmarks(c: &mut Criterion) {
for fmt in ["json", "msgpack"] {
group.bench_function(&format!("{fmt} decode for {row_cnt} * {col_cnt}"), |b| {
let mut res = vec![];
let test_data =
PluginResponse::Value(Box::new(encoding_test_data(row_cnt, col_cnt)));
let test_data = PluginOutput::CallResponse(
0,
PluginCallResponse::value(encoding_test_data(row_cnt, col_cnt)),
);
let encoder = EncodingType::try_from_bytes(fmt.as_bytes()).unwrap();
encoder.encode_response(&test_data, &mut res).unwrap();
encoder.encode(&test_data, &mut res).unwrap();
let mut binary_data = std::io::Cursor::new(res);
b.iter(|| {
b.iter(|| -> Result<Option<PluginOutput>, _> {
binary_data.set_position(0);
encoder.decode_response(&mut binary_data)
encoder.decode(&mut binary_data)
})
});
}
Expand Down
4 changes: 4 additions & 0 deletions crates/nu-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ bincode = "1.3"
rmp-serde = "1.1"
serde = { version = "1.0" }
serde_json = { version = "1.0" }
log = "0.4"
miette = "7.0"
semver = "1.0"
typetag = "0.2"
21 changes: 17 additions & 4 deletions crates/nu-plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! function, which will handle all of the input and output serialization when
//! invoked by Nushell.
//!
//! ```
//! ```rust,no_run
//! use nu_plugin::{EvaluatedCall, LabeledError, MsgPackSerializer, Plugin, serve_plugin};
//! use nu_protocol::{PluginSignature, Value};
//!
Expand Down Expand Up @@ -46,8 +46,21 @@
//! that demonstrates the full range of plugin capabilities.
mod plugin;
mod protocol;
mod sequence;
mod serializers;

pub use plugin::{get_signature, serve_plugin, Plugin, PluginDeclaration};
pub use protocol::{EvaluatedCall, LabeledError, PluginResponse};
pub use serializers::{json::JsonSerializer, msgpack::MsgPackSerializer, EncodingType};
pub use plugin::{serve_plugin, Plugin, PluginEncoder, StreamingPlugin};
pub use protocol::{EvaluatedCall, LabeledError};
pub use serializers::{json::JsonSerializer, msgpack::MsgPackSerializer};

// Used by other nu crates.
#[doc(hidden)]
pub use plugin::{get_signature, PluginDeclaration};
#[doc(hidden)]
pub use serializers::EncodingType;

// Used by external benchmarks.
#[doc(hidden)]
pub use plugin::Encoder;
#[doc(hidden)]
pub use protocol::{PluginCallResponse, PluginOutput};
46 changes: 46 additions & 0 deletions crates/nu-plugin/src/plugin/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::sync::{atomic::AtomicBool, Arc};

use nu_protocol::{
ast::Call,
engine::{EngineState, Stack},
};

/// Object safe trait for abstracting operations required of the plugin context.
pub(crate) trait PluginExecutionContext: Send + Sync {
/// The interrupt signal, if present
fn ctrlc(&self) -> Option<&Arc<AtomicBool>>;
}

/// The execution context of a plugin command. May be extended with more fields in the future.
pub(crate) struct PluginExecutionCommandContext {
ctrlc: Option<Arc<AtomicBool>>,
}

impl PluginExecutionCommandContext {
pub fn new(
engine_state: &EngineState,
_stack: &Stack,
_call: &Call,
) -> PluginExecutionCommandContext {
PluginExecutionCommandContext {
ctrlc: engine_state.ctrlc.clone(),
}
}
}

impl PluginExecutionContext for PluginExecutionCommandContext {
fn ctrlc(&self) -> Option<&Arc<AtomicBool>> {
self.ctrlc.as_ref()
}
}

/// A bogus execution context for testing that doesn't really implement anything properly
#[cfg(test)]
pub(crate) struct PluginExecutionBogusContext;

#[cfg(test)]
impl PluginExecutionContext for PluginExecutionBogusContext {
fn ctrlc(&self) -> Option<&Arc<AtomicBool>> {
None
}
}
166 changes: 37 additions & 129 deletions crates/nu-plugin/src/plugin/declaration.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use crate::EvaluatedCall;

use super::{call_plugin, create_command, get_plugin_encoding};
use crate::protocol::{
CallInfo, CallInput, PluginCall, PluginCustomValue, PluginData, PluginResponse,
};
use super::{PluginExecutionCommandContext, PluginIdentity};
use crate::protocol::{CallInfo, EvaluatedCall};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use nu_engine::eval_block;
use nu_protocol::engine::{Command, EngineState, Stack};
Expand All @@ -16,17 +13,15 @@ use nu_protocol::{Example, PipelineData, ShellError, Value};
pub struct PluginDeclaration {
name: String,
signature: PluginSignature,
filename: PathBuf,
shell: Option<PathBuf>,
identity: Arc<PluginIdentity>,
}

impl PluginDeclaration {
pub fn new(filename: PathBuf, signature: PluginSignature, shell: Option<PathBuf>) -> Self {
Self {
name: signature.sig.name.clone(),
signature,
filename,
shell,
identity: Arc::new(PluginIdentity::new(filename, shell)),
}
}
}
Expand Down Expand Up @@ -76,76 +71,18 @@ impl Command for PluginDeclaration {
call: &Call,
input: PipelineData,
) -> Result<PipelineData, ShellError> {
// Call the command with self path
// Decode information from plugin
// Create PipelineData
let source_file = Path::new(&self.filename);
let mut plugin_cmd = create_command(source_file, self.shell.as_deref());
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
let current_envs = nu_engine::env::env_to_strings(engine_state, stack).unwrap_or_default();
plugin_cmd.envs(current_envs);

let mut child = plugin_cmd.spawn().map_err(|err| {
let decl = engine_state.get_decl(call.decl_id);
ShellError::GenericError {
error: format!("Unable to spawn plugin for {}", decl.name()),
msg: format!("{err}"),
span: Some(call.head),
help: None,
inner: vec![],
}
})?;

let input = input.into_value(call.head);
let span = input.span();
let input = match input {
Value::CustomValue { val, .. } => {
match val.as_any().downcast_ref::<PluginCustomValue>() {
Some(plugin_data) if plugin_data.filename == self.filename => {
CallInput::Data(PluginData {
data: plugin_data.data.clone(),
span,
})
}
_ => {
let custom_value_name = val.value_string();
return Err(ShellError::GenericError {
error: format!(
"Plugin {} can not handle the custom value {}",
self.name, custom_value_name
),
msg: format!("custom value {custom_value_name}"),
span: Some(span),
help: None,
inner: vec![],
});
}
}
}
Value::LazyRecord { val, .. } => CallInput::Value(val.collect()?),
value => CallInput::Value(value),
};
// Create the EvaluatedCall to send to the plugin first - it's best for this to fail early,
// before we actually try to run the plugin command
let evaluated_call = EvaluatedCall::try_from_call(call, engine_state, stack)?;

// Fetch the configuration for a plugin
//
// The `plugin` must match the registered name of a plugin. For
// `register nu_plugin_example` the plugin config lookup uses `"example"`
let config = self
.filename
.file_stem()
.and_then(|file| {
file.to_string_lossy()
.clone()
.strip_prefix("nu_plugin_")
.map(|name| {
nu_engine::get_config(engine_state, stack)
.plugins
.get(name)
.cloned()
})
})
.flatten()
let config = nu_engine::get_config(engine_state, stack)
.plugins
.get(&self.identity.plugin_name)
.cloned()
.map(|value| {
let span = value.span();
match value {
Expand All @@ -164,70 +101,41 @@ impl Command for PluginDeclaration {
}
});

let plugin_call = PluginCall::CallInfo(CallInfo {
name: self.name.clone(),
call: EvaluatedCall::try_from_call(call, engine_state, stack)?,
input,
config,
});

let encoding = {
let stdout_reader = match &mut child.stdout {
Some(out) => out,
None => {
return Err(ShellError::PluginFailedToLoad {
msg: "Plugin missing stdout reader".into(),
})
}
};
get_plugin_encoding(stdout_reader)?
};
let response = call_plugin(&mut child, plugin_call, &encoding, call.head).map_err(|err| {
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
let current_envs = nu_engine::env::env_to_strings(engine_state, stack).unwrap_or_default();

// Start the plugin
let plugin = self.identity.clone().spawn(current_envs).map_err(|err| {
let decl = engine_state.get_decl(call.decl_id);
ShellError::GenericError {
error: format!("Unable to decode call for {}", decl.name()),
error: format!("Unable to spawn plugin for `{}`", decl.name()),
msg: err.to_string(),
span: Some(call.head),
help: None,
inner: vec![],
}
});

let pipeline_data = match response {
Ok(PluginResponse::Value(value)) => {
Ok(PipelineData::Value(value.as_ref().clone(), None))
}
Ok(PluginResponse::PluginData(name, plugin_data)) => Ok(PipelineData::Value(
Value::custom_value(
Box::new(PluginCustomValue {
name,
data: plugin_data.data,
filename: self.filename.clone(),
shell: self.shell.clone(),
source: engine_state.get_decl(call.decl_id).name().to_owned(),
}),
plugin_data.span,
),
None,
)),
Ok(PluginResponse::Error(err)) => Err(err.into()),
Ok(PluginResponse::Signature(..)) => Err(ShellError::GenericError {
error: "Plugin missing value".into(),
msg: "Received a signature from plugin instead of value".into(),
span: Some(call.head),
help: None,
inner: vec![],
}),
Err(err) => Err(err),
};

// We need to call .wait() on the child, or we'll risk summoning the zombie horde
let _ = child.wait();
})?;

pipeline_data
// Create the context to execute in
let context = Arc::new(PluginExecutionCommandContext::new(
engine_state,
stack,
call,
));

plugin.run(
CallInfo {
name: self.name.clone(),
call: evaluated_call,
input,
config,
},
context,
)
}

fn is_plugin(&self) -> Option<(&Path, Option<&Path>)> {
Some((&self.filename, self.shell.as_deref()))
Some((&self.identity.filename, self.identity.shell.as_deref()))
}
}
Loading

0 comments on commit 88f1f38

Please sign in to comment.