From 1efe9e15f953f65e4487fc4c74503a85107c5a3f Mon Sep 17 00:00:00 2001 From: vsilent Date: Wed, 15 Apr 2026 20:07:18 +0300 Subject: [PATCH 1/6] feat: add real WebSocket and gRPC streaming transports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace WS stub with real tokio-tungstenite client (ws_fetch_source, ws_send_target, connect_and_stream) - Add gRPC client transport (grpc_fetch_source, grpc_send_target) with tonic/prost and JSON↔prost_types conversion - Add proto/pipe.proto with PipeService (Send + Subscribe RPCs) - Add build.rs for tonic-build protobuf compilation - Route ws:// and grpc:// target URLs in handle_trigger_pipe to appropriate streaming transports - Add tokio-tungstenite, tonic, prost, prost-types dependencies - All 371 lib tests + 19 integration tests passing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .claude/agents/code-reviewer.md | 27 +++ .claude/agents/planner.md | 28 +++ .claude/agents/tester.md | 28 +++ .claude/settings.local.json | 13 ++ CLAUDE.md | 70 ++++++ Cargo.lock | 394 ++++++++++++++++++++++++++++++-- Cargo.toml | 7 + build.rs | 7 + proto/pipe.proto | 31 +++ src/commands/stacker.rs | 18 +- src/transport/grpc_client.rs | 150 ++++++++++++ src/transport/mod.rs | 1 + src/transport/websocket.rs | 94 +++++++- 13 files changed, 837 insertions(+), 31 deletions(-) create mode 100644 .claude/agents/code-reviewer.md create mode 100644 .claude/agents/planner.md create mode 100644 .claude/agents/tester.md create mode 100644 .claude/settings.local.json create mode 100644 CLAUDE.md create mode 100644 build.rs create mode 100644 proto/pipe.proto create mode 100644 src/transport/grpc_client.rs diff --git a/.claude/agents/code-reviewer.md b/.claude/agents/code-reviewer.md new file mode 100644 index 0000000..c0d7c05 --- /dev/null +++ b/.claude/agents/code-reviewer.md @@ -0,0 +1,27 @@ +--- +name: code-reviewer +description: Reviews status panel code for system security, privilege handling, and correctness. +tools: + - Read + - Grep + - Glob +--- + +You are a senior security-focused code reviewer for a privileged system agent. + +This code runs with elevated privileges on deployed servers. Security is paramount. + +Check for: +1. **Command Injection** — all system commands properly sanitized, no shell interpolation +2. **Authentication** — HMAC signature verification on all endpoints +3. **Docker Safety** — container operations validated, no arbitrary image execution +4. **Self-Update Security** — binary integrity verified before replacement +5. **WebSocket Safety** — message validation, connection limits, no data leaks +6. **Resource Limits** — no unbounded memory/CPU usage from user requests +7. **Error Handling** — no system information leaked in error responses +8. **Async Safety** — no blocking calls, proper timeout handling +9. **Feature Flags** — code works correctly in both docker and minimal modes +10. **Test Coverage** — security-critical paths tested + +Output: severity-rated findings with file:line references. +CRITICAL for any command injection or auth bypass. diff --git a/.claude/agents/planner.md b/.claude/agents/planner.md new file mode 100644 index 0000000..48decbe --- /dev/null +++ b/.claude/agents/planner.md @@ -0,0 +1,28 @@ +--- +name: planner +description: Plans changes for the status panel. Understands Axum, WebSocket, Docker management, and system security. +tools: + - Read + - Grep + - Glob + - LS +--- + +You are a senior Rust engineer planning changes for a server-side status panel agent. + +This runs on deployed servers: Axum HTTP/WebSocket server with Docker management, system metrics, and self-update capability. + +1. Research src/lib.rs for core logic and routing +2. Check security patterns (HMAC auth, signature verification) +3. Review Docker integration via Bollard +4. Check feature flags: `docker` vs `minimal` +5. Create a step-by-step implementation plan +6. Identify risks: privilege escalation, command injection, DoS + +RULES: +- NEVER write code. Only plan. +- ALWAYS consider both feature configurations (docker / minimal) +- ALWAYS evaluate security implications — this runs with system privileges +- Flag any changes to command execution or Docker operations +- Consider resource usage — this runs alongside user applications +- Estimate complexity of each step (small / medium / large) diff --git a/.claude/agents/tester.md b/.claude/agents/tester.md new file mode 100644 index 0000000..e8f902a --- /dev/null +++ b/.claude/agents/tester.md @@ -0,0 +1,28 @@ +--- +name: tester +description: Writes and runs tests for the status panel. Tests HTTP routes, security, and self-update with mocks. +tools: + - Read + - Write + - Bash + - Grep + - Glob +--- + +You are a QA engineer for a Rust/Axum system agent running on deployed servers. + +1. Read existing test patterns in tests/ (http_routes, security_integration, self_update_integration) +2. Write new tests following the established patterns +3. Run the FULL test suite: `cargo test` +4. Also test minimal feature: `cargo test --no-default-features --features minimal` +5. Report: what passed, what failed, root cause analysis + +RULES: +- TDD: Write failing test FIRST, then verify it fails, then implement fix +- ALWAYS run full suite: `cargo test` +- ALWAYS test both feature configurations +- Use tower::ServiceExt for Axum handler testing +- Use mockito for external HTTP mocks +- Test security: HMAC validation, invalid signatures, replay attacks +- Test WebSocket connections with tokio-test +- Do NOT modify existing passing tests unless explicitly asked diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..ae596e7 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,13 @@ +{ + "permissions": { + "allow": [ + "Bash(cargo test:*)", + "Bash(cargo build:*)", + "Bash(gh pr:*)", + "Bash(gh api:*)", + "Bash(cargo clippy:*)", + "Bash(cargo update:*)", + "Bash(cargo audit:*)" + ] + } +} diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..0062556 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,70 @@ +# Status Panel + +On-server status panel agent. Runs on deployed servers to report health metrics, manage containers, handle self-updates, and provide a WebSocket interface for real-time monitoring. + +## Tech Stack +- **Language**: Rust (2021 edition) +- **Framework**: Axum 0.8 (with WebSocket support) +- **Async**: Tokio (full features) +- **Docker**: Bollard 0.19 (Docker API via Unix socket, optional) +- **HTTP Client**: reqwest 0.12 (rustls-tls) +- **System Metrics**: sysinfo 0.30 +- **Security**: HMAC-SHA256, ring 0.17 +- **Daemonization**: daemonize 0.5 +- **Testing**: assert_cmd, tokio-test, mockito, tower + +## Project Structure +``` +src/ + main.rs # Binary entry point + lib.rs # Library root (core logic) + test_utils.rs # Shared test utilities +tests/ + http_routes.rs # HTTP route integration tests + security_integration.rs # Security/auth integration tests + self_update_integration.rs # Self-update mechanism tests +examples/ + command_execution.rs # Command execution example +``` + +## Features +- `default = ["docker"]` — includes Docker management via Bollard +- `docker` — Docker container management (Unix socket) +- `minimal` — builds without Docker support + +## Commands +```bash +# Build +cargo build + +# Build minimal (without Docker) +cargo build --no-default-features --features minimal + +# Run tests +cargo test + +# Run tests without Docker feature +cargo test --no-default-features --features minimal + +# Format & lint +cargo fmt +cargo clippy -- -D warnings + +# Run +cargo run --bin status +``` + +## Critical Rules +- NEVER expose system commands without HMAC authentication +- NEVER trust incoming WebSocket data without signature verification +- ALWAYS validate self-update binary integrity before replacing +- Docker socket access is privileged — validate all container operations +- System metrics collection must not block the async runtime +- Test both `docker` and `minimal` feature configurations +- Test with `cargo test` after every change +- DO NOT yet add to repo .claude CLAUDE.md .copilot directories and files + +## Agents +- Use `planner` before any feature work +- Use `tester` after every code change (must run cargo test) +- Use `code-reviewer` before commits — focus on security and system safety \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 54304b2..4d287ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -135,13 +146,40 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ - "axum-core", + "axum-core 0.5.5", "base64", "bytes", "form_urlencoded", @@ -152,7 +190,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -164,13 +202,33 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite", - "tower", + "tokio-tungstenite 0.28.0", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.5.5" @@ -237,7 +295,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-util", "tower-service", @@ -275,6 +333,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" @@ -549,6 +613,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "fnv" version = "1.0.7" @@ -842,7 +912,20 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", + "webpki-roots 1.0.4", +] + +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", ] [[package]] @@ -863,7 +946,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.1", "tokio", "tower-service", "tracing", @@ -1071,6 +1154,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1147,6 +1239,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -1211,6 +1309,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "nix" version = "0.29.0" @@ -1349,6 +1453,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.12.1", +] + [[package]] name = "phf" version = "0.11.3" @@ -1387,6 +1501,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -1450,6 +1584,16 @@ dependencies = [ "termtree", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -1459,6 +1603,58 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" +dependencies = [ + "heck", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "quinn" version = "0.11.9" @@ -1472,8 +1668,8 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", - "thiserror", + "socket2 0.6.1", + "thiserror 2.0.17", "tokio", "tracing", "web-time", @@ -1494,7 +1690,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror", + "thiserror 2.0.17", "tinyvec", "tracing", "web-time", @@ -1509,7 +1705,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.1", "tracing", "windows-sys 0.60.2", ] @@ -1694,14 +1890,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", - "tower", + "tower 0.5.2", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 1.0.4", ] [[package]] @@ -1762,6 +1958,7 @@ version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ + "log", "once_cell", "ring", "rustls-pki-types", @@ -1770,6 +1967,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.13.1" @@ -2036,6 +2242,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.1" @@ -2058,7 +2274,7 @@ version = "0.1.7" dependencies = [ "anyhow", "assert_cmd", - "axum", + "axum 0.8.7", "base64", "bollard", "bytes", @@ -2072,6 +2288,8 @@ dependencies = [ "hyper", "mockito", "nix", + "prost", + "prost-types", "rand 0.8.5", "regex", "reqwest", @@ -2086,10 +2304,13 @@ dependencies = [ "sysinfo", "tempfile", "tera", - "thiserror", + "thiserror 2.0.17", "tokio", "tokio-test", - "tower", + "tokio-tungstenite 0.24.0", + "tonic", + "tonic-build", + "tower 0.5.2", "tower-http", "tracing", "tracing-subscriber", @@ -2195,13 +2416,33 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.17", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2292,7 +2533,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.1", "tokio-macros", "windows-sys 0.61.2", ] @@ -2342,6 +2583,22 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tungstenite 0.24.0", + "webpki-roots 0.26.11", +] + [[package]] name = "tokio-tungstenite" version = "0.28.0" @@ -2351,7 +2608,7 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.28.0", ] [[package]] @@ -2367,6 +2624,72 @@ dependencies = [ "tokio", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.9", + "base64", + "bytes", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-pemfile", + "socket2 0.5.10", + "tokio", + "tokio-rustls", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -2405,7 +2728,7 @@ dependencies = [ "pin-project-lite", "tokio", "tokio-util", - "tower", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -2504,6 +2827,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.8.5", + "rustls", + "rustls-pki-types", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.28.0" @@ -2517,7 +2860,7 @@ dependencies = [ "log", "rand 0.9.2", "sha1", - "thiserror", + "thiserror 2.0.17", "utf-8", ] @@ -2737,6 +3080,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.4", +] + [[package]] name = "webpki-roots" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 6290b3e..40d7bb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,10 @@ uuid = { version = "1", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } serde_yaml = "0.9" futures-util = "0.3" +tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] } +tonic = { version = "0.12", features = ["tls"] } +prost = "0.13" +prost-types = "0.13" tera = "1" tower-http = { version = "0.6", features = ["fs"] } base64 = "0.22" @@ -54,6 +58,9 @@ nix = { version = "0.29", features = ["signal"] } name = "status" path = "src/main.rs" +[build-dependencies] +tonic-build = "0.12" + [dev-dependencies] assert_cmd = "2.0" tokio-test = "0.4" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..a9c616e --- /dev/null +++ b/build.rs @@ -0,0 +1,7 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_server(false) + .build_client(true) + .compile_protos(&["proto/pipe.proto"], &["proto"])?; + Ok(()) +} diff --git a/proto/pipe.proto b/proto/pipe.proto new file mode 100644 index 0000000..bd068bc --- /dev/null +++ b/proto/pipe.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package pipe; + +import "google/protobuf/struct.proto"; + +service PipeService { + // Send data to a pipe target (unary) + rpc Send(PipeMessage) returns (PipeResponse); + + // Subscribe to a pipe source (server-streaming) + rpc Subscribe(SubscribeRequest) returns (stream PipeMessage); +} + +message PipeMessage { + string pipe_instance_id = 1; + string step_id = 2; + google.protobuf.Struct payload = 3; + int64 timestamp_ms = 4; +} + +message PipeResponse { + bool success = 1; + string message = 2; +} + +message SubscribeRequest { + string pipe_instance_id = 1; + string step_id = 2; + map filters = 3; +} diff --git a/src/commands/stacker.rs b/src/commands/stacker.rs index 5aa3335..464424b 100644 --- a/src/commands/stacker.rs +++ b/src/commands/stacker.rs @@ -2138,7 +2138,23 @@ async fn handle_trigger_pipe( let send_result = match target_mode { "external" => { - send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await + if target_value.starts_with("ws://") || target_value.starts_with("wss://") { + crate::transport::websocket::ws_send_target(&target_value, &mapped_data) + .await + .map_err(|e| anyhow::anyhow!(e)) + } else if target_value.starts_with("grpc://") { + let grpc_endpoint = target_value.replacen("grpc://", "http://", 1); + crate::transport::grpc_client::grpc_send_target( + &grpc_endpoint, + &data.pipe_instance_id, + "", + &mapped_data, + ) + .await + .map_err(|e| anyhow::anyhow!(e)) + } else { + send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await + } } "container" => { send_trigger_pipe_container_request( diff --git a/src/transport/grpc_client.rs b/src/transport/grpc_client.rs new file mode 100644 index 0000000..dc9b446 --- /dev/null +++ b/src/transport/grpc_client.rs @@ -0,0 +1,150 @@ +use anyhow::{Context, Result}; +use serde_json::Value; +use tracing::info; + +pub mod pipe_proto { + tonic::include_proto!("pipe"); +} + +use pipe_proto::pipe_service_client::PipeServiceClient; +use pipe_proto::{PipeMessage, SubscribeRequest}; + +/// Subscribe to a gRPC pipe source and read the first message. +pub async fn grpc_fetch_source( + endpoint: &str, + pipe_instance_id: &str, + step_id: &str, +) -> Result { + info!(endpoint, "grpc_fetch_source: connecting"); + let mut client = PipeServiceClient::connect(endpoint.to_string()) + .await + .with_context(|| format!("gRPC connection failed: {endpoint}"))?; + + let request = tonic::Request::new(SubscribeRequest { + pipe_instance_id: pipe_instance_id.to_string(), + step_id: step_id.to_string(), + filters: Default::default(), + }); + + let mut stream = client + .subscribe(request) + .await + .with_context(|| "gRPC subscribe failed")? + .into_inner(); + + match stream.message().await { + Ok(Some(msg)) => { + let payload = msg + .payload + .map(|s| struct_to_json(&s)) + .unwrap_or_else(|| serde_json::json!({})); + Ok(payload) + } + Ok(None) => Err(anyhow::anyhow!("gRPC stream closed without data")), + Err(e) => Err(anyhow::anyhow!("gRPC read error: {e}")), + } +} + +/// Send data to a gRPC pipe target via unary RPC. +pub async fn grpc_send_target( + endpoint: &str, + pipe_instance_id: &str, + step_id: &str, + data: &Value, +) -> Result<(u16, Value)> { + info!(endpoint, "grpc_send_target: connecting"); + let mut client = PipeServiceClient::connect(endpoint.to_string()) + .await + .with_context(|| format!("gRPC connection failed: {endpoint}"))?; + + let payload_struct = json_to_struct(data); + + let request = tonic::Request::new(PipeMessage { + pipe_instance_id: pipe_instance_id.to_string(), + step_id: step_id.to_string(), + payload: Some(payload_struct), + timestamp_ms: chrono::Utc::now().timestamp_millis(), + }); + + let response = client + .send(request) + .await + .with_context(|| "gRPC send failed")? + .into_inner(); + + let status = if response.success { 200 } else { 500 }; + Ok(( + status, + serde_json::json!({ + "grpc_delivered": response.success, + "message": response.message, + }), + )) +} + +// ── Conversion helpers: serde_json ↔ prost_types::Struct ── + +fn json_to_struct(value: &Value) -> prost_types::Struct { + let fields = match value.as_object() { + Some(map) => map + .iter() + .map(|(k, v)| (k.clone(), json_to_prost_value(v))) + .collect(), + None => Default::default(), + }; + prost_types::Struct { fields } +} + +fn json_to_prost_value(value: &Value) -> prost_types::Value { + use prost_types::value::Kind; + let kind = match value { + Value::Null => Kind::NullValue(0), + Value::Bool(b) => Kind::BoolValue(*b), + Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)), + Value::String(s) => Kind::StringValue(s.clone()), + Value::Array(arr) => Kind::ListValue(prost_types::ListValue { + values: arr.iter().map(json_to_prost_value).collect(), + }), + Value::Object(_) => Kind::StructValue(json_to_struct(value)), + }; + prost_types::Value { kind: Some(kind) } +} + +fn struct_to_json(s: &prost_types::Struct) -> Value { + let map: serde_json::Map = s + .fields + .iter() + .map(|(k, v)| (k.clone(), prost_value_to_json(v))) + .collect(); + Value::Object(map) +} + +fn prost_value_to_json(v: &prost_types::Value) -> Value { + use prost_types::value::Kind; + match &v.kind { + Some(Kind::NullValue(_)) => Value::Null, + Some(Kind::BoolValue(b)) => Value::Bool(*b), + Some(Kind::NumberValue(n)) => serde_json::json!(*n), + Some(Kind::StringValue(s)) => Value::String(s.clone()), + Some(Kind::ListValue(list)) => { + Value::Array(list.values.iter().map(prost_value_to_json).collect()) + } + Some(Kind::StructValue(s)) => struct_to_json(s), + None => Value::Null, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_json_struct_roundtrip() { + let original = serde_json::json!({"name": "test", "count": 42, "active": true}); + let proto = json_to_struct(&original); + let back = struct_to_json(&proto); + assert_eq!(back["name"], "test"); + assert_eq!(back["count"], 42.0); + assert_eq!(back["active"], true); + } +} diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 43afae4..fc4271c 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,3 +1,4 @@ +pub mod grpc_client; pub mod http_polling; pub mod retry; pub mod websocket; diff --git a/src/transport/websocket.rs b/src/transport/websocket.rs index 2810ba3..07dfb5b 100644 --- a/src/transport/websocket.rs +++ b/src/transport/websocket.rs @@ -1,11 +1,87 @@ -use anyhow::Result; -use tracing::{debug, info}; - -/// Placeholder for WebSocket streaming (logs/metrics/status). -/// This stub will be replaced with a `tokio_tungstenite` client. -pub async fn connect_and_stream(_ws_url: &str) -> Result<()> { - info!("WebSocket stub: connect_and_stream called"); - // TODO: implement ping/pong heartbeat and reconnection - debug!("Streaming stub active"); +use anyhow::{Context, Result}; +use futures_util::{SinkExt, StreamExt}; +use serde_json::Value; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tracing::{debug, info, warn}; + +/// Connect to a WebSocket endpoint and read the first message as pipe source data. +pub async fn ws_fetch_source(url: &str) -> Result { + info!(url, "ws_fetch_source: connecting"); + let (ws_stream, _) = connect_async(url) + .await + .with_context(|| format!("WebSocket connection failed: {url}"))?; + + let (_write, mut read) = ws_stream.split(); + + match read.next().await { + Some(Ok(Message::Text(text))) => { + debug!(len = text.len(), "ws_fetch_source: received text"); + serde_json::from_str::(&text) + .with_context(|| "ws_fetch_source: failed to parse JSON") + } + Some(Ok(Message::Binary(bin))) => { + debug!(len = bin.len(), "ws_fetch_source: received binary"); + serde_json::from_slice::(&bin) + .with_context(|| "ws_fetch_source: failed to parse binary JSON") + } + Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })), + Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), + None => Err(anyhow::anyhow!( + "ws_fetch_source: stream closed without data" + )), + } +} + +/// Send JSON data to a WebSocket endpoint (pipe target). +pub async fn ws_send_target(url: &str, data: &Value) -> Result<(u16, Value)> { + info!(url, "ws_send_target: connecting"); + let (ws_stream, _) = connect_async(url) + .await + .with_context(|| format!("WebSocket connection failed: {url}"))?; + + let (mut write, _read) = ws_stream.split(); + + let payload = + serde_json::to_string(data).with_context(|| "ws_send_target: failed to serialize")?; + + write + .send(Message::Text(payload)) + .await + .with_context(|| "ws_send_target: failed to send")?; + + info!(url, "ws_send_target: data sent"); + Ok((200, serde_json::json!({"ws_delivered": true}))) +} + +/// Connect to a WebSocket endpoint for streaming logs/metrics/status. +/// Reads messages in a loop until the stream closes or an error occurs. +pub async fn connect_and_stream(ws_url: &str) -> Result<()> { + info!(ws_url, "connect_and_stream: connecting"); + let (ws_stream, _) = connect_async(ws_url) + .await + .with_context(|| format!("WebSocket streaming connection failed: {ws_url}"))?; + + let (_write, mut read) = ws_stream.split(); + + while let Some(msg) = read.next().await { + match msg { + Ok(Message::Text(text)) => { + debug!(len = text.len(), "stream message received"); + } + Ok(Message::Ping(_)) => { + debug!("stream ping received"); + } + Ok(Message::Close(_)) => { + info!("stream closed by server"); + break; + } + Err(e) => { + warn!(error = %e, "stream error"); + break; + } + _ => {} + } + } + Ok(()) } From d5ae04287a3ae5db9a6bf71ed4112b6ea5938f6c Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 17 Apr 2026 11:36:08 +0300 Subject: [PATCH 2/6] .claude/settings.local.json appears to be a local, developer-specific tool-permissions file. Committing it can unintentionally standardize local permissions or leak workflow assumptions. Consider removing it from version control and adding .claude/ (or at least settings.local.json) to .gitignore, or rename to a non-local template. Fix --- .claude/agents/code-reviewer.md | 27 --------------------------- .claude/agents/planner.md | 28 ---------------------------- .claude/agents/tester.md | 28 ---------------------------- .claude/settings.local.json | 13 ------------- 4 files changed, 96 deletions(-) delete mode 100644 .claude/agents/code-reviewer.md delete mode 100644 .claude/agents/planner.md delete mode 100644 .claude/agents/tester.md delete mode 100644 .claude/settings.local.json diff --git a/.claude/agents/code-reviewer.md b/.claude/agents/code-reviewer.md deleted file mode 100644 index c0d7c05..0000000 --- a/.claude/agents/code-reviewer.md +++ /dev/null @@ -1,27 +0,0 @@ ---- -name: code-reviewer -description: Reviews status panel code for system security, privilege handling, and correctness. -tools: - - Read - - Grep - - Glob ---- - -You are a senior security-focused code reviewer for a privileged system agent. - -This code runs with elevated privileges on deployed servers. Security is paramount. - -Check for: -1. **Command Injection** — all system commands properly sanitized, no shell interpolation -2. **Authentication** — HMAC signature verification on all endpoints -3. **Docker Safety** — container operations validated, no arbitrary image execution -4. **Self-Update Security** — binary integrity verified before replacement -5. **WebSocket Safety** — message validation, connection limits, no data leaks -6. **Resource Limits** — no unbounded memory/CPU usage from user requests -7. **Error Handling** — no system information leaked in error responses -8. **Async Safety** — no blocking calls, proper timeout handling -9. **Feature Flags** — code works correctly in both docker and minimal modes -10. **Test Coverage** — security-critical paths tested - -Output: severity-rated findings with file:line references. -CRITICAL for any command injection or auth bypass. diff --git a/.claude/agents/planner.md b/.claude/agents/planner.md deleted file mode 100644 index 48decbe..0000000 --- a/.claude/agents/planner.md +++ /dev/null @@ -1,28 +0,0 @@ ---- -name: planner -description: Plans changes for the status panel. Understands Axum, WebSocket, Docker management, and system security. -tools: - - Read - - Grep - - Glob - - LS ---- - -You are a senior Rust engineer planning changes for a server-side status panel agent. - -This runs on deployed servers: Axum HTTP/WebSocket server with Docker management, system metrics, and self-update capability. - -1. Research src/lib.rs for core logic and routing -2. Check security patterns (HMAC auth, signature verification) -3. Review Docker integration via Bollard -4. Check feature flags: `docker` vs `minimal` -5. Create a step-by-step implementation plan -6. Identify risks: privilege escalation, command injection, DoS - -RULES: -- NEVER write code. Only plan. -- ALWAYS consider both feature configurations (docker / minimal) -- ALWAYS evaluate security implications — this runs with system privileges -- Flag any changes to command execution or Docker operations -- Consider resource usage — this runs alongside user applications -- Estimate complexity of each step (small / medium / large) diff --git a/.claude/agents/tester.md b/.claude/agents/tester.md deleted file mode 100644 index e8f902a..0000000 --- a/.claude/agents/tester.md +++ /dev/null @@ -1,28 +0,0 @@ ---- -name: tester -description: Writes and runs tests for the status panel. Tests HTTP routes, security, and self-update with mocks. -tools: - - Read - - Write - - Bash - - Grep - - Glob ---- - -You are a QA engineer for a Rust/Axum system agent running on deployed servers. - -1. Read existing test patterns in tests/ (http_routes, security_integration, self_update_integration) -2. Write new tests following the established patterns -3. Run the FULL test suite: `cargo test` -4. Also test minimal feature: `cargo test --no-default-features --features minimal` -5. Report: what passed, what failed, root cause analysis - -RULES: -- TDD: Write failing test FIRST, then verify it fails, then implement fix -- ALWAYS run full suite: `cargo test` -- ALWAYS test both feature configurations -- Use tower::ServiceExt for Axum handler testing -- Use mockito for external HTTP mocks -- Test security: HMAC validation, invalid signatures, replay attacks -- Test WebSocket connections with tokio-test -- Do NOT modify existing passing tests unless explicitly asked diff --git a/.claude/settings.local.json b/.claude/settings.local.json deleted file mode 100644 index ae596e7..0000000 --- a/.claude/settings.local.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "permissions": { - "allow": [ - "Bash(cargo test:*)", - "Bash(cargo build:*)", - "Bash(gh pr:*)", - "Bash(gh api:*)", - "Bash(cargo clippy:*)", - "Bash(cargo update:*)", - "Bash(cargo audit:*)" - ] - } -} From e58eb4501fea24ea2b0c95cdb937cc4103532c80 Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 17 Apr 2026 11:37:02 +0300 Subject: [PATCH 3/6] ignore claude files --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 1f8702f..57cc9e1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ __pycache__ .DS_Store .ai target -.env \ No newline at end of file +.env +.claude \ No newline at end of file From f3c1af6b96bc4e81e25b60a45610268142a12a65 Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 17 Apr 2026 12:05:36 +0300 Subject: [PATCH 4/6] fix reviewer comments --- Cargo.lock | 82 +++--------------- Cargo.toml | 2 +- build.rs | 1 + src/commands/stacker.rs | 155 ++++++++++++++++++++++++++++++++--- src/transport/grpc_client.rs | 68 +++++++++++---- src/transport/websocket.rs | 77 ++++++++++++----- 6 files changed, 270 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d287ca..b78f7e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -202,7 +202,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite 0.28.0", + "tokio-tungstenite", "tower 0.5.2", "tower-layer", "tower-service", @@ -295,7 +295,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror 2.0.17", + "thiserror", "tokio", "tokio-util", "tower-service", @@ -333,12 +333,6 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.11.1" @@ -1669,7 +1663,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2 0.6.1", - "thiserror 2.0.17", + "thiserror", "tokio", "tracing", "web-time", @@ -1690,7 +1684,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror 2.0.17", + "thiserror", "tinyvec", "tracing", "web-time", @@ -2304,10 +2298,10 @@ dependencies = [ "sysinfo", "tempfile", "tera", - "thiserror 2.0.17", + "thiserror", "tokio", "tokio-test", - "tokio-tungstenite 0.24.0", + "tokio-tungstenite", "tonic", "tonic-build", "tower 0.5.2", @@ -2416,33 +2410,13 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" -[[package]] -name = "thiserror" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" -dependencies = [ - "thiserror-impl 1.0.69", -] - [[package]] name = "thiserror" version = "2.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" dependencies = [ - "thiserror-impl 2.0.17", -] - -[[package]] -name = "thiserror-impl" -version = "1.0.69" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "thiserror-impl", ] [[package]] @@ -2585,9 +2559,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.24.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" dependencies = [ "futures-util", "log", @@ -2595,22 +2569,10 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls", - "tungstenite 0.24.0", + "tungstenite", "webpki-roots 0.26.11", ] -[[package]] -name = "tokio-tungstenite" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d25a406cddcc431a75d3d9afc6a7c0f7428d4891dd973e4d54c56b46127bf857" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite 0.28.0", -] - [[package]] name = "tokio-util" version = "0.7.17" @@ -2827,26 +2789,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tungstenite" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" -dependencies = [ - "byteorder", - "bytes", - "data-encoding", - "http", - "httparse", - "log", - "rand 0.8.5", - "rustls", - "rustls-pki-types", - "sha1", - "thiserror 1.0.69", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.28.0" @@ -2859,8 +2801,10 @@ dependencies = [ "httparse", "log", "rand 0.9.2", + "rustls", + "rustls-pki-types", "sha1", - "thiserror 2.0.17", + "thiserror", "utf-8", ] diff --git a/Cargo.toml b/Cargo.toml index 40d7bb4..b221e64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ uuid = { version = "1", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } serde_yaml = "0.9" futures-util = "0.3" -tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] } +tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } tonic = { version = "0.12", features = ["tls"] } prost = "0.13" prost-types = "0.13" diff --git a/build.rs b/build.rs index a9c616e..ab76254 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,5 @@ fn main() -> Result<(), Box> { + println!("cargo:rerun-if-changed=proto/pipe.proto"); tonic_build::configure() .build_server(false) .build_client(true) diff --git a/src/commands/stacker.rs b/src/commands/stacker.rs index 464424b..14557b4 100644 --- a/src/commands/stacker.rs +++ b/src/commands/stacker.rs @@ -196,6 +196,130 @@ mod trigger_pipe_handler_tests { Some("trigger_pipe requires input_data or source_container") ); } + + #[tokio::test] + async fn handle_trigger_pipe_routes_ws_target() { + // Use a port that is not listening so the WS connection fails fast + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "pipe-ws-1".into(), + input_data: Some(json!({ "key": "value" })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: Some("ws://127.0.0.1:19999".into()), + target_container: None, + target_endpoint: "/ws-target".into(), + target_method: "POST".into(), + field_mapping: None, + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured result"); + + // WS connection will fail (nothing listening), so we expect a failed status + assert_eq!(result.status, "failed"); + let error = result.error.as_deref().unwrap_or(""); + assert!( + error.contains("WebSocket") || error.contains("Connection refused"), + "expected WS connection error, got: {error}" + ); + } + + #[tokio::test] + async fn handle_trigger_pipe_routes_grpc_target() { + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "pipe-grpc-1".into(), + input_data: Some(json!({ "key": "value" })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: Some("grpc://127.0.0.1:19998".into()), + target_container: None, + target_endpoint: "/grpc-target".into(), + target_method: "POST".into(), + field_mapping: None, + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured result"); + + // gRPC connection will fail (nothing listening), so we expect a failed status + assert_eq!(result.status, "failed"); + let error = result.error.as_deref().unwrap_or(""); + assert!( + error.contains("gRPC") || error.contains("connection") || error.contains("connect"), + "expected gRPC connection error, got: {error}" + ); + } + + #[tokio::test] + async fn handle_trigger_pipe_routes_grpcs_target() { + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "pipe-grpcs-1".into(), + input_data: Some(json!({ "key": "value" })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: Some("grpcs://127.0.0.1:19997".into()), + target_container: None, + target_endpoint: "/".into(), + target_method: "POST".into(), + field_mapping: None, + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured result"); + + // grpcs:// should be routed to gRPC transport (fails on connect) + assert_eq!(result.status, "failed"); + let error = result.error.as_deref().unwrap_or(""); + assert!( + error.contains("gRPC") || error.contains("connection") || error.contains("connect"), + "expected gRPC connection error for grpcs://, got: {error}" + ); + } + + #[tokio::test] + async fn handle_trigger_pipe_grpc_rejects_empty_pipe_instance_id() { + let agent_cmd = make_trigger_agent_command(); + let data = TriggerPipeCommand { + deployment_hash: "dep-123".into(), + pipe_instance_id: "".into(), + input_data: Some(json!({ "key": "value" })), + source_container: None, + source_endpoint: "/".into(), + source_method: "GET".into(), + target_url: Some("grpc://127.0.0.1:19996".into()), + target_container: None, + target_endpoint: "/".into(), + target_method: "POST".into(), + field_mapping: None, + trigger_type: "manual".into(), + }; + + let result = handle_trigger_pipe(&agent_cmd, &data) + .await + .expect("trigger_pipe should return structured result"); + + assert_eq!(result.status, "failed"); + let error = result.error.as_deref().unwrap_or(""); + assert!( + error.contains("non-empty"), + "expected empty step_id error, got: {error}" + ); + } } impl std::fmt::Display for ContainerRuntime { @@ -2142,16 +2266,27 @@ async fn handle_trigger_pipe( crate::transport::websocket::ws_send_target(&target_value, &mapped_data) .await .map_err(|e| anyhow::anyhow!(e)) - } else if target_value.starts_with("grpc://") { - let grpc_endpoint = target_value.replacen("grpc://", "http://", 1); - crate::transport::grpc_client::grpc_send_target( - &grpc_endpoint, - &data.pipe_instance_id, - "", - &mapped_data, - ) - .await - .map_err(|e| anyhow::anyhow!(e)) + } else if target_value.starts_with("grpc://") || target_value.starts_with("grpcs://") { + let grpc_endpoint = if target_value.starts_with("grpcs://") { + target_value.replacen("grpcs://", "https://", 1) + } else { + target_value.replacen("grpc://", "http://", 1) + }; + let step_id = data.pipe_instance_id.trim(); + if step_id.is_empty() { + Err(anyhow::anyhow!( + "trigger_pipe gRPC target requires a non-empty pipe_instance_id for step_id" + )) + } else { + crate::transport::grpc_client::grpc_send_target( + &grpc_endpoint, + &data.pipe_instance_id, + step_id, + &mapped_data, + ) + .await + .map_err(|e| anyhow::anyhow!(e)) + } } else { send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await } diff --git a/src/transport/grpc_client.rs b/src/transport/grpc_client.rs index dc9b446..973a9cf 100644 --- a/src/transport/grpc_client.rs +++ b/src/transport/grpc_client.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; use serde_json::Value; +use std::time::Duration; use tracing::info; pub mod pipe_proto { @@ -9,6 +10,9 @@ pub mod pipe_proto { use pipe_proto::pipe_service_client::PipeServiceClient; use pipe_proto::{PipeMessage, SubscribeRequest}; +const GRPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); +const GRPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); + /// Subscribe to a gRPC pipe source and read the first message. pub async fn grpc_fetch_source( endpoint: &str, @@ -16,10 +20,16 @@ pub async fn grpc_fetch_source( step_id: &str, ) -> Result { info!(endpoint, "grpc_fetch_source: connecting"); - let mut client = PipeServiceClient::connect(endpoint.to_string()) + let channel = tonic::transport::Endpoint::from_shared(endpoint.to_string()) + .with_context(|| format!("invalid gRPC endpoint: {endpoint}"))? + .connect_timeout(GRPC_CONNECT_TIMEOUT) + .timeout(GRPC_REQUEST_TIMEOUT) + .connect() .await .with_context(|| format!("gRPC connection failed: {endpoint}"))?; + let mut client = PipeServiceClient::new(channel); + let request = tonic::Request::new(SubscribeRequest { pipe_instance_id: pipe_instance_id.to_string(), step_id: step_id.to_string(), @@ -53,11 +63,18 @@ pub async fn grpc_send_target( data: &Value, ) -> Result<(u16, Value)> { info!(endpoint, "grpc_send_target: connecting"); - let mut client = PipeServiceClient::connect(endpoint.to_string()) + let channel = tonic::transport::Endpoint::from_shared(endpoint.to_string()) + .with_context(|| format!("invalid gRPC endpoint: {endpoint}"))? + .connect_timeout(GRPC_CONNECT_TIMEOUT) + .timeout(GRPC_REQUEST_TIMEOUT) + .connect() .await .with_context(|| format!("gRPC connection failed: {endpoint}"))?; - let payload_struct = json_to_struct(data); + let mut client = PipeServiceClient::new(channel); + + let payload_struct = + json_to_struct(data).with_context(|| "failed to convert payload to gRPC Struct")?; let request = tonic::Request::new(PipeMessage { pipe_instance_id: pipe_instance_id.to_string(), @@ -84,30 +101,45 @@ pub async fn grpc_send_target( // ── Conversion helpers: serde_json ↔ prost_types::Struct ── -fn json_to_struct(value: &Value) -> prost_types::Struct { +fn json_to_struct(value: &Value) -> Result { let fields = match value.as_object() { Some(map) => map .iter() - .map(|(k, v)| (k.clone(), json_to_prost_value(v))) - .collect(), - None => Default::default(), + .map(|(k, v)| Ok((k.clone(), json_to_prost_value(v)?))) + .collect::>()?, + None => { + return Err(anyhow::anyhow!( + "gRPC Struct conversion requires a JSON object, got: {}", + match value { + Value::Array(_) => "array", + Value::String(_) => "string", + Value::Number(_) => "number", + Value::Bool(_) => "bool", + Value::Null => "null", + _ => "unknown", + } + )); + } }; - prost_types::Struct { fields } + Ok(prost_types::Struct { fields }) } -fn json_to_prost_value(value: &Value) -> prost_types::Value { +fn json_to_prost_value(value: &Value) -> Result { use prost_types::value::Kind; let kind = match value { Value::Null => Kind::NullValue(0), Value::Bool(b) => Kind::BoolValue(*b), - Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)), + Value::Number(n) => Kind::NumberValue( + n.as_f64() + .ok_or_else(|| anyhow::anyhow!("number {n} cannot be represented as f64"))?, + ), Value::String(s) => Kind::StringValue(s.clone()), Value::Array(arr) => Kind::ListValue(prost_types::ListValue { - values: arr.iter().map(json_to_prost_value).collect(), + values: arr.iter().map(json_to_prost_value).collect::>()?, }), - Value::Object(_) => Kind::StructValue(json_to_struct(value)), + Value::Object(_) => Kind::StructValue(json_to_struct(value)?), }; - prost_types::Value { kind: Some(kind) } + Ok(prost_types::Value { kind: Some(kind) }) } fn struct_to_json(s: &prost_types::Struct) -> Value { @@ -141,10 +173,18 @@ mod tests { #[test] fn test_json_struct_roundtrip() { let original = serde_json::json!({"name": "test", "count": 42, "active": true}); - let proto = json_to_struct(&original); + let proto = json_to_struct(&original).unwrap(); let back = struct_to_json(&proto); assert_eq!(back["name"], "test"); assert_eq!(back["count"], 42.0); assert_eq!(back["active"], true); } + + #[test] + fn test_json_to_struct_rejects_non_object() { + assert!(json_to_struct(&serde_json::json!("string")).is_err()); + assert!(json_to_struct(&serde_json::json!(42)).is_err()); + assert!(json_to_struct(&serde_json::json!([1, 2])).is_err()); + assert!(json_to_struct(&serde_json::json!(null)).is_err()); + } } diff --git a/src/transport/websocket.rs b/src/transport/websocket.rs index 07dfb5b..39a609f 100644 --- a/src/transport/websocket.rs +++ b/src/transport/websocket.rs @@ -1,9 +1,12 @@ use anyhow::{Context, Result}; use futures_util::{SinkExt, StreamExt}; use serde_json::Value; +use std::time::Duration; use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{debug, info, warn}; +const WS_READ_TIMEOUT: Duration = Duration::from_secs(30); + /// Connect to a WebSocket endpoint and read the first message as pipe source data. pub async fn ws_fetch_source(url: &str) -> Result { info!(url, "ws_fetch_source: connecting"); @@ -11,24 +14,49 @@ pub async fn ws_fetch_source(url: &str) -> Result { .await .with_context(|| format!("WebSocket connection failed: {url}"))?; - let (_write, mut read) = ws_stream.split(); + let (mut write, mut read) = ws_stream.split(); - match read.next().await { - Some(Ok(Message::Text(text))) => { - debug!(len = text.len(), "ws_fetch_source: received text"); - serde_json::from_str::(&text) - .with_context(|| "ws_fetch_source: failed to parse JSON") - } - Some(Ok(Message::Binary(bin))) => { - debug!(len = bin.len(), "ws_fetch_source: received binary"); - serde_json::from_slice::(&bin) - .with_context(|| "ws_fetch_source: failed to parse binary JSON") + loop { + let msg = tokio::time::timeout(WS_READ_TIMEOUT, read.next()) + .await + .with_context(|| format!("ws_fetch_source: timed out after {WS_READ_TIMEOUT:?}"))?; + + match msg { + Some(Ok(Message::Text(text))) => { + debug!(len = text.len(), "ws_fetch_source: received text"); + return serde_json::from_str::(&text) + .with_context(|| "ws_fetch_source: failed to parse JSON"); + } + Some(Ok(Message::Binary(bin))) => { + debug!(len = bin.len(), "ws_fetch_source: received binary"); + return serde_json::from_slice::(&bin) + .with_context(|| "ws_fetch_source: failed to parse binary JSON"); + } + Some(Ok(Message::Ping(payload))) => { + debug!(len = payload.len(), "ws_fetch_source: received ping"); + write + .send(Message::Pong(payload)) + .await + .with_context(|| "ws_fetch_source: failed to send pong")?; + } + Some(Ok(Message::Pong(_))) => { + debug!("ws_fetch_source: received pong, ignoring"); + } + Some(Ok(Message::Close(frame))) => { + return Err(anyhow::anyhow!( + "ws_fetch_source: stream closed before data: {frame:?}" + )); + } + Some(Ok(other)) => { + debug!(message = %other, "ws_fetch_source: ignoring non-data frame"); + } + Some(Err(e)) => return Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), + None => { + return Err(anyhow::anyhow!( + "ws_fetch_source: stream closed without data" + )) + } } - Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })), - Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), - None => Err(anyhow::anyhow!( - "ws_fetch_source: stream closed without data" - )), } } @@ -45,7 +73,7 @@ pub async fn ws_send_target(url: &str, data: &Value) -> Result<(u16, Value)> { serde_json::to_string(data).with_context(|| "ws_send_target: failed to serialize")?; write - .send(Message::Text(payload)) + .send(Message::Text(payload.into())) .await .with_context(|| "ws_send_target: failed to send")?; @@ -61,18 +89,25 @@ pub async fn connect_and_stream(ws_url: &str) -> Result<()> { .await .with_context(|| format!("WebSocket streaming connection failed: {ws_url}"))?; - let (_write, mut read) = ws_stream.split(); + let (mut write, mut read) = ws_stream.split(); while let Some(msg) = read.next().await { match msg { Ok(Message::Text(text)) => { debug!(len = text.len(), "stream message received"); } - Ok(Message::Ping(_)) => { - debug!("stream ping received"); + Ok(Message::Ping(payload)) => { + debug!(len = payload.len(), "stream ping received"); + if let Err(e) = write.send(Message::Pong(payload)).await { + warn!(error = %e, "failed to send pong"); + break; + } } - Ok(Message::Close(_)) => { + Ok(Message::Close(frame)) => { info!("stream closed by server"); + if let Err(e) = write.send(Message::Close(frame)).await { + warn!(error = %e, "failed to acknowledge close frame"); + } break; } Err(e) => { From 742cfca8305ee14e8719849a8a56cc843cdb5dda Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 17 Apr 2026 20:56:57 +0300 Subject: [PATCH 5/6] protoc deps --- Cargo.lock | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + build.rs | 3 +++ 3 files changed, 69 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index b78f7e8..a7e8ba6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,6 +1649,70 @@ dependencies = [ "prost", ] +[[package]] +name = "protoc-bin-vendored" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1c381df33c98266b5f08186583660090a4ffa0889e76c7e9a5e175f645a67fa" +dependencies = [ + "protoc-bin-vendored-linux-aarch_64", + "protoc-bin-vendored-linux-ppcle_64", + "protoc-bin-vendored-linux-s390_64", + "protoc-bin-vendored-linux-x86_32", + "protoc-bin-vendored-linux-x86_64", + "protoc-bin-vendored-macos-aarch_64", + "protoc-bin-vendored-macos-x86_64", + "protoc-bin-vendored-win32", +] + +[[package]] +name = "protoc-bin-vendored-linux-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c350df4d49b5b9e3ca79f7e646fde2377b199e13cfa87320308397e1f37e1a4c" + +[[package]] +name = "protoc-bin-vendored-linux-ppcle_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55a63e6c7244f19b5c6393f025017eb5d793fd5467823a099740a7a4222440c" + +[[package]] +name = "protoc-bin-vendored-linux-s390_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dba5565db4288e935d5330a07c264a4ee8e4a5b4a4e6f4e83fad824cc32f3b0" + +[[package]] +name = "protoc-bin-vendored-linux-x86_32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8854774b24ee28b7868cd71dccaae8e02a2365e67a4a87a6cd11ee6cdbdf9cf5" + +[[package]] +name = "protoc-bin-vendored-linux-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b38b07546580df720fa464ce124c4b03630a6fb83e05c336fea2a241df7e5d78" + +[[package]] +name = "protoc-bin-vendored-macos-aarch_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89278a9926ce312e51f1d999fee8825d324d603213344a9a706daa009f1d8092" + +[[package]] +name = "protoc-bin-vendored-macos-x86_64" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81745feda7ccfb9471d7a4de888f0652e806d5795b61480605d4943176299756" + +[[package]] +name = "protoc-bin-vendored-win32" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95067976aca6421a523e491fce939a3e65249bac4b977adee0ee9771568e8aa3" + [[package]] name = "quinn" version = "0.11.9" @@ -2284,6 +2348,7 @@ dependencies = [ "nix", "prost", "prost-types", + "protoc-bin-vendored", "rand 0.8.5", "regex", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index b221e64..04c4701 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ path = "src/main.rs" [build-dependencies] tonic-build = "0.12" +protoc-bin-vendored = "3" [dev-dependencies] assert_cmd = "2.0" diff --git a/build.rs b/build.rs index ab76254..586c11b 100644 --- a/build.rs +++ b/build.rs @@ -1,5 +1,8 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=proto/pipe.proto"); + // Vendor protoc so builds work without a system-installed protoc + let _protoc = protoc_bin_vendored::protoc_bin_path().expect("vendored protoc not found"); + std::env::set_var("PROTOC", &_protoc); tonic_build::configure() .build_server(false) .build_client(true) From b942b7db0e89bea59391614037efedcae3208b11 Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 17 Apr 2026 23:19:14 +0300 Subject: [PATCH 6/6] clippy blockers fix --- src/commands/stacker.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/commands/stacker.rs b/src/commands/stacker.rs index 14557b4..a878528 100644 --- a/src/commands/stacker.rs +++ b/src/commands/stacker.rs @@ -5474,10 +5474,8 @@ fn extract_html_forms(html: &str, page_path: &str) -> Vec { for attr_cap in attr_re.captures_iter(attrs_str) { match attr_cap[1].to_lowercase().as_str() { "id" => id = attr_cap[2].to_string(), - "name" => { - if id.is_empty() { - id = attr_cap[2].to_string(); - } + "name" if id.is_empty() => { + id = attr_cap[2].to_string(); } "action" => action = attr_cap[2].to_string(), "method" => method = attr_cap[2].to_uppercase(),