From 448b1c6d1e9c5d7e627f788cd4243c14888502f5 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 24 Apr 2026 19:54:35 +0000 Subject: [PATCH 1/4] feat(mcp): add generate_oneshot_command tool Add a new MCP tool that generates curl or skit-cli commands for executing oneshot (batch processing) pipelines. The tool validates the pipeline YAML before generating the command, returning diagnostics if validation fails. Key details: - Supports 'curl' (default) and 'skit-cli' output formats - Validates YAML with mode=oneshot before generating commands - Uses 'config' as the multipart field name (matching the server API) - Includes integration tests for both formats, invalid YAML, and permission denial Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/mcp.rs | 166 +++++++++++++++++++++++- apps/skit/tests/mcp_integration_test.rs | 163 +++++++++++++++++++++++ 2 files changed, 327 insertions(+), 2 deletions(-) diff --git a/apps/skit/src/mcp.rs b/apps/skit/src/mcp.rs index 4d5e8138..c6cf2a7a 100644 --- a/apps/skit/src/mcp.rs +++ b/apps/skit/src/mcp.rs @@ -54,6 +54,30 @@ fn extract_auth( // MCP tool argument structs // --------------------------------------------------------------------------- +#[derive(Debug, Deserialize, schemars::JsonSchema)] +pub struct OneshotInput { + /// Input field name matching a node ID in the pipeline (e.g., "input"). + pub field: String, + /// Path to the input file on the local filesystem. + pub path: String, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +pub struct GenerateOneshotCommandArgs { + /// Pipeline YAML for the oneshot run. + pub yaml: String, + /// Input file(s) to include in the request. + pub inputs: Vec, + /// Path where the output should be saved. + pub output: String, + /// Server URL (defaults to "http://localhost:4545"). + #[serde(default)] + pub server_url: Option, + /// Command format: "curl" or "skit-cli". Defaults to "curl". + #[serde(default)] + pub format: Option, +} + #[derive(Debug, Deserialize, schemars::JsonSchema)] pub struct ValidatePipelineArgs { /// Pipeline YAML to validate. @@ -428,6 +452,66 @@ impl StreamKitMcp { Ok(CallToolResult::success(vec![Content::text(json)])) } + // -- generate_oneshot_command ------------------------------------------- + + #[tool( + description = "Generate a curl or skit-cli command to execute a oneshot (batch processing) pipeline. The oneshot runs through the HTTP data plane (POST /api/v1/process), not through MCP. Use validate_pipeline with mode='oneshot' first to ensure the YAML is valid." + )] + async fn generate_oneshot_command( + &self, + Parameters(args): Parameters, + ctx: RequestContext, + ) -> Result { + let (_role_name, perms) = extract_auth(&ctx, &self.app_state)?; + + if !perms.create_sessions { + return Err(McpError::invalid_request( + "Permission denied: create_sessions required", + None, + )); + } + + // Validate the YAML before generating a command. + let validation = crate::server::validate_pipeline_yaml( + &self.app_state, + &perms, + &args.yaml, + Some(crate::server::PipelineMode::Oneshot), + ) + .map_err(|e| McpError::internal_error(e, None))?; + + let validation_json = serde_json::to_value(&validation) + .map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?; + + if validation_json["valid"] == false { + let pretty = serde_json::to_string_pretty(&validation_json) + .map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?; + return Ok(CallToolResult::success(vec![Content::text(format!( + "Pipeline validation failed. Fix the errors before generating a command:\n{pretty}" + ))])); + } + + let server_url = args.server_url.as_deref().unwrap_or("http://localhost:4545"); + let format = args.format.as_deref().unwrap_or("curl"); + + let command = match format { + "curl" => generate_curl_command(&args.yaml, &args.inputs, &args.output, server_url), + "skit-cli" => { + generate_skit_cli_command(&args.yaml, &args.inputs, &args.output, server_url) + }, + other => { + return Err(McpError::invalid_params( + format!("Invalid format '{other}'. Must be 'curl' or 'skit-cli'."), + None, + )); + }, + }; + + info!(format, "MCP generate_oneshot_command"); + + Ok(CallToolResult::success(vec![Content::text(command)])) + } + // -- destroy_session --------------------------------------------------- #[tool( @@ -523,6 +607,82 @@ impl StreamKitMcp { } } +// --------------------------------------------------------------------------- +// Command generation helpers +// --------------------------------------------------------------------------- + +fn generate_curl_command( + yaml: &str, + inputs: &[OneshotInput], + output: &str, + server_url: &str, +) -> String { + use std::fmt::Write; + + let mut cmd = String::new(); + let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run curl."); + let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'PIPELINE_EOF'"); + let _ = writeln!(cmd, "{yaml}"); + let _ = writeln!(cmd, "PIPELINE_EOF"); + let _ = writeln!(cmd); + let _ = write!( + cmd, + "curl -X POST {server_url}/api/v1/process \\\n -F \"config= String { + use std::fmt::Write; + + let mut cmd = String::new(); + let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run the CLI."); + let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'PIPELINE_EOF'"); + let _ = writeln!(cmd, "{yaml}"); + let _ = writeln!(cmd, "PIPELINE_EOF"); + let _ = writeln!(cmd); + + // The CLI takes one positional input mapped to the "media" field, + // plus optional --input field=path for additional inputs. + let (primary, extras): (Vec<_>, Vec<_>) = inputs.iter().partition(|i| i.field == "media"); + + if let Some(primary_input) = primary.first() { + let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", primary_input.path); + } else if let Some(first) = inputs.first() { + // No input named "media" — use the first as positional and re-add + // it via --input so the server receives the correct field name. + let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", first.path); + } else { + let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml "); + } + + let _ = write!(cmd, " {output}"); + + // Emit --input flags for non-primary inputs. + for input in &extras { + let _ = write!(cmd, " --input {}={}", input.field, input.path); + } + // If the first input was not "media" and was used as positional, + // add all original inputs via --input flags. + if primary.is_empty() { + for input in inputs { + let _ = write!(cmd, " --input {}={}", input.field, input.path); + } + } + + let _ = write!(cmd, " --server {server_url}"); + cmd +} + // --------------------------------------------------------------------------- // ServerHandler trait impl // --------------------------------------------------------------------------- @@ -532,9 +692,11 @@ impl ServerHandler for StreamKitMcp { let capabilities = ServerCapabilities::builder().enable_tools().build(); let mut info = ServerInfo::new(capabilities).with_instructions( "StreamKit MCP server. Use list_nodes to discover available \ - processing nodes, validate_pipeline to check YAML, and \ + processing nodes, validate_pipeline to check YAML, \ create_session / list_sessions / get_pipeline / destroy_session \ - to manage dynamic pipeline sessions.", + to manage dynamic pipeline sessions, and \ + generate_oneshot_command to get a curl or skit-cli command for \ + batch processing via the HTTP data plane.", ); info.server_info = rmcp::model::Implementation::new("streamkit", env!("CARGO_PKG_VERSION")); info diff --git a/apps/skit/tests/mcp_integration_test.rs b/apps/skit/tests/mcp_integration_test.rs index 72acdc3b..3db389e0 100644 --- a/apps/skit/tests/mcp_integration_test.rs +++ b/apps/skit/tests/mcp_integration_test.rs @@ -402,6 +402,14 @@ async fn mcp_config_endpoint_validation() { /// Minimal valid pipeline YAML for dynamic sessions. const PASSTHROUGH_YAML: &str = "nodes:\n pass:\n kind: core::passthrough"; +/// Minimal valid pipeline YAML for oneshot (steps-based). +const ONESHOT_PASSTHROUGH_YAML: &str = "\ +mode: oneshot\n\ +steps:\n\ + - kind: streamkit::http_input\n\ + - kind: core::passthrough\n\ + - kind: streamkit::http_output"; + #[tokio::test] async fn mcp_list_nodes_returns_definitions() { let _ = tracing_subscriber::fmt::try_init(); @@ -551,3 +559,158 @@ async fn mcp_create_list_get_destroy_session_round_trip() { "destroyed session should not appear in list_sessions" ); } + +// ----------------------------------------------------------------------- +// generate_oneshot_command tests +// ----------------------------------------------------------------------- + +#[tokio::test] +async fn mcp_generate_oneshot_command_curl() { + let _ = tracing_subscriber::fmt::try_init(); + + let (addr, _handle, token, _dir) = start_mcp_server().await; + let client = reqwest::Client::new(); + let session_id = init_mcp_session(&client, addr, &token).await; + + let call = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": "generate_oneshot_command", + "arguments": { + "yaml": ONESHOT_PASSTHROUGH_YAML, + "inputs": [{ "field": "media", "path": "/tmp/input.wav" }], + "output": "/tmp/output.wav", + "server_url": "http://localhost:4545", + "format": "curl" + } + } + }); + let res = mcp_post_with_session(&client, addr, &call, &token, &session_id).await; + assert_eq!(res.status(), StatusCode::OK); + + let body_text = res.text().await.unwrap(); + let body = extract_sse_json(&body_text); + let result = &body["result"]; + assert!(!result.is_null(), "expected result, got: {body}"); + + let text = result["content"][0]["text"].as_str().expect("expected text content"); + assert!(text.contains("curl"), "curl command expected in output: {text}"); + assert!(text.contains("/api/v1/process"), "endpoint expected in output: {text}"); + assert!(text.contains("config="), "config field expected in output: {text}"); + assert!(text.contains("media=@/tmp/input.wav"), "input field expected in output: {text}"); + assert!(text.contains("-o /tmp/output.wav"), "output path expected in output: {text}"); +} + +#[tokio::test] +async fn mcp_generate_oneshot_command_skit_cli() { + let _ = tracing_subscriber::fmt::try_init(); + + let (addr, _handle, token, _dir) = start_mcp_server().await; + let client = reqwest::Client::new(); + let session_id = init_mcp_session(&client, addr, &token).await; + + let call = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": "generate_oneshot_command", + "arguments": { + "yaml": ONESHOT_PASSTHROUGH_YAML, + "inputs": [{ "field": "media", "path": "/tmp/input.wav" }], + "output": "/tmp/output.wav", + "server_url": "http://localhost:9999", + "format": "skit-cli" + } + } + }); + let res = mcp_post_with_session(&client, addr, &call, &token, &session_id).await; + assert_eq!(res.status(), StatusCode::OK); + + let body_text = res.text().await.unwrap(); + let body = extract_sse_json(&body_text); + let result = &body["result"]; + assert!(!result.is_null(), "expected result, got: {body}"); + + let text = result["content"][0]["text"].as_str().expect("expected text content"); + assert!( + text.contains("streamkit-client oneshot"), + "skit-cli command expected in output: {text}" + ); + assert!(text.contains("/tmp/input.wav"), "input path expected in output: {text}"); + assert!(text.contains("/tmp/output.wav"), "output path expected in output: {text}"); + assert!( + text.contains("--server http://localhost:9999"), + "server URL expected in output: {text}" + ); +} + +#[tokio::test] +async fn mcp_generate_oneshot_command_invalid_yaml() { + let _ = tracing_subscriber::fmt::try_init(); + + let (addr, _handle, token, _dir) = start_mcp_server().await; + let client = reqwest::Client::new(); + let session_id = init_mcp_session(&client, addr, &token).await; + + let call = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": "generate_oneshot_command", + "arguments": { + "yaml": "not: valid: yaml: [[", + "inputs": [{ "field": "media", "path": "/tmp/input.wav" }], + "output": "/tmp/output.wav" + } + } + }); + let res = mcp_post_with_session(&client, addr, &call, &token, &session_id).await; + assert_eq!(res.status(), StatusCode::OK); + + let body_text = res.text().await.unwrap(); + let body = extract_sse_json(&body_text); + let result = &body["result"]; + assert!(!result.is_null(), "expected result (validation error), got: {body}"); + + let text = result["content"][0]["text"].as_str().expect("expected text content"); + assert!(text.contains("validation failed"), "expected validation failure message, got: {text}"); +} + +#[tokio::test] +async fn mcp_generate_oneshot_command_permission_denied() { + let _ = tracing_subscriber::fmt::try_init(); + + let (addr, _handle, token, _dir) = start_restricted_mcp_server().await; + let client = reqwest::Client::new(); + let session_id = init_mcp_session(&client, addr, &token).await; + + let call = json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { + "name": "generate_oneshot_command", + "arguments": { + "yaml": ONESHOT_PASSTHROUGH_YAML, + "inputs": [{ "field": "media", "path": "/tmp/input.wav" }], + "output": "/tmp/output.wav" + } + } + }); + let res = mcp_post_with_session(&client, addr, &call, &token, &session_id).await; + assert_eq!(res.status(), StatusCode::OK); + + let body_text = res.text().await.unwrap(); + let body = extract_sse_json(&body_text); + let error = &body["error"]; + assert!(!error.is_null(), "expected error for permission denied, got: {body}"); + let error_msg = error["message"].as_str().unwrap_or(""); + assert!( + error_msg.contains("Permission denied") || error_msg.contains("permission"), + "expected permission error, got: {error_msg}" + ); +} From 5bffb28be7c5421ef1c3b0300a47af9dca5b4019 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 24 Apr 2026 20:00:02 +0000 Subject: [PATCH 2/4] fix(mcp): fix duplicate --input flags and add shell quoting - Fix duplicate --input flags in generate_skit_cli_command when no input has field 'media' (extras and inputs were both iterated) - Shell-quote all interpolated values (paths, URLs) in generated commands to handle spaces and metacharacters safely Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/mcp.rs | 47 +++++++++++++++---------- apps/skit/tests/mcp_integration_test.rs | 10 +++--- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/apps/skit/src/mcp.rs b/apps/skit/src/mcp.rs index c6cf2a7a..b17a472d 100644 --- a/apps/skit/src/mcp.rs +++ b/apps/skit/src/mcp.rs @@ -611,6 +611,12 @@ impl StreamKitMcp { // Command generation helpers // --------------------------------------------------------------------------- +/// Shell-quote a value by wrapping it in single quotes and escaping any +/// embedded single quotes (`'` → `'\''`). +fn shell_quote(s: &str) -> String { + format!("'{}'", s.replace('\'', "'\\''")) +} + fn generate_curl_command( yaml: &str, inputs: &[OneshotInput], @@ -625,14 +631,13 @@ fn generate_curl_command( let _ = writeln!(cmd, "{yaml}"); let _ = writeln!(cmd, "PIPELINE_EOF"); let _ = writeln!(cmd); - let _ = write!( - cmd, - "curl -X POST {server_url}/api/v1/process \\\n -F \"config=, Vec<_>) = inputs.iter().partition(|i| i.field == "media"); if let Some(primary_input) = primary.first() { - let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", primary_input.path); + let _ = write!( + cmd, + "streamkit-client oneshot /tmp/pipeline.yaml {}", + shell_quote(&primary_input.path) + ); } else if let Some(first) = inputs.first() { // No input named "media" — use the first as positional and re-add // it via --input so the server receives the correct field name. - let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", first.path); + let _ = + write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", shell_quote(&first.path)); } else { let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml "); } - let _ = write!(cmd, " {output}"); + let _ = write!(cmd, " {}", shell_quote(output)); - // Emit --input flags for non-primary inputs. - for input in &extras { - let _ = write!(cmd, " --input {}={}", input.field, input.path); - } - // If the first input was not "media" and was used as positional, - // add all original inputs via --input flags. - if primary.is_empty() { + // Emit --input flags: when a "media" input exists, only extras need + // flags; otherwise all inputs are emitted (the first was used as the + // positional arg but with a non-"media" field name). + if !primary.is_empty() { + for input in &extras { + let _ = write!(cmd, " --input {}={}", input.field, shell_quote(&input.path)); + } + } else { for input in inputs { - let _ = write!(cmd, " --input {}={}", input.field, input.path); + let _ = write!(cmd, " --input {}={}", input.field, shell_quote(&input.path)); } } - let _ = write!(cmd, " --server {server_url}"); + let _ = write!(cmd, " --server {}", shell_quote(server_url)); cmd } diff --git a/apps/skit/tests/mcp_integration_test.rs b/apps/skit/tests/mcp_integration_test.rs index 3db389e0..683ffbfb 100644 --- a/apps/skit/tests/mcp_integration_test.rs +++ b/apps/skit/tests/mcp_integration_test.rs @@ -599,8 +599,8 @@ async fn mcp_generate_oneshot_command_curl() { assert!(text.contains("curl"), "curl command expected in output: {text}"); assert!(text.contains("/api/v1/process"), "endpoint expected in output: {text}"); assert!(text.contains("config="), "config field expected in output: {text}"); - assert!(text.contains("media=@/tmp/input.wav"), "input field expected in output: {text}"); - assert!(text.contains("-o /tmp/output.wav"), "output path expected in output: {text}"); + assert!(text.contains("'media=@/tmp/input.wav'"), "input field expected in output: {text}"); + assert!(text.contains("-o '/tmp/output.wav'"), "output path expected in output: {text}"); } #[tokio::test] @@ -639,10 +639,10 @@ async fn mcp_generate_oneshot_command_skit_cli() { text.contains("streamkit-client oneshot"), "skit-cli command expected in output: {text}" ); - assert!(text.contains("/tmp/input.wav"), "input path expected in output: {text}"); - assert!(text.contains("/tmp/output.wav"), "output path expected in output: {text}"); + assert!(text.contains("'/tmp/input.wav'"), "input path expected in output: {text}"); + assert!(text.contains("'/tmp/output.wav'"), "output path expected in output: {text}"); assert!( - text.contains("--server http://localhost:9999"), + text.contains("--server 'http://localhost:9999'"), "server URL expected in output: {text}" ); } From ad33ad800668e3bf06349e7f3705b11b74714a37 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 24 Apr 2026 20:05:09 +0000 Subject: [PATCH 3/4] fix(mcp): shell-quote field names in skit-cli command generator Quote the entire field=path pair as a single shell token in generate_skit_cli_command, consistent with how generate_curl_command handles it. Prevents shell injection via crafted field names. Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/mcp.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/skit/src/mcp.rs b/apps/skit/src/mcp.rs index b17a472d..b1ba4977 100644 --- a/apps/skit/src/mcp.rs +++ b/apps/skit/src/mcp.rs @@ -682,11 +682,13 @@ fn generate_skit_cli_command( // positional arg but with a non-"media" field name). if !primary.is_empty() { for input in &extras { - let _ = write!(cmd, " --input {}={}", input.field, shell_quote(&input.path)); + let _ = + write!(cmd, " --input {}", shell_quote(&format!("{}={}", input.field, input.path))); } } else { for input in inputs { - let _ = write!(cmd, " --input {}={}", input.field, shell_quote(&input.path)); + let _ = + write!(cmd, " --input {}", shell_quote(&format!("{}={}", input.field, input.path))); } } From 5fca7ed985c45f6f8b86b6f6fea0e476225dc19c Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Fri, 24 Apr 2026 20:11:09 +0000 Subject: [PATCH 4/4] fix(mcp): use dynamic heredoc delimiter to prevent content injection Choose a heredoc delimiter that does not appear in the YAML content, preventing premature termination if YAML contains the literal delimiter string. Applies to both curl and skit-cli generators. Signed-off-by: Devin AI Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/mcp.rs | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/apps/skit/src/mcp.rs b/apps/skit/src/mcp.rs index b1ba4977..42733a9b 100644 --- a/apps/skit/src/mcp.rs +++ b/apps/skit/src/mcp.rs @@ -617,6 +617,21 @@ fn shell_quote(s: &str) -> String { format!("'{}'", s.replace('\'', "'\\''")) } +/// Return a heredoc delimiter that does not appear in `content`. +fn unique_heredoc_delimiter(content: &str) -> String { + let base = "PIPELINE_EOF"; + if !content.contains(base) { + return base.to_string(); + } + for i in 0u32.. { + let candidate = format!("{base}_{i}"); + if !content.contains(&candidate) { + return candidate; + } + } + unreachable!() +} + fn generate_curl_command( yaml: &str, inputs: &[OneshotInput], @@ -625,11 +640,13 @@ fn generate_curl_command( ) -> String { use std::fmt::Write; + let delim = unique_heredoc_delimiter(yaml); + let mut cmd = String::new(); let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run curl."); - let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'PIPELINE_EOF'"); + let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'{delim}'"); let _ = writeln!(cmd, "{yaml}"); - let _ = writeln!(cmd, "PIPELINE_EOF"); + let _ = writeln!(cmd, "{delim}"); let _ = writeln!(cmd); let url = format!("{server_url}/api/v1/process"); let _ = write!(cmd, "curl -X POST {} \\\n -F 'config= String { use std::fmt::Write; + let delim = unique_heredoc_delimiter(yaml); + let mut cmd = String::new(); let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run the CLI."); - let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'PIPELINE_EOF'"); + let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'{delim}'"); let _ = writeln!(cmd, "{yaml}"); - let _ = writeln!(cmd, "PIPELINE_EOF"); + let _ = writeln!(cmd, "{delim}"); let _ = writeln!(cmd); // The CLI takes one positional input mapped to the "media" field,