Skip to content

fix(tracing) batch span events in SGPAsyncTracingProcessor#342

Closed
mohammadatallah-scale wants to merge 4 commits into
mainfrom
mohammad/ove-2b-batch-span-events
Closed

fix(tracing) batch span events in SGPAsyncTracingProcessor#342
mohammadatallah-scale wants to merge 4 commits into
mainfrom
mohammad/ove-2b-batch-span-events

Conversation

@mohammadatallah-scale
Copy link
Copy Markdown

@mohammadatallah-scale mohammadatallah-scale commented May 4, 2026

Closes OVE-4 and also a TODO in the code. Should help BP's agent that's running in start_span occasional timeouts as well.

Bug

SGPAsyncTracingProcessor.on_span_start and on_span_end each await client.spans.upsert_batch(items=[one_span]) on the agent's hot path. The synchronous processor inherits the SDK's TraceQueueManager and is fine; the async one needs to match its behavior.

Fix

Replace the per-event upsert with a buffer-plus-flush model in asyncio:

  • asyncio.Queue (max 4000) plus an asyncio.Task worker that drains in batches of up to 50, every 4s or when 200+ events queued. Constants mirror scale_gp_beta.lib.tracing.trace_queue_manager.
  • Retry policy mirrors the SDK: 4 attempts with exponential backoff (0.4s -> 20s capped). Unexpected exceptions are dropped, not retried.
  • Per-iteration try / except in the worker so one bad batch doesn't break subsequent flushes.
  • shutdown() drains the queue, signals the worker, joins with a 10s timeout. Spans whose end was never recorded are re-enqueued so they aren't silently lost.

Impact

  • Hot path becomes non-blocking. on_span_start and on_span_end are now in-memory queue inserts (~µs) instead of awaited HTTP roundtrips (~50-200ms). The agent no longer waits on the SGP API per span event.
  • ~50× fewer HTTP calls to the SGP traces API in typical workloads (50 spans per upsert vs 1), with proportional reductions in identity-service auth load and TLS handshakes.
  • Removes the start-span timeout failure class by taking the network call off the agent's critical path.
  • Aligns async with sync. The sync processor already used the SDK's queue-and-worker model; this brings the async processor in line.
  • Backpressure is bounded. If the queue fills (4000 spans), new spans are dropped with a warning rather than blocking the agent. In practice this only happens when the worker is far behind (outage, slow server).

Applies to any agent emitting spans through the async processor.

Code organization

  • _run is split into _is_shutting_down, _wait_for_flush_signal, and _safe_drain so the worker reads top-to-bottom: "while not shutting down, wait for trigger, drain. Final drain. Done."
  • All helpers (_enqueue, _ensure_started, _drain, _upsert_with_retry, _wait_for_flush_signal, _safe_drain) have docstrings explaining inputs, side effects, and dropped-batch semantics.

Observability

Disabled processors (no sgp_api_key or sgp_account_id) now emit a single WARNING at __init__ instead of one warning per span event. The previous per-event log would flood at agent throughput.

Behavior preserved

  • on_span_start / on_span_end / shutdown signatures unchanged.
  • _spans dict still tracks SGPSpan objects between start and end.
  • Externally injected sgp_async_client is still respected.
  • test_sgp_span_input_updated_on_end updated: no upsert on the hot path; the upsert is observed after shutdown() instead.

Tests

15 tests in test_sgp_tracing_processor.py (6 pre-existing lifecycle/mem-leak tests + 9 added here):

Batching (TestSGPAsyncTracingProcessorBatching):

  • test_span_event_does_not_trigger_immediate_upsert — single event must not hit the network.
  • test_shutdown_flushes_queued_spans_in_one_batch — 5 lifecycles (10 events) coalesce into one upsert.
  • test_drain_splits_into_multiple_batches_above_max_batch_size — 80 events split across multiple upserts, each capped at 50.
  • test_worker_continues_after_unexpected_exception_in_one_batchRuntimeError on one upsert drops that batch; worker stays alive and processes the next one.

Edge cases (TestSGPAsyncTracingProcessorEdgeCases):

  • test_disabled_processor_never_enqueues_or_calls_upsert — disabled processor builds no client, no queue, no worker.
  • test_shutdown_is_safe_when_called_multiple_times — idempotent shutdown.
  • test_shutdown_before_any_event_is_noop — early-return when worker was never started.
  • test_apierror_triggers_retry_then_drops_batch_on_exhaustionAPIError retried up to DEFAULT_RETRIES, batch dropped on exhaustion.

Test plan

  • pytest tests/lib/core/tracing/ (31 passed, 2 skipped pre-existing)
  • ruff check clean
  • ruff format --check clean
  • python -c "import agentex" succeeds
  • CI passes

Risks

Mitigated by this PR (should not be seen in practice)

  • Worker death from an unhandled exception during a batch. Per-iteration try / except in _run plus a broad-except in _upsert_with_retry catch any non-CancelledError exception, drop the offending batch, and keep the worker running. to_request_params failures are also caught and drop only the affected span.
  • Worker death from CancelledError during normal flow. Cancellation only happens via shutdown(), which drains before signalling cancel.

Actually possible in production

Risk When it happens Severity
Live-tracing latency Always. Up to 4s of buffer lag (or until 200 events queue). Intentional behavior change. Aligns async UX with the existing sync processor's UX.
Hard process kill (SIGKILL / OOM kill) Process terminated before shutdown() runs. In-memory queue lost. Inherent to any in-process buffer; same exposure as the sync processor's daemon thread.
Memory pressure Large LLM contexts × deep queue. 4000 × 10KB-sized payloads = Tens of MBs of process memory. Queue depth bounded; payload size is not.
Retry exhaustion SGP outage > ~50s (4 attempts × backoff to 20s). One batch (≤50 spans) dropped per outage window. Same exposure as the SDK sync side.
Loop-swap data loss Sync-ACP / per-request event loops where the loop swaps mid-process. Items in the previous loop's queue are lost. Rare in standard agentex deployments.
Shutdown timeout Queue near 4000 + slow network → 10s wait isn't enough. Whatever didn't drain in 10s is lost. Edge case.

Likelihood ranking, highest to lowest: live-tracing latency → hard kill loss → memory pressure → retry exhaustion → loop-swap → shutdown timeout.

Greptile Summary

This PR replaces the per-event await upsert_batch(items=[one_span]) in SGPAsyncTracingProcessor with an asyncio buffer-plus-flush model (queue max 4000, worker batches up to 50, flushes every 4s or at 200+ queued), bringing the async processor in line with the SDK's sync TraceQueueManager and removing awaited HTTP calls from the agent hot path.

  • P1 regression: sgp_span.input = span.input was silently removed from on_span_end. Callers that update span.input between start and end (e.g., growing conversation contexts) will have only the start-time input persisted to SGP. test_sgp_span_input_updated_on_end was simultaneously weakened to only assert upsert timing, not that the input was actually updated, masking this change.
  • P2: After a shutdown timeout, self._worker.cancel() is called but self._worker is never set to None, so a stray _ensure_started call during the brief cancellation window would find the task not-yet-done and skip creating a replacement worker.

Confidence Score: 3/5

Not safe to merge as-is — a silent behavioral regression drops updated span inputs that were set after on_span_start.

One P1 present: sgp_span.input is no longer updated in on_span_end, and the covering test was weakened to not assert the input value, so the regression ships silently. P1 ceiling is 4/5 but the fact that the test masking the regression was intentionally modified pulls confidence to 3/5.

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py — on_span_end input update and shutdown timeout cleanup; tests/lib/core/tracing/processors/test_sgp_tracing_processor.py — test_sgp_span_input_updated_on_end assertion coverage.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py Core implementation of the batching rewrite. One P1 regression: sgp_span.input is no longer updated in on_span_end, silently dropping input mutations made after span start. Shutdown timeout path also leaves _worker non-None after cancel, though this is P2.
tests/lib/core/tracing/processors/test_sgp_tracing_processor.py 9 new tests covering batching, edge cases, and retry logic. test_sgp_span_input_updated_on_end was weakened to check only upsert timing, no longer asserting that the input was actually updated on end, masking the P1 regression in the processor.

Sequence Diagram

sequenceDiagram
    participant Agent
    participant Processor as SGPAsyncTracingProcessor
    participant Queue as asyncio.Queue
    participant Worker as _run() task
    participant SGP as SGP API

    Agent->>Processor: on_span_start(span)
    Processor->>Processor: _ensure_started()
    Processor->>Queue: put_nowait(sgp_span)
    Note over Queue: qsize >= 200? set _flush_event

    Agent->>Processor: on_span_end(span)
    Processor->>Queue: put_nowait(sgp_span)

    loop Every 4s or flush_event set
        Worker->>Worker: _wait_for_flush_signal()
        Worker->>Queue: drain up to 50 items
        Worker->>SGP: upsert_batch(items=[<=50])
        Note over Worker,SGP: retry up to 4x on APIError
    end

    Agent->>Processor: shutdown()
    Processor->>Queue: re-enqueue unclosed spans
    Processor->>Worker: set _shutdown_event + _flush_event
    Worker->>Queue: final drain
    Worker->>SGP: upsert_batch(remaining items)
    Worker-->>Processor: done
    Processor-->>Agent: shutdown complete
Loading

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py:218-221
**`sgp_span.input` no longer updated on span end**

`on_span_end` previously contained `sgp_span.input = span.input`, which allowed callers to augment the input between start and end (the canonical case being LLM conversation messages that grow as the turn completes). That line is absent from this PR; the end-enqueue only carries `output`, `metadata`, and `end_time`. Any caller that sets `span.input` after `on_span_start` — exactly the scenario exercised by `test_sgp_span_input_updated_on_end` — will have the stale start-time input persisted to SGP instead of the final value.

The test was updated to check batching mechanics but the assertion that the updated input reached the upsert was removed, hiding this regression. If the intent is to intentionally align with the sync processor (which also skips `input` on end), that should be documented in the PR description and the test should be renamed to remove the misleading "input updated on end" framing.

### Issue 2 of 2
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py:252-254
After `asyncio.wait_for` raises `TimeoutError` the wrapped task is **already cancelled** internally by the wait machinery. Calling `self._worker.cancel()` a second time is harmless, but `self._worker` is never set to `None`, so a subsequent call to `_ensure_started` (e.g., a stray `on_span_start` after shutdown) will find `_worker.done() == False` (cancellation hasn't propagated yet) and skip creating a new worker, silently discarding those spans. Setting `self._worker = None` on the timeout path prevents this.

```suggestion
        except asyncio.TimeoutError:
            logger.warning(f"Async tracing worker did not exit within {SHUTDOWN_DRAIN_TIMEOUT}s; cancelling")
            self._worker.cancel()
            self._worker = None
```

Reviews (5): Last reviewed commit: "fix(tracing): import APIError from scale..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

@mohammadatallah-scale mohammadatallah-scale changed the base branch from mohammad/ove-2a-keepalive-async-client to main May 4, 2026 18:13
@mohammadatallah-scale mohammadatallah-scale marked this pull request as ready for review May 4, 2026 18:13
Comment thread src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
@mohammadatallah-scale mohammadatallah-scale force-pushed the mohammad/ove-2b-batch-span-events branch from 26306bc to dc129ef Compare May 4, 2026 18:24
`SGPAsyncTracingProcessor` previously `await client.spans.upsert_batch(items=[one_span])`
on every `on_span_start` and every `on_span_end`, awaited synchronously
on the agent's hot path. Under burst load this surfaces as start-span
timeouts. The synchronous processor inherits the SDK's
`TraceQueueManager` and is fine; this change brings the async one in
line.

Replace the per-event upsert with a buffer-plus-flush model in asyncio:

- asyncio.Queue (max 4000) plus an asyncio.Task worker that drains in
  batches of up to 50, every 4s or when 200+ events queued. Constants
  mirror scale_gp_beta.lib.tracing.trace_queue_manager.
- Retry policy mirrors the SDK: 4 attempts with exponential backoff
  (0.4s -> 20s capped). Unexpected exceptions are dropped, not retried.
- Per-iteration try / except in the worker so one bad batch doesn't
  break subsequent flushes.
- shutdown() drains the queue, signals the worker, joins with a 10s
  timeout. Spans whose end was never recorded are re-enqueued so they
  aren't silently lost.

Behavior preserved:
- on_span_start / on_span_end / shutdown signatures unchanged.
- _spans dict still tracks SGPSpan objects between start and end.
- Externally injected sgp_async_client is still respected.
- The existing AsyncSGPClient construction in __init__, with the
  max_keepalive_connections=0 workaround, is kept untouched.

Closes OVE-4.
@mohammadatallah-scale mohammadatallah-scale force-pushed the mohammad/ove-2b-batch-span-events branch from dc129ef to 81e2021 Compare May 4, 2026 18:31
@mohammadatallah-scale mohammadatallah-scale changed the title fix(tracing) [2/2]: batch span events in SGPAsyncTracingProcessor fix(tracing) batch span events in SGPAsyncTracingProcessor May 4, 2026
Code clarity changes (no behavior change):
- Split SGPAsyncTracingProcessor._run into _is_shutting_down,
  _wait_for_flush_signal, and _safe_drain helpers so the loop reads
  top-to-bottom: "while not shutting down, wait for trigger, drain."
- Add docstrings on _enqueue, _ensure_started, _drain, and
  _upsert_with_retry covering inputs, side effects, and dropped-batch
  semantics.

New tests (regression coverage that was missing):
- test_drain_splits_into_multiple_batches_above_max_batch_size: 80
  enqueued events split into multiple upsert_batch calls, each batch
  capped at MAX_BATCH_SIZE (50).
- test_worker_continues_after_unexpected_exception_in_one_batch: a
  RuntimeError on one upsert drops that batch; the worker keeps
  flushing and a subsequent batch lands. Exercises the per-iteration
  try/except in _run.
…se tests

Two related changes addressing review feedback:

1. Restore observability for disabled processors. When the rewrite
   moved the per-event 'SGP is disabled, skipping span upsert' warning
   out of on_span_start it left the disabled state silent. The original
   per-event log was spammy (one entry per span event at agent
   throughput) and inconsistent (only on_span_start had it, on_span_end
   did not). Replace it with a single warning at __init__ time.

2. Edge-case tests in TestSGPAsyncTracingProcessorEdgeCases:
   - test_disabled_processor_never_enqueues_or_calls_upsert: confirms
     a disabled processor builds no client, no queue, no worker, and
     shutdown is a no-op.
   - test_shutdown_is_safe_when_called_multiple_times: idempotency
     regression. Second shutdown after the worker has already exited
     does not re-flush or raise.
   - test_shutdown_before_any_event_is_noop: shutdown invoked before
     any span event must early-return without spinning up a worker.
   - test_apierror_triggers_retry_then_drops_batch_on_exhaustion:
     APIError is retried up to DEFAULT_RETRIES, batch is dropped after
     exhaustion. asyncio.sleep is patched to keep the test fast.
CI pyright reports reportPrivateImportUsage on
`from scale_gp_beta._exceptions import APIError` because the
`_exceptions` submodule is private. Re-export of `APIError` is
available at the package root, so use that instead.
Comment on lines 218 to 221
self._add_source_to_span(span)
sgp_span.input = span.input # type: ignore[assignment]
sgp_span.output = span.output # type: ignore[assignment]
sgp_span.metadata = span.data # type: ignore[assignment]
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 sgp_span.input no longer updated on span end

on_span_end previously contained sgp_span.input = span.input, which allowed callers to augment the input between start and end (the canonical case being LLM conversation messages that grow as the turn completes). That line is absent from this PR; the end-enqueue only carries output, metadata, and end_time. Any caller that sets span.input after on_span_start — exactly the scenario exercised by test_sgp_span_input_updated_on_end — will have the stale start-time input persisted to SGP instead of the final value.

The test was updated to check batching mechanics but the assertion that the updated input reached the upsert was removed, hiding this regression. If the intent is to intentionally align with the sync processor (which also skips input on end), that should be documented in the PR description and the test should be renamed to remove the misleading "input updated on end" framing.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Line: 218-221

Comment:
**`sgp_span.input` no longer updated on span end**

`on_span_end` previously contained `sgp_span.input = span.input`, which allowed callers to augment the input between start and end (the canonical case being LLM conversation messages that grow as the turn completes). That line is absent from this PR; the end-enqueue only carries `output`, `metadata`, and `end_time`. Any caller that sets `span.input` after `on_span_start` — exactly the scenario exercised by `test_sgp_span_input_updated_on_end` — will have the stale start-time input persisted to SGP instead of the final value.

The test was updated to check batching mechanics but the assertion that the updated input reached the upsert was removed, hiding this regression. If the intent is to intentionally align with the sync processor (which also skips `input` on end), that should be documented in the PR description and the test should be renamed to remove the misleading "input updated on end" framing.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

@smoreinis
Copy link
Copy Markdown
Contributor

curious what the overlap here is with https://github.com/scaleapi/scale-agentex-python/pull/303/changes which added a background queue for processor.on_span_start calls.
Hoping to better understand if that approach did not have the desired effect (moving a trace's on_span_start off the hot path), or if there's value in having both of these approaches implemented

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants