feat(stream): real-time NDJSON streaming with throttled display#42
feat(stream): real-time NDJSON streaming with throttled display#42nextlevelshit merged 5 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds real-time tool activity streaming support and improves human-readable output by throttling stream_activity rendering, while extending NDJSON event schemas for better downstream observability (model/adapter + ETA field consistency).
Changes:
- Extend Claude streaming support: parse NDJSON stream events, enrich tool targets (WebFetch/WebSearch/NotebookEdit + heuristic), and bridge tool_use events into pipeline
stream_activityevents. - Add display-layer
ThrottledProgressEmitter(coalesced rendering) and wire it into output emitter factories. - Extend event schema with
model/adapteron step-start events and ensureestimated_time_msalways serializes (noomitempty).
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| cmd/wave/commands/output.go | Wrap progress emitters with ThrottledProgressEmitter for text/quiet/auto modes |
| cmd/wave/commands/output_test.go | Update factory tests to assert throttle wrapper presence and JSON mode behavior |
| internal/display/throttled_emitter.go | New throttling wrapper for stream_activity display events |
| internal/display/throttled_emitter_test.go | Unit tests for throttling behavior, flushing, and concurrency |
| internal/adapter/claude.go | Extend extractToolTarget with new tools + heuristic fallback |
| internal/adapter/claude_test.go | Add table-driven tests for tool target extraction and stream line parsing |
| internal/pipeline/executor.go | Populate step-start events with Model and Adapter metadata |
| internal/pipeline/executor_test.go | Add executor test verifying stream-activity event bridge behavior |
| internal/event/emitter.go | Add model/adapter fields; remove omitempty from estimated_time_ms |
| internal/event/emitter_test.go | Tests for step-start metadata serialization and ETA field presence |
| specs/026-stream-verbosity/** | New/updated spec artifacts (spec/plan/data-model/contracts/checklists/tasks) documenting feature |
| specs/024-add-verbose-flag/**, specs/025-stream-verbosity/plan.md | Additional spec artifacts included in this PR |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (t *ThrottledProgressEmitter) EmitProgress(evt event.Event) error { | ||
| t.mu.Lock() | ||
| defer t.mu.Unlock() | ||
|
|
||
| if evt.State != event.StateStreamActivity { | ||
| // Non-stream-activity events: flush any pending stream event first, | ||
| // then forward immediately. | ||
| if t.pendingStreamActivity != nil { | ||
| pending := t.pendingStreamActivity | ||
| t.pendingStreamActivity = nil | ||
| if t.inner != nil { | ||
| t.inner.EmitProgress(*pending) | ||
| } | ||
| } | ||
| if t.inner != nil { | ||
| return t.inner.EmitProgress(evt) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // stream_activity event handling | ||
| now := time.Now() | ||
|
|
||
| if t.lastStreamActivityTime.IsZero() || now.Sub(t.lastStreamActivityTime) >= t.throttleInterval { | ||
| // First event ever, or throttle window has passed: forward immediately | ||
| t.lastStreamActivityTime = now | ||
| t.pendingStreamActivity = nil | ||
| if t.inner != nil { | ||
| return t.inner.EmitProgress(evt) | ||
| } |
There was a problem hiding this comment.
EmitProgress holds the mutex while calling the inner emitter. If the inner emitter blocks (I/O, rendering) this will serialize all callers and can amplify backpressure during bursts; it also makes it easier to introduce deadlocks if an inner implementation ever calls back into the wrapper. A safer pattern is to decide what to emit while holding the lock, release the lock, then call inner.EmitProgress outside the critical section.
| Message: fmt.Sprintf("Starting %s persona in %s", step.Persona, workspacePath), | ||
| CurrentAction: "Initializing", | ||
| Model: persona.Model, | ||
| Adapter: adapterDef.Binary, |
There was a problem hiding this comment.
The step-start metadata sets Event.Adapter to adapterDef.Binary (the executable path/name used to run the adapter). For FR-010, consumers typically want a stable adapter identifier (e.g., the manifest adapter key like persona.Adapter: "claude", "opencode"), not an install-specific binary path. Consider emitting the adapter key for Adapter, and (if needed) introduce a separate field for the resolved binary path.
| Adapter: adapterDef.Binary, | |
| Adapter: persona.Adapter, |
| ## 3. Event Schema Extension (Step Metadata) | ||
|
|
||
| ### Decision | ||
|
|
||
| Add two new string fields to the `event.Event` struct in `internal/event/emitter.go`: | ||
|
|
||
| ```go | ||
| Model string `json:"model,omitempty"` // Model used for this step | ||
| AdapterType string `json:"adapter_type,omitempty"` // Adapter type (e.g., "claude", "opencode") | ||
| ``` | ||
|
|
||
| Populate these fields in the executor's step-start event (the `"running"` state emission at step begin). The model value comes from `persona.Model` (already available in `AdapterRunConfig.Model`) and the adapter type comes from `adapterDef.Binary` or a normalized adapter name. | ||
|
|
||
| ### Rationale | ||
|
|
There was a problem hiding this comment.
research.md still specifies the step-start event field as AdapterType/adapter_type, but the implemented schema (internal/event/emitter.go + executor.go) uses Adapter/adapter. This inconsistency will confuse consumers reading the docs; update research.md to match the actual Event fields (or explicitly mark it as superseded).
| // T003: TestStreamEventCallback — verify OnStreamEvent invocation via parseStreamLine | ||
| func TestStreamEventCallback(t *testing.T) { | ||
| lines := []struct { | ||
| label string | ||
| data []byte | ||
| wantOK bool | ||
| wantType string | ||
| toolName string | ||
| toolIn string | ||
| }{ | ||
| { | ||
| label: "tool_use event returns true with correct fields", | ||
| data: []byte(`{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Bash","input":{"command":"go test ./..."}}],"usage":{"input_tokens":200,"output_tokens":80}}}`), | ||
| wantOK: true, | ||
| wantType: "tool_use", | ||
| toolName: "Bash", | ||
| toolIn: "go test ./...", | ||
| }, | ||
| { | ||
| label: "second tool_use event with Write", | ||
| data: []byte(`{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Write","input":{"file_path":"/tmp/out.txt"}}],"usage":{"input_tokens":50,"output_tokens":20}}}`), | ||
| wantOK: true, | ||
| wantType: "tool_use", | ||
| toolName: "Write", | ||
| toolIn: "/tmp/out.txt", | ||
| }, | ||
| { | ||
| label: "text event returns true with different Type", | ||
| data: []byte(`{"type":"assistant","message":{"content":[{"type":"text","text":"Analyzing the code..."}],"usage":{}}}`), | ||
| wantOK: true, | ||
| wantType: "text", | ||
| }, | ||
| { | ||
| label: "tool_result event returns false", | ||
| data: []byte(`{"type":"tool_result"}`), | ||
| wantOK: false, | ||
| }, | ||
| { | ||
| label: "system event returns true with Type system", | ||
| data: []byte(`{"type":"system","subtype":"init"}`), | ||
| wantOK: true, | ||
| wantType: "system", | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range lines { | ||
| t.Run(tc.label, func(t *testing.T) { | ||
| evt, ok := parseStreamLine(tc.data) | ||
| if ok != tc.wantOK { | ||
| t.Fatalf("ok = %v, want %v", ok, tc.wantOK) | ||
| } | ||
| if !tc.wantOK { | ||
| return | ||
| } | ||
| if evt.Type != tc.wantType { | ||
| t.Errorf("Type = %q, want %q", evt.Type, tc.wantType) | ||
| } | ||
| if tc.toolName != "" && evt.ToolName != tc.toolName { | ||
| t.Errorf("ToolName = %q, want %q", evt.ToolName, tc.toolName) | ||
| } | ||
| if tc.toolIn != "" && evt.ToolInput != tc.toolIn { | ||
| t.Errorf("ToolInput = %q, want %q", evt.ToolInput, tc.toolIn) | ||
| } | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
TestStreamEventCallback doesn't exercise any OnStreamEvent callback behavior; it only asserts parseStreamLine parsing results. The name/comment are misleading and make it harder to understand what coverage this test provides. Either rename the test to reflect what it actually validates (parseStreamLine parsing), or extend it to cover the adapter's stream scanning path where OnStreamEvent is invoked.
| // T004: TestResultAccumulation — verify result extraction from stream | ||
| func TestResultAccumulation(t *testing.T) { | ||
| t.Run("result with standard token counts", func(t *testing.T) { | ||
| line := []byte(`{"type":"result","usage":{"input_tokens":5000,"output_tokens":2000}}`) | ||
| evt, ok := parseStreamLine(line) | ||
| if !ok { | ||
| t.Fatal("expected ok=true for result event") | ||
| } | ||
| if evt.TokensIn != 5000 { | ||
| t.Errorf("TokensIn = %d, want 5000", evt.TokensIn) | ||
| } | ||
| if evt.TokensOut != 2000 { | ||
| t.Errorf("TokensOut = %d, want 2000", evt.TokensOut) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("result parses correctly after tool_use events", func(t *testing.T) { | ||
| // Parse several tool_use events first — shouldn't affect result parsing | ||
| toolLines := [][]byte{ | ||
| []byte(`{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Read","input":{"file_path":"/a.go"}}],"usage":{"input_tokens":10,"output_tokens":5}}}`), | ||
| []byte(`{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Glob","input":{"pattern":"*.go"}}],"usage":{"input_tokens":20,"output_tokens":10}}}`), | ||
| } | ||
| for _, tl := range toolLines { | ||
| _, ok := parseStreamLine(tl) | ||
| if !ok { | ||
| t.Fatal("expected ok=true for tool_use event") | ||
| } | ||
| } | ||
|
|
||
| // Now parse the result event | ||
| resultLine := []byte(`{"type":"result","usage":{"input_tokens":5000,"output_tokens":2000}}`) | ||
| evt, ok := parseStreamLine(resultLine) | ||
| if !ok { | ||
| t.Fatal("expected ok=true for result event") | ||
| } | ||
| if evt.TokensIn != 5000 { | ||
| t.Errorf("TokensIn = %d, want 5000", evt.TokensIn) | ||
| } | ||
| if evt.TokensOut != 2000 { | ||
| t.Errorf("TokensOut = %d, want 2000", evt.TokensOut) | ||
| } | ||
| }) | ||
|
|
||
| t.Run("result with zero tokens", func(t *testing.T) { | ||
| line := []byte(`{"type":"result","usage":{"input_tokens":0,"output_tokens":0}}`) | ||
| evt, ok := parseStreamLine(line) | ||
| if !ok { | ||
| t.Fatal("expected ok=true for result event") | ||
| } | ||
| if evt.TokensIn != 0 { | ||
| t.Errorf("TokensIn = %d, want 0", evt.TokensIn) | ||
| } | ||
| if evt.TokensOut != 0 { | ||
| t.Errorf("TokensOut = %d, want 0", evt.TokensOut) | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
TestResultAccumulation currently only asserts that parseStreamLine parses a single "result" event; it doesn't validate the adapter's actual result accumulation behavior across a full streamed run (multiple events, then final result extraction into AdapterResult). Consider renaming this test to reflect its current scope or adding an integration-style test around ClaudeAdapter.Run/parseOutput to ensure accumulated AdapterResult fields match expectations.
| # Contract: ThrottledProgressEmitter | ||
|
|
||
| ## Interface | ||
| Implements `ProgressEmitter` interface from `internal/display/types.go`. |
There was a problem hiding this comment.
This contract references the ProgressEmitter interface as coming from internal/display/types.go, but the actual ProgressEmitter interface is defined in internal/event/emitter.go (and ThrottledProgressEmitter correctly imports internal/event). Updating the contract to the correct location will prevent implementers from importing the wrong package/path.
| Implements `ProgressEmitter` interface from `internal/display/types.go`. | |
| Implements `ProgressEmitter` interface from `internal/event/emitter.go`. |
|
|
||
| ## Function Signature | ||
| ```go | ||
| func extractToolTarget(toolName string, input map[string]json.RawMessage) string |
There was a problem hiding this comment.
The documented extractToolTarget signature takes input map[string]json.RawMessage, but the actual implementation (internal/adapter/claude.go) takes input json.RawMessage and unmarshals internally. This mismatch can mislead readers and cause accidental API changes; align the contract signature with the code.
| func extractToolTarget(toolName string, input map[string]json.RawMessage) string | |
| func extractToolTarget(toolName string, input json.RawMessage) string |
|
|
||
| 2. **Generic heuristic fallback for unrecognized tools.** After the explicit `switch` cases, check the tool's input JSON for well-known field names in priority order: | ||
| ``` | ||
| file_path > url > pattern > command > query > notebook_path > path > description |
There was a problem hiding this comment.
The heuristic field priority list here includes path and description, but the implemented heuristic (internal/adapter/claude.go) and the downstream contract/tasks list only file_path, url, pattern, command, query, notebook_path. Align this list with the implemented/contracted behavior to avoid readers assuming additional fields are supported.
| file_path > url > pattern > command > query > notebook_path > path > description | |
| file_path > url > pattern > command > query > notebook_path |
| case OutputFormatText: | ||
| progress := display.NewBasicProgressDisplayWithVerbose(cfg.Verbose) | ||
| throttled := display.NewThrottledProgressEmitter(progress) | ||
| return EmitterResult{ | ||
| Emitter: event.NewProgressOnlyEmitter(progress), | ||
| Progress: progress, | ||
| Emitter: event.NewProgressOnlyEmitter(throttled), | ||
| Progress: throttled, | ||
| Cleanup: func() {}, | ||
| } | ||
|
|
There was a problem hiding this comment.
CreateEmitter now returns Progress as *display.ThrottledProgressEmitter. Downstream code (e.g., cmd/wave/commands/run.go) type-asserts result.Progress to *display.BubbleTeaProgressDisplay to configure the UI (deliverable tracker); with the wrapper this assertion will never succeed, causing a functional regression in BubbleTea mode. Consider returning the inner progress display in EmitterResult.Progress (while still passing the throttled wrapper to the event emitter), or exposing an unwrap/accessor so callers can reach the underlying display when needed.
| if evt.State != event.StateStreamActivity { | ||
| // Non-stream-activity events: flush any pending stream event first, | ||
| // then forward immediately. | ||
| if t.pendingStreamActivity != nil { | ||
| pending := t.pendingStreamActivity | ||
| t.pendingStreamActivity = nil | ||
| if t.inner != nil { | ||
| t.inner.EmitProgress(*pending) | ||
| } | ||
| } | ||
| if t.inner != nil { | ||
| return t.inner.EmitProgress(evt) |
There was a problem hiding this comment.
When flushing a pending stream_activity on a non-stream event, the code emits the pending event but does not update lastStreamActivityTime. This can violate the throttle guarantee (you may emit a flushed stream_activity and then emit another stream_activity less than throttleInterval later). Flushing should count as an emission: update lastStreamActivityTime accordingly, and consider propagating/handling any error returned by the inner emitter instead of ignoring it.
Add Implementation Scope section identifying existing vs. new work, specify throttling architecture (ThrottledProgressEmitter in display layer), extend tool target extraction with generic heuristic fallback, phase ETA data source, and confirm callback-based event bridge pattern.
Spec evolution from 024-add-verbose-flag through 025/026-stream-verbosity with research, data model, plan, tasks, checklists, and contracts.
Replace --output-format json with stream-json for real-time NDJSON streaming of tool calls during pipeline execution. - Add parseStreamLine for NDJSON event parsing (system, tool_use, text, result) - Extend extractToolTarget with WebFetch/WebSearch/NotebookEdit + generic heuristic - Add stream_activity event bridge in executor with pipeline-enriched metadata - Add Model and Adapter fields to step-start events (FR-010) - Fix EstimatedTimeMs to always serialize (remove omitempty for ETA field) - Add ThrottledProgressEmitter for 1s coalesced display rendering - Integrate throttle layer across all output format emitters
daaaca9 to
21355ed
Compare
The bubbletea progress display was still actively rendering with cursor repositioning while deliverables were printed to stderr, causing garbled output. Now cleanup runs explicitly before post-run output, with sync.Once ensuring the deferred safety-net call is a no-op.
Raise parse-time limits from 60/80 to 200 chars so data isn't discarded before reaching the display layer. Display now computes available space from actual terminal width instead of hardcoded limits.
feat(stream): real-time NDJSON streaming with throttled display
… live updates - Rich input display: GitHub/GitLab URLs parsed into "PR #695" / "Issue #42" instead of raw URLs, with forge-agnostic heuristics - Header: run ID inline after title, description with proper spacing - Step details panel positioned between IO and tab bar (not below tabs) - Flattened containers: removed nested borders from tabcontent - Auto-refresh: polling fallback every 5s when SSE unavailable - Running step gantt bar expands over time (animated width) - Humanized JSON keys in smart renderer (snake_case → Title Case) - Removed clip-path shapes on gates/conditionals (visual noise) - Fixed OUT card byte formatting (formatBytes helper) - Title-cased failure class badges - Shortened run IDs with tooltip for full value
…iff, hover states - Pending steps in completed pipelines now show as "skipped" (not "pending") - Step prompt rendered as markdown (headers, code blocks, bold) via marked.js - Changed Files: split-pane layout (file tree left, diff viewer right) - Skipped/pending steps: lighter on hover/focus for readability - Minimal steps (no data from old runs): collapsed with reduced opacity - Tab content now renders BELOW tab bar (was incorrectly above) - Step config panel separate from tab content (both visible simultaneously) - Rich input: forge URL parsing into "PR #695" / "Issue #42" - Humanized JSON keys in smart renderer
Summary
parseStreamLineto parse real-time NDJSON events from Claude Code's--output-format stream-json, handling system, tool_use, text, and result event typesextractToolTargetwith explicit support for WebFetch, WebSearch, NotebookEdit and a generic heuristic fallback for unknown toolsOnStreamEventcallback in the pipeline executor to emit pipeline-enrichedstream_activityevents with step/persona contextModelandAdapterfields to step-start events (FR-010) for downstream consumers to identify which model and adapter are runningThrottledProgressEmitter(1s coalesced rendering) to prevent display flooding during rapid tool call bursts, integrated across all output format emittersomitemptyfromEstimatedTimeMsso zero values serialize consistently in NDJSON outputSpec
See
specs/026-stream-verbosity/for the full specification, plan, data model, tasks, checklists, and contracts.Test plan
go test -race ./...(0 failures)internal/adapter/claude_test.go): Table-driven tests forextractToolTarget(13 cases including edge cases),parseStreamLine(8 event type cases + 1MB stress test), stream event callback verification, result accumulation, and mid-stream termination safetyinternal/event/emitter_test.go): Step-start metadata serialization (model/adapter fields present/absent), ETA field always-present validationinternal/pipeline/executor_test.go): Stream activity event bridge integration test verifying pipeline-enriched metadata, filtering of non-tool_use and empty-ToolName eventsinternal/display/throttled_emitter_test.go+tests/unit/display/): Throttle interval enforcement, pass-through for non-stream events, pending event flush on lifecycle transitions, nil inner safety, concurrent access with race detectorcmd/wave/commands/output_test.go): All emitter factories verified to produceThrottledProgressEmitterwrappers (text, quiet, auto TTY/non-TTY modes)Known limitations
--output-format stream-jsonadapter flag integration (actually switching Claude Code fromjsontostream-json) is not yet wired — this change adds the parsing and display infrastructure. The flag switch itself is a follow-up.NewThrottledProgressEmitterWithIntervalbut not exposed through CLI flags yet.