feat: add real WebSocket and gRPC streaming transports#90
feat: add real WebSocket and gRPC streaming transports#90vsilent merged 8 commits intotrydirect:masterfrom
Conversation
vsilent
commented
Apr 16, 2026
- Replace WS stub with real tokio-tungstenite client (ws_fetch_source, ws_send_target, connect_and_stream)
- Add gRPC client transport (grpc_fetch_source, grpc_send_target) with tonic/prost and JSON↔prost_types conversion
- Add proto/pipe.proto with PipeService (Send + Subscribe RPCs)
- Add build.rs for tonic-build protobuf compilation
- Route ws:// and grpc:// target URLs in handle_trigger_pipe to appropriate streaming transports
- Add tokio-tungstenite, tonic, prost, prost-types dependencies
- All 371 lib tests + 19 integration tests passing
- Replace WS stub with real tokio-tungstenite client (ws_fetch_source, ws_send_target, connect_and_stream) - Add gRPC client transport (grpc_fetch_source, grpc_send_target) with tonic/prost and JSON↔prost_types conversion - Add proto/pipe.proto with PipeService (Send + Subscribe RPCs) - Add build.rs for tonic-build protobuf compilation - Route ws:// and grpc:// target URLs in handle_trigger_pipe to appropriate streaming transports - Add tokio-tungstenite, tonic, prost, prost-types dependencies - All 371 lib tests + 19 integration tests passing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR implements real streaming transports for the “trigger_pipe” mechanism by adding WebSocket (tokio-tungstenite) and gRPC (tonic/prost) client implementations, plus protobuf generation wiring, and routes ws:///grpc:// targets to the new transports.
Changes:
- Replaced the WebSocket transport stub with real client send/fetch/stream functions using tokio-tungstenite.
- Added a tonic-based gRPC client transport with a new
PipeServiceproto and JSON ↔prost_types::Structconversion helpers. - Updated trigger-pipe routing to dispatch external targets based on URL scheme (
ws://,wss://,grpc://) and added build/dependency plumbing for proto compilation.
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
src/transport/websocket.rs |
Implements real WS client functions (ws_fetch_source, ws_send_target, connect_and_stream). |
src/transport/mod.rs |
Exposes the new grpc_client module. |
src/transport/grpc_client.rs |
Adds tonic client + JSON/prost-types conversion helpers + a small unit test. |
src/commands/stacker.rs |
Routes external trigger_pipe targets to WS or gRPC transports based on URL prefix. |
proto/pipe.proto |
Defines PipeService with Send and Subscribe RPCs plus message types. |
build.rs |
Adds tonic-build protobuf compilation for pipe.proto. |
Cargo.toml |
Adds tokio-tungstenite/tonic/prost deps and tonic-build build dependency. |
Cargo.lock |
Locks new dependency graph (including additional versions pulled by new deps). |
CLAUDE.md |
Adds repository guidance / agent workflow documentation. |
.claude/settings.local.json |
Adds local tool-permissions configuration for Claude tooling. |
.claude/agents/tester.md |
Adds a “tester” agent spec for local automation. |
.claude/agents/planner.md |
Adds a “planner” agent spec for local automation. |
.claude/agents/code-reviewer.md |
Adds a “code-reviewer” agent spec for local automation. |
| let (_write, mut read) = ws_stream.split(); | ||
|
|
||
| while let Some(msg) = read.next().await { | ||
| match msg { | ||
| Ok(Message::Text(text)) => { | ||
| debug!(len = text.len(), "stream message received"); | ||
| } | ||
| Ok(Message::Ping(_)) => { | ||
| debug!("stream ping received"); | ||
| } | ||
| Ok(Message::Close(_)) => { | ||
| info!("stream closed by server"); |
There was a problem hiding this comment.
connect_and_stream also drops the write half and only logs pings. Tungstenite does not automatically send Pong responses, so this can cause the server to close an otherwise healthy stream. Keep write and send Message::Pong(...) when receiving Message::Ping(...) (and consider handling Close handshake).
| let (_write, mut read) = ws_stream.split(); | |
| while let Some(msg) = read.next().await { | |
| match msg { | |
| Ok(Message::Text(text)) => { | |
| debug!(len = text.len(), "stream message received"); | |
| } | |
| Ok(Message::Ping(_)) => { | |
| debug!("stream ping received"); | |
| } | |
| Ok(Message::Close(_)) => { | |
| info!("stream closed by server"); | |
| let (mut write, mut read) = ws_stream.split(); | |
| while let Some(msg) = read.next().await { | |
| match msg { | |
| Ok(Message::Text(text)) => { | |
| debug!(len = text.len(), "stream message received"); | |
| } | |
| Ok(Message::Ping(payload)) => { | |
| debug!(len = payload.len(), "stream ping received"); | |
| if let Err(e) = write.send(Message::Pong(payload)).await { | |
| warn!(error = %e, "failed to send pong"); | |
| break; | |
| } | |
| } | |
| Ok(Message::Close(frame)) => { | |
| info!("stream closed by server"); | |
| if let Err(e) = write.send(Message::Close(frame)).await { | |
| warn!(error = %e, "failed to acknowledge close frame"); | |
| } |
| let kind = match value { | ||
| Value::Null => Kind::NullValue(0), | ||
| Value::Bool(b) => Kind::BoolValue(*b), | ||
| Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)), | ||
| Value::String(s) => Kind::StringValue(s.clone()), |
There was a problem hiding this comment.
json_to_prost_value converts numbers via as_f64().unwrap_or(0.0), which silently corrupts values it can’t represent (e.g., large integers become 0.0). Prefer failing the conversion with a clear error (propagate Result) or at least preserve intent (e.g., stringify) rather than silently substituting 0.0.
| if target_value.starts_with("ws://") || target_value.starts_with("wss://") { | ||
| crate::transport::websocket::ws_send_target(&target_value, &mapped_data) | ||
| .await | ||
| .map_err(|e| anyhow::anyhow!(e)) | ||
| } else if target_value.starts_with("grpc://") { | ||
| let grpc_endpoint = target_value.replacen("grpc://", "http://", 1); | ||
| crate::transport::grpc_client::grpc_send_target( | ||
| &grpc_endpoint, | ||
| &data.pipe_instance_id, | ||
| "", | ||
| &mapped_data, | ||
| ) | ||
| .await | ||
| .map_err(|e| anyhow::anyhow!(e)) | ||
| } else { | ||
| send_trigger_pipe_request(&target_value, &data.target_method, &mapped_data).await |
There was a problem hiding this comment.
New ws:// and grpc:// branches in handle_trigger_pipe are not covered by tests, but this file already has trigger_pipe_handler_tests. Add tests that exercise URL routing for WS and gRPC targets (at least verifying the correct transport function is invoked / correct error handling), so regressions are caught.
| fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| tonic_build::configure() | ||
| .build_server(false) | ||
| .build_client(true) | ||
| .compile_protos(&["proto/pipe.proto"], &["proto"])?; |
There was a problem hiding this comment.
Adding a build script that compiles protos introduces an implicit dependency on protoc being available in build environments (including CI and cross-compiles). Consider using a vendored protoc (e.g., via protoc-bin-vendored) or clearly documenting/enforcing the requirement; otherwise builds can fail unexpectedly.
| chrono = { version = "0.4", features = ["serde"] } | ||
| serde_yaml = "0.9" | ||
| futures-util = "0.3" | ||
| tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] } |
There was a problem hiding this comment.
tokio-tungstenite = 0.24 introduces a second tokio-tungstenite/tungstenite version alongside Axum’s dependency (Cargo.lock shows 0.28 as well). This increases binary size and can cause subtle type/version mismatches. Prefer aligning your direct dependency version with the one pulled in by axum (or otherwise justify the pin).
| tokio-tungstenite = { version = "0.24", features = ["rustls-tls-webpki-roots"] } | |
| tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] } |
| crate::transport::grpc_client::grpc_send_target( | ||
| &grpc_endpoint, | ||
| &data.pipe_instance_id, | ||
| "", | ||
| &mapped_data, | ||
| ) | ||
| .await | ||
| .map_err(|e| anyhow::anyhow!(e)) |
There was a problem hiding this comment.
The gRPC path always passes an empty step_id to grpc_send_target. If step_id is required for routing/observability on the receiver side, this will break downstream behavior. Consider deriving step_id from existing TriggerPipeCommand fields (or extend the command schema) and fail early if it’s missing rather than sending an empty string.
| crate::transport::grpc_client::grpc_send_target( | |
| &grpc_endpoint, | |
| &data.pipe_instance_id, | |
| "", | |
| &mapped_data, | |
| ) | |
| .await | |
| .map_err(|e| anyhow::anyhow!(e)) | |
| let step_id = data.pipe_instance_id.trim(); | |
| if step_id.is_empty() { | |
| Err(anyhow::anyhow!( | |
| "trigger_pipe gRPC target requires a non-empty step_id" | |
| )) | |
| } else { | |
| crate::transport::grpc_client::grpc_send_target( | |
| &grpc_endpoint, | |
| &data.pipe_instance_id, | |
| step_id, | |
| &mapped_data, | |
| ) | |
| .await | |
| .map_err(|e| anyhow::anyhow!(e)) | |
| } |
| } else if target_value.starts_with("grpc://") { | ||
| let grpc_endpoint = target_value.replacen("grpc://", "http://", 1); |
There was a problem hiding this comment.
grpc:// targets are rewritten to http:// unconditionally, which forces plaintext even though the dependency enables TLS. Consider supporting grpcs:// (map to https://) and/or honoring grpc:// vs grpcs:// so operators can choose secure defaults.
| } else if target_value.starts_with("grpc://") { | |
| let grpc_endpoint = target_value.replacen("grpc://", "http://", 1); | |
| } else if target_value.starts_with("grpc://") | |
| || target_value.starts_with("grpcs://") | |
| { | |
| let grpc_endpoint = if target_value.starts_with("grpcs://") { | |
| target_value.replacen("grpcs://", "https://", 1) | |
| } else { | |
| target_value.replacen("grpc://", "http://", 1) | |
| }; |
| fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| tonic_build::configure() | ||
| .build_server(false) | ||
| .build_client(true) | ||
| .compile_protos(&["proto/pipe.proto"], &["proto"])?; |
There was a problem hiding this comment.
build.rs doesn’t emit any cargo:rerun-if-changed=... directives. This causes Cargo to rerun the build script more often than necessary, and makes incremental builds slower. Add println!("cargo:rerun-if-changed=proto/pipe.proto"); (and for any imported protos if applicable).
| "allow": [ | ||
| "Bash(cargo test:*)", | ||
| "Bash(cargo build:*)", | ||
| "Bash(gh pr:*)", | ||
| "Bash(gh api:*)", | ||
| "Bash(cargo clippy:*)", | ||
| "Bash(cargo update:*)", | ||
| "Bash(cargo audit:*)" | ||
| ] |
There was a problem hiding this comment.
.claude/settings.local.json appears to be a local, developer-specific tool-permissions file. Committing it can unintentionally standardize local permissions or leak workflow assumptions. Consider removing it from version control and adding .claude/ (or at least settings.local.json) to .gitignore, or rename to a non-local template (e.g., settings.json.example).
| "allow": [ | |
| "Bash(cargo test:*)", | |
| "Bash(cargo build:*)", | |
| "Bash(gh pr:*)", | |
| "Bash(gh api:*)", | |
| "Bash(cargo clippy:*)", | |
| "Bash(cargo update:*)", | |
| "Bash(cargo audit:*)" | |
| ] | |
| "allow": [] |
| let (_write, mut read) = ws_stream.split(); | ||
|
|
||
| match read.next().await { | ||
| Some(Ok(Message::Text(text))) => { | ||
| debug!(len = text.len(), "ws_fetch_source: received text"); | ||
| serde_json::from_str::<Value>(&text) | ||
| .with_context(|| "ws_fetch_source: failed to parse JSON") | ||
| } | ||
| Some(Ok(Message::Binary(bin))) => { | ||
| debug!(len = bin.len(), "ws_fetch_source: received binary"); | ||
| serde_json::from_slice::<Value>(&bin) | ||
| .with_context(|| "ws_fetch_source: failed to parse binary JSON") | ||
| } | ||
| Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })), | ||
| Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), | ||
| None => Err(anyhow::anyhow!( | ||
| "ws_fetch_source: stream closed without data" | ||
| )), | ||
| } |
There was a problem hiding this comment.
ws_fetch_source drops the write half of the socket (let (_write, mut read) = ...), so it cannot respond to Ping frames. If the server sends a Ping before the first data message (common with some gateways), the connection can be closed and this function may return a raw ping/close instead of the expected payload. Keep the write half and explicitly reply with Pong (and ignore/continue on Ping/other control frames until a data message arrives).
| let (_write, mut read) = ws_stream.split(); | |
| match read.next().await { | |
| Some(Ok(Message::Text(text))) => { | |
| debug!(len = text.len(), "ws_fetch_source: received text"); | |
| serde_json::from_str::<Value>(&text) | |
| .with_context(|| "ws_fetch_source: failed to parse JSON") | |
| } | |
| Some(Ok(Message::Binary(bin))) => { | |
| debug!(len = bin.len(), "ws_fetch_source: received binary"); | |
| serde_json::from_slice::<Value>(&bin) | |
| .with_context(|| "ws_fetch_source: failed to parse binary JSON") | |
| } | |
| Some(Ok(other)) => Ok(serde_json::json!({ "raw": other.to_string() })), | |
| Some(Err(e)) => Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), | |
| None => Err(anyhow::anyhow!( | |
| "ws_fetch_source: stream closed without data" | |
| )), | |
| } | |
| let (mut write, mut read) = ws_stream.split(); | |
| while let Some(msg) = read.next().await { | |
| match msg { | |
| Ok(Message::Text(text)) => { | |
| debug!(len = text.len(), "ws_fetch_source: received text"); | |
| return serde_json::from_str::<Value>(&text) | |
| .with_context(|| "ws_fetch_source: failed to parse JSON"); | |
| } | |
| Ok(Message::Binary(bin)) => { | |
| debug!(len = bin.len(), "ws_fetch_source: received binary"); | |
| return serde_json::from_slice::<Value>(&bin) | |
| .with_context(|| "ws_fetch_source: failed to parse binary JSON"); | |
| } | |
| Ok(Message::Ping(payload)) => { | |
| debug!(len = payload.len(), "ws_fetch_source: received ping"); | |
| write | |
| .send(Message::Pong(payload)) | |
| .await | |
| .with_context(|| "ws_fetch_source: failed to send pong")?; | |
| } | |
| Ok(Message::Pong(payload)) => { | |
| debug!(len = payload.len(), "ws_fetch_source: received pong"); | |
| } | |
| Ok(Message::Close(frame)) => { | |
| return Err(anyhow::anyhow!( | |
| "ws_fetch_source: stream closed before data: {:?}", | |
| frame | |
| )); | |
| } | |
| Ok(other) => { | |
| debug!(message = %other.to_string(), "ws_fetch_source: ignoring non-data frame"); | |
| } | |
| Err(e) => return Err(anyhow::anyhow!("ws_fetch_source read error: {e}")), | |
| } | |
| } | |
| Err(anyhow::anyhow!( | |
| "ws_fetch_source: stream closed without data" | |
| )) |
… tool-permissions file. Committing it can unintentionally standardize local permissions or leak workflow assumptions. Consider removing it from version control and adding .claude/ (or at least settings.local.json) to .gitignore, or rename to a non-local template. Fix
…status into feature/streaming-pipes