perf(tracing): span queue linger + per-loop httpx keepalive#362
Conversation
Two compounding causes of slow SGP trace export under load: - The async drain loop returned size-1 batches almost every time because there was no time window for spans to accumulate. Adds a 100ms linger (tunable via AGENTEX_SPAN_QUEUE_LINGER_MS) so concurrently-emitted spans coalesce into one upsert_batch call. - httpx keepalive was disabled (max_keepalive_connections=0) in SGPAsyncTracingProcessor, AgentexAsyncTracingProcessor, and the ADK TracingModule to avoid "bound to a different event loop" errors in sync-ACP. Each span paid a full TLS handshake. Replaced with a per-event-loop client cache keyed on id(asyncio.get_running_loop()); connections are reused within a loop and cross-loop safety is preserved. Tests cover linger coalescing, batch-size cap interaction, per-loop client caching, a keepalive-enabled regression guard, and disabled-processor null-client behavior.
Addresses Greptile review feedback on PR #362. The original `dict[int, AsyncSGPClient]` cache used `id(asyncio.get_running_loop())` as the key. In CPython `id()` returns a memory address, and once a loop is garbage-collected its address can be assigned to a new loop — a fresh loop hashing to a stale entry would receive a client whose httpx.AsyncClient was bound to the dead loop, reintroducing the "bound to a different event loop" error this PR was built to prevent. Switching the cache to `weakref.WeakKeyDictionary` keyed on the loop object itself fixes the bug: the entry is evicted automatically when the loop is collected, so id() recycling can't cause stale-client reuse. Multi-loop caching benefit is preserved (better than the single-slot pattern in TracingModule for agents that bounce between loops). Same fix applied to AgentexAsyncTracingProcessor. Added a regression test verifying the cache evicts a closed/dropped loop's entry after gc.collect().
Addresses both Greptile P3 findings on PR #362: - AgentexAsyncTracingProcessor implemented the same per-loop client cache pattern as SGPAsyncTracingProcessor but had no dedicated test file. Added test_agentex_tracing_processor.py mirroring the SGP coverage: single-build-per-loop, keepalive-enabled regression guard, and WeakKeyDictionary eviction after GC. Skipped cleanly with pytest.importorskip when pydantic_ai isn't installed (the SDK dev venv state), since agentex_tracing_processor pulls in agentex.lib.adk which requires it. - test_linger_respects_batch_size_cap used linger_ms=500, forcing the tail singleton batch to wait out the full 500ms timeout — the test only asserts no batch exceeds the cap, so dropping to linger_ms=50 keeps correctness while cutting wall time by ~10x.
|
Both Greptile P3 follow-ups addressed in 9bb4ae6:
Verified locally: 3 new Agentex tests pass when pydantic_ai is available; cap test still passes; full |
SDK Validation Notes:The Good News: PR #362 completely fixed our ingest queueing latency. At both 1x and 2x traffic, our p99 dropped from ~2.4s down to ~0.5s. The Problem: When we push to 2x load (~42 RPS), the export pipeline inside the Mock Agent collapses and starts silently dropping data (thought dropped spans also occurred in 1x load run). Here are the metrics detailing the breakdown of export. 1. Client export did not scale with RPSAt 2x scale, the agent only sent ~17% of the PUTs it was sending at 1x. Egress completely missed the floor target.
2. Server batch ~169/s vs mock client ~10/s — platform is not idleThis doesn't seem to be an EGP bottleneck. The backend API is runs steadily at ~169 PUT/s. The mock agent seems to be the issue.
3. ~9.1 KB/PUT unchanged — fewer PUTs, not smaller bodiesThe batches aren't getting smaller; they stay flat at ~9.1 KB/PUT. The agent is just sending far fewer of them.
4. Memory is not the 2× blocker; EGP saturation matches historyThis is not an OOM issue. Memory stayed incredibly stable at ~52%.
Spans Getting Drops (Logs): I think the agent is silently dropping ~25-span batches instead of retrying them — not sure if this is by design or not. Needless to say, under heavy load, the agent drain task is not keeping up and is causing a backlog. NOTE:
|
|
@smoreinis Quick update: I just finished auditing the DB across all traces from the 1x & 2x runs, and the data is all there. The median trace perfectly persisted total expected # of spans. So the ~86 KB/s egress isn't due to missing rows or silent data drops. I think the issue lies purely with the queue drain mechanics. No need to dig deeper into the Spans Getting Dropped issue — this is a sep smaller issue. |
Under load, span export failures were silently dropped and unbounded
queue growth was invisible. Add observability and a narrow retry:
- Bound the queue (AGENTEX_SPAN_QUEUE_MAX_SIZE, 0=unbounded default) and
expose a dropped_spans counter + depth so span loss is measurable
instead of silent.
- Re-enqueue only transient HTTP failures {429,500,502,503,504} up to
AGENTEX_SPAN_QUEUE_MAX_RETRIES (default 1 = no retry). Auth/4xx (e.g.
the 401s seen in the load test) and non-HTTP exceptions are dropped and
counted, never retried, preserving the drain-continues-on-error contract.
Defaults preserve prior behavior (unbounded, no retry).
…ger-keepalive # Conflicts: # .stats.yml # pyproject.toml # src/agentex/lib/cli/templates/default-pydantic-ai/project/acp.py.j2 # src/agentex/lib/cli/templates/sync-pydantic-ai/project/acp.py.j2 # src/agentex/lib/cli/templates/temporal-pydantic-ai/project/workflow.py.j2
Summary
Two compounding causes of slow SGP trace export under load test, fixed together:
AsyncSpanQueuenow lingers up to 100ms (env-tunable viaAGENTEX_SPAN_QUEUE_LINGER_MS) so concurrently-emitted spans coalesce into oneupsert_batchcall. Stops early when the batch fills or on shutdown.SGPAsyncTracingProcessor,AgentexAsyncTracingProcessor, and the ADKTracingModuledisabled keepalive (max_keepalive_connections=0) to avoid "bound to a different event loop" errors in sync-ACP, paying a full TLS handshake per span. Replaced with a per-loop client cache keyed onid(asyncio.get_running_loop()):max_keepalive_connections=20within each loop, and cross-loop safety is preserved by giving each loop its own client.The combination is what matters: keepalive alone is wasted if requests stay size-1, and batching alone is wasted if every request pays a TLS handshake.
Test plan
rye run lint— ruff + pyright cleanrye run pytest tests/lib/core/tracing/— 38 passed, 2 skipped (pre-existing load-test gates)linger_ms=0preserves the old immediate-drain behaviorbatch_sizecap_get_clientcaches per event loop and only builds the client oncemax_keepalive_connections > 0regression guardsgp_api_key/sgp_account_id) returnsNoneclientWeakKeyDictionaryevicts the entry after a closed loop is GC'd (regression guard against theid()-recycling bug)examples/tutorials/00_sync/030_langgraphwith real SGP credentials (see below)End-to-end verification
Validated the fix against
examples/tutorials/00_sync/030_langgraph(sync-ACP + LangGraph + SGP — the exact pattern that motivated the per-loop client cache). Editable-installed this branch's SDK into the tutorial'suvvenv, ranagentex agents run --manifest manifest.yamlagainst the local AgentEx stack, sent three messages, then queried SGP back via the API.add_tracing_processor_configat module load, processor registers,_get_client()returns a real client (not None).8916dd2f-…AGENT_WORKFLOW:message→COMPLETION:llm:gpt-52c534aa7-…AGENT_WORKFLOW:message→COMPLETION:llm:gpt-58e5346a6-…AGENT_WORKFLOW:message→ 2×COMPLETION:llm:gpt-5+CUSTOM:tool:get_weatherGreptile Summary
This PR addresses two compounding causes of slow SGP trace export: a missing linger window in
AsyncSpanQueuethat caused size-1 batches on every drain cycle, and per-event-loop httpx keepalive connections forSGPAsyncTracingProcessorandAgentexAsyncTracingProcessor.AsyncSpanQueuenow waits up tolinger_ms(default 100ms, env-tunable) for spans to coalesce before dispatching, with early exit on batch-full or shutdown. The PR also adds bounded-queue support with drop counting and per-processor retry logic for transient HTTP errors (429/5xx).Async*TracingProcessorclasses replace the oldmax_keepalive_connections=0workaround with aweakref.WeakKeyDictionary[asyncio.AbstractEventLoop, Client]cache, giving each event loop its own connection pool.TracingModuleintracing.pyalso enables keepalive but retains the original single-slotid(loop)-based guard instead of being updated toWeakKeyDictionary.Confidence Score: 4/5
Safe to merge for the two processors; TracingModule retains a loop-detection gap that could reintroduce the error the PR fixes.
The Async*TracingProcessor changes are well-implemented with correct WeakKeyDictionary eviction, sound task_done() bookkeeping in the linger path, and comprehensive test coverage. The one gap is TracingModule._tracing_service: it now enables keepalive (max_keepalive_connections=20) but still tracks the current loop via id() stored in _bound_loop_id. When CPython recycles the memory address of a GC'd loop, the stale httpx.AsyncClient (with live keepalive connections) is returned for the new loop — the same condition the PR was built to prevent. This affects the ADK span/start_span/end_span path in sync-ACP and streaming contexts.
src/agentex/lib/adk/_modules/tracing.py — TracingModule._tracing_service loop-detection logic
Important Files Changed
Sequence Diagram
sequenceDiagram participant Agent participant AsyncSpanQueue participant DrainLoop participant Processor as SGP/Agentex Processor participant ClientCache as WeakKeyDictionary[Loop→Client] participant SGP as SGP API Agent->>AsyncSpanQueue: enqueue(span_start) AsyncSpanQueue->>DrainLoop: create_task(_drain_loop) Note over DrainLoop: await queue.get() → first item rect rgb(200, 230, 255) Note over DrainLoop: Linger window (default 100ms) DrainLoop->>AsyncSpanQueue: "wait_for(queue.get(), timeout=remaining)" AsyncSpanQueue-->>DrainLoop: span_1 (20ms later) DrainLoop->>AsyncSpanQueue: "wait_for(queue.get(), timeout=remaining)" AsyncSpanQueue-->>DrainLoop: span_2 (40ms later) DrainLoop->>AsyncSpanQueue: "wait_for(queue.get(), timeout=remaining)" Note over DrainLoop: TimeoutError → linger ends end DrainLoop->>Processor: on_spans_start([span_0, span_1, span_2]) Processor->>ClientCache: get(running_loop) ClientCache-->>Processor: cached AsyncClient (keepalive) Processor->>SGP: upsert_batch([3 spans]) SGP-->>Processor: 200 OK DrainLoop->>AsyncSpanQueue: task_done() × 3 Note over DrainLoop: On transient error (429/5xx) DrainLoop->>AsyncSpanQueue: put_nowait(item, attempts+1)Comments Outside Diff (1)
src/agentex/lib/adk/_modules/tracing.py, line 57-83 (link)id()recycling risk survives keepalive changeTracingModule._tracing_servicecomparesid(loop)against_bound_loop_id. If a loop is GC'd and CPython reuses its memory address for a new loop,id(new_loop) == _bound_loop_idso the guard doesn't fire and the stale_tracing_service_lazy(whosehttpx.AsyncClientis bound to the dead loop) is returned — exactly the "bound to a different event loop" RuntimeError the PR aims to prevent.Both
SGPAsyncTracingProcessorandAgentexAsyncTracingProcessorwere correctly migrated toweakref.WeakKeyDictionaryfor the same reason (see the thread on this PR), butTracingModulewas updated only to enable keepalive, leaving theid()-based guard in place. With keepalive now on, the live connection pool makes the stale-client scenario more likely to produce an error rather than just incurring a fresh TLS handshake.Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (5): Last reviewed commit: "Merge remote-tracking branch 'origin/nex..." | Re-trigger Greptile