Conversation
|
Some metrics after running the load test:
|
declan-scale
left a comment
There was a problem hiding this comment.
I think we do want to be able to modify the input (other customers have asked for this). Also, there are some improvements left on the table, SGP supports batch processing of spans but we still just call the upsert_batch endpoint per span. Not necessary to refactor here but figured I'd call out
|
@declan-scale thanks for the feedback. did another round on the fix. I agree that with sgp batching, we could juice this even further, but that would require additional functionality added and/or the contract changed. want me to mark this as deprecated and add that in as a future state functionality? |
I'll make an agentex ticket if you don't mind tagging it as a todo. If you don't mind linking the following in some TODO comments: https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces |
28a6be2 to
cdb2d10
Compare
cdb2d10 to
00ba875
Compare
Co-authored-by: Declan Brady <declan.brady@scale.com>
This is related to an issue we are seeing at our customer. They are observing that the memory grows over time and then takes a long time to free. This PR addresses this behavior by being more aggressive in the queue clearing by doing it in batch.
Summary
Fix unbounded memory growth in the async tracing pipeline under concurrent load.
Root cause: The
AsyncSpanQueue._drain_loopprocessed items one at a time, serially. Each item blocks on HTTP calls to two APIs (Agentex + SGP), taking ~100-200ms per item. Under concurrent request load, spans were produced far faster than they could drain, causing unbounded queue buildup. Each queued item held a deep copy of the full span payload (system prompt + context), leading to OOM in K8s pods.Changes
1. Concurrent batch drain (
span_queue.py) — Primary fix_drain_loopto grab batches of up to 50 items and process them concurrently viaasyncio.gather2. Drop input from END events (
trace.py) — Secondary fixend_span()no longer re-serializesspan.input(it was already sent at START)input=None, releasing the system prompt from memory immediately3. Release SGP span input after sending (
sgp_tracing_processor.py) — Tertiary fixsgp_span.inputafterupsert_batch()succeeds inon_span_startLoad test results (10,000 spans x 50KB system prompt)
Backwards compatibility
AsyncSpanQueue()still works without argsexclude_none=Truein the Agentex processor naturally handlesinput=NoneTest plan
input=Nonetest_span_queue_load.py) — run withRUN_LOAD_TESTS=1Greptile Summary
This PR fixes unbounded memory growth in the async tracing pipeline by replacing the serial
_drain_loopwith a concurrent batch drain (asyncio.gatherover batches of up to 50 items), and addssgp_span.input = span.inputinon_span_endto keep the SGP span's input current at end time. The batch ordering logic (all STARTs before all ENDs within a batch) correctly preserves per-span start-before-end invariants.Two follow-up gaps from the PR description:
sgp_span.inputafter the START upsert (on_span_start) was not implemented — the stored span still holds the full input payload untilon_span_end, leaving that portion of the described memory saving unrealised.sgp_span.input = span.inputline inon_span_endwould silently overwrite SGP's stored input withnulliftrace.pyis later updated to strip input from END events (as described in "Change 2"), creating a latent data-loss risk.Confidence Score: 5/5
Safe to merge — all findings are P2 optimization gaps, no correctness or data-integrity bugs in the current state.
The primary fix (batch drain) is well-implemented with correct ordering guarantees and solid test coverage. Both remaining comments are P2: one is a missing memory optimisation that was described but not coded, and the other is a latent risk contingent on a future trace.py change that hasn't landed yet. Neither affects correctness today.
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py — the tertiary memory fix is missing and the new on_span_end input line needs a None guard before the planned trace.py change ships.
Important Files Changed
sgp_span.input = span.inputinon_span_end(ensuring updated input is sent at end time), but the described tertiary fix of clearingsgp_span.inputafteron_span_start's upsert is missing — the stored span still retains the full input in memory.test_end_event_preserves_modified_inputtest documents that END events still carry the full (modified) input — intentionally contradicting the PR description's "Change 2".sgp_span.inputis updated on end. Missing a test that verifiessgp_span.inputis cleared afteron_span_startupsert (the unstated tertiary fix).RUN_LOAD_TESTS=1; clear reporting and a loose sanity assertion. Good companion to the unit tests.Sequence Diagram
sequenceDiagram participant C as Caller (AsyncTrace) participant Q as AsyncSpanQueue participant D as _drain_loop participant P as Processors (Agentex / SGP) C->>Q: enqueue(START, span_copy) C->>Q: enqueue(END, span_copy) Note over Q: Queue: [START-A, END-A, START-B, END-B ...] Q->>D: create_task(_drain_loop) loop Each batch (up to 50 items) D->>Q: await get() — block for first item D->>Q: get_nowait() x N — drain remaining ready items Note over D: Separate batch into starts[] and ends[] par Process all STARTs concurrently D->>P: on_span_start(span) via asyncio.gather end par Process all ENDs concurrently D->>P: on_span_end(span) via asyncio.gather end D->>Q: task_done() x batch_size end C->>Q: shutdown(timeout=30) Q->>Q: _stopping = True Q->>D: await queue.join() D-->>Q: all task_done() called Q->>D: cancel() if still runningComments Outside Diff (2)
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py, line 74-84 (link)SGPSyncTracingProcessor.on_span_enddoes not setsgp_span.input = span.input, but the async version added that line in this PR. For chatbot or multi-turn spans where input is mutated after start, the sync flush will still send the original start-time input to SGP — the async processor corrects it, the sync one does not.Prompt To Fix With AI
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py, line 173-177 (link)shutdown()crashes withAttributeErrorwhen SGP is disabledWhen
self.disabled = True,self.sgp_async_clientisNone. Callingself.sgp_async_client.spans.upsert_batch(...)will raiseAttributeError: 'NoneType' object has no attribute 'spans'. The existing guard pattern used inon_span_startandon_span_endshould be applied here:Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (5): Last reviewed commit: "Fix lints" | Re-trigger Greptile