perf(tracing): span queue linger + per-loop httpx keepalive#362
Open
smoreinis wants to merge 6 commits into
Open
perf(tracing): span queue linger + per-loop httpx keepalive#362smoreinis wants to merge 6 commits into
smoreinis wants to merge 6 commits into
Conversation
Two compounding causes of slow SGP trace export under load: - The async drain loop returned size-1 batches almost every time because there was no time window for spans to accumulate. Adds a 100ms linger (tunable via AGENTEX_SPAN_QUEUE_LINGER_MS) so concurrently-emitted spans coalesce into one upsert_batch call. - httpx keepalive was disabled (max_keepalive_connections=0) in SGPAsyncTracingProcessor, AgentexAsyncTracingProcessor, and the ADK TracingModule to avoid "bound to a different event loop" errors in sync-ACP. Each span paid a full TLS handshake. Replaced with a per-event-loop client cache keyed on id(asyncio.get_running_loop()); connections are reused within a loop and cross-loop safety is preserved. Tests cover linger coalescing, batch-size cap interaction, per-loop client caching, a keepalive-enabled regression guard, and disabled-processor null-client behavior.
Addresses Greptile review feedback on PR #362. The original `dict[int, AsyncSGPClient]` cache used `id(asyncio.get_running_loop())` as the key. In CPython `id()` returns a memory address, and once a loop is garbage-collected its address can be assigned to a new loop — a fresh loop hashing to a stale entry would receive a client whose httpx.AsyncClient was bound to the dead loop, reintroducing the "bound to a different event loop" error this PR was built to prevent. Switching the cache to `weakref.WeakKeyDictionary` keyed on the loop object itself fixes the bug: the entry is evicted automatically when the loop is collected, so id() recycling can't cause stale-client reuse. Multi-loop caching benefit is preserved (better than the single-slot pattern in TracingModule for agents that bounce between loops). Same fix applied to AgentexAsyncTracingProcessor. Added a regression test verifying the cache evicts a closed/dropped loop's entry after gc.collect().
danielmillerp
approved these changes
May 20, 2026
Addresses both Greptile P3 findings on PR #362: - AgentexAsyncTracingProcessor implemented the same per-loop client cache pattern as SGPAsyncTracingProcessor but had no dedicated test file. Added test_agentex_tracing_processor.py mirroring the SGP coverage: single-build-per-loop, keepalive-enabled regression guard, and WeakKeyDictionary eviction after GC. Skipped cleanly with pytest.importorskip when pydantic_ai isn't installed (the SDK dev venv state), since agentex_tracing_processor pulls in agentex.lib.adk which requires it. - test_linger_respects_batch_size_cap used linger_ms=500, forcing the tail singleton batch to wait out the full 500ms timeout — the test only asserts no batch exceeds the cap, so dropping to linger_ms=50 keeps correctness while cutting wall time by ~10x.
Contributor
Author
|
Both Greptile P3 follow-ups addressed in 9bb4ae6:
Verified locally: 3 new Agentex tests pass when pydantic_ai is available; cap test still passes; full |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two compounding causes of slow SGP trace export under load test, fixed together:
AsyncSpanQueuenow lingers up to 100ms (env-tunable viaAGENTEX_SPAN_QUEUE_LINGER_MS) so concurrently-emitted spans coalesce into oneupsert_batchcall. Stops early when the batch fills or on shutdown.SGPAsyncTracingProcessor,AgentexAsyncTracingProcessor, and the ADKTracingModuledisabled keepalive (max_keepalive_connections=0) to avoid "bound to a different event loop" errors in sync-ACP, paying a full TLS handshake per span. Replaced with a per-loop client cache keyed onid(asyncio.get_running_loop()):max_keepalive_connections=20within each loop, and cross-loop safety is preserved by giving each loop its own client.The combination is what matters: keepalive alone is wasted if requests stay size-1, and batching alone is wasted if every request pays a TLS handshake.
Test plan
rye run lint— ruff + pyright cleanrye run pytest tests/lib/core/tracing/— 38 passed, 2 skipped (pre-existing load-test gates)linger_ms=0preserves the old immediate-drain behaviorbatch_sizecap_get_clientcaches per event loop and only builds the client oncemax_keepalive_connections > 0regression guardsgp_api_key/sgp_account_id) returnsNoneclientWeakKeyDictionaryevicts the entry after a closed loop is GC'd (regression guard against theid()-recycling bug)examples/tutorials/00_sync/030_langgraphwith real SGP credentials (see below)End-to-end verification
Validated the fix against
examples/tutorials/00_sync/030_langgraph(sync-ACP + LangGraph + SGP — the exact pattern that motivated the per-loop client cache). Editable-installed this branch's SDK into the tutorial'suvvenv, ranagentex agents run --manifest manifest.yamlagainst the local AgentEx stack, sent three messages, then queried SGP back via the API.add_tracing_processor_configat module load, processor registers,_get_client()returns a real client (not None).8916dd2f-…AGENT_WORKFLOW:message→COMPLETION:llm:gpt-52c534aa7-…AGENT_WORKFLOW:message→COMPLETION:llm:gpt-58e5346a6-…AGENT_WORKFLOW:message→ 2×COMPLETION:llm:gpt-5+CUSTOM:tool:get_weatherGreptile Summary
This PR addresses two compounding performance bottlenecks in SGP trace export: a span queue that returned size-1 batches due to no coalescing window, and per-request TLS handshakes caused by disabled HTTP keepalive. Both are fixed together, since batching without keepalive and keepalive without batching each only deliver partial benefit.
AsyncSpanQueuenow waits up tolinger_ms(default 100ms, env-tunable) after the first item arrives, usingasyncio.wait_forwith a deadline. Items stop accumulating on batch fill, timeout, or shutdown. The linger is suppressed immediately when_stoppingis set so new batches after shutdown begin draining at once.SGPAsyncTracingProcessor,AgentexAsyncTracingProcessor,TracingModule) replacedmax_keepalive_connections=0with aWeakKeyDictionary[asyncio.AbstractEventLoop, client]cache keyed on the live loop object. Cross-loop safety is preserved because each loop gets its own client, and GC'd loops (and their stale clients) are evicted automatically — closing the id()-recycling race that the previous review flagged.linger_ms=0back-compat, batch-size cap enforcement, per-loop caching, keepalive regression, disabled-processor short-circuit, andWeakKeyDictionaryeviction.Confidence Score: 5/5
Safe to merge — changes are purely additive performance improvements with no behavioral regression in the happy path and well-tested edge cases.
The linger window is bounded, shutdown interaction is correct (task_done always fires in finally, _queue.join() waits for in-flight batches), and the WeakKeyDictionary cache addresses the id()-recycling issue raised in the previous review. All three processors now use consistent patterns with comprehensive regression tests.
No files require special attention.
Important Files Changed
Flowchart
%%{init: {'theme': 'neutral'}}%% flowchart TD A["Queue.get() — block for first item"] --> B["batch = [first]"] B --> C{linger_ms > 0 AND not _stopping?} C -- Yes --> D["deadline = loop.time() + linger_ms/1000"] D --> E{len batch < batch_size?} E -- Yes --> F["remaining = deadline minus now"] F --> G{remaining > 0?} G -- No --> P G -- Yes --> H["await wait_for(queue.get(), timeout=remaining)"] H -- item --> I["append to batch"] --> E H -- TimeoutError --> P C -- No --> J["get_nowait() until empty or batch_size reached"] J --> P P["Process: START items to on_spans_start, END items to on_spans_end"] --> Q["finally: task_done x len batch"] Q --> A subgraph Per-loop client lookup R["get_running_loop()"] -- RuntimeError --> S["build one-off client no cache"] R -- loop --> T{loop in WeakKeyDict?} T -- Yes --> U["return cached client"] T -- No --> V["_build_client() + store in WeakKeyDict"] --> U endReviews (3): Last reviewed commit: "test(tracing): cover Agentex per-loop ca..." | Re-trigger Greptile