Add thread history projection observers#26527
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ca445e0899
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
liujianyeey-oss
left a comment
There was a problem hiding this comment.
Read through the three new projection observers — one Error-handling asymmetry stands out. TurnSummaryProjectionObserver and the v2 thread_history reducer both set TurnStatus::Failed when an EventMsg::Error arrives whose payload's affects_turn_status() returns true. LifecycleProjectionObserver doesn't have a corresponding arm, so the same event produces a TurnSummary Failed row and a v2 turn marked Failed, but no lifecycle mutation. Inline.
| "numTurns": payload.num_turns, | ||
| }), | ||
| ), | ||
| _ => return None, |
There was a problem hiding this comment.
turn_summary_projection.rs:98-104 and thread_history.rs:900-913 both treat EventMsg::Error with affects_turn_status() as a turn-failure signal:
// turn_summary_projection.rs:98
EventMsg::Error(payload) if payload.affects_turn_status() => {
let turn_id = self.current_turn_id.clone()?;
let summary = self.update_summary(&turn_id, |summary| {
summary.status = TurnStatus::Failed;
});
(turn_id, summary)
}
// thread_history.rs:900
fn handle_error(&mut self, payload: &ErrorEvent) {
if !payload.affects_turn_status() { return; }
let Some(turn) = self.current_turn.as_mut() else { return; };
turn.status = TurnStatus::Failed;
turn.error = Some(V2TurnError { ... });
}Here the same event falls into the catch-all and produces no mutation. The error classifier itself is broad — CodexErrorInfo::affects_turn_status() (protocol/src/protocol.rs:1640) returns true for ContextWindowExceeded, UsageLimitExceeded, ServerOverloaded, HttpConnectionFailed, InternalServerError, Unauthorized, BadRequest, SandboxError, ResponseStreamDisconnected, ResponseTooManyFailedAttempts, Other, plus ErrorEvent::affects_turn_status() at :1813 returns true even when codex_error_info is None. So the gap fires on any common turn-killing error.
session.rs:600-606 wires all three observers into the same projection vector, so each append batch runs them in lockstep against the same RolloutItem stream — divergence is observable per-batch in live_thread.rs::append_items (the three sets of mutations end up in the same AppendThreadItemsParams.thread_history_mutations). For this PR's tests it doesn't surface because there's no Error case in lifecycle_projection_tests.rs, but a durable lifecycle/outbox consumer that joins on TurnSummary's Failed rows will see orphans.
Two reasonable shapes for the fix, depending on what you want the lifecycle outbox to carry:
-
Add an
EventMsg::Error(payload) if payload.affects_turn_status()arm that emits something like("turn.failed", current_turn_id_or_payload_turn_id, json!({ "errorMessage": payload.message, "codexErrorInfo": ... })). This matches the spirit of the other three arms (one event type per terminal-ish lifecycle transition) and keeps the outbox row symmetric with what TurnSummary writes. -
If lifecycle mutations are intentionally a smaller set than TurnSummary mutations, a comment at the catch-all spelling out which
EventMsgvariants are deliberately non-lifecycle (Error, ItemStarted, ItemCompleted, …) would make that explicit — currently it reads like "these are the four we know about" rather than "these are the four we want".
Worth extending lifecycle_projection_tests.rs either way: the existing turn_summary_projection_tests.rs exercises the Error path, and giving lifecycle the same coverage would surface anything else that's missed when new lifecycle-relevant events get added later.
400a442 to
5702f96
Compare
LiveThreadruns installedThreadHistoryProjectionObservers for each append and forwards typed thread-history mutations to the ThreadStore alongside canonical rollout items. This allows ThreadStore implementation to persist these projections, which were previously only ever derived in memory from the underlying RolloutItems.ThreadItemProjectionObserver, which reduces raw append events into ThreadItem upserts/tombstones and carries checkpoint state for open items.TurnSummaryProjectionObserver, which derives turn summary mutations directly from turn lifecycle/status events.LifecycleProjectionObserver, which maps lifecycle events directly into lifecycle mutations.ThreadHistoryBuilderand the ThreadItem observer both sit on the same event-to-turn reduction.