Skip to content

(bug): fix to queue drain#327

Merged
declan-scale merged 4 commits intonextfrom
levilentz/bug-span-queue-fix
Apr 21, 2026
Merged

(bug): fix to queue drain#327
declan-scale merged 4 commits intonextfrom
levilentz/bug-span-queue-fix

Conversation

@levilentz
Copy link
Copy Markdown
Contributor

@levilentz levilentz commented Apr 17, 2026

This is related to an issue we are seeing at our customer. They are observing that the memory grows over time and then takes a long time to free. This PR addresses this behavior by being more aggressive in the queue clearing by doing it in batch.

Summary

Fix unbounded memory growth in the async tracing pipeline under concurrent load.

Root cause: The AsyncSpanQueue._drain_loop processed items one at a time, serially. Each item blocks on HTTP calls to two APIs (Agentex + SGP), taking ~100-200ms per item. Under concurrent request load, spans were produced far faster than they could drain, causing unbounded queue buildup. Each queued item held a deep copy of the full span payload (system prompt + context), leading to OOM in K8s pods.

Changes

1. Concurrent batch drain (span_queue.py) — Primary fix

  • Rewrote _drain_loop to grab batches of up to 50 items and process them concurrently via asyncio.gather
  • All STARTs in a batch are flushed concurrently, then all ENDs — preserving per-span start-before-end ordering
  • Throughput improvement: ~5 items/sec (serial) → ~250 items/sec (concurrent), keeping the queue near-empty under load

2. Drop input from END events (trace.py) — Secondary fix

  • end_span() no longer re-serializes span.input (it was already sent at START)
  • The deep copy for the END queue event has input=None, releasing the system prompt from memory immediately
  • Reduces per-span queue memory by ~40-50%

3. Release SGP span input after sending (sgp_tracing_processor.py) — Tertiary fix

  • Clear sgp_span.input after upsert_batch() succeeds in on_span_start
  • The retained SGP span only needs output/metadata/end_time for the end event

Load test results (10,000 spans x 50KB system prompt)

Metric Before (serial drain) After (batch drain)
Peak queue depth 19,097 items 524 items
Drain time 114.88s 0.14s
Total time 135.10s 20.35s

Backwards compatibility

  • No public API changes — AsyncSpanQueue() still works without args
  • Processor interface unchanged
  • No data loss: input is persisted to APIs at START time. END events send only output/end_time via PATCH-style update. exclude_none=True in the Agentex processor naturally handles input=None
  • Agents do not need to be updated

Test plan

  • Existing span queue tests updated for batch semantics (ordering, error handling, shutdown)
  • New batch concurrency tests (parallel processing, faster-than-serial)
  • Memory reduction tests (END events <5% of START event size, >40% queue payload reduction)
  • SGP processor memory leak tests (spans cleaned up after lifecycle, input cleared after start)
  • Integration test verifying END events have input=None
  • Manual load test added (test_span_queue_load.py) — run with RUN_LOAD_TESTS=1
  • Deploy to staging and monitor pod RSS under production-like load

Greptile Summary

This PR fixes unbounded memory growth in the async tracing pipeline by replacing the serial _drain_loop with a concurrent batch drain (asyncio.gather over batches of up to 50 items), and adds sgp_span.input = span.input in on_span_end to keep the SGP span's input current at end time. The batch ordering logic (all STARTs before all ENDs within a batch) correctly preserves per-span start-before-end invariants.

Two follow-up gaps from the PR description:

  • The "tertiary fix" of clearing sgp_span.input after the START upsert (on_span_start) was not implemented — the stored span still holds the full input payload until on_span_end, leaving that portion of the described memory saving unrealised.
  • The sgp_span.input = span.input line in on_span_end would silently overwrite SGP's stored input with null if trace.py is later updated to strip input from END events (as described in "Change 2"), creating a latent data-loss risk.

Confidence Score: 5/5

Safe to merge — all findings are P2 optimization gaps, no correctness or data-integrity bugs in the current state.

The primary fix (batch drain) is well-implemented with correct ordering guarantees and solid test coverage. Both remaining comments are P2: one is a missing memory optimisation that was described but not coded, and the other is a latent risk contingent on a future trace.py change that hasn't landed yet. Neither affects correctness today.

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py — the tertiary memory fix is missing and the new on_span_end input line needs a None guard before the planned trace.py change ships.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/span_queue.py Core fix: replaces serial drain with concurrent batch drain. Logic is sound — per-span start-before-end ordering is preserved via START/END phase separation within each batch.
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py Adds sgp_span.input = span.input in on_span_end (ensuring updated input is sent at end time), but the described tertiary fix of clearing sgp_span.input after on_span_start's upsert is missing — the stored span still retains the full input in memory.
src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py Only adds TODO comment linking to batch-API Linear ticket — no functional change.
tests/lib/core/tracing/test_span_queue.py Good coverage for new batch semantics: ordering, concurrency, error handling, shutdown, and integration. The test_end_event_preserves_modified_input test documents that END events still carry the full (modified) input — intentionally contradicting the PR description's "Change 2".
tests/lib/core/tracing/processors/test_sgp_tracing_processor.py Adds memory-leak tests (lifecycle cleanup) and a new test verifying sgp_span.input is updated on end. Missing a test that verifies sgp_span.input is cleared after on_span_start upsert (the unstated tertiary fix).
tests/lib/core/tracing/test_span_queue_load.py Manual load test gated behind RUN_LOAD_TESTS=1; clear reporting and a loose sanity assertion. Good companion to the unit tests.

Sequence Diagram

sequenceDiagram
    participant C as Caller (AsyncTrace)
    participant Q as AsyncSpanQueue
    participant D as _drain_loop
    participant P as Processors (Agentex / SGP)

    C->>Q: enqueue(START, span_copy)
    C->>Q: enqueue(END, span_copy)
    Note over Q: Queue: [START-A, END-A, START-B, END-B ...]

    Q->>D: create_task(_drain_loop)

    loop Each batch (up to 50 items)
        D->>Q: await get() — block for first item
        D->>Q: get_nowait() x N — drain remaining ready items
        Note over D: Separate batch into starts[] and ends[]
        par Process all STARTs concurrently
            D->>P: on_span_start(span) via asyncio.gather
        end
        par Process all ENDs concurrently
            D->>P: on_span_end(span) via asyncio.gather
        end
        D->>Q: task_done() x batch_size
    end

    C->>Q: shutdown(timeout=30)
    Q->>Q: _stopping = True
    Q->>D: await queue.join()
    D-->>Q: all task_done() called
    Q->>D: cancel() if still running
Loading

Comments Outside Diff (2)

  1. src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py, line 74-84 (link)

    P1 Sync processor missing input update at end time

    SGPSyncTracingProcessor.on_span_end does not set sgp_span.input = span.input, but the async version added that line in this PR. For chatbot or multi-turn spans where input is mutated after start, the sync flush will still send the original start-time input to SGP — the async processor corrects it, the sync one does not.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
    Line: 74-84
    
    Comment:
    **Sync processor missing input update at end time**
    
    `SGPSyncTracingProcessor.on_span_end` does not set `sgp_span.input = span.input`, but the async version added that line in this PR. For chatbot or multi-turn spans where input is mutated after start, the sync flush will still send the original start-time input to SGP — the async processor corrects it, the sync one does not.
    
    
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code

  2. src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py, line 173-177 (link)

    P1 shutdown() crashes with AttributeError when SGP is disabled

    When self.disabled = True, self.sgp_async_client is None. Calling self.sgp_async_client.spans.upsert_batch(...) will raise AttributeError: 'NoneType' object has no attribute 'spans'. The existing guard pattern used in on_span_start and on_span_end should be applied here:

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
    Line: 173-177
    
    Comment:
    **`shutdown()` crashes with `AttributeError` when SGP is disabled**
    
    When `self.disabled = True`, `self.sgp_async_client` is `None`. Calling `self.sgp_async_client.spans.upsert_batch(...)` will raise `AttributeError: 'NoneType' object has no attribute 'spans'`. The existing guard pattern used in `on_span_start` and `on_span_end` should be applied here:
    
    
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code

Fix All in Cursor Fix All in Claude Code

Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Line: 161

Comment:
**Latent data-loss risk if `trace.py` strips input from END events**

The PR description's "Change 2" says `end_span()` will stop re-serialising `span.input` (setting it to `None`) to save memory. If that change lands later, `sgp_span.input = span.input` would copy `None` into `sgp_span`, and the subsequent `upsert_batch` call would send `input: null` to SGP, overwriting the input that was already persisted at START. SGP's `upsert_batch` would have to be a true partial-update (PATCH) for this to be safe.

Adding a `None` guard here future-proofs the code:
```suggestion
        sgp_span.input = span.input if span.input is not None else sgp_span.input  # type: ignore[assignment]
```
Alternatively, document that `sgp_span.input` must never be set to `None` before calling `upsert_batch` for the END event.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Line: 147-151

Comment:
**Tertiary memory fix described in PR not implemented**

The PR description states: *"Clear `sgp_span.input` after `upsert_batch()` succeeds in `on_span_start` — The retained SGP span only needs output/metadata/end_time for the end event."* That clearing step was not added. `sgp_span` is stored in `self._spans` with its full `input` field intact, keeping a reference to the large payload (e.g. 50 KB system prompt) in memory until `on_span_end` is called.

Adding `sgp_span.input = None` after the successful upsert would release that reference immediately:
```python
        await self.sgp_async_client.spans.upsert_batch(
            items=[sgp_span.to_request_params()]
        )
        sgp_span.input = None  # release large input payload; END event will re-attach if needed
        self._spans[span.id] = sgp_span
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (5): Last reviewed commit: "Fix lints" | Re-trigger Greptile

@levilentz
Copy link
Copy Markdown
Contributor Author

Some metrics after running the load test:

Metric Before After Improvement
Peak queue depth 19,372 items 5,734 items 3.4x lower
Drain time 116.43s 1.52s 77x faster
Total time 119.76s 4.88s 25x faster

@declan-scale declan-scale changed the base branch from main to next April 20, 2026 20:41
Comment thread src/agentex/lib/core/tracing/trace.py Outdated
Comment thread src/agentex/lib/core/tracing/trace.py Outdated
Copy link
Copy Markdown
Contributor

@declan-scale declan-scale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do want to be able to modify the input (other customers have asked for this). Also, there are some improvements left on the table, SGP supports batch processing of spans but we still just call the upsert_batch endpoint per span. Not necessary to refactor here but figured I'd call out

@levilentz
Copy link
Copy Markdown
Contributor Author

@declan-scale thanks for the feedback. did another round on the fix. I agree that with sgp batching, we could juice this even further, but that would require additional functionality added and/or the contract changed. want me to mark this as deprecated and add that in as a future state functionality?

Copy link
Copy Markdown
Contributor

@declan-scale declan-scale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks, can you just run the lint fix script?

@declan-scale
Copy link
Copy Markdown
Contributor

declan-scale commented Apr 20, 2026

@declan-scale thanks for the feedback. did another round on the fix. I agree that with sgp batching, we could juice this even further, but that would require additional functionality added and/or the contract changed. want me to mark this as deprecated and add that in as a future state functionality?

I'll make an agentex ticket if you don't mind tagging it as a todo. If you don't mind linking the following in some TODO comments:

https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces
https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans

@declan-scale declan-scale force-pushed the levilentz/bug-span-queue-fix branch from 28a6be2 to cdb2d10 Compare April 21, 2026 14:00
@declan-scale declan-scale force-pushed the levilentz/bug-span-queue-fix branch from cdb2d10 to 00ba875 Compare April 21, 2026 14:07
@declan-scale declan-scale merged commit b59d6d8 into next Apr 21, 2026
9 checks passed
@declan-scale declan-scale deleted the levilentz/bug-span-queue-fix branch April 21, 2026 14:15
@stainless-app stainless-app Bot mentioned this pull request Apr 21, 2026
stainless-app Bot pushed a commit that referenced this pull request Apr 21, 2026
Co-authored-by: Declan Brady <declan.brady@scale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants