From 1bd6754d7992a48880d9132a1953e6628e7a0741 Mon Sep 17 00:00:00 2001 From: rhan-oai Date: Wed, 29 Apr 2026 14:14:04 -0700 Subject: [PATCH] fix guardian reused review attribution --- codex-rs/core/src/guardian/review_session.rs | 410 ++++++++++++++++++- codex-rs/core/src/guardian/tests.rs | 109 +++++ 2 files changed, 509 insertions(+), 10 deletions(-) diff --git a/codex-rs/core/src/guardian/review_session.rs b/codex-rs/core/src/guardian/review_session.rs index 22651c23d8e6..ac8f9f4c9953 100644 --- a/codex-rs/core/src/guardian/review_session.rs +++ b/codex-rs/core/src/guardian/review_session.rs @@ -13,6 +13,7 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::Op; @@ -452,6 +453,18 @@ impl GuardianReviewSessionManager { } } + #[cfg(test)] + pub(crate) async fn send_trunk_event_raw_for_test(&self, event: Event) { + let trunk = self + .state + .lock() + .await + .trunk + .clone() + .expect("guardian trunk should exist"); + trunk.codex.session.send_event_raw(event).await; + } + async fn remove_trunk_if_current( &self, trunk: &Arc, @@ -699,8 +712,8 @@ async fn run_review_on_session( })), ) .await; - match submit_result { - Ok(Ok(_)) => {} + let child_turn_id = match submit_result { + Ok(Ok(child_turn_id)) => child_turn_id, Ok(Err(err)) => { return ( GuardianReviewSessionOutcome::SessionFailed(err.into()), @@ -709,11 +722,12 @@ async fn run_review_on_session( ); } Err(outcome) => return (outcome, false, analytics_result), - } + }; analytics_result.reviewed_action_truncated = reviewed_action_truncated; let outcome = wait_for_guardian_review( review_session, + child_turn_id.as_str(), deadline, params.external_cancel.as_ref(), &mut analytics_result, @@ -758,6 +772,7 @@ async fn load_rollout_items_for_fork( async fn wait_for_guardian_review( review_session: &GuardianReviewSession, + expected_turn_id: &str, deadline: tokio::time::Instant, external_cancel: Option<&CancellationToken>, analytics_result: &mut GuardianReviewAnalyticsResult, @@ -769,7 +784,12 @@ async fn wait_for_guardian_review( loop { tokio::select! { _ = &mut timeout => { - let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok(); + let keep_review_session = interrupt_and_drain_turn( + &review_session.codex, + expected_turn_id, + ) + .await + .is_ok(); return (GuardianReviewSessionOutcome::TimedOut, keep_review_session, false); } _ = async { @@ -779,11 +799,17 @@ async fn wait_for_guardian_review( std::future::pending::<()>().await; } } => { - let keep_review_session = interrupt_and_drain_turn(&review_session.codex).await.is_ok(); + let keep_review_session = interrupt_and_drain_turn( + &review_session.codex, + expected_turn_id, + ) + .await + .is_ok(); return (GuardianReviewSessionOutcome::Aborted, keep_review_session, false); } event = review_session.codex.next_event() => { match event { + Ok(event) if !event_matches_turn(&event, expected_turn_id) => {} Ok(event) => match event.msg { EventMsg::TurnComplete(turn_complete) => { analytics_result.time_to_first_token_ms = turn_complete @@ -825,6 +851,20 @@ async fn wait_for_guardian_review( } } +fn event_matches_turn(event: &Event, expected_turn_id: &str) -> bool { + if event.id != expected_turn_id { + return false; + } + + match &event.msg { + EventMsg::TurnComplete(turn_complete) => turn_complete.turn_id == expected_turn_id, + EventMsg::TurnAborted(turn_aborted) => { + turn_aborted.turn_id.as_deref() == Some(expected_turn_id) + } + _ => true, + } +} + pub(crate) fn build_guardian_review_session_config( parent_config: &Config, live_network_config: Option, @@ -932,16 +972,18 @@ async fn run_before_review_deadline_with_cancel( result } -async fn interrupt_and_drain_turn(codex: &Codex) -> anyhow::Result<()> { +async fn interrupt_and_drain_turn(codex: &Codex, expected_turn_id: &str) -> anyhow::Result<()> { let _ = codex.submit(Op::Interrupt).await; tokio::time::timeout(GUARDIAN_INTERRUPT_DRAIN_TIMEOUT, async { loop { let event = codex.next_event().await?; - if matches!( - event.msg, - EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_) - ) { + if event_matches_turn(&event, expected_turn_id) + && matches!( + event.msg, + EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_) + ) + { return Ok::<(), anyhow::Error>(()); } } @@ -955,6 +997,114 @@ async fn interrupt_and_drain_turn(codex: &Codex) -> anyhow::Result<()> { #[cfg(test)] mod tests { use super::*; + use codex_protocol::protocol::AgentStatus; + use codex_protocol::protocol::ErrorEvent; + use codex_protocol::protocol::Submission; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::protocol::TurnAbortedEvent; + use codex_protocol::protocol::TurnCompleteEvent; + + async fn test_review_session() -> ( + GuardianReviewSession, + async_channel::Sender, + async_channel::Receiver, + ) { + let (session, _turn, _rx) = crate::session::tests::make_session_and_context_with_rx().await; + let (tx_sub, rx_sub) = async_channel::bounded(4); + let (tx_event, rx_event) = async_channel::unbounded(); + let (_agent_status_tx, agent_status) = + tokio::sync::watch::channel(AgentStatus::PendingInit); + let reuse_key = + GuardianReviewSessionReuseKey::from_spawn_config(session.get_config().await.as_ref()); + + ( + GuardianReviewSession { + codex: Codex { + tx_sub, + rx_event, + agent_status, + session, + session_loop_termination: crate::session::completed_session_loop_termination(), + }, + cancel_token: CancellationToken::new(), + reuse_key, + review_lock: Semaphore::new(/*permits*/ 1), + state: Mutex::new(GuardianReviewState { + prior_review_count: 0, + last_reviewed_transcript_cursor: None, + last_committed_fork_snapshot: None, + }), + }, + tx_event, + rx_sub, + ) + } + + fn turn_complete_event( + turn_id: &str, + last_agent_message: Option<&str>, + time_to_first_token_ms: Option, + ) -> Event { + Event { + id: turn_id.to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn_id.to_string(), + last_agent_message: last_agent_message.map(str::to_string), + completed_at: None, + duration_ms: None, + time_to_first_token_ms, + }), + } + } + + fn turn_aborted_event(turn_id: &str) -> Event { + Event { + id: turn_id.to_string(), + msg: EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some(turn_id.to_string()), + reason: TurnAbortReason::Interrupted, + completed_at: None, + duration_ms: None, + }), + } + } + + async fn test_review_params() -> GuardianReviewSessionParams { + let (session, turn) = crate::session::tests::make_session_and_context().await; + let model = turn.model_info.slug.clone(); + let reasoning_effort = turn.reasoning_effort; + let reasoning_summary = turn.reasoning_summary; + let personality = turn.personality; + let cwd = turn.cwd.clone(); + let spawn_config = build_guardian_review_session_config( + turn.config.as_ref(), + /*live_network_config*/ None, + model.as_str(), + reasoning_effort, + ) + .expect("guardian config"); + + GuardianReviewSessionParams { + parent_session: Arc::new(session), + parent_turn: Arc::new(turn), + spawn_config, + request: GuardianApprovalRequest::Shell { + id: "shell-1".to_string(), + command: vec!["git".to_string(), "status".to_string()], + cwd, + sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault, + additional_permissions: None, + justification: Some("Inspect repo state.".to_string()), + }, + retry_reason: None, + schema: super::super::prompt::guardian_output_schema(), + model, + reasoning_effort, + reasoning_summary, + personality, + external_cancel: None, + } + } #[tokio::test] async fn guardian_review_session_config_change_invalidates_cached_session() { @@ -1163,4 +1313,244 @@ mod tests { } ); } + + #[tokio::test] + async fn run_review_on_reused_session_waits_for_submitted_turn() { + let (review_session, tx_event, rx_sub) = test_review_session().await; + { + let mut state = review_session.state.lock().await; + state.prior_review_count = 1; + state.last_reviewed_transcript_cursor = Some(GuardianTranscriptCursor { + parent_history_version: 0, + transcript_entry_count: 0, + }); + } + let params = test_review_params().await; + + let review = tokio::spawn(async move { + run_review_on_session( + &review_session, + ¶ms, + GuardianReviewSessionKind::TrunkReused, + tokio::time::Instant::now() + Duration::from_secs(1), + ) + .await + }); + let submission = rx_sub.recv().await.expect("guardian submission"); + tx_event + .send(turn_complete_event("prior-turn", Some("stale"), Some(9))) + .await + .expect("queue prior turn completion"); + tx_event + .send(turn_complete_event( + submission.id.as_str(), + Some("fresh"), + Some(42), + )) + .await + .expect("queue submitted turn completion"); + + let (outcome, keep_review_session, analytics_result) = + review.await.expect("review task should complete"); + let GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) = outcome else { + panic!("expected submitted turn completion"); + }; + assert_eq!(last_agent_message.as_deref(), Some("fresh")); + assert_eq!(analytics_result.time_to_first_token_ms, Some(42)); + assert!(keep_review_session); + } + + #[tokio::test] + async fn wait_for_guardian_review_ignores_prior_turn_completion() { + let (review_session, tx_event, _rx_sub) = test_review_session().await; + tx_event + .send(turn_complete_event("prior-turn", Some("stale"), Some(9))) + .await + .expect("queue prior turn completion"); + tx_event + .send(turn_complete_event("current-turn", Some("fresh"), Some(42))) + .await + .expect("queue current turn completion"); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_secs(1), + /*external_cancel*/ None, + &mut analytics_result, + ) + .await; + + let GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) = outcome else { + panic!("expected current turn completion"); + }; + assert_eq!(last_agent_message.as_deref(), Some("fresh")); + assert_eq!(analytics_result.time_to_first_token_ms, Some(42)); + assert!(keep_review_session); + assert!(capture_token_usage); + } + + #[tokio::test] + async fn wait_for_guardian_review_ignores_prior_turn_errors() { + let (review_session, tx_event, _rx_sub) = test_review_session().await; + tx_event + .send(Event { + id: "prior-turn".to_string(), + msg: EventMsg::Error(ErrorEvent { + message: "stale guardian error".to_string(), + codex_error_info: None, + }), + }) + .await + .expect("queue prior turn error"); + tx_event + .send(turn_complete_event( + "current-turn", + /*last_agent_message*/ None, + Some(42), + )) + .await + .expect("queue current turn completion"); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_secs(1), + /*external_cancel*/ None, + &mut analytics_result, + ) + .await; + + let GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) = outcome else { + panic!("expected current turn completion"); + }; + assert_eq!(last_agent_message, None); + assert_eq!(analytics_result.time_to_first_token_ms, Some(42)); + assert!(keep_review_session); + assert!(capture_token_usage); + } + + #[tokio::test] + async fn wait_for_guardian_review_ignores_prior_turn_aborts() { + let (review_session, tx_event, _rx_sub) = test_review_session().await; + tx_event + .send(turn_aborted_event("prior-turn")) + .await + .expect("queue prior turn abort"); + tx_event + .send(turn_complete_event("current-turn", Some("fresh"), Some(42))) + .await + .expect("queue current turn completion"); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_secs(1), + /*external_cancel*/ None, + &mut analytics_result, + ) + .await; + + let GuardianReviewSessionOutcome::Completed(Ok(last_agent_message)) = outcome else { + panic!("expected current turn completion"); + }; + assert_eq!(last_agent_message.as_deref(), Some("fresh")); + assert_eq!(analytics_result.time_to_first_token_ms, Some(42)); + assert!(keep_review_session); + assert!(capture_token_usage); + } + + #[tokio::test] + async fn wait_for_guardian_review_timeout_drains_expected_turn_after_stale_terminal_event() { + let (review_session, tx_event, rx_sub) = test_review_session().await; + tx_event + .send(turn_complete_event("prior-turn", Some("stale"), Some(9))) + .await + .expect("queue prior turn completion"); + let tx_interrupt_event = tx_event.clone(); + let interrupt_response = tokio::spawn(async move { + let submission = rx_sub.recv().await.expect("interrupt submission"); + assert!(matches!(submission.op, Op::Interrupt)); + tx_interrupt_event + .send(turn_aborted_event("current-turn")) + .await + .expect("queue current turn abort"); + }); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_millis(10), + /*external_cancel*/ None, + &mut analytics_result, + ) + .await; + + interrupt_response + .await + .expect("interrupt response task should complete"); + assert!(matches!(outcome, GuardianReviewSessionOutcome::TimedOut)); + assert!(keep_review_session); + assert!(!capture_token_usage); + } + + #[tokio::test] + async fn wait_for_guardian_review_cancel_drains_expected_turn_after_stale_terminal_event() { + let (review_session, tx_event, rx_sub) = test_review_session().await; + tx_event + .send(turn_complete_event("prior-turn", Some("stale"), Some(9))) + .await + .expect("queue prior turn completion"); + let tx_interrupt_event = tx_event.clone(); + let interrupt_response = tokio::spawn(async move { + let submission = rx_sub.recv().await.expect("interrupt submission"); + assert!(matches!(submission.op, Op::Interrupt)); + tx_interrupt_event + .send(turn_aborted_event("current-turn")) + .await + .expect("queue current turn abort"); + }); + let external_cancel = CancellationToken::new(); + external_cancel.cancel(); + + let mut analytics_result = GuardianReviewAnalyticsResult::without_session(); + let (outcome, keep_review_session, capture_token_usage) = wait_for_guardian_review( + &review_session, + "current-turn", + tokio::time::Instant::now() + Duration::from_secs(1), + Some(&external_cancel), + &mut analytics_result, + ) + .await; + + interrupt_response + .await + .expect("interrupt response task should complete"); + assert!(matches!(outcome, GuardianReviewSessionOutcome::Aborted)); + assert!(keep_review_session); + assert!(!capture_token_usage); + } + + #[tokio::test] + async fn interrupt_and_drain_turn_ignores_prior_turn_completion() { + let (review_session, tx_event, _rx_sub) = test_review_session().await; + tx_event + .send(turn_complete_event("prior-turn", Some("stale"), Some(9))) + .await + .expect("queue prior turn completion"); + tx_event + .send(turn_aborted_event("current-turn")) + .await + .expect("queue current turn abort"); + + interrupt_and_drain_turn(&review_session.codex, "current-turn") + .await + .expect("drain current turn"); + + assert!(review_session.codex.rx_event.try_recv().is_err()); + } } diff --git a/codex-rs/core/src/guardian/tests.rs b/codex-rs/core/src/guardian/tests.rs index 6761d87022ff..78362b6f8985 100644 --- a/codex-rs/core/src/guardian/tests.rs +++ b/codex-rs/core/src/guardian/tests.rs @@ -30,6 +30,7 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::PermissionProfile; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::GranularApprovalConfig; use codex_protocol::protocol::GuardianAssessmentStatus; @@ -38,6 +39,7 @@ use codex_protocol::protocol::GuardianUserAuthorization; use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SandboxPolicy; +use codex_protocol::protocol::TurnCompleteEvent; use core_test_support::PathBufExt; use core_test_support::TempDirExt; use core_test_support::context_snapshot; @@ -1674,6 +1676,113 @@ async fn guardian_reuses_prompt_cache_key_and_appends_prior_reviews() -> anyhow: Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn guardian_reused_trunk_ignores_stale_prior_turn_completion() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let request_log = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-guardian-1"), + ev_assistant_message( + "msg-guardian-1", + "{\"risk_level\":\"low\",\"user_authorization\":\"high\",\"outcome\":\"allow\",\"rationale\":\"first guardian rationale\"}", + ), + ev_completed("resp-guardian-1"), + ]), + sse(vec![ + ev_response_created("resp-guardian-2"), + ev_assistant_message( + "msg-guardian-2", + "{\"risk_level\":\"low\",\"user_authorization\":\"high\",\"outcome\":\"allow\",\"rationale\":\"second guardian rationale\"}", + ), + ev_completed("resp-guardian-2"), + ]), + ], + ) + .await; + + let (session, turn) = guardian_test_session_and_turn(&server).await; + let first_outcome = run_guardian_review_session_for_test( + Arc::clone(&session), + Arc::clone(&turn), + GuardianApprovalRequest::Shell { + id: "shell-1".to_string(), + command: vec!["git".to_string(), "push".to_string()], + cwd: test_path_buf("/repo/codex-rs/core").abs(), + sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault, + additional_permissions: None, + justification: Some("Need to push the first docs fix.".to_string()), + }, + /*retry_reason*/ None, + guardian_output_schema(), + /*external_cancel*/ None, + ) + .await; + let (GuardianReviewOutcome::Completed(first_assessment), first_metadata) = first_outcome else { + panic!("expected first guardian assessment"); + }; + assert_eq!(first_assessment.rationale, "first guardian rationale"); + assert!(matches!( + first_metadata.guardian_session_kind, + Some(codex_analytics::GuardianReviewSessionKind::TrunkNew) + )); + + session + .guardian_review_session + .send_trunk_event_raw_for_test(Event { + id: "stale-turn".to_string(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "stale-turn".to_string(), + last_agent_message: Some( + "{\"risk_level\":\"high\",\"user_authorization\":\"low\",\"outcome\":\"deny\",\"rationale\":\"stale guardian rationale\"}" + .to_string(), + ), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: Some(1), + }), + }) + .await; + + let second_outcome = run_guardian_review_session_for_test( + Arc::clone(&session), + Arc::clone(&turn), + GuardianApprovalRequest::Shell { + id: "shell-2".to_string(), + command: vec!["git".to_string(), "push".to_string()], + cwd: test_path_buf("/repo/codex-rs/core").abs(), + sandbox_permissions: crate::sandboxing::SandboxPermissions::UseDefault, + additional_permissions: None, + justification: Some("Need to push the second docs fix.".to_string()), + }, + /*retry_reason*/ None, + guardian_output_schema(), + /*external_cancel*/ None, + ) + .await; + let (GuardianReviewOutcome::Completed(second_assessment), second_metadata) = second_outcome + else { + panic!("expected second guardian assessment"); + }; + assert_eq!(second_assessment.outcome, GuardianAssessmentOutcome::Allow); + assert_eq!(second_assessment.rationale, "second guardian rationale"); + assert!(matches!( + second_metadata.guardian_session_kind, + Some(codex_analytics::GuardianReviewSessionKind::TrunkReused) + )); + + assert_eq!( + request_log.requests().len(), + 2, + "the reused trunk should wait for the real follow-up review" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn guardian_review_surfaces_responses_api_errors_in_rejection_reason() -> anyhow::Result<()> { skip_if_no_network!(Ok(()));