Skip to content

feat(tracing): Add background queue for async span processing#303

Merged
smoreinis merged 6 commits intomainfrom
stas/async-span-queue
Apr 3, 2026
Merged

feat(tracing): Add background queue for async span processing#303
smoreinis merged 6 commits intomainfrom
stas/async-span-queue

Conversation

@smoreinis
Copy link
Copy Markdown
Contributor

@smoreinis smoreinis commented Apr 3, 2026

Summary

  • Add background FIFO queue for async span processing — AsyncTrace.start_span() and end_span() no longer await processor HTTP calls inline, removing 8 blocking round-trips per request
  • Wire graceful shutdown into FastAPI lifespan so the queue drains before server exit

Problem

Users reported high latency with tracing enabled. Datadog showed significant time spent in /v5/spans/batch — every span start/end awaited HTTP calls to tracing processors inline. With 4 spans per request and 2 events each, that's 8 blocking network round-trips per request just for telemetry.

Solution

New AsyncSpanQueue class in span_queue.py with:

  • Synchronous enqueue() via put_nowait — callers return instantly
  • Background drain task processing events sequentially (FIFO ordering preserves SGP processor's start-before-end invariant for its internal _spans dict)
  • Error logging via asyncio.gather(..., return_exceptions=True) — processor failures don't crash the drain loop
  • Graceful shutdown(timeout) using Queue.join(), wired into FastAPI lifespan teardown in base_acp_server.py

Test plan

  • 7 tests for span queue (non-blocking enqueue, FIFO ordering, error resilience, shutdown drain, timeout, post-shutdown safety, AsyncTrace integration)
  • Full test suite passes (598 passed, 959 skipped)
  • Lint + typecheck clean

Greptile Summary

This PR removes 8 blocking HTTP round-trips per request by introducing AsyncSpanQueue — a background FIFO drain task that processes span start/end events asynchronously while preserving the start-before-end ordering required by the SGP processor. Graceful drain-before-exit is wired into the FastAPI lifespan via shutdown_default_span_queue.

The previous concern about a shared mutable Span reference racing with the background drain task has been resolved: both start_span and end_span now enqueue a span.model_copy(deep=True), fully decoupling the live span object from the background processor.

Key observations:

  • The FIFO invariant and error isolation (return_exceptions=True + per-result logging) are correctly implemented.
  • Graceful shutdown via Queue.join() + cancel is a sound pattern; the _ensure_drain_running restart guard adds resilience if the drain task ever unexpectedly terminates.
  • The queue is unbounded (asyncio.Queue() with no maxsize), which could lead to uncapped memory growth under sustained processor latency.
  • The shutdown warning logs qsize(), which excludes the one item currently being processed by the drain task — the count can be off by one.
  • The 30.0-second default timeout is duplicated across AsyncSpanQueue.shutdown and shutdown_default_span_queue; extracting it to a named constant would avoid drift.
  • Test coverage is solid: 7 tests covering non-blocking enqueueing, FIFO ordering, error resilience, drain-on-shutdown, timeout, post-shutdown drops, and end-to-end AsyncTrace integration.

Confidence Score: 5/5

Safe to merge; all remaining findings are non-blocking P2 style/hardening suggestions.

No P0 or P1 issues found. The core race condition from the previous review round is addressed by deep-copying spans before enqueue. Shutdown sequencing, FIFO ordering, and error isolation are all correctly implemented. The three P2 comments (unbounded queue, off-by-one warning count, duplicated magic number) are improvements worth making but do not affect correctness or production safety of the feature.

src/agentex/lib/core/tracing/span_queue.py — review the unbounded queue and shutdown warning accuracy.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/span_queue.py New AsyncSpanQueue class; well-structured FIFO drain loop with error isolation and graceful shutdown, but the queue is unbounded, the shutdown warning miscounts in-flight items, and the 30.0s timeout magic number is repeated.
src/agentex/lib/core/tracing/trace.py Async processor calls replaced with span_queue.enqueue(); span is deep-copied before enqueuing (addressing previous mutable-reference concern), and the optional span_queue parameter threads through cleanly.
src/agentex/lib/core/tracing/tracer.py Minimal change: span_queue parameter added to AsyncTracer.trace() and passed through to AsyncTrace; no issues.
src/agentex/lib/sdk/fastacp/base/base_acp_server.py Lifespan wrapped in try/finally to call shutdown_default_span_queue on teardown; graceful drain wired correctly.
tests/lib/core/tracing/test_span_queue.py 7 tests covering non-blocking enqueue, FIFO ordering, error resilience, shutdown drain, timeout, post-shutdown safety, and AsyncTrace integration; good coverage of core invariants.
src/agentex/lib/core/tracing/init.py Exports updated to include AsyncSpanQueue, get_default_span_queue, and shutdown_default_span_queue; no issues.

Sequence Diagram

sequenceDiagram
    participant H as Request Handler
    participant AT as AsyncTrace
    participant Q as AsyncSpanQueue
    participant DL as Drain Task (bg)
    participant P as TracingProcessor

    H->>AT: await start_span(name)
    AT->>AT: build Span + model_copy(deep=True)
    AT->>Q: enqueue(START, span_copy, processors)
    Q->>Q: _ensure_drain_running()
    Q-->>H: return (non-blocking)
    AT-->>H: return live Span

    Note over DL,P: Background drain loop
    DL->>Q: await queue.get()
    Q-->>DL: _SpanQueueItem(START, span_copy)
    DL->>P: on_span_start(span_copy)
    P-->>DL: (HTTP call completes)
    DL->>Q: task_done()

    H->>AT: await end_span(span)
    AT->>AT: snapshot + model_copy(deep=True)
    AT->>Q: enqueue(END, span_copy, processors)
    Q-->>H: return (non-blocking)

    DL->>Q: await queue.get()
    Q-->>DL: _SpanQueueItem(END, span_copy)
    DL->>P: on_span_end(span_copy)
    P-->>DL: (HTTP call completes)
    DL->>Q: task_done()

    Note over Q,DL: Graceful shutdown (FastAPI lifespan teardown)
    Q->>Q: _stopping = True
    Q->>Q: await queue.join() [timeout=30s]
    Q->>DL: cancel()
    DL-->>Q: CancelledError (caught)
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/span_queue.py
Line: 85-88

Comment:
**Timeout warning undercounts unprocessed spans by one**

`queue.qsize()` returns the number of items waiting in the queue's internal deque. It does **not** include the item the drain task has already `get()`-ed but not yet `task_done()`-ed. Because the drain loop processes items one at a time, when the timeout fires while the drain task is mid-processing, the log will read "0 items remaining" even though one span is still in flight and will be abandoned.

Consider logging `qsize()` alongside a note that one additional item may be in-flight:

```suggestion
            logger.warning(
                "Span queue shutdown timed out after %.1fs with ~%d items remaining (excludes any item currently in-flight)",
                timeout,
                self._queue.qsize(),
            )
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/span_queue.py
Line: 37

Comment:
**Unbounded queue — memory can grow without limit under slow processors**

`asyncio.Queue()` with no `maxsize` means the queue can accumulate spans indefinitely if the processor HTTP calls are consistently slower than span production. Under a burst of activity (e.g. many parallel agent tasks) this could balloon memory before the problem is noticed.

Consider adding a bounded `maxsize` and deciding on a drop/block policy, or at minimum adding a monitoring metric for queue depth so you get early warning:

```suggestion
        self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=10_000)
```

`put_nowait` already raises `asyncio.QueueFull` when the limit is hit; `enqueue` can catch that and log a warning rather than crashing.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/span_queue.py
Line: 79

Comment:
**Magic number `30.0` repeated across multiple symbols**

The default shutdown timeout of `30.0` seconds appears both in `AsyncSpanQueue.shutdown()` and `shutdown_default_span_queue()`. If the value ever needs to change it must be updated in two places, and the semantic meaning is implicit.

Extract to a named module-level constant:

```python
_DEFAULT_SHUTDOWN_TIMEOUT_SECONDS: float = 30.0
```

then reference it in both signatures:

```python
async def shutdown(self, timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_SECONDS) -> None:
```
```python
async def shutdown_default_span_queue(timeout: float = _DEFAULT_SHUTDOWN_TIMEOUT_SECONDS) -> None:
```

**Rule Used:** Store magic numbers as class or instance variables... ([source](https://app.greptile.com/review/custom-context?memory=002e0051-41ad-46c1-9098-47433c580150))

**Learnt From**
[scaleapi/scaleapi#126388](https://github.com/scaleapi/scaleapi/pull/126388)

How can I resolve this? If you propose a fix, please make it concise.

Reviews (2): Last reviewed commit: "fix(tracing): Deep-copy spans before enq..." | Re-trigger Greptile

Context used:

  • Rule used - Store magic numbers as class or instance variables... (source)

Learnt From
scaleapi/scaleapi#126388

SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans
in self._spans dict on every request but never removed them, since
on_span_end() used dict.get() (read-only) instead of dict.pop()
(read-and-remove). The only cleanup was in shutdown() which is never
called. After this fix, spans are removed from the dict when they
complete, preventing unbounded memory growth.
AsyncTrace.start_span() and end_span() previously awaited processor
HTTP calls inline, adding 8 blocking round-trips per request. This
moves processor calls into a background FIFO queue so callers enqueue
and return immediately.

- Add AsyncSpanQueue with sequential drain loop, error logging, and
  graceful shutdown with configurable timeout
- Wire shutdown into FastAPI lifespan teardown in base_acp_server
- FIFO ordering preserves start-before-end invariant required by
  SGPAsyncTracingProcessor's internal _spans dict
@patch decorators on _make_processor expired before test bodies ran,
so on_span_start/on_span_end hit the real create_span and flush.
Refactored to @staticmethod with 'with patch(...)' context managers
matching the async test class pattern.
Use explicit dict[str, object] annotation to avoid invariance error
when assigning to Span.output (Dict[str, object] | ... | None).
@smoreinis smoreinis marked this pull request as ready for review April 3, 2026 01:22
@smoreinis smoreinis requested a review from danielmillerp April 3, 2026 01:22
@smoreinis smoreinis changed the title fix(tracing): Fix memory leak and add background span queue feat(tracing): Add background queue for async span processing Apr 3, 2026
Processors like SGPAsyncTracingProcessor mutate span.data in-place
via _add_source_to_span. With async background processing, this raced
with the caller who holds a reference to the same span object.
Deep-copying via model_copy(deep=True) decouples the two.
@smoreinis smoreinis merged commit 3a60add into main Apr 3, 2026
32 checks passed
@smoreinis smoreinis deleted the stas/async-span-queue branch April 3, 2026 16:55
@stainless-app stainless-app bot mentioned this pull request Apr 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants