Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 196 additions & 2 deletions apps/skit/src/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ fn extract_auth(
// MCP tool argument structs
// ---------------------------------------------------------------------------

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct OneshotInput {
/// Input field name matching a node ID in the pipeline (e.g., "input").
pub field: String,
/// Path to the input file on the local filesystem.
pub path: String,
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct GenerateOneshotCommandArgs {
/// Pipeline YAML for the oneshot run.
pub yaml: String,
/// Input file(s) to include in the request.
pub inputs: Vec<OneshotInput>,
/// Path where the output should be saved.
pub output: String,
/// Server URL (defaults to "http://localhost:4545").
#[serde(default)]
pub server_url: Option<String>,
/// Command format: "curl" or "skit-cli". Defaults to "curl".
#[serde(default)]
pub format: Option<String>,
}

#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct ValidatePipelineArgs {
/// Pipeline YAML to validate.
Expand Down Expand Up @@ -428,6 +452,66 @@ impl StreamKitMcp {
Ok(CallToolResult::success(vec![Content::text(json)]))
}

// -- generate_oneshot_command -------------------------------------------

#[tool(
description = "Generate a curl or skit-cli command to execute a oneshot (batch processing) pipeline. The oneshot runs through the HTTP data plane (POST /api/v1/process), not through MCP. Use validate_pipeline with mode='oneshot' first to ensure the YAML is valid."
)]
async fn generate_oneshot_command(
&self,
Parameters(args): Parameters<GenerateOneshotCommandArgs>,
ctx: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let (_role_name, perms) = extract_auth(&ctx, &self.app_state)?;

if !perms.create_sessions {
return Err(McpError::invalid_request(
"Permission denied: create_sessions required",
None,
));
}

// Validate the YAML before generating a command.
let validation = crate::server::validate_pipeline_yaml(
&self.app_state,
&perms,
&args.yaml,
Some(crate::server::PipelineMode::Oneshot),
)
.map_err(|e| McpError::internal_error(e, None))?;

let validation_json = serde_json::to_value(&validation)
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?;

if validation_json["valid"] == false {
let pretty = serde_json::to_string_pretty(&validation_json)
.map_err(|e| McpError::internal_error(format!("serialization error: {e}"), None))?;
return Ok(CallToolResult::success(vec![Content::text(format!(
"Pipeline validation failed. Fix the errors before generating a command:\n{pretty}"
))]));
}

let server_url = args.server_url.as_deref().unwrap_or("http://localhost:4545");
let format = args.format.as_deref().unwrap_or("curl");

let command = match format {
"curl" => generate_curl_command(&args.yaml, &args.inputs, &args.output, server_url),
"skit-cli" => {
generate_skit_cli_command(&args.yaml, &args.inputs, &args.output, server_url)
},
other => {
return Err(McpError::invalid_params(
format!("Invalid format '{other}'. Must be 'curl' or 'skit-cli'."),
None,
));
},
};

info!(format, "MCP generate_oneshot_command");

Ok(CallToolResult::success(vec![Content::text(command)]))
}

// -- destroy_session ---------------------------------------------------

#[tool(
Expand Down Expand Up @@ -523,6 +607,114 @@ impl StreamKitMcp {
}
}

// ---------------------------------------------------------------------------
// Command generation helpers
// ---------------------------------------------------------------------------

/// Shell-quote a value by wrapping it in single quotes and escaping any
/// embedded single quotes (`'` → `'\''`).
fn shell_quote(s: &str) -> String {
format!("'{}'", s.replace('\'', "'\\''"))
}

/// Return a heredoc delimiter that does not appear in `content`.
fn unique_heredoc_delimiter(content: &str) -> String {
let base = "PIPELINE_EOF";
if !content.contains(base) {
return base.to_string();
}
for i in 0u32.. {
let candidate = format!("{base}_{i}");
if !content.contains(&candidate) {
return candidate;
}
}
unreachable!()
}

fn generate_curl_command(
yaml: &str,
inputs: &[OneshotInput],
output: &str,
server_url: &str,
) -> String {
use std::fmt::Write;

let delim = unique_heredoc_delimiter(yaml);

let mut cmd = String::new();
let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run curl.");
let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'{delim}'");
let _ = writeln!(cmd, "{yaml}");
let _ = writeln!(cmd, "{delim}");
let _ = writeln!(cmd);
let url = format!("{server_url}/api/v1/process");
let _ = write!(cmd, "curl -X POST {} \\\n -F 'config=</tmp/pipeline.yaml'", shell_quote(&url));
for input in inputs {
let _ =
write!(cmd, " \\\n -F {}", shell_quote(&format!("{}=@{}", input.field, input.path)));
}
let _ = write!(cmd, " \\\n -o {}", shell_quote(output));
cmd
}

fn generate_skit_cli_command(
yaml: &str,
inputs: &[OneshotInput],
output: &str,
server_url: &str,
) -> String {
use std::fmt::Write;

let delim = unique_heredoc_delimiter(yaml);

let mut cmd = String::new();
let _ = writeln!(cmd, "# Save pipeline YAML to a temporary file, then run the CLI.");
let _ = writeln!(cmd, "cat > /tmp/pipeline.yaml <<'{delim}'");
let _ = writeln!(cmd, "{yaml}");
let _ = writeln!(cmd, "{delim}");
let _ = writeln!(cmd);

// The CLI takes one positional input mapped to the "media" field,
// plus optional --input field=path for additional inputs.
let (primary, extras): (Vec<_>, Vec<_>) = inputs.iter().partition(|i| i.field == "media");

if let Some(primary_input) = primary.first() {
let _ = write!(
cmd,
"streamkit-client oneshot /tmp/pipeline.yaml {}",
shell_quote(&primary_input.path)
);
} else if let Some(first) = inputs.first() {
// No input named "media" — use the first as positional and re-add
// it via --input so the server receives the correct field name.
let _ =
write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml {}", shell_quote(&first.path));
} else {
let _ = write!(cmd, "streamkit-client oneshot /tmp/pipeline.yaml <INPUT_FILE>");
}

let _ = write!(cmd, " {}", shell_quote(output));

// Emit --input flags: when a "media" input exists, only extras need
// flags; otherwise all inputs are emitted (the first was used as the
// positional arg but with a non-"media" field name).
if !primary.is_empty() {
for input in &extras {
let _ =
write!(cmd, " --input {}", shell_quote(&format!("{}={}", input.field, input.path)));
}
} else {
for input in inputs {
let _ =
write!(cmd, " --input {}", shell_quote(&format!("{}={}", input.field, input.path)));
}
}
Comment on lines +688 to +712
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 skit-cli command sends duplicate input when no 'media' field exists

When no input has field name "media", the generate_skit_cli_command function uses the first input's path as the CLI's required positional input argument (which the CLI always maps to field "media" per apps/skit-cli/src/main.rs:366), and then also emits ALL inputs as --input field=path flags (lines 688-692). This means the first input file is sent twice: once as "media" (positional) and once with its actual field name. The comment at line 670-671 acknowledges this is intentional to ensure the server receives the correct field name, but the extra "media" multipart part is spurious and could confuse pipelines that don't expect it. Whether this causes a runtime error depends on how the oneshot engine handles unexpected input fields — worth verifying.

Open in Devin Review (Staging)

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation. This is a known limitation of the skit-cli's design — the positional input arg always maps to the "media" field internally. When the user's pipeline uses a different field name, the generated command necessarily sends the file twice: once as the positional (mapped to "media") and once via --input with the correct field name. The oneshot engine ignores unrecognized input fields, so the extra "media" part is harmless but redundant. A cleaner solution would require a CLI change to accept the positional input's field name — out of scope for this PR but worth considering.


let _ = write!(cmd, " --server {}", shell_quote(server_url));
cmd
}
Comment thread
staging-devin-ai-integration[bot] marked this conversation as resolved.

// ---------------------------------------------------------------------------
// ServerHandler trait impl
// ---------------------------------------------------------------------------
Expand All @@ -532,9 +724,11 @@ impl ServerHandler for StreamKitMcp {
let capabilities = ServerCapabilities::builder().enable_tools().build();
let mut info = ServerInfo::new(capabilities).with_instructions(
"StreamKit MCP server. Use list_nodes to discover available \
processing nodes, validate_pipeline to check YAML, and \
processing nodes, validate_pipeline to check YAML, \
create_session / list_sessions / get_pipeline / destroy_session \
to manage dynamic pipeline sessions.",
to manage dynamic pipeline sessions, and \
generate_oneshot_command to get a curl or skit-cli command for \
batch processing via the HTTP data plane.",
);
info.server_info = rmcp::model::Implementation::new("streamkit", env!("CARGO_PKG_VERSION"));
info
Expand Down
Loading