🔧 fix: Capture anthropic streaming with dedicated logic#192
🔧 fix: Capture anthropic streaming with dedicated logic#192jpmcb merged 3 commits intopapercomputeco:mainfrom
Conversation
|
| Filename | Overview |
|---|---|
| pkg/capture/anthropic.go | New Anthropic reducer: correctly classifies content-type to route SSE vs one-shot, uses io.TeeReader for zero-copy client forwarding, and handles partial captures gracefully by returning diagnostic metadata instead of errors. |
| pkg/capture/anthropic_state.go | Solid SSE state machine: index-keyed block map handles out-of-order events, input_json_delta fragments are correctly assembled at content_block_stop, and all partial-capture scenarios (mid-stream error, truncated EOF, malformed tool input) are surfaced via Extra rather than swallowed. |
| pkg/capture/reducer.go | Clean registry pattern with RWMutex for thread safety; panic-on-duplicate registration gives loud init-time failure for build bugs; ErrUnknownProvider wraps correctly for errors.Is. |
| ingest/ingest.go | Good sentinel error / status-code split (400/422/502) for single-turn ingest. handleBatchIngest is missing all Prometheus metric calls introduced by this PR, creating a monitoring blind spot; it also returns 202 even when 0 of N turns succeed. |
| ingest/metrics.go | Per-server registry avoids global Prometheus state leaking between test runs; closed IngestResult enum prevents label typos; ExponentialBucketsRange for body-byte histogram is well-sized for expected 256 B–16 MB range. |
| proxy/proxy.go | handleSSEStreamViaCapture correctly tees httpResp.Body to pw so the client sees data in real-time while capture.Reduce assembles the canonical response; context.Background() is appropriate since the goroutine outlives the fiber handler. |
| pkg/capture/anthropic_events.go | Wire-shape types closely mirror the Anthropic docs streaming format; unmarshalStrict wraps errors with event type context for actionable operator messages; oneshotResponse duplicates anthropic/types.go intentionally to keep pkg/capture free of provider dependency. |
| ingest/contract_test.go | DescribeTable covers the 400/422/202 status matrix for single-turn ingest; Consistently assertion for the no-row cases prevents false positives from async workers. |
| pkg/capture/canonical_equivalence_test.go | Equivalence test correctly uses merkle.NewNode (the actual production hash path) rather than field comparison, covering text, tool_use, and thinking fixture pairs. |
Sequence Diagram
sequenceDiagram
participant Client
participant Proxy
participant Upstream as Anthropic API
participant Capture as pkg/capture
participant Worker as Worker Pool
Client->>Proxy: POST /v1/messages (stream:true)
Proxy->>Upstream: Forward request (context.Background())
Upstream-->>Proxy: 200 text/event-stream
Proxy->>Proxy: io.TeeReader(httpResp.Body, pw)
note over Proxy: goroutine: handleSSEStreamViaCapture
loop SSE events (tee: client & reducer)
Upstream-->>Proxy: SSE frame (message_start / content_block_* / message_stop)
Proxy-->>Client: Forward raw SSE chunk (via pw)
Proxy->>Capture: dispatchStreamEvent → turnState
end
Capture-->>Proxy: *llm.ChatResponse (finalize)
Proxy->>Worker: Enqueue(Job{Req, Resp})
note over Proxy: defer pw.Close() → EOF to client
Comments Outside Diff (1)
-
ingest/ingest.go, line 283-285 (link)Fully-rejected batch still returns HTTP 202
When every turn in a batch fails
processTurn,result.Accepted == 0andresult.Rejected == len(payload.Turns), but the handler still returnsfiber.StatusAccepted(202). Callers checking only the HTTP status code cannot distinguish "all turns stored" from "all turns silently discarded." The partial-failure test iningest_test.goasserts 202 for a mix, which is reasonable, but a total failure is semantically different and would benefit from a 4xx (e.g., 422) so callers that don't parse the body get an unambiguous signal.Prompt To Fix With AI
This is a comment left during a code review. Path: ingest/ingest.go Line: 283-285 Comment: **Fully-rejected batch still returns HTTP 202** When every turn in a batch fails `processTurn`, `result.Accepted == 0` and `result.Rejected == len(payload.Turns)`, but the handler still returns `fiber.StatusAccepted` (202). Callers checking only the HTTP status code cannot distinguish "all turns stored" from "all turns silently discarded." The partial-failure test in `ingest_test.go` asserts 202 for a mix, which is reasonable, but a total failure is semantically different and would benefit from a 4xx (e.g., 422) so callers that don't parse the body get an unambiguous signal. How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix All With AI
This is a comment left during a code review.
Path: ingest/ingest.go
Line: 260-285
Comment:
**Batch ingest silently skips all Prometheus metrics**
`handleBatchIngest` never calls `s.metrics.ObserveWrite(...)`, `s.metrics.ObserveDAGLatency(...)`, or `s.recordProcessTurnError(...)`. Every turn accepted or rejected via the batch endpoint is invisible to the Prometheus scrape, which will cause dashboards and alerts built on `tapes_ingest_writes_total` to under-count actual traffic whenever the batch path is used.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: ingest/ingest.go
Line: 283-285
Comment:
**Fully-rejected batch still returns HTTP 202**
When every turn in a batch fails `processTurn`, `result.Accepted == 0` and `result.Rejected == len(payload.Turns)`, but the handler still returns `fiber.StatusAccepted` (202). Callers checking only the HTTP status code cannot distinguish "all turns stored" from "all turns silently discarded." The partial-failure test in `ingest_test.go` asserts 202 for a mix, which is reasonable, but a total failure is semantically different and would benefit from a 4xx (e.g., 422) so callers that don't parse the body get an unambiguous signal.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "🧹 chore: route Anthropic streaming thro..." | Re-trigger Greptile
| @@ -159,21 +284,47 @@ func (s *Server) handleBatchIngest(c *fiber.Ctx) error { | |||
| return c.Status(fiber.StatusAccepted).JSON(result) | |||
| } | |||
There was a problem hiding this comment.
Batch ingest silently skips all Prometheus metrics
handleBatchIngest never calls s.metrics.ObserveWrite(...), s.metrics.ObserveDAGLatency(...), or s.recordProcessTurnError(...). Every turn accepted or rejected via the batch endpoint is invisible to the Prometheus scrape, which will cause dashboards and alerts built on tapes_ingest_writes_total to under-count actual traffic whenever the batch path is used.
Prompt To Fix With AI
This is a comment left during a code review.
Path: ingest/ingest.go
Line: 260-285
Comment:
**Batch ingest silently skips all Prometheus metrics**
`handleBatchIngest` never calls `s.metrics.ObserveWrite(...)`, `s.metrics.ObserveDAGLatency(...)`, or `s.recordProcessTurnError(...)`. Every turn accepted or rejected via the batch endpoint is invisible to the Prometheus scrape, which will cause dashboards and alerts built on `tapes_ingest_writes_total` to under-count actual traffic whenever the batch path is used.
How can I resolve this? If you propose a fix, please make it concise.
jpmcb
left a comment
There was a problem hiding this comment.
Very nice work on this: tricky stuff. Just one architecture comment on expecting init and the global registry state.
| func init() { | ||
| Register(ProviderAnthropic, NewAnthropicReducer()) | ||
| } |
There was a problem hiding this comment.
We shouldn't use init: when debugging or tracking down issues, it's really hard to track what's happening when a "magic" init fires from a package and you end up needing global async state mechanisms (like the registry) to track that. init also can make testing difficult since it runs as soon as a package is imported.
We should simplify and just expect the caller to handle it in their call loops.
This would also eliminate the need for a package scoped registry of reducers with a RW mutex. The proxy or other consumers could just handle it like:
anthReducer := capture.NewAnthropicReducer()
openAIReducer := capture.NewOpenAIReducer()
// etc. etc. - inside loop of handling SSE messages
authReducer.Reduce()
openAIReducer.Reduce()or if we needed to be fancy, could just have it managed in a slice of reducers that the SSE handler manages itself:
// during proxy bootstrapping ...
reducers := []capture.Reducer{}
reducers = append(reducers, capture.NewAnthropicReducer())
reducers = append(reducers, capture.NewOpenAIReducer())
// etc. etc. - inside loop of handling SSE messages
for _, r := range reducers {
r.Reduce()
}The only case where init can be really useful is if we provide some sort of "package" mechanism for people to bring their own reducer that automatically gets init'd / imported ... but I can't imagine we'll need that since there are a fairly limited number of LLM providers and most have aligned with OpenAI's API shape.
There was a problem hiding this comment.
This is great! I've pushed an update to refactor to avoid the init path.
3c2963f to
9a6fc0e
Compare
For four weeks the cloud capture sidecar silently rejected every streaming response. The hand-rolled reducer there had three compounding bugs — body chunks overwriting instead of appending, dispatch firing on the first SSE frame instead of EndOfStream, and json.Marshal errors swallowed into empty POST bodies. The local proxy had its own working reducer the whole time, so neither side saw the other's regression. Lifting the local proxy's logic into a shared library means the cloud sidecar (and any future capture site) can't drift again. pkg/capture introduces a Reducer interface, an Anthropic Messages reducer that walks the full streaming wire format (message_start → content_block_start/delta/stop → message_delta → message_stop, plus ping keep-alives and mid-stream error events), and the LimitedBuffer-free in-memory state machine that produces a canonical *llm.ChatResponse on end-of-stream. Callers construct reducers explicitly via NewAnthropicReducer (no init() side effects, no global registry); each consumer holds its own provider→reducer map so dispatch is visible at the call site. input_json_delta fragments are concatenated per-block and parsed on content_block_stop so tool_use blocks carry a structured ToolInput. Thinking blocks accumulate thinking_delta + signature_delta into ContentBlock.Thinking / ThinkingSignature (those fields are added to the ContentBlock type here, and the existing non-streaming anthropic.ParseResponse gets a matching case so both paths represent thinking content identically). Partial captures — mid-stream error, EOF before message_stop, malformed tool_input JSON — return a ChatResponse with diagnostic metadata in Extra (error, incomplete, tool_input_parse_errors) rather than an error. Callers see what happened instead of silence. ToolInput stays nil on parse failure so downstream consumers checking ToolInput != nil don't mistake a broken payload for a valid tool call. Per-turn memory scales with the upstream's output and is NOT bounded inside the reducer. Anthropic's current hard output ceilings (128K tokens for Opus 4.7, model-dependent otherwise) put realistic plain-text turns under ~5 MB on the wire; tool-use turns with large input_json_delta payloads can push higher. Callers that need a hard ceiling should impose one themselves; sidecar deployments track turn size via metrics rather than enforcing a cap so the full tape is preserved. Streamed and non-streamed captures of the same turn must produce the same merkle node so the content-addressed DAG dedups them. A DescribeTable golden test across three fixture pairs — text, tool_use, thinking — builds a merkle.Bucket from each reduction (the exact projection proxy/worker/pool.go makes in production) and asserts the two sides produce identical node hashes via merkle.NewNode. No separate canonical encoder needed — node.go's jsontext.Value.Canonicalize (RFC 8785 JCS) is the one canonicalization in the codebase. stop_sequence is surfaced via Extra; CreatedAt is set at finalize time in both paths so the streamed and non-streamed envelopes have consistent time semantics (CreatedAt lives outside the Bucket, so it doesn't affect the hash). The fixtures under pkg/capture/testdata/anthropic/ are hand-crafted against the streaming specification; they exercise the state machine against the wire format as documented, not against recorded production traffic. A deliberate refresh against a sandbox key is a manual follow-up when someone has access — not promised by this change. Signed-off-by: Matt Yeazel <matt@papercompute.com>
…s query Every ingest failure used to collapse to HTTP 400 — malformed envelope, unknown provider, worker-queue saturation, all indistinguishable in the access log. That collapse is one of the mechanisms that kept the four-week capture regression invisible. Splits the handler into three failure classes via sentinel errors: 400 for malformed envelope, 422 for unprocessable turn (unknown provider, parse failure), 502 for downstream (worker queue full, DAG outage). errors.Is-based classification means new call sites can't accidentally regress. Lands a Prometheus metric set on a fresh per-server registry with a scrapeable /metrics endpoint mounted next to /ping: writes_total by provider and outcome, dag_write_seconds histogram, worker queue-depth gauge, body_bytes histogram. The IngestResult enumeration is closed so dashboards can't split on a typo. A contract_test.go DescribeTable enumerates the full status-code × shape matrix with post-condition assertions on the in-memory driver — adding a new status transition forces adding a row. Also adds GET /v1/ingest/nodes?agent= as a minimal verification surface for the streaming-capture e2e and the staging canary. Filters driver.List by Bucket.AgentName, returns a count+items envelope. O(N) and not intended as a general-purpose query surface — the tapes-api remains the right place for rich reads — but enough to confirm "a row for this canary run landed" without standing up the full query stack. Signed-off-by: Matt Yeazel <matt@papercompute.com>
…roxy The local proxy had a bespoke Anthropic SSE reducer in proxy.go since long before this work. It was never broken — it's the implementation that didn't drift. Lifting that logic into pkg/capture (the prior commit) gives the cloud sidecar a place to consume the same code, and this commit closes the loop by retiring the local copy: when the resolved provider has a reducer registered in pkg/capture (today: anthropic only), handleSSEStream tees the upstream body through capture.Reduce on end-of-stream. The client pipe still receives chunks as they arrive — we stream directly into Reduce rather than buffering the full body into an intermediate []byte first. OpenAI and Ollama stay on the legacy in-file extraction helpers until they migrate into the shared library too. The Anthropic branches in extractContentFromJSON / extractUsageFromSSE were dead once anthropic routed through capture; they're deleted here and the matching proxy_usage_test.go cases retired (coverage moved into pkg/capture's reducer suite). No user-visible behavior change. proxy_capture_test.go asserts that an Anthropic SSE stream routed through the proxy produces both verbatim chunk forwarding AND a canonical assistant node in the DAG. A split-TCP-write regression test covers the legacy reader for events that straddle multiple flushes. Signed-off-by: Matt Yeazel <matt@papercompute.com>
9a6fc0e to
03b33ec
Compare
|
No docs update needed. This PR fixes Anthropic streaming bugs and refactors SSE handling into a shared PR #192 was merged: 🔧 fix: Capture anthropic streaming with dedicated logic |
Summary
Streaming responses from Anthropic Messages don't quite work as expected in non-local environments. The cloud sidecar reducer had three compounding bugs — body chunks overwriting instead of appending, dispatch firing on the first SSE frame instead of
EndOfStream, and ajson.Marshalerror being swallowed into an empty POST body. The local proxy has its own working reducer but the two capture paths never shared code, so local would work and the cloud path didn't.This PR fixes the regression and prevents its recurrence by consolidating streaming reduction into one library and tightening ingest visibility:
pkg/capture(new): Shared streaming-reduction library. Anthropic is the first provider implemented; the state machine walks the full Messages wire format — text,tool_usewithinput_json_deltareassembly, extended thinking with signature, ping keep-alives, mid-streamerrorevents, EOF beforemessage_stop.tapes-ingesthygiene: Every ingest failure used to collapse to HTTP 400. Splits responses into 400 / 422 / 502 via sentinel errors so we can distinguish envelope errors from unprocessable turns from downstream outages. Adds a Prometheus metric set on/metrics, aGET /v1/ingest/nodes?agent=verification surface for e2e tests, and acontract_test.goDescribeTableenumerating the full status-code × shape matrix.Local proxy: Routes Anthropic SSE through
capture.Reduce. This allows both local and cloud paths to use the same code.TL;DR is we now look for the full stream before trying to create the turn so that it will successfully parse into the data structure we expect. Since different providers will likely have different logic/implementations, this breaks the code out into a
capturepackage that can be used for each provider.Testing
GOEXPERIMENT=jsonv2 go test -race -count=1 ./pkg/capture/...(24 specs: state machine across text / tool_use / thinking / mid-stream error / truncated stream, registry thread safety, canonical equivalence across 3 fixture pairs)GOEXPERIMENT=jsonv2 go test -race -count=1 ./ingest/...(status-code matrixDescribeTable, Prometheus scrape assertions,GET /v1/ingest/nodes?agent=filter + empty-result tests)GOEXPERIMENT=jsonv2 go test -race -count=1 ./proxy/...(Anthropic-via-captureend-to-end, split-TCP-write regression for the legacy SSE reader, existing OpenAI/Ollama suites unchanged)GOEXPERIMENT=jsonv2 go build ./...clean