Route Python SDK turn notifications by ID#21778
Conversation
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
|
@codex review this |
There was a problem hiding this comment.
💡 Codex Review
turn_start() now registers the returned turn ID, so all turn/* and item notifications for that turn are routed away from the global queue. stream_until_methods("turn/completed") still waits on next_notification(), so the completion event is never observed and callers can block indefinitely. Route turn-scoped targets through next_turn_notification() or stop exposing this helper for turn methods.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
Co-authored-by: Codex <noreply@openai.com>
|
@codex review this |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b22c46a66b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| turn_queues = list(self._turn_notifications.values()) | ||
| # Put the same transport failure into every queue so no SDK call blocks | ||
| # forever waiting for a response that cannot arrive. | ||
| for waiter in response_waiters: | ||
| waiter.put(exc) | ||
| for turn_queue in turn_queues: | ||
| turn_queue.put(exc) | ||
| self._global_notifications.put(exc) |
There was a problem hiding this comment.
Preserve transport failures for pending turns
fail_all only wakes queues already in _turn_notifications. Early turn events can be buffered in _pending_turn_notifications before turn_start registers the turn; if stdout closes after the response is delivered but before registration, the handle drains those pending events and then blocks forever instead of seeing the transport error.
Useful? React with 👍 / 👎.
|
One thing I’d consider changing in def _notification_turn_id(self, notification: Notification) -> str | None:
payload = notification.payload
turn_id = getattr(payload, "turn_id", None)
if isinstance(turn_id, str):
return turn_id
turn = getattr(payload, "turn", None)
nested_turn_id = getattr(turn, "id", None)
if isinstance(nested_turn_id, str):
return nested_turn_id
return NoneThe For example, the SDK generator could emit a helper next to def notification_turn_id(notification: Notification) -> str | None:
payload = notification.payload
if isinstance(payload, DIRECT_TURN_ID_NOTIFICATION_TYPES):
return payload.turn_id
if isinstance(payload, NESTED_TURN_NOTIFICATION_TYPES):
return payload.turn.id
if isinstance(payload, UnknownNotification):
raw_turn_id = payload.params.get("turnId")
if isinstance(raw_turn_id, str):
return raw_turn_id
raw_turn = payload.params.get("turn")
if isinstance(raw_turn, dict) and isinstance(raw_turn.get("id"), str):
return raw_turn["id"]
return NoneThen |
owenlin0
left a comment
There was a problem hiding this comment.
btw it seems like a TurnHandle that is created but never consumed can leave a live queue forever?
owenlin0
left a comment
There was a problem hiding this comment.
pre-approving in case you want to address the above as followups
Generate typed turn-routing metadata for notifications, use it in the router, clear pending turn notifications when unregistered turns complete, and route stream_text through the per-turn queue. Add focused sync and async coverage for interleaved turn routing and async delegation. Co-authored-by: Codex <noreply@openai.com>
Drive interleaved turn routing through the real client reader loop with raw notification messages, so the test covers notification coercion, generated turn routing, and router demux behavior rather than calling the router directly. Co-authored-by: Codex <noreply@openai.com>
Why
The Python SDK previously protected the stdio transport with a single active turn-consumer guard. That avoided competing reads from stdout, but it also meant one
Codex/AsyncCodexclient could not stream multiple active turns at the same time. Notifications could also arrive before the caller received aTurnHandleand registered for streaming, so the SDK needed an explicit routing layer instead of letting individual API calls read directly from the shared transport.What Changed
MessageRouterthat owns per-request response queues, per-turn notification queues, pending turn-notification replay, and global notification delivery behind a single stdout reader thread.TurnHandle.stream()/run()andstream_text()consume only notifications for their own turn ID, whileAsyncAppServerClientno longer serializes all transport calls behind one async lock.Validation
uv run --extra dev ruff format scripts/update_sdk_artifacts.py src/codex_app_server/_message_router.py src/codex_app_server/client.py src/codex_app_server/generated/notification_registry.py tests/test_client_rpc_methods.py tests/test_public_api_runtime_behavior.py tests/test_async_client_behavior.pyuv run --extra dev ruff check scripts/update_sdk_artifacts.py src/codex_app_server/_message_router.py src/codex_app_server/client.py src/codex_app_server/generated/notification_registry.py tests/test_client_rpc_methods.py tests/test_public_api_runtime_behavior.py tests/test_async_client_behavior.pyuv run --extra dev pytest tests/test_client_rpc_methods.py tests/test_public_api_runtime_behavior.py tests/test_async_client_behavior.pygit diff --check