Skip to content

fix(tests): repair test_streaming_model so all 28 tests run and pass#334

Merged
smoreinis merged 1 commit intomainfrom
fix/streaming-model-tests
Apr 30, 2026
Merged

fix(tests): repair test_streaming_model so all 28 tests run and pass#334
smoreinis merged 1 commit intomainfrom
fix/streaming-model-tests

Conversation

@smoreinis
Copy link
Copy Markdown
Contributor

@smoreinis smoreinis commented Apr 30, 2026

Summary

The entire test_streaming_model.py suite has been broken on main for some time — running it shows 4 failed + 24 errors, 0 passing. None of those tests have actually been exercising TemporalStreamingModel since whichever refactor introduced the drift. This PR fixes the four root causes so the suite runs end-to-end (28/28 pass) and starts catching real regressions again.

Bugs fixed

Bug 1: Fixture name typo (24 errors)

conftest.py defines the streaming-mock fixture as mock_adk_streaming (no leading underscore), but every test in TestStreamingModelSettings and TestStreamingModelTools requests it as _mock_adk_streaming. Pytest can't resolve the fixture, so all 24 tests error before their bodies run.

The fixture is autouse=True and the parameter value was never used in any test body, so the param was purely vestigial — the original author appears to have intended the leading underscore as the standard pytest convention for intentionally unused side-effect fixtures, but applied it to the consumer instead of the fixture name.

Fix: Replaced the dead _mock_adk_streaming parameter with the new _streaming_context_vars fixture (see Bug 2), which these tests genuinely need.

Bug 2: Test/code drift on validation contract (3 of 4 original failures)

TemporalStreamingModel.get_response() reads task_id, trace_id, and parent_span_id from ContextVars populated by ContextInterceptor from request headers in real Temporal flows. Tests had been passing task_id=... as a kwarg, which is silently swallowed by **kwargs and ignored — so all three ContextVars stayed at their defaults, the validation at the top of get_response raised, and no test in TestStreamingModelBasics ever reached its assertions.

Fix: New _streaming_context_vars fixture in conftest.py populates all three ContextVars (with proper teardown via tokens), simulating what ContextInterceptor does in production. Tests that need the validation to pass now request this fixture.

Bug 3: sample_computer_tool fixture vs. recent ComputerTool narrowing (1 error)

Commit 4c269080 ("Narrow ComputerTool.computer union for Responses API serialization") added isinstance(computer, (Computer, AsyncComputer)) validation to _convert_tools, but sample_computer_tool still constructed a bare MagicMock(). test_computer_tool started failing at conversion time.

Fix: Switched to MagicMock(spec=Computer) so isinstance passes. Same pattern other fixtures in the file already use (HostedMCPTool, ImageGenerationTool, etc.).

Bug 4: Stale event mocks (1 of 4 original failures + 2 surfaced after Bug 2)

Three tests in TestStreamingModelBasics that assert on streaming_task_message_context calls built their event sequences with raw MagicMock(type=\"...\"). Production dispatches via isinstance(event, ResponseOutputItemAddedEvent) etc. — bare MagicMock never satisfies that, so dispatch was silently skipped and the assertions failed.

Fix: Switched to MagicMock(spec=ResponseOutputItemAddedEvent) (and the other event types) for each event in those streams. spec= makes isinstance pass without triggering pydantic validation on the event's required fields (which would also need sequence_number, real Response instances, etc.).

test_task_id_threading additionally asserted against a hardcoded task_id=\"test_task_12345\" that never actually got threaded anywhere — the kwarg was ignored just like in Bug 2. Updated the assertion to use the value yielded by _streaming_context_vars, which is what production reads from the ContextVar.

Verification

Check Result
pytest src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py 28 passed in 8.01s (was 4 failed + 24 errors)
ruff check on changed files 0 errors
pyright on changed files 0 errors, 0 warnings

Test plan

  • Run the full file: 28/28 pass locally
  • Lint and typecheck on changed files clean
  • Confirm CI on the PR shows the same 28/28 (was 0/28 on main)
  • Confirm no other test files relied on the _mock_adk_streaming name (none found in the codebase, but worth a CI re-check)

Why these are real fixes, not just "make the failures go away"

  • The new _streaming_context_vars fixture isn't a workaround — it explicitly mirrors what ContextInterceptor does in production. Tests that exercise get_response() now actually hit the same validation/dispatch path real flows do.
  • MagicMock(spec=...) is the canonical pytest pattern for satisfying isinstance without rebuilding production data structures from scratch. Same approach is already used elsewhere in this conftest.
  • Pre-existing assertions about streaming_task_message_context being called are now actually verifiable — previously the dispatcher was skipping over MagicMock events without anyone noticing.

Greptile Summary

This PR correctly fixes four root causes that left all 28 streaming model tests broken. The conftest.py changes are clean — the new _streaming_context_vars fixture properly mirrors production's ContextInterceptor with correct token-based teardown, and the MagicMock(spec=Computer) fix is the right approach. One residual bare mock in test_multiple_computer_tools_error could cause the test to assert the wrong exception if the isinstance guard fires first.

Confidence Score: 4/5

Safe to merge after addressing the bare MagicMock() for computer2 in test_multiple_computer_tools_error; all other findings are P2 quality suggestions.

One P1 finding remains: computer2 = MagicMock() (no spec=Computer) in test_multiple_computer_tools_error could cause the test to raise the wrong error if the isinstance guard in _convert_tools fires before the multi-tool check. All other issues are P2 concerns that don't block merge.

src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py — specifically line 551 (computer2 = MagicMock() should be MagicMock(spec=Computer)).

Important Files Changed

Filename Overview
src/agentex/lib/core/temporal/plugins/openai_agents/tests/conftest.py Adds _streaming_context_vars fixture with proper ContextVar setup/teardown via tokens, and fixes sample_computer_tool to use MagicMock(spec=Computer) — both changes are correct and well-motivated.
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py Replaces _mock_adk_streaming with _streaming_context_vars, upgrades event mocks to spec-bound objects in TestStreamingModelBasics, and rewires test_task_id_threading to use the ContextVar value. One residual bare MagicMock() for computer2 (line 551) may fail the isinstance guard before the intended multi-tool error is raised; self-referential initial_content assertion (line 779) is vacuously true; Settings/Tools tests still use bare completed-event mocks that skip production dispatch.

Sequence Diagram

sequenceDiagram
    participant Test
    participant Fixture as _streaming_context_vars
    participant ContextVar as ContextVars (streaming_task_id etc.)
    participant GetResponse as TemporalStreamingModel.get_response()
    participant Interceptor as ContextInterceptor (production)

    Note over Interceptor,ContextVar: Production flow
    Interceptor->>ContextVar: set(task_id, trace_id, span_id) from request headers

    Note over Test,ContextVar: Test flow (this PR)
    Test->>Fixture: request _streaming_context_vars
    Fixture->>ContextVar: set(sample_task_id, test-trace-id, test-parent-span-id)
    Fixture-->>Test: yield sample_task_id (token held)
    Test->>GetResponse: get_response(...) — no task_id kwarg needed
    GetResponse->>ContextVar: read streaming_task_id.get()
    ContextVar-->>GetResponse: sample_task_id
    GetResponse-->>Test: ModelResponse
    Test->>Fixture: teardown
    Fixture->>ContextVar: reset(token) — restore prior state
Loading

Comments Outside Diff (3)

  1. src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py, line 551 (link)

    P1 Bare MagicMock() for second ComputerTool may fail isinstance check before the multi-tool guard

    The PR's Bug 3 fix (switching sample_computer_tool to MagicMock(spec=Computer)) was needed because production's _convert_tools runs isinstance(computer, (Computer, AsyncComputer)) validation. The same validation applies to every ComputerTool passed in, including second_computer_tool here. If the isinstance check fires before the "only one computer tool" guard, this test will raise a different error than ValueError("You can only provide one computer tool") — causing it to fail or produce a false positive.

    Apply the same pattern used in sample_computer_tool:

    computer2 = MagicMock(spec=Computer)
    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py
    Line: 551
    
    Comment:
    **Bare `MagicMock()` for second `ComputerTool` may fail isinstance check before the multi-tool guard**
    
    The PR's Bug 3 fix (switching `sample_computer_tool` to `MagicMock(spec=Computer)`) was needed because production's `_convert_tools` runs `isinstance(computer, (Computer, AsyncComputer))` validation. The same validation applies to every `ComputerTool` passed in, including `second_computer_tool` here. If the isinstance check fires before the "only one computer tool" guard, this test will raise a different error than `ValueError("You can only provide one computer tool")` — causing it to fail or produce a false positive.
    
    Apply the same pattern used in `sample_computer_tool`:
    ```python
    computer2 = MagicMock(spec=Computer)
    ```
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

  2. src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py, line 777-780 (link)

    P2 Self-referential initial_content assertion is trivially true

    The assert_called_with(...) check reads the actual call_args.kwargs['initial_content'] as the expected value, so it unconditionally passes for any non-empty call. Only the task_id comparison does real work here. Either assert against a concrete expected value for initial_content, or drop to assert_called_once() if the content is intentionally unspecified.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py
    Line: 777-780
    
    Comment:
    **Self-referential `initial_content` assertion is trivially true**
    
    The `assert_called_with(...)` check reads the actual `call_args.kwargs['initial_content']` as the *expected* value, so it unconditionally passes for any non-empty call. Only the `task_id` comparison does real work here. Either assert against a concrete expected value for `initial_content`, or drop to `assert_called_once()` if the content is intentionally unspecified.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

  3. src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py, line 29-32 (link)

    P2 Bare MagicMock completed events in TestStreamingModelSettings / TestStreamingModelTools skip production event dispatch

    All ~24 mock streams in these classes still use MagicMock(type="response.completed", ...). Because production dispatches via isinstance(event, ResponseCompletedEvent), these bare mocks fail the isinstance check and the completed event is silently ignored. The tests pass only because they assert on responses.create call arguments (captured before streaming), not on the returned ModelResponse. Consider applying MagicMock(spec=ResponseCompletedEvent) (with .response = MagicMock(output=[], usage=MagicMock())), consistent with what TestStreamingModelBasics does after this PR.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py
    Line: 29-32
    
    Comment:
    **Bare `MagicMock` completed events in `TestStreamingModelSettings` / `TestStreamingModelTools` skip production event dispatch**
    
    All ~24 mock streams in these classes still use `MagicMock(type="response.completed", ...)`. Because production dispatches via `isinstance(event, ResponseCompletedEvent)`, these bare mocks fail the isinstance check and the completed event is silently ignored. The tests pass only because they assert on `responses.create` call arguments (captured before streaming), not on the returned `ModelResponse`. Consider applying `MagicMock(spec=ResponseCompletedEvent)` (with `.response = MagicMock(output=[], usage=MagicMock())`), consistent with what `TestStreamingModelBasics` does after this PR.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py:551
**Bare `MagicMock()` for second `ComputerTool` may fail isinstance check before the multi-tool guard**

The PR's Bug 3 fix (switching `sample_computer_tool` to `MagicMock(spec=Computer)`) was needed because production's `_convert_tools` runs `isinstance(computer, (Computer, AsyncComputer))` validation. The same validation applies to every `ComputerTool` passed in, including `second_computer_tool` here. If the isinstance check fires before the "only one computer tool" guard, this test will raise a different error than `ValueError("You can only provide one computer tool")` — causing it to fail or produce a false positive.

Apply the same pattern used in `sample_computer_tool`:
```python
computer2 = MagicMock(spec=Computer)
```

### Issue 2 of 3
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py:777-780
**Self-referential `initial_content` assertion is trivially true**

The `assert_called_with(...)` check reads the actual `call_args.kwargs['initial_content']` as the *expected* value, so it unconditionally passes for any non-empty call. Only the `task_id` comparison does real work here. Either assert against a concrete expected value for `initial_content`, or drop to `assert_called_once()` if the content is intentionally unspecified.

### Issue 3 of 3
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py:29-32
**Bare `MagicMock` completed events in `TestStreamingModelSettings` / `TestStreamingModelTools` skip production event dispatch**

All ~24 mock streams in these classes still use `MagicMock(type="response.completed", ...)`. Because production dispatches via `isinstance(event, ResponseCompletedEvent)`, these bare mocks fail the isinstance check and the completed event is silently ignored. The tests pass only because they assert on `responses.create` call arguments (captured before streaming), not on the returned `ModelResponse`. Consider applying `MagicMock(spec=ResponseCompletedEvent)` (with `.response = MagicMock(output=[], usage=MagicMock())`), consistent with what `TestStreamingModelBasics` does after this PR.

Reviews (1): Last reviewed commit: "fix(tests): repair test_streaming_model ..." | Re-trigger Greptile

Four pre-existing bugs left this entire test file unrunnable on main (4
failures + 24 errors); fixing them here so the suite actually exercises
TemporalStreamingModel and protects against regressions.

Bug 1 (24 errors): `conftest.py` defines fixture `mock_adk_streaming` (no
underscore) but every test in TestStreamingModelSettings and
TestStreamingModelTools requested it as `_mock_adk_streaming`, so pytest
failed to resolve the fixture before the body ever ran. The fixture is
``autouse=True`` and the param value was never used in any test body, so
the parameter was vestigial — replaced with `_streaming_context_vars`,
which provides the ContextVar setup these tests now actually need.

Bug 2 (4 failures): `TemporalStreamingModel.get_response()` reads
`task_id`, `trace_id`, and `parent_span_id` from ContextVars populated
by `ContextInterceptor` from request headers in real Temporal flows.
Tests had been passing `task_id=...` as a kwarg, which is silently
swallowed by `**kwargs` and ignored, so all three ContextVars stayed at
their defaults and the validation at the top of `get_response` raised
before any work happened. New `_streaming_context_vars` fixture in
conftest sets all three vars (and resets them on teardown), simulating
what `ContextInterceptor` does in production.

Bug 3 (test_computer_tool): A recent commit narrowed `ComputerTool`
serialization to require an actual `Computer`/`AsyncComputer` instance,
but `sample_computer_tool` still built a bare `MagicMock`. Switched to
`MagicMock(spec=Computer)` so the production isinstance check passes.

Bug 4 (3 streaming-context tests): The 3 tests in TestStreamingModelBasics
that assert on `streaming_task_message_context` calls built event
sequences with raw `MagicMock(type="...")`. Production dispatches via
`isinstance(event, ResponseOutputItemAddedEvent)` etc., which `MagicMock`
without `spec` never satisfies, so dispatch was silently skipped and
the assertions failed. Switched to `MagicMock(spec=...)` for each event
type — passes isinstance without triggering pydantic validation on the
event's required fields. Also fixed `test_task_id_threading` which had
been asserting against a hardcoded `task_id="test_task_12345"` that was
never actually threaded anywhere (the kwarg was ignored, just like in
Bug 2); it now asserts against the value yielded by the fixture, which
is the value production reads from the ContextVar.

After all four fixes: 28/28 pass, ruff clean, pyright clean.
@smoreinis smoreinis merged commit 7e5e69c into main Apr 30, 2026
32 checks passed
@smoreinis smoreinis deleted the fix/streaming-model-tests branch April 30, 2026 15:50
@stainless-app stainless-app Bot mentioned this pull request Apr 30, 2026
smoreinis added a commit that referenced this pull request Apr 30, 2026
After merging the test-suite repair from main (#334) into this branch, one
model test (test_responses_api_streaming) regressed because its
assert_called_with strict-matched all kwargs of streaming_task_message_context
and didn't tolerate the new `streaming_mode='coalesced'` kwarg this PR
adds. Switched to assert_called() + targeted kwarg checks so the test
verifies what it cares about (task_id threading) without locking in
implementation details.

Replaced the ad-hoc smoke scripts that lived in conversation with a real
pytest module at tests/lib/core/services/adk/test_streaming.py covering:

- _delta_char_len, _can_merge, _merge_pair: per-channel correctness +
  None-handling
- _merge_consecutive: pure-text collapse, cross-channel order preservation,
  per-channel reconstruction matches per-token semantics
- CoalescingBuffer: first-delta-immediate flush within ~20ms,
  size-threshold flush before timer fires, multi-delta coalescing within
  one window, idle close, add-after-close no-op
- CoalescingBuffer cancel-during-flush regression test for the P1 fix:
  five queued chunks must all surface across publishes when close()
  cancels mid-flush (asserts substring presence rather than exact
  ordering, since the documented trade-off allows duplicates of the
  in-flight item)
- StreamingTaskMessageContext mode dispatch: "off" suppresses publishes
  but persists full content, "per_token" publishes each delta synchronously,
  "coalesced" batches and persists full content
smoreinis added a commit that referenced this pull request Apr 30, 2026
…ar window) (#333)

* perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window)

Per-token Redis publishes from TemporalStreamingModel were adding ~45s
(56-62%) overhead to agent response latency, mostly from head-of-line
blocking on the model's event loop: each `await streaming_context.stream_update(...)`
inside the OpenAI stream `async for` paused token consumption until the
publish round-trip completed.

This change introduces a `CoalescingBuffer` driven by an `asyncio.Event`,
so the producer never awaits on Redis. Deltas are merged consecutive-only
(preserving character order in every (type, index) channel) and flushed
on a 50ms timer, on a 128-char size threshold, or immediately for the
first delta to keep perceived responsiveness high. The buffer's `close()`
drains remaining deltas before the DONE event, so consumers see the full
sequence in order.

A new `StreamingMode = Literal["off", "per_token", "coalesced"]` lives
in `streaming.py` as the single source of truth and is plumbed through
the adk streaming module, `StreamingService.streaming_task_message_context`,
and `StreamingTaskMessageContext`. Default is `"coalesced"` everywhere,
so all 13+ existing context callers (claude_agents, langgraph, litellm
provider, openai sync provider, etc.) benefit automatically.

* chore(streaming): fix import ordering (ruff I001)

* fix(streaming): address greptile review findings

- _run: when CancelledError is raised mid-flush in the for-loop, re-enqueue
  the in-flight item plus any remaining items in the local `drained` list
  back into self._buf so close()'s final drain can recover them. Previously
  the local `drained` list was unreachable after CancelledError exited the
  for-loop, causing the last coalesced batch to be silently dropped on
  close-during-flush races. Trade-off: the in-flight item may be duplicated
  on the consumer side (Redis pub may have completed before cancel was
  delivered), which is preferable to silent loss for streaming UX.

- _merge_pair: replace `return b` fallback with AssertionError. All six
  current TaskMessageDelta variants have explicit isinstance branches, so
  the fallback is unreachable today. But _can_merge returns True for any
  same-type pair, so adding a 7th delta variant without updating
  _merge_pair would silently drop `a`'s accumulated content. Asserting
  turns a future silent data-loss into an immediate, diagnosable crash.

* test(streaming): add coalescing-layer tests; loosen one model assertion

After merging the test-suite repair from main (#334) into this branch, one
model test (test_responses_api_streaming) regressed because its
assert_called_with strict-matched all kwargs of streaming_task_message_context
and didn't tolerate the new `streaming_mode='coalesced'` kwarg this PR
adds. Switched to assert_called() + targeted kwarg checks so the test
verifies what it cares about (task_id threading) without locking in
implementation details.

Replaced the ad-hoc smoke scripts that lived in conversation with a real
pytest module at tests/lib/core/services/adk/test_streaming.py covering:

- _delta_char_len, _can_merge, _merge_pair: per-channel correctness +
  None-handling
- _merge_consecutive: pure-text collapse, cross-channel order preservation,
  per-channel reconstruction matches per-token semantics
- CoalescingBuffer: first-delta-immediate flush within ~20ms,
  size-threshold flush before timer fires, multi-delta coalescing within
  one window, idle close, add-after-close no-op
- CoalescingBuffer cancel-during-flush regression test for the P1 fix:
  five queued chunks must all surface across publishes when close()
  cancels mid-flush (asserts substring presence rather than exact
  ordering, since the documented trade-off allows duplicates of the
  in-flight item)
- StreamingTaskMessageContext mode dispatch: "off" suppresses publishes
  but persists full content, "per_token" publishes each delta synchronously,
  "coalesced" batches and persists full content

* chore(streaming): route TemporalStreamingModel logger through make_logger

The model file used raw ``logging.getLogger("agentex.temporal.streaming")``,
which returns a logger with no handler attached and no level configured —
so the existing ``[TemporalStreamingModel] Initialized ... streaming_mode=...``
INFO log was silently dropped, making it impossible to verify at runtime
that a coalesced (or any) streaming mode was actually wired.

Switch to the SDK's ``make_logger`` helper (level=INFO, RichHandler in
local mode, StreamHandler otherwise) used everywhere else in the SDK.
The explicit logger name ``agentex.temporal.streaming`` is preserved so
any external logging configuration targeting that name keeps working.
declan-scale added a commit that referenced this pull request Apr 30, 2026
* feat(api): api update

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* chore(internal): more robust bootstrap script

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* fix: use correct field name format for multipart file arrays

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* feat: support setting headers via env

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* fix: allow litellm security patch (#336)

* fix(adk): Always inject headers on execute activity (#337)

* perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window) (#333)

* perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window)

Per-token Redis publishes from TemporalStreamingModel were adding ~45s
(56-62%) overhead to agent response latency, mostly from head-of-line
blocking on the model's event loop: each `await streaming_context.stream_update(...)`
inside the OpenAI stream `async for` paused token consumption until the
publish round-trip completed.

This change introduces a `CoalescingBuffer` driven by an `asyncio.Event`,
so the producer never awaits on Redis. Deltas are merged consecutive-only
(preserving character order in every (type, index) channel) and flushed
on a 50ms timer, on a 128-char size threshold, or immediately for the
first delta to keep perceived responsiveness high. The buffer's `close()`
drains remaining deltas before the DONE event, so consumers see the full
sequence in order.

A new `StreamingMode = Literal["off", "per_token", "coalesced"]` lives
in `streaming.py` as the single source of truth and is plumbed through
the adk streaming module, `StreamingService.streaming_task_message_context`,
and `StreamingTaskMessageContext`. Default is `"coalesced"` everywhere,
so all 13+ existing context callers (claude_agents, langgraph, litellm
provider, openai sync provider, etc.) benefit automatically.

* chore(streaming): fix import ordering (ruff I001)

* fix(streaming): address greptile review findings

- _run: when CancelledError is raised mid-flush in the for-loop, re-enqueue
  the in-flight item plus any remaining items in the local `drained` list
  back into self._buf so close()'s final drain can recover them. Previously
  the local `drained` list was unreachable after CancelledError exited the
  for-loop, causing the last coalesced batch to be silently dropped on
  close-during-flush races. Trade-off: the in-flight item may be duplicated
  on the consumer side (Redis pub may have completed before cancel was
  delivered), which is preferable to silent loss for streaming UX.

- _merge_pair: replace `return b` fallback with AssertionError. All six
  current TaskMessageDelta variants have explicit isinstance branches, so
  the fallback is unreachable today. But _can_merge returns True for any
  same-type pair, so adding a 7th delta variant without updating
  _merge_pair would silently drop `a`'s accumulated content. Asserting
  turns a future silent data-loss into an immediate, diagnosable crash.

* test(streaming): add coalescing-layer tests; loosen one model assertion

After merging the test-suite repair from main (#334) into this branch, one
model test (test_responses_api_streaming) regressed because its
assert_called_with strict-matched all kwargs of streaming_task_message_context
and didn't tolerate the new `streaming_mode='coalesced'` kwarg this PR
adds. Switched to assert_called() + targeted kwarg checks so the test
verifies what it cares about (task_id threading) without locking in
implementation details.

Replaced the ad-hoc smoke scripts that lived in conversation with a real
pytest module at tests/lib/core/services/adk/test_streaming.py covering:

- _delta_char_len, _can_merge, _merge_pair: per-channel correctness +
  None-handling
- _merge_consecutive: pure-text collapse, cross-channel order preservation,
  per-channel reconstruction matches per-token semantics
- CoalescingBuffer: first-delta-immediate flush within ~20ms,
  size-threshold flush before timer fires, multi-delta coalescing within
  one window, idle close, add-after-close no-op
- CoalescingBuffer cancel-during-flush regression test for the P1 fix:
  five queued chunks must all surface across publishes when close()
  cancels mid-flush (asserts substring presence rather than exact
  ordering, since the documented trade-off allows duplicates of the
  in-flight item)
- StreamingTaskMessageContext mode dispatch: "off" suppresses publishes
  but persists full content, "per_token" publishes each delta synchronously,
  "coalesced" batches and persists full content

* chore(streaming): route TemporalStreamingModel logger through make_logger

The model file used raw ``logging.getLogger("agentex.temporal.streaming")``,
which returns a logger with no handler attached and no level configured —
so the existing ``[TemporalStreamingModel] Initialized ... streaming_mode=...``
INFO log was silently dropped, making it impossible to verify at runtime
that a coalesced (or any) streaming mode was actually wired.

Switch to the SDK's ``make_logger`` helper (level=INFO, RichHandler in
local mode, StreamHandler otherwise) used everywhere else in the SDK.
The explicit logger name ``agentex.temporal.streaming`` is preserved so
any external logging configuration targeting that name keeps working.

* codegen metadata

* feat(api): api update

* release: 0.10.3

---------

Co-authored-by: stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com>
Co-authored-by: Brandon Allen <brandon.allen@scale.com>
Co-authored-by: Declan Brady <declan.brady@scale.com>
Co-authored-by: Stas Moreinis <stas.moreinis@scale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants