feat(tracing): Add background queue for async span processing#303
Merged
feat(tracing): Add background queue for async span processing#303
Conversation
SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans in self._spans dict on every request but never removed them, since on_span_end() used dict.get() (read-only) instead of dict.pop() (read-and-remove). The only cleanup was in shutdown() which is never called. After this fix, spans are removed from the dict when they complete, preventing unbounded memory growth.
AsyncTrace.start_span() and end_span() previously awaited processor HTTP calls inline, adding 8 blocking round-trips per request. This moves processor calls into a background FIFO queue so callers enqueue and return immediately. - Add AsyncSpanQueue with sequential drain loop, error logging, and graceful shutdown with configurable timeout - Wire shutdown into FastAPI lifespan teardown in base_acp_server - FIFO ordering preserves start-before-end invariant required by SGPAsyncTracingProcessor's internal _spans dict
@patch decorators on _make_processor expired before test bodies ran, so on_span_start/on_span_end hit the real create_span and flush. Refactored to @staticmethod with 'with patch(...)' context managers matching the async test class pattern.
Use explicit dict[str, object] annotation to avoid invariance error when assigning to Span.output (Dict[str, object] | ... | None).
danielmillerp
approved these changes
Apr 3, 2026
Processors like SGPAsyncTracingProcessor mutate span.data in-place via _add_source_to_span. With async background processing, this raced with the caller who holds a reference to the same span object. Deep-copying via model_copy(deep=True) decouples the two.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
AsyncTrace.start_span()andend_span()no longer await processor HTTP calls inline, removing 8 blocking round-trips per requestProblem
Users reported high latency with tracing enabled. Datadog showed significant time spent in
/v5/spans/batch— every span start/end awaited HTTP calls to tracing processors inline. With 4 spans per request and 2 events each, that's 8 blocking network round-trips per request just for telemetry.Solution
New
AsyncSpanQueueclass inspan_queue.pywith:enqueue()viaput_nowait— callers return instantly_spansdict)asyncio.gather(..., return_exceptions=True)— processor failures don't crash the drain loopshutdown(timeout)usingQueue.join(), wired into FastAPI lifespan teardown inbase_acp_server.pyTest plan
Greptile Summary
This PR removes 8 blocking HTTP round-trips per request by introducing
AsyncSpanQueue— a background FIFO drain task that processes span start/end events asynchronously while preserving the start-before-end ordering required by the SGP processor. Graceful drain-before-exit is wired into the FastAPI lifespan viashutdown_default_span_queue.The previous concern about a shared mutable
Spanreference racing with the background drain task has been resolved: bothstart_spanandend_spannow enqueue aspan.model_copy(deep=True), fully decoupling the live span object from the background processor.Key observations:
return_exceptions=True+ per-result logging) are correctly implemented.Queue.join()+ cancel is a sound pattern; the_ensure_drain_runningrestart guard adds resilience if the drain task ever unexpectedly terminates.asyncio.Queue()with nomaxsize), which could lead to uncapped memory growth under sustained processor latency.qsize(), which excludes the one item currently being processed by the drain task — the count can be off by one.30.0-second default timeout is duplicated acrossAsyncSpanQueue.shutdownandshutdown_default_span_queue; extracting it to a named constant would avoid drift.AsyncTraceintegration.Confidence Score: 5/5
Safe to merge; all remaining findings are non-blocking P2 style/hardening suggestions.
No P0 or P1 issues found. The core race condition from the previous review round is addressed by deep-copying spans before enqueue. Shutdown sequencing, FIFO ordering, and error isolation are all correctly implemented. The three P2 comments (unbounded queue, off-by-one warning count, duplicated magic number) are improvements worth making but do not affect correctness or production safety of the feature.
src/agentex/lib/core/tracing/span_queue.py — review the unbounded queue and shutdown warning accuracy.
Important Files Changed
Sequence Diagram
sequenceDiagram participant H as Request Handler participant AT as AsyncTrace participant Q as AsyncSpanQueue participant DL as Drain Task (bg) participant P as TracingProcessor H->>AT: await start_span(name) AT->>AT: build Span + model_copy(deep=True) AT->>Q: enqueue(START, span_copy, processors) Q->>Q: _ensure_drain_running() Q-->>H: return (non-blocking) AT-->>H: return live Span Note over DL,P: Background drain loop DL->>Q: await queue.get() Q-->>DL: _SpanQueueItem(START, span_copy) DL->>P: on_span_start(span_copy) P-->>DL: (HTTP call completes) DL->>Q: task_done() H->>AT: await end_span(span) AT->>AT: snapshot + model_copy(deep=True) AT->>Q: enqueue(END, span_copy, processors) Q-->>H: return (non-blocking) DL->>Q: await queue.get() Q-->>DL: _SpanQueueItem(END, span_copy) DL->>P: on_span_end(span_copy) P-->>DL: (HTTP call completes) DL->>Q: task_done() Note over Q,DL: Graceful shutdown (FastAPI lifespan teardown) Q->>Q: _stopping = True Q->>Q: await queue.join() [timeout=30s] Q->>DL: cancel() DL-->>Q: CancelledError (caught)Prompt To Fix All With AI
Reviews (2): Last reviewed commit: "fix(tracing): Deep-copy spans before enq..." | Re-trigger Greptile
Context used:
Learnt From
scaleapi/scaleapi#126388