diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index c13bad06cfe5..838e79786357 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -23,8 +23,11 @@ use codex_protocol::ThreadId; use codex_protocol::error::CodexErr; use codex_protocol::error::Result as CodexResult; use codex_protocol::models::ContentItem; +use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::MessagePhase; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::CollabCloseEndEvent; +use codex_protocol::protocol::Event; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::Op; @@ -37,11 +40,17 @@ use codex_protocol::user_input::UserInput; use codex_rollout::state_db; use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::ReadThreadParams; +use codex_tools::create_compact_parent_context_tool; +use codex_tools::create_watchdog_close_self_tool; +use codex_tools::create_watchdog_snooze_tool; +use codex_tools::create_watchdog_tools_namespace; use serde::Serialize; use std::collections::HashMap; use std::collections::VecDeque; use std::sync::Arc; use std::sync::Weak; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use tokio::sync::watch; use tracing::warn; @@ -49,6 +58,8 @@ const AGENT_NAMES: &str = include_str!("agent_names.txt"); const ROOT_LAST_TASK_MESSAGE: &str = "Main thread"; const CODEX_EXPERIMENTAL_FORK_PREVIOUS_RESPONSE_ID_ENV: &str = "CODEX_EXPERIMENTAL_FORK_PREVIOUS_RESPONSE_ID"; +const WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID: &str = "synthetic_watchdog_tool_search"; +const WATCHDOG_BOOT_LIST_AGENTS_CALL_ID: &str = "synthetic_watchdog_list_agents"; #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum SpawnAgentForkMode { @@ -77,6 +88,18 @@ pub(crate) struct ListedAgent { pub(crate) last_task_message: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum WatchdogParentCompactionResult { + NotWatchdogHelper, + ParentBusy { + parent_thread_id: ThreadId, + }, + Submitted { + parent_thread_id: ThreadId, + submission_id: String, + }, +} + fn default_agent_nickname_list() -> Vec<&'static str> { AGENT_NAMES .lines() @@ -141,6 +164,70 @@ fn is_watchdog_helper_source(session_source: &SessionSource) -> bool { ) } +fn unix_timestamp_seconds() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .unwrap_or_default() +} + +fn synthetic_watchdog_tool_search_items() -> Vec { + let namespace = create_watchdog_tools_namespace(vec![ + create_compact_parent_context_tool(), + create_watchdog_close_self_tool(), + create_watchdog_snooze_tool(), + ]); + let Ok(namespace) = serde_json::to_value(namespace) else { + return Vec::new(); + }; + + vec![ + RolloutItem::ResponseItem(ResponseItem::ToolSearchCall { + id: None, + call_id: Some(WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID.to_string()), + status: Some("completed".to_string()), + execution: "client".to_string(), + arguments: serde_json::json!({ + "query": "watchdog namespace tools", + }), + }), + RolloutItem::ResponseItem(ResponseItem::ToolSearchOutput { + call_id: Some(WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID.to_string()), + status: "completed".to_string(), + execution: "client".to_string(), + tools: vec![namespace], + }), + ] +} + +fn synthetic_watchdog_list_agents_items( + owner_thread_id: ThreadId, + agents: Vec, +) -> Vec { + let envelope = serde_json::json!({ + "source": "pre_injected_agents_list", + "generated_at": unix_timestamp_seconds(), + "owner_thread_id": owner_thread_id.to_string(), + "agents": agents, + }); + let mut output = FunctionCallOutputPayload::from_text(envelope.to_string()); + output.success = Some(true); + + vec![ + RolloutItem::ResponseItem(ResponseItem::FunctionCall { + id: None, + name: "list_agents".to_string(), + namespace: None, + arguments: "{}".to_string(), + call_id: WATCHDOG_BOOT_LIST_AGENTS_CALL_ID.to_string(), + }), + RolloutItem::ResponseItem(ResponseItem::FunctionCallOutput { + call_id: WATCHDOG_BOOT_LIST_AGENTS_CALL_ID.to_string(), + output, + }), + ] +} + /// Control-plane handle for multi-agent operations. /// `AgentControl` is held by each session (via `SessionServices`). It provides capability to /// spawn new agents and the inter-agent communication layer. @@ -497,6 +584,12 @@ impl AgentControl { keep_forked_rollout_item(item) }); } + if is_watchdog_helper_source(&session_source) { + forked_rollout_items.extend( + self.watchdog_boot_context_items(state, parent_thread_id) + .await, + ); + } state .fork_thread_with_source( @@ -906,6 +999,36 @@ impl AgentControl { .await } + pub(crate) async fn finish_watchdog_helper(&self, helper_thread_id: ThreadId) -> bool { + let Some(watchdogs) = self.watchdogs.as_ref() else { + return false; + }; + watchdogs.finish_active_helper(helper_thread_id).await + } + + pub(crate) async fn finish_watchdog_helper_thread( + &self, + agent_id: ThreadId, + ) -> CodexResult<()> { + let state = self.upgrade()?; + if let Ok(thread) = state.get_thread(agent_id).await { + if let Some(state_db_ctx) = thread.state_db() + && let Err(err) = state_db_ctx + .set_thread_spawn_edge_status( + agent_id, + DirectionalThreadSpawnEdgeStatus::Closed, + ) + .await + { + warn!("failed to persist thread-spawn edge status for {agent_id}: {err}"); + } + thread.codex.session.flush_rollout().await?; + } + let _ = state.remove_thread(&agent_id).await; + self.state.release_spawned_thread(agent_id); + Ok(()) + } + fn watchdog_manager(&self) -> CodexResult<&Arc> { self.watchdogs.as_ref().ok_or_else(|| { CodexErr::UnsupportedOperation("watchdog manager unavailable".to_string()) @@ -954,7 +1077,8 @@ impl AgentControl { let Ok(thread) = state.get_thread(agent_id).await else { return AgentStatus::NotFound; }; - thread.agent_status().await + self.reported_agent_status(agent_id, thread.agent_status().await) + .await } pub(crate) fn register_session_root( @@ -1086,7 +1210,9 @@ impl AgentControl { { agents.push(ListedAgent { agent_name: root_path.to_string(), - agent_status: root_thread.agent_status().await, + agent_status: self + .reported_agent_status(root_thread_id, root_thread.agent_status().await) + .await, last_task_message: Some(ROOT_LAST_TASK_MESSAGE.to_string()), }); } @@ -1095,6 +1221,14 @@ impl AgentControl { let Some(thread_id) = metadata.agent_id else { continue; }; + if let Some(watchdogs) = self.watchdogs.as_ref() + && watchdogs + .target_for_active_helper(thread_id) + .await + .is_some() + { + continue; + } if resolved_prefix .as_ref() .is_some_and(|prefix| !agent_matches_prefix(metadata.agent_path.as_ref(), prefix)) @@ -1113,7 +1247,9 @@ impl AgentControl { let last_task_message = metadata.last_task_message.clone(); agents.push(ListedAgent { agent_name, - agent_status: thread.agent_status().await, + agent_status: self + .reported_agent_status(thread_id, thread.agent_status().await) + .await, last_task_message, }); } @@ -1121,6 +1257,108 @@ impl AgentControl { Ok(agents) } + async fn reported_agent_status(&self, agent_id: ThreadId, status: AgentStatus) -> AgentStatus { + if matches!(status, AgentStatus::PendingInit) + && let Some(watchdogs) = self.watchdogs.as_ref() + && watchdogs.is_watchdog_handle(agent_id).await + { + return AgentStatus::Running; + } + status + } + + pub(crate) async fn send_watchdog_close_event( + &self, + owner_thread_id: ThreadId, + target_thread_id: ThreadId, + receiver_agent_nickname: Option, + receiver_agent_role: Option, + status: AgentStatus, + ) -> CodexResult<()> { + let state = self.upgrade()?; + let owner_thread = state.get_thread(owner_thread_id).await?; + owner_thread + .codex + .session + .send_event_raw(Event { + id: format!("watchdog-close-{target_thread_id}"), + msg: CollabCloseEndEvent { + call_id: format!("watchdog-close-{target_thread_id}"), + sender_thread_id: owner_thread_id, + receiver_thread_id: target_thread_id, + receiver_agent_nickname, + receiver_agent_role, + status, + } + .into(), + }) + .await; + Ok(()) + } + + pub(crate) async fn compact_parent_for_watchdog_helper( + &self, + helper_thread_id: ThreadId, + ) -> CodexResult { + let Some(watchdogs) = self.watchdogs.as_ref() else { + return Ok(WatchdogParentCompactionResult::NotWatchdogHelper); + }; + let Some(parent_thread_id) = watchdogs.owner_for_active_helper(helper_thread_id).await + else { + return Ok(WatchdogParentCompactionResult::NotWatchdogHelper); + }; + let state = self.upgrade()?; + let parent_thread = state.get_thread(parent_thread_id).await?; + if parent_thread + .codex + .session + .active_turn + .lock() + .await + .is_some() + { + return Ok(WatchdogParentCompactionResult::ParentBusy { parent_thread_id }); + } + + state + .send_op(parent_thread_id, Op::Compact) + .await + .map(|submission_id| WatchdogParentCompactionResult::Submitted { + parent_thread_id, + submission_id, + }) + } + + async fn watchdog_boot_context_items( + &self, + state: &Arc, + owner_thread_id: ThreadId, + ) -> Vec { + let owner_source = match state.get_thread(owner_thread_id).await { + Ok(owner_thread) => { + owner_thread + .codex + .thread_config_snapshot() + .await + .session_source + } + Err(_) => SessionSource::Cli, + }; + self.register_session_root(owner_thread_id, &owner_source); + let agents = self + .list_agents(&owner_source, /*path_prefix*/ None) + .await + .unwrap_or_default(); + + synthetic_watchdog_tool_search_items() + .into_iter() + .chain(synthetic_watchdog_list_agents_items( + owner_thread_id, + agents, + )) + .collect() + } + /// Starts a detached watcher for sub-agents spawned from another thread. /// /// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 24da1fca966b..3ab53d3ce7f7 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -589,6 +589,29 @@ async fn watchdog_helper_forks_owner_history() { &history_items, "previous owner response: pong 81 (118)" )); + assert!(history_items.iter().any(|item| matches!( + item, + ResponseItem::ToolSearchCall { call_id: Some(call_id), .. } + if call_id == "synthetic_watchdog_tool_search" + ))); + assert!(history_items.iter().any(|item| match item { + ResponseItem::ToolSearchOutput { + call_id: Some(call_id), + tools, + .. + } if call_id == "synthetic_watchdog_tool_search" => { + let rendered = serde_json::to_string(tools).expect("tools should serialize"); + rendered.contains("compact_parent_context") + && rendered.contains("close_self") + && rendered.contains("snooze") + } + _ => false, + })); + assert!(history_items.iter().any(|item| matches!( + item, + ResponseItem::FunctionCall { name, call_id, .. } + if name == "list_agents" && call_id == "synthetic_watchdog_list_agents" + ))); assert!( !helper_thread .codex @@ -715,6 +738,226 @@ async fn watchdog_forwards_completed_helper_without_waiting_for_interval() { .expect("watchdog should forward completed helper output without interval delay"); } +#[tokio::test] +async fn watchdog_snooze_delays_next_helper_and_resumes_after_delay() { + let harness = AgentControlHarness::new().await; + let (owner_thread_id, owner_thread) = harness.start_thread().await; + let (target_thread_id, _) = harness.start_thread().await; + let mut config = harness.config.clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow feature update"); + + harness + .control + .register_watchdog(WatchdogRegistration { + owner_thread_id, + target_thread_id, + child_depth: 0, + interval_s: 1, + prompt: "snooze scheduling check".to_string(), + config, + }) + .await + .expect("watchdog registration should succeed"); + + let owner_turn = owner_thread.codex.session.new_default_turn().await; + owner_thread + .codex + .session + .send_event( + owner_turn.as_ref(), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: owner_turn.sub_id.clone(), + last_agent_message: Some("root done".to_string()), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }), + ) + .await; + + let first_helper_id = timeout(Duration::from_secs(5), async { + loop { + if let Some((thread_id, _)) = harness.manager.captured_ops().into_iter().find( + |(thread_id, op)| { + *thread_id != owner_thread_id + && *thread_id != target_thread_id + && matches!(op, Op::UserInput { items, .. } if items.iter().any(|item| match item { + UserInput::Text { text, .. } => text.contains("snooze scheduling check"), + UserInput::Image { .. } + | UserInput::LocalImage { .. } + | UserInput::Skill { .. } + | UserInput::Mention { .. } => false, + _ => false, + })) + }, + ) { + break thread_id; + } + sleep(Duration::from_millis(50)).await; + } + }) + .await + .expect("watchdog should spawn a helper before snooze"); + + let result = harness + .control + .snooze_watchdog_helper(first_helper_id, /*delay_seconds*/ None) + .await + .expect("active helper should snooze its watchdog"); + assert_eq!(result.target_thread_id, target_thread_id); + assert_eq!(result.delay_seconds, 1); + harness + .control + .finish_watchdog_helper_thread(first_helper_id) + .await + .expect("snoozed helper should finish"); + + sleep(Duration::from_millis(300)).await; + assert!( + !harness + .manager + .captured_ops() + .into_iter() + .any(|(thread_id, op)| { + thread_id != owner_thread_id + && thread_id != target_thread_id + && thread_id != first_helper_id + && matches!(op, Op::UserInput { items, .. } if items.iter().any(|item| match item { + UserInput::Text { text, .. } => text.contains("snooze scheduling check"), + UserInput::Image { .. } + | UserInput::LocalImage { .. } + | UserInput::Skill { .. } + | UserInput::Mention { .. } => false, + _ => false, + })) + }), + "watchdog should not spawn another helper before the snooze delay elapses" + ); + + timeout(Duration::from_secs(5), async { + loop { + if harness.manager.captured_ops().into_iter().any(|(thread_id, op)| { + thread_id != owner_thread_id + && thread_id != target_thread_id + && thread_id != first_helper_id + && matches!(op, Op::UserInput { items, .. } if items.iter().any(|item| match item { + UserInput::Text { text, .. } => text.contains("snooze scheduling check"), + UserInput::Image { .. } + | UserInput::LocalImage { .. } + | UserInput::Skill { .. } + | UserInput::Mention { .. } => false, + _ => false, + })) + }) { + break; + } + sleep(Duration::from_millis(50)).await; + } + }) + .await + .expect("watchdog should resume spawning helpers after the snooze delay"); +} + +#[tokio::test] +async fn watchdog_plain_goodbye_final_message_closes_handle() { + let harness = AgentControlHarness::new().await; + let (owner_thread_id, owner_thread) = harness.start_thread().await; + let (target_thread_id, _) = harness.start_thread().await; + let mut config = harness.config.clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow feature update"); + + harness + .control + .register_watchdog(WatchdogRegistration { + owner_thread_id, + target_thread_id, + child_depth: 0, + interval_s: 60, + prompt: "check in".to_string(), + config, + }) + .await + .expect("watchdog registration should succeed"); + + let owner_turn = owner_thread.codex.session.new_default_turn().await; + owner_thread + .codex + .session + .send_event( + owner_turn.as_ref(), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: owner_turn.sub_id.clone(), + last_agent_message: Some("root done".to_string()), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }), + ) + .await; + + let helper_thread_id = timeout(Duration::from_secs(5), async { + loop { + if let Some((thread_id, _)) = harness.manager.captured_ops().into_iter().find( + |(thread_id, op)| { + *thread_id != owner_thread_id + && *thread_id != target_thread_id + && matches!(op, Op::UserInput { items, .. } if items.iter().any(|item| match item { + UserInput::Text { text, .. } => text.contains("check in"), + UserInput::Image { .. } + | UserInput::LocalImage { .. } + | UserInput::Skill { .. } + | UserInput::Mention { .. } => false, + _ => false, + })) + }, + ) { + break thread_id; + } + sleep(Duration::from_millis(50)).await; + } + }) + .await + .expect("watchdog should spawn a helper"); + + let helper_thread = harness + .manager + .get_thread(helper_thread_id) + .await + .expect("helper thread should be registered"); + let helper_turn = helper_thread.codex.session.new_default_turn().await; + helper_thread + .codex + .session + .send_event( + helper_turn.as_ref(), + EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: helper_turn.sub_id.clone(), + last_agent_message: Some("goodbye".to_string()), + completed_at: None, + duration_ms: None, + time_to_first_token_ms: None, + }), + ) + .await; + + timeout(Duration::from_secs(5), async { + loop { + if !harness.control.is_watchdog_handle(target_thread_id).await { + break; + } + sleep(Duration::from_millis(50)).await; + } + }) + .await + .expect("plain goodbye final message should close the watchdog handle"); +} + #[tokio::test] async fn send_input_errors_when_thread_missing() { let harness = AgentControlHarness::new().await; @@ -1065,6 +1308,37 @@ async fn spawn_agent_creates_thread_and_sends_prompt() { assert_eq!(captured, Some(expected)); } +#[tokio::test] +async fn spawn_agent_fork_rejects_missing_parent_spawn_call_id_for_non_watchdogs() { + let harness = AgentControlHarness::new().await; + let (parent_thread_id, _) = harness.start_thread().await; + + let err = harness + .control + .spawn_agent_with_metadata( + harness.config.clone(), + text_input("child task"), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: None, + })), + SpawnAgentOptions { + fork_mode: Some(SpawnAgentForkMode::FullHistory), + ..Default::default() + }, + ) + .await + .expect_err("forked worker spawns should require the parent spawn call id"); + + assert_eq!( + err.to_string(), + "Fatal error: spawn_agent fork requires a parent spawn call id" + ); +} + #[tokio::test] async fn spawn_agent_can_fork_parent_thread_history_with_sanitized_items() { let harness = AgentControlHarness::new().await; @@ -1160,8 +1434,8 @@ async fn spawn_agent_can_fork_parent_thread_history_with_sanitized_items() { agent_role: None, })), SpawnAgentOptions { + fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()), fork_mode: Some(SpawnAgentForkMode::FullHistory), - fork_parent_spawn_call_id: Some(parent_spawn_call_id), ..Default::default() }, ) @@ -1480,8 +1754,8 @@ async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() { agent_role: None, })), SpawnAgentOptions { + fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()), fork_mode: Some(SpawnAgentForkMode::FullHistory), - fork_parent_spawn_call_id: Some(parent_spawn_call_id), ..Default::default() }, ) @@ -1590,8 +1864,8 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() { agent_role: None, })), SpawnAgentOptions { + fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()), fork_mode: Some(SpawnAgentForkMode::LastNTurns(2)), - fork_parent_spawn_call_id: Some(parent_spawn_call_id), ..Default::default() }, ) diff --git a/codex-rs/core/src/agent/mod.rs b/codex-rs/core/src/agent/mod.rs index 2855a51cbbcf..463eefd66f55 100644 --- a/codex-rs/core/src/agent/mod.rs +++ b/codex-rs/core/src/agent/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod watchdog; pub(crate) use codex_protocol::protocol::AgentStatus; pub(crate) use control::AgentControl; +pub(crate) use control::WatchdogParentCompactionResult; pub(crate) use mailbox::Mailbox; pub(crate) use mailbox::MailboxReceiver; pub(crate) use registry::exceeds_thread_spawn_depth_limit; diff --git a/codex-rs/core/src/agent/watchdog.rs b/codex-rs/core/src/agent/watchdog.rs index 8bb37aaa74b0..193019a24a6a 100644 --- a/codex-rs/core/src/agent/watchdog.rs +++ b/codex-rs/core/src/agent/watchdog.rs @@ -307,19 +307,32 @@ impl WatchdogManager { return; } let helper_suppressed = self.take_suppressed_helper(helper_id).await; + let mut close_watchdog_handle = false; if let AgentStatus::Completed(Some(message)) = helper_status && !helper_suppressed - && let Err(err) = control_for_spawn + { + close_watchdog_handle = final_message_requests_watchdog_close(&message); + if let Err(err) = control_for_spawn .send_watchdog_wakeup(snapshot.owner_thread_id, message) .await - { - warn!( - helper_id = %helper_id, - owner_thread_id = %snapshot.owner_thread_id, - "watchdog helper forward failed: {err}" - ); + { + warn!( + helper_id = %helper_id, + owner_thread_id = %snapshot.owner_thread_id, + "watchdog helper forward failed: {err}" + ); + } } let _ = control_for_spawn.shutdown_live_agent(helper_id).await; + if close_watchdog_handle { + let _ = control_for_spawn + .unregister_watchdog_handle(target_thread_id) + .await; + let _ = control_for_spawn + .shutdown_live_agent(target_thread_id) + .await; + return; + } self.update_after_spawn( target_thread_id, generation, @@ -534,6 +547,25 @@ impl WatchdogManager { Some(result) } + pub(crate) async fn finish_active_helper(&self, helper_thread_id: ThreadId) -> bool { + let found = { + let mut registrations = self.registrations.lock().await; + let Some(entry) = registrations + .values_mut() + .find(|entry| entry.active_helper_id == Some(helper_thread_id)) + else { + return false; + }; + entry.active_helper_id = None; + true + }; + self.suppressed_helpers + .lock() + .await + .insert(helper_thread_id); + found + } + async fn update_after_spawn( &self, target_thread_id: ThreadId, @@ -582,6 +614,10 @@ fn is_watchdog_terminated(status: &AgentStatus) -> bool { matches!(status, AgentStatus::Shutdown | AgentStatus::NotFound) } +fn final_message_requests_watchdog_close(message: &str) -> bool { + message.trim().eq_ignore_ascii_case("goodbye") +} + async fn get_status(manager_state: &Arc, thread_id: ThreadId) -> AgentStatus { match manager_state.get_thread(thread_id).await { Ok(thread) => thread.agent_status().await, diff --git a/codex-rs/core/src/config/config_tests.rs b/codex-rs/core/src/config/config_tests.rs index d506c8797fbc..50dcd5248d5c 100644 --- a/codex-rs/core/src/config/config_tests.rs +++ b/codex-rs/core/src/config/config_tests.rs @@ -6158,6 +6158,51 @@ async fn load_config_rejects_unsafe_agent_role_nickname_candidates() -> std::io: Ok(()) } +#[tokio::test] +async fn load_config_reads_top_level_watchdog_interval() -> std::io::Result<()> { + let codex_home = TempDir::new()?; + let cfg = ConfigToml { + watchdog_interval_s: Some(3), + ..Default::default() + }; + + let config = Config::load_from_base_config_with_overrides( + cfg, + ConfigOverrides::default(), + codex_home.abs(), + ) + .await?; + + assert_eq!(config.watchdog_interval_s, 3); + + Ok(()) +} + +#[tokio::test] +async fn load_config_rejects_nonpositive_top_level_watchdog_interval() -> std::io::Result<()> { + let codex_home = TempDir::new()?; + let cfg = ConfigToml { + watchdog_interval_s: Some(0), + ..Default::default() + }; + + let err = Config::load_from_base_config_with_overrides( + cfg, + ConfigOverrides::default(), + codex_home.abs(), + ) + .await + .expect_err("nonpositive watchdog interval should fail"); + + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!( + err.to_string(), + "watchdog_interval_s must be greater than zero" + ); + + Ok(()) +} + #[tokio::test] async fn model_catalog_json_loads_from_path() -> std::io::Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core/src/tools/handlers/multi_agents.rs b/codex-rs/core/src/tools/handlers/multi_agents.rs index 9c3ae75c1db1..fc6276586fc4 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents.rs @@ -58,6 +58,7 @@ pub(crate) fn parse_agent_id_targets( } pub(crate) use close_agent::Handler as CloseAgentHandler; +pub(crate) use compact_parent_context::Handler as CompactParentContextHandler; pub(crate) use resume_agent::Handler as ResumeAgentHandler; pub(crate) use send_input::Handler as SendInputHandler; pub(crate) use spawn::Handler as SpawnAgentHandler; @@ -66,6 +67,7 @@ pub(crate) use watchdog_self_close::Handler as WatchdogSelfCloseHandler; pub(crate) use watchdog_snooze::Handler as WatchdogSnoozeHandler; pub(crate) mod close_agent; +mod compact_parent_context; mod resume_agent; mod send_input; mod spawn; diff --git a/codex-rs/core/src/tools/handlers/multi_agents/compact_parent_context.rs b/codex-rs/core/src/tools/handlers/multi_agents/compact_parent_context.rs new file mode 100644 index 000000000000..ed7346f1b752 --- /dev/null +++ b/codex-rs/core/src/tools/handlers/multi_agents/compact_parent_context.rs @@ -0,0 +1,90 @@ +use super::*; +use crate::agent::WatchdogParentCompactionResult; + +pub(crate) struct Handler; + +impl ToolHandler for Handler { + type Output = CompactParentContextResult; + + fn kind(&self) -> ToolKind { + ToolKind::Function + } + + fn matches_kind(&self, payload: &ToolPayload) -> bool { + matches!(payload, ToolPayload::Function { .. }) + } + + async fn handle(&self, invocation: ToolInvocation) -> Result { + let ToolInvocation { + session, payload, .. + } = invocation; + let arguments = function_arguments(payload)?; + let args: CompactParentContextArgs = parse_arguments(&arguments)?; + let _ = (args.reason, args.evidence); + let result = session + .services + .agent_control + .compact_parent_for_watchdog_helper(session.conversation_id) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!("compact_parent_context failed: {err}")) + })?; + Ok(CompactParentContextResult::from(result)) + } +} + +#[derive(Debug, Deserialize)] +struct CompactParentContextArgs { + reason: Option, + evidence: Option, +} + +#[derive(Debug, Serialize)] +pub(crate) struct CompactParentContextResult { + kind: &'static str, + parent_thread_id: Option, + submission_id: Option, +} + +impl From for CompactParentContextResult { + fn from(value: WatchdogParentCompactionResult) -> Self { + match value { + WatchdogParentCompactionResult::NotWatchdogHelper => Self { + kind: "not_watchdog_helper", + parent_thread_id: None, + submission_id: None, + }, + WatchdogParentCompactionResult::ParentBusy { parent_thread_id } => Self { + kind: "parent_busy", + parent_thread_id: Some(parent_thread_id.to_string()), + submission_id: None, + }, + WatchdogParentCompactionResult::Submitted { + parent_thread_id, + submission_id, + } => Self { + kind: "submitted", + parent_thread_id: Some(parent_thread_id.to_string()), + submission_id: Some(submission_id), + }, + } + } +} + +impl ToolOutput for CompactParentContextResult { + fn log_preview(&self) -> String { + tool_output_json_text(self, "compact_parent_context") + } + + fn success_for_logging(&self) -> bool { + true + } + + fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem { + tool_output_response_item(call_id, payload, self, Some(true), "compact_parent_context") + } + + fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue { + tool_output_code_mode_result(self, "compact_parent_context") + } +} diff --git a/codex-rs/core/src/tools/handlers/multi_agents/watchdog_self_close.rs b/codex-rs/core/src/tools/handlers/multi_agents/watchdog_self_close.rs index d1e1101cfd51..5d5691c4a5e5 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents/watchdog_self_close.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents/watchdog_self_close.rs @@ -27,7 +27,7 @@ impl ToolHandler for Handler { .await else { return Err(FunctionCallError::RespondToModel( - "watchdog_self_close is only available in watchdog check-in threads.".to_string(), + "watchdog.close_self is only available in watchdog check-in threads.".to_string(), )); }; let Some(target_thread_id) = session @@ -37,7 +37,7 @@ impl ToolHandler for Handler { .await else { return Err(FunctionCallError::RespondToModel( - "watchdog_self_close is only available in watchdog check-in threads.".to_string(), + "watchdog.close_self is only available in watchdog check-in threads.".to_string(), )); }; @@ -46,6 +46,23 @@ impl ToolHandler for Handler { .agent_control .get_status(target_thread_id) .await; + let receiver_agent = session + .services + .agent_control + .get_agent_metadata(target_thread_id) + .unwrap_or_default(); + + let _ = session + .services + .agent_control + .send_watchdog_close_event( + owner_thread_id, + target_thread_id, + receiver_agent.agent_nickname, + receiver_agent.agent_role, + status.clone(), + ) + .await; if let Some(message) = args.message && !message.trim().is_empty() @@ -82,7 +99,7 @@ pub(crate) struct WatchdogSelfCloseResult { impl ToolOutput for WatchdogSelfCloseResult { fn log_preview(&self) -> String { - tool_output_json_text(self, "watchdog_self_close") + tool_output_json_text(self, "close_self") } fn success_for_logging(&self) -> bool { @@ -90,10 +107,10 @@ impl ToolOutput for WatchdogSelfCloseResult { } fn to_response_item(&self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem { - tool_output_response_item(call_id, payload, self, Some(true), "watchdog_self_close") + tool_output_response_item(call_id, payload, self, Some(true), "close_self") } fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue { - tool_output_code_mode_result(self, "watchdog_self_close") + tool_output_code_mode_result(self, "close_self") } } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index 66821e9e29eb..4e05d9fc7da6 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -443,6 +443,61 @@ async fn multi_agent_v2_spawn_defaults_to_full_fork_and_rejects_child_model_over ); } +#[tokio::test] +async fn multi_agent_v2_spawn_watchdog_role_returns_inert_handle_and_ignores_fork_turns() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + let agent_control = manager.agent_control(); + session.services.agent_control = agent_control.clone(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow feature update"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let output = SpawnAgentHandlerV2 + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "spawn_agent", + function_payload(json!({ + "message": "check in later", + "task_name": "watchdog", + "agent_type": "watchdog", + "fork_turns": "ignored for watchdogs" + })), + )) + .await + .expect("watchdog spawn should ignore fork_turns and succeed"); + let (content, success) = expect_text_output(output); + let result: serde_json::Value = + serde_json::from_str(&content).expect("spawn_agent result should be json"); + assert_eq!(result["task_name"], "/root/watchdog"); + + let watchdog_id = manager + .captured_ops() + .into_iter() + .filter_map(|(thread_id, op)| (thread_id != root.thread_id).then_some((thread_id, op))) + .find_map(|(thread_id, op)| (op == Op::Interrupt).then_some(thread_id)) + .expect("watchdog handle should receive only an inert interrupt"); + assert_eq!(success, Some(true)); + assert_eq!( + agent_control.get_status(watchdog_id).await, + AgentStatus::Running + ); + assert!(agent_control.is_watchdog_handle(watchdog_id).await); +} + #[tokio::test] async fn multi_agent_v2_spawn_partial_fork_turns_allows_agent_type_override() { let (mut session, mut turn) = make_session_and_context().await; @@ -564,7 +619,7 @@ async fn spawn_agent_watchdog_role_returns_inert_handle() { assert_eq!(success, Some(true)); assert_eq!( agent_control.get_status(agent_id).await, - AgentStatus::PendingInit + AgentStatus::Running ); let ops_for_agent = manager .captured_ops() @@ -575,6 +630,71 @@ async fn spawn_agent_watchdog_role_returns_inert_handle() { assert!(agent_control.is_watchdog_handle(agent_id).await); } +#[tokio::test] +async fn compact_parent_context_submits_compaction_for_idle_parent() { + let (mut session, turn) = make_session_and_context().await; + let manager = thread_manager(); + let agent_control = manager.agent_control(); + session.services.agent_control = agent_control.clone(); + let owner = manager + .start_thread((*turn.config).clone()) + .await + .expect("owner thread should start"); + let target = manager + .start_thread((*turn.config).clone()) + .await + .expect("watchdog handle should start"); + let helper_thread_id = ThreadId::new(); + agent_control + .register_watchdog(WatchdogRegistration { + owner_thread_id: owner.thread_id, + target_thread_id: target.thread_id, + child_depth: 1, + interval_s: 60, + prompt: "check in".to_string(), + config: (*turn.config).clone(), + }) + .await + .expect("watchdog registration should succeed"); + agent_control + .set_watchdog_active_helper_for_tests(target.thread_id, helper_thread_id) + .await; + session.conversation_id = helper_thread_id; + + let output = CompactParentContextHandler + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "compact_parent_context", + function_payload(json!({"reason": "root is idle"})), + )) + .await + .expect("watchdog helper should request parent compaction"); + let (content, success) = expect_text_output(output); + + assert_eq!(success, Some(true)); + let result: serde_json::Value = + serde_json::from_str(&content).expect("compact_parent_context result should be json"); + let submission_id = result + .get("submission_id") + .and_then(|value| value.as_str()) + .expect("submitted compaction should include a submission id"); + assert!(!submission_id.is_empty()); + assert_eq!( + result, + json!({ + "kind": "submitted", + "parent_thread_id": owner.thread_id.to_string(), + "submission_id": submission_id, + }) + ); + let captured = manager + .captured_ops() + .into_iter() + .find(|(thread_id, op)| *thread_id == owner.thread_id && matches!(op, Op::Compact)); + assert_eq!(captured, Some((owner.thread_id, Op::Compact))); +} + #[tokio::test] async fn watchdog_snooze_rejects_non_watchdog_thread() { let (session, turn) = make_session_and_context().await; @@ -661,17 +781,198 @@ async fn watchdog_snooze_suppresses_helper_and_clears_active_helper() { .watchdog_helper_is_suppressed_for_tests(helper_thread_id) .await ); + assert_eq!( + agent_control.get_status(helper_thread_id).await, + AgentStatus::NotFound + ); + assert!( + !manager + .captured_ops() + .iter() + .any(|(thread_id, op)| *thread_id == helper_thread_id && matches!(op, Op::Shutdown)), + "snooze should finish the helper turn without a shutdown op" + ); +} + +#[tokio::test] +async fn multi_agent_v2_watchdog_followup_task_parent_wakes_owner_and_finishes_helper() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let agent_control = manager.agent_control(); + let owner = manager + .start_thread((*turn.config).clone()) + .await + .expect("owner thread should start"); + let target = manager + .start_thread((*turn.config).clone()) + .await + .expect("watchdog handle should start"); + let helper_thread_id = session.conversation_id; + session.services.agent_control = agent_control.clone(); + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow watchdog feature update"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow multi-agent v2 feature update"); + turn.config = Arc::new(config.clone()); + turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: owner.thread_id, + depth: 1, + agent_path: Some(AgentPath::try_from("/root/watchdog").expect("watchdog path")), + agent_nickname: None, + agent_role: Some("watchdog".to_string()), + }); + + agent_control + .register_watchdog(WatchdogRegistration { + owner_thread_id: owner.thread_id, + target_thread_id: target.thread_id, + child_depth: 0, + interval_s: 60, + prompt: "check in".to_string(), + config, + }) + .await + .expect("watchdog registration should succeed"); + agent_control + .set_watchdog_active_helper_for_tests(target.thread_id, helper_thread_id) + .await; + + let output = FollowupTaskHandlerV2 + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "followup_task", + function_payload(json!({ + "target": "parent", + "message": "continue the user task" + })), + )) + .await + .expect("watchdog helper should wake its parent"); + let (_, success) = expect_text_output(output); + + assert_eq!(success, Some(true)); + assert_eq!( + agent_control + .watchdog_target_for_active_helper(helper_thread_id) + .await, + None + ); + assert!( + agent_control + .watchdog_helper_is_suppressed_for_tests(helper_thread_id) + .await + ); + assert_eq!( + agent_control.get_status(helper_thread_id).await, + AgentStatus::NotFound + ); + assert!(manager.captured_ops().iter().any(|(thread_id, op)| { + *thread_id == owner.thread_id + && matches!( + op, + Op::InterAgentCommunication { communication } + if communication.author.as_str() == "/root/watchdog" + && communication.recipient == AgentPath::root() + && communication.other_recipients.is_empty() + && communication.content == "continue the user task" + && communication.trigger_turn + ) + })); } #[tokio::test] -async fn watchdog_self_close_rejects_non_watchdog_thread() { +async fn multi_agent_v2_watchdog_send_message_parent_is_rejected() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let agent_control = manager.agent_control(); + let owner = manager + .start_thread((*turn.config).clone()) + .await + .expect("owner thread should start"); + let target = manager + .start_thread((*turn.config).clone()) + .await + .expect("watchdog handle should start"); + let helper_thread_id = session.conversation_id; + session.services.agent_control = agent_control.clone(); + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow watchdog feature update"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow multi-agent v2 feature update"); + turn.config = Arc::new(config.clone()); + turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: owner.thread_id, + depth: 1, + agent_path: Some(AgentPath::try_from("/root/watchdog").expect("watchdog path")), + agent_nickname: None, + agent_role: Some("watchdog".to_string()), + }); + + agent_control + .register_watchdog(WatchdogRegistration { + owner_thread_id: owner.thread_id, + target_thread_id: target.thread_id, + child_depth: 0, + interval_s: 60, + prompt: "check in".to_string(), + config, + }) + .await + .expect("watchdog registration should succeed"); + agent_control + .set_watchdog_active_helper_for_tests(target.thread_id, helper_thread_id) + .await; + + let Err(err) = SendMessageHandlerV2 + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "send_message", + function_payload(json!({ + "target": "parent", + "message": "queued watchdog update" + })), + )) + .await + else { + panic!("watchdog helper send_message to parent should be rejected"); + }; + + assert_eq!( + err, + FunctionCallError::RespondToModel( + "watchdog check-in threads must use followup_task with target `parent` to message their parent." + .to_string() + ) + ); + assert_eq!( + agent_control + .watchdog_target_for_active_helper(helper_thread_id) + .await, + Some(target.thread_id) + ); +} + +#[tokio::test] +async fn watchdog_close_self_rejects_non_watchdog_thread() { let (session, turn) = make_session_and_context().await; let err = WatchdogSelfCloseHandler .handle(invocation( Arc::new(session), Arc::new(turn), - "watchdog_self_close", + "close_self", function_payload(json!({})), )) .await @@ -680,13 +981,13 @@ async fn watchdog_self_close_rejects_non_watchdog_thread() { assert_eq!( err, FunctionCallError::RespondToModel( - "watchdog_self_close is only available in watchdog check-in threads.".to_string(), + "watchdog.close_self is only available in watchdog check-in threads.".to_string(), ) ); } #[tokio::test] -async fn watchdog_self_close_notifies_owner_and_unregisters_handle() { +async fn watchdog_close_self_notifies_owner_and_unregisters_handle() { let (mut session, mut turn) = make_session_and_context().await; let manager = thread_manager(); let agent_control = manager.agent_control(); @@ -726,7 +1027,7 @@ async fn watchdog_self_close_notifies_owner_and_unregisters_handle() { .handle(invocation( Arc::new(session), Arc::new(turn), - "watchdog_self_close", + "close_self", function_payload(json!({"message": "watchdog done"})), )) .await @@ -736,7 +1037,7 @@ async fn watchdog_self_close_notifies_owner_and_unregisters_handle() { serde_json::from_str(&content).expect("self-close result should be json"); assert_eq!(success, Some(true)); - assert_eq!(result["previous_status"], json!("pending_init")); + assert_eq!(result["previous_status"], json!("running")); assert!(!agent_control.is_watchdog_handle(target.thread_id).await); assert_eq!( agent_control.get_status(target.thread_id).await, @@ -753,6 +1054,23 @@ async fn watchdog_self_close_notifies_owner_and_unregisters_handle() { thread_id == owner.thread_id && matches!(op, Op::InterAgentCommunication { communication } if communication == expected) })); + let close_event = timeout(Duration::from_secs(1), async { + loop { + let event = owner + .thread + .next_event() + .await + .expect("owner event channel should stay open"); + if let EventMsg::CollabCloseEnd(close) = event.msg { + break close; + } + } + }) + .await + .expect("watchdog self-close should publish a close event for the handle"); + assert_eq!(close_event.sender_thread_id, owner.thread_id); + assert_eq!(close_event.receiver_thread_id, target.thread_id); + assert_eq!(close_event.status, AgentStatus::Running); } #[tokio::test] @@ -1220,6 +1538,79 @@ async fn multi_agent_v2_followup_task_rejects_root_target_from_child() { ); } +#[tokio::test] +async fn multi_agent_v2_followup_task_rejects_parent_target_from_non_watchdog_child() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + session.services.agent_control = manager.agent_control(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + turn.config = Arc::new(config); + + let child_path = AgentPath::try_from("/root/worker").expect("agent path"); + let child_thread_id = session + .services + .agent_control + .spawn_agent_with_metadata( + (*turn.config).clone(), + vec![UserInput::Text { + text: "inspect this repo".to_string(), + text_elements: Vec::new(), + }] + .into(), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: root.thread_id, + depth: 1, + agent_path: Some(child_path.clone()), + agent_nickname: None, + agent_role: None, + })), + crate::agent::control::SpawnAgentOptions::default(), + ) + .await + .expect("worker spawn should succeed") + .thread_id; + session.conversation_id = child_thread_id; + turn.session_source = SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: root.thread_id, + depth: 1, + agent_path: Some(child_path), + agent_nickname: None, + agent_role: None, + }); + + let Err(err) = FollowupTaskHandlerV2 + .handle(invocation( + Arc::new(session), + Arc::new(turn), + "followup_task", + function_payload(json!({ + "target": "parent", + "message": "wake up" + })), + )) + .await + else { + panic!("non-watchdog followup_task should reject the direct parent target"); + }; + + assert_eq!( + err, + FunctionCallError::RespondToModel( + "Only watchdog check-in threads can use followup_task with target `parent`; use send_message for parent updates." + .to_string() + ) + ); +} + #[tokio::test] async fn multi_agent_v2_list_agents_returns_completed_status_and_last_task_message() { let (mut session, mut turn) = make_session_and_context().await; @@ -1465,6 +1856,153 @@ async fn multi_agent_v2_list_agents_omits_closed_agents() { ); } +#[tokio::test] +async fn watchdog_handle_is_listed_and_close_agent_removes_it() { + let (mut session, mut turn) = make_session_and_context().await; + let manager = thread_manager(); + let root = manager + .start_thread((*turn.config).clone()) + .await + .expect("root thread should start"); + let agent_control = manager.agent_control(); + session.services.agent_control = agent_control.clone(); + session.conversation_id = root.thread_id; + let mut config = (*turn.config).clone(); + config + .features + .enable(Feature::AgentWatchdog) + .expect("test config should allow feature update"); + config + .features + .enable(Feature::MultiAgentV2) + .expect("test config should allow feature update"); + let enabled_config = config.clone(); + turn.config = Arc::new(config); + + let session = Arc::new(session); + let turn = Arc::new(turn); + let spawn_output = SpawnAgentHandler + .handle(invocation( + session.clone(), + turn.clone(), + "spawn_agent", + function_payload(json!({ + "message": "check this branch periodically", + "agent_type": "watchdog" + })), + )) + .await + .expect("watchdog spawn should succeed"); + let (spawn_content, spawn_success) = expect_text_output(spawn_output); + let spawn_result: serde_json::Value = + serde_json::from_str(&spawn_content).expect("watchdog spawn result should be json"); + let watchdog_id = parse_agent_id( + spawn_result["agent_id"] + .as_str() + .expect("watchdog spawn result should include agent_id"), + ); + assert_eq!(spawn_success, Some(true)); + assert!(agent_control.is_watchdog_handle(watchdog_id).await); + + let helper_id = agent_control + .spawn_agent( + enabled_config, + vec![UserInput::Text { + text: "watchdog helper implementation detail".to_string(), + text_elements: Vec::new(), + }] + .into(), + Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id: root.thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: Some("watchdog".to_string()), + })), + ) + .await + .expect("watchdog helper should start"); + agent_control + .set_watchdog_active_helper_for_tests(watchdog_id, helper_id) + .await; + + let list_output = ListAgentsHandlerV2 + .handle(invocation( + session.clone(), + turn.clone(), + "list_agents", + function_payload(json!({})), + )) + .await + .expect("list_agents should include the watchdog handle"); + let (list_content, list_success) = expect_text_output(list_output); + let list_result: ListAgentsResult = + serde_json::from_str(&list_content).expect("list_agents result should be json"); + assert_eq!(list_success, Some(true)); + let watchdog_listing = list_result + .agents + .iter() + .find(|agent| agent.agent_name == watchdog_id.to_string()) + .expect("list_agents should include the watchdog handle"); + assert_eq!(watchdog_listing.agent_status, json!("running")); + assert!( + !list_result + .agents + .iter() + .any(|agent| agent.agent_name == helper_id.to_string()), + "active watchdog helpers should not be exposed as targetable list_agents entries" + ); + + let close_output = CloseAgentHandlerV2 + .handle(invocation( + session.clone(), + turn.clone(), + "close_agent", + function_payload(json!({"target": watchdog_id.to_string()})), + )) + .await + .expect("close_agent should close the watchdog handle"); + let (close_content, close_success) = expect_text_output(close_output); + let close_result: close_agent::CloseAgentResult = + serde_json::from_str(&close_content).expect("close_agent result should be json"); + assert_eq!(close_success, Some(true)); + assert_eq!(close_result.previous_status, AgentStatus::PendingInit); + assert!(!agent_control.is_watchdog_handle(watchdog_id).await); + assert_eq!( + agent_control.get_status(watchdog_id).await, + AgentStatus::NotFound + ); + assert_eq!( + agent_control.get_status(helper_id).await, + AgentStatus::NotFound + ); + assert!( + manager + .captured_ops() + .iter() + .any(|(thread_id, op)| *thread_id == helper_id && matches!(op, Op::Shutdown)) + ); + + let list_after_close_output = ListAgentsHandlerV2 + .handle(invocation( + session, + turn, + "list_agents", + function_payload(json!({})), + )) + .await + .expect("list_agents should omit the closed watchdog handle"); + let (list_after_close_content, _) = expect_text_output(list_after_close_output); + let list_after_close_result: ListAgentsResult = serde_json::from_str(&list_after_close_content) + .expect("list_agents result after close should be json"); + assert!( + !list_after_close_result + .agents + .iter() + .any(|agent| agent.agent_name == watchdog_id.to_string()) + ); +} + #[tokio::test] async fn multi_agent_v2_send_message_rejects_legacy_items_field() { let (mut session, mut turn) = make_session_and_context().await; @@ -2101,13 +2639,7 @@ async fn spawn_agent_allows_depth_up_to_configured_max_depth() { } #[tokio::test] -async fn multi_agent_v2_spawn_agent_ignores_configured_max_depth() { - #[derive(Debug, Deserialize)] - struct SpawnAgentResult { - task_name: String, - nickname: Option, - } - +async fn multi_agent_v2_spawn_agent_rejects_when_depth_limit_exceeded() { let (mut session, mut turn) = make_session_and_context().await; let manager = thread_manager(); let mut config = (*turn.config).clone(); @@ -2142,16 +2674,15 @@ async fn multi_agent_v2_spawn_agent_ignores_configured_max_depth() { "fork_turns": "none" })), ); - let output = SpawnAgentHandlerV2 - .handle(invocation) - .await - .expect("multi-agent v2 spawn should ignore max depth"); - let (content, success) = expect_text_output(output); - let result: SpawnAgentResult = - serde_json::from_str(&content).expect("spawn_agent result should be json"); - assert_eq!(result.task_name, "/root/parent/child"); - assert!(result.nickname.is_some()); - assert_eq!(success, Some(true)); + let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else { + panic!("multi-agent v2 spawn should fail when depth limit exceeded"); + }; + assert_eq!( + err, + FunctionCallError::RespondToModel( + "Agent depth limit reached. Solve the task yourself.".to_string() + ) + ); } #[tokio::test] @@ -2807,7 +3338,7 @@ async fn wait_agent_rejects_only_watchdog_handles() { panic!("expected model-facing error"); }; assert!(message.contains("watchdog handle ids")); - assert!(message.contains("pending_init")); + assert!(message.contains("running")); } #[tokio::test] diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/followup_task.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/followup_task.rs index bcb3f49dea51..e618f5da2bf0 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/followup_task.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/followup_task.rs @@ -25,6 +25,7 @@ impl ToolHandler for Handler { MessageDeliveryMode::TriggerTurn, args.target, args.message, + args.interrupt, ) .await } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs index a42cde8f62fe..34b6671471c1 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/message_tool.rs @@ -4,8 +4,13 @@ //! resulting `InterAgentCommunication` should wake the target immediately. use super::*; +use crate::session::turn_context::TurnContext; use crate::tools::context::FunctionToolOutput; +use codex_protocol::ThreadId; use codex_protocol::protocol::InterAgentCommunication; +use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; +use std::sync::Arc; #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) enum MessageDeliveryMode { @@ -43,6 +48,8 @@ pub(crate) struct SendMessageArgs { pub(crate) struct FollowupTaskArgs { pub(crate) target: String, pub(crate) message: String, + #[serde(default)] + pub(crate) interrupt: bool, } fn message_content(message: String) -> Result { @@ -60,8 +67,16 @@ pub(crate) async fn handle_message_string_tool( mode: MessageDeliveryMode, target: String, message: String, + interrupt: bool, ) -> Result { - handle_message_submission(invocation, mode, target, message_content(message)?).await + handle_message_submission( + invocation, + mode, + target, + message_content(message)?, + interrupt, + ) + .await } async fn handle_message_submission( @@ -69,6 +84,7 @@ async fn handle_message_submission( mode: MessageDeliveryMode, target: String, prompt: String, + interrupt: bool, ) -> Result { let ToolInvocation { session, @@ -76,22 +92,62 @@ async fn handle_message_submission( call_id, .. } = invocation; - let receiver_thread_id = resolve_agent_target(&session, &turn, &target).await?; + let target_is_parent = target == "parent"; + let receiver_thread_id = resolve_message_target(&session, &turn, &target).await?; + let direct_parent_thread_id = direct_parent_thread_id(&turn.session_source); + let is_direct_parent = direct_parent_thread_id == Some(receiver_thread_id); + let watchdog_owner_thread_id = session + .services + .agent_control + .watchdog_owner_for_active_helper(session.conversation_id) + .await; + let is_watchdog_parent = watchdog_owner_thread_id == Some(receiver_thread_id); let receiver_agent = session .services .agent_control .get_agent_metadata(receiver_thread_id) .unwrap_or_default(); + if mode == MessageDeliveryMode::QueueOnly && is_watchdog_parent { + return Err(FunctionCallError::RespondToModel( + "watchdog check-in threads must use followup_task with target `parent` to message their parent." + .to_string(), + )); + } + if mode == MessageDeliveryMode::TriggerTurn + && is_direct_parent + && !is_watchdog_parent + && target_is_parent + { + return Err(FunctionCallError::RespondToModel( + "Only watchdog check-in threads can use followup_task with target `parent`; use send_message for parent updates." + .to_string(), + )); + } if mode == MessageDeliveryMode::TriggerTurn && receiver_agent .agent_path .as_ref() .is_some_and(AgentPath::is_root) + && !is_watchdog_parent { return Err(FunctionCallError::RespondToModel( "Tasks can't be assigned to the root agent".to_string(), )); } + if mode == MessageDeliveryMode::TriggerTurn && is_direct_parent && !is_watchdog_parent { + return Err(FunctionCallError::RespondToModel( + "Only watchdog check-in threads can use followup_task with target `parent`; use send_message for parent updates." + .to_string(), + )); + } + if interrupt { + session + .services + .agent_control + .interrupt_agent(receiver_thread_id) + .await + .map_err(|err| collab_agent_error(receiver_thread_id, err))?; + } session .send_event( &turn, @@ -104,9 +160,17 @@ async fn handle_message_submission( .into(), ) .await; - let receiver_agent_path = receiver_agent.agent_path.clone().ok_or_else(|| { - FunctionCallError::RespondToModel("target agent is missing an agent_path".to_string()) - })?; + let receiver_agent_path = receiver_agent + .agent_path + .clone() + .or_else(|| { + is_direct_parent + .then(|| direct_parent_path(&turn.session_source)) + .flatten() + }) + .ok_or_else(|| { + FunctionCallError::RespondToModel("target agent is missing an agent_path".to_string()) + })?; let communication = InterAgentCommunication::new( turn.session_source .get_agent_path() @@ -143,6 +207,72 @@ async fn handle_message_submission( ) .await; result?; + if mode == MessageDeliveryMode::TriggerTurn && is_watchdog_parent { + let _ = session + .services + .agent_control + .finish_watchdog_helper(session.conversation_id) + .await; + session + .services + .agent_control + .finish_watchdog_helper_thread(session.conversation_id) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to finish watchdog helper after followup_task: {err}" + )) + })?; + } Ok(FunctionToolOutput::from_text(String::new(), Some(true))) } + +async fn resolve_message_target( + session: &Arc, + turn: &Arc, + target: &str, +) -> Result { + if target == "parent" { + return direct_parent_thread_id(&turn.session_source).ok_or_else(|| { + FunctionCallError::RespondToModel( + "target `parent` is only available from a spawned agent.".to_string(), + ) + }); + } + resolve_agent_target(session, turn, target).await +} + +fn direct_parent_thread_id(session_source: &SessionSource) -> Option { + match session_source { + SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, .. + }) => Some(*parent_thread_id), + SessionSource::Cli + | SessionSource::VSCode + | SessionSource::Exec + | SessionSource::Mcp + | SessionSource::Custom(_) + | SessionSource::Internal(_) + | SessionSource::SubAgent(SubAgentSource::Review) + | SessionSource::SubAgent(SubAgentSource::Compact) + | SessionSource::SubAgent(SubAgentSource::MemoryConsolidation) + | SessionSource::SubAgent(SubAgentSource::Other(_)) + | SessionSource::Unknown => None, + } +} + +fn direct_parent_path(session_source: &SessionSource) -> Option { + parent_path(session_source.get_agent_path()?.as_str()) +} + +fn parent_path(agent_path: &str) -> Option { + if agent_path == AgentPath::ROOT { + return None; + } + let parent = agent_path.rsplit_once('/')?.0; + if parent.is_empty() { + return None; + } + AgentPath::try_from(parent).ok() +} diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/send_message.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/send_message.rs index b327ccf52002..0d142623a9c6 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/send_message.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/send_message.rs @@ -25,6 +25,7 @@ impl ToolHandler for Handler { MessageDeliveryMode::QueueOnly, args.target, args.message, + /*interrupt*/ false, ) .await } diff --git a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs index 26b6750c46f5..8cafe287c966 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs @@ -1,14 +1,24 @@ use super::*; +use crate::agent::RemovedWatchdog; +use crate::agent::WatchdogRegistration; use crate::agent::control::SpawnAgentForkMode; use crate::agent::control::SpawnAgentOptions; use crate::agent::control::render_input_preview; +use crate::agent::exceeds_thread_spawn_depth_limit; use crate::agent::next_thread_spawn_depth; use crate::agent::role::DEFAULT_ROLE_NAME; use crate::agent::role::apply_role_to_config; +use crate::agent::role::watchdog_interval_for_role; +use crate::config::Config; use crate::session::turn_context::TurnEnvironment; +use codex_features::Feature; use codex_protocol::AgentPath; +use codex_protocol::ThreadId; use codex_protocol::protocol::InterAgentCommunication; use codex_protocol::protocol::Op; +use codex_protocol::protocol::SessionSource; +use std::collections::HashMap; +use std::collections::HashSet; pub(crate) struct Handler; @@ -33,18 +43,35 @@ impl ToolHandler for Handler { } = invocation; let arguments = function_arguments(payload)?; let args: SpawnAgentArgs = parse_arguments(&arguments)?; - let fork_mode = args.fork_mode()?; let role_name = args .agent_type .as_deref() .map(str::trim) .filter(|role| !role.is_empty()); - let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?; - let prompt = render_input_preview(&initial_operation); - let session_source = turn.session_source.clone(); let child_depth = next_thread_spawn_depth(&session_source); + let max_depth = turn.config.agent_max_depth; + let watchdog_interval_s = watchdog_interval_for_role(&turn.config, role_name); + let is_watchdog = watchdog_interval_s.is_some(); + if is_watchdog && !turn.config.features.enabled(Feature::AgentWatchdog) { + return Err(FunctionCallError::RespondToModel( + "watchdogs are disabled".to_string(), + )); + } + if is_watchdog && matches!(session_source, SessionSource::SubAgent(_)) { + return Err(FunctionCallError::RespondToModel( + "watchdogs can only be spawned by root agents".to_string(), + )); + } + if exceeds_thread_spawn_depth_limit(child_depth, max_depth) { + return Err(FunctionCallError::RespondToModel( + "Agent depth limit reached. Solve the task yourself.".to_string(), + )); + } + let fork_mode = if is_watchdog { None } else { args.fork_mode()? }; + let initial_operation = parse_collab_input(Some(args.message), /*items*/ None)?; + let prompt = render_input_preview(&initial_operation); session .send_event( &turn, @@ -89,45 +116,68 @@ impl ToolHandler for Handler { role_name, Some(args.task_name.clone()), )?; - let result = session - .services - .agent_control - .spawn_agent_with_metadata( + let result = if let Some(watchdog_interval_s) = watchdog_interval_s { + let thread_id = spawn_watchdog( + &session.services.agent_control, config, - match (spawn_source.get_agent_path(), initial_operation) { - (Some(recipient), Op::UserInput { items, .. }) - if items - .iter() - .all(|item| matches!(item, UserInput::Text { .. })) => - { - Op::InterAgentCommunication { - communication: InterAgentCommunication::new( - turn.session_source - .get_agent_path() - .unwrap_or_else(AgentPath::root), - recipient, - Vec::new(), - prompt.clone(), - /*trigger_turn*/ true, - ), - } - } - (_, initial_operation) => initial_operation, - }, - Some(spawn_source), - SpawnAgentOptions { - fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()), - fork_mode, - environments: Some( - turn.environments - .iter() - .map(TurnEnvironment::selection) - .collect(), - ), - }, + prompt.clone(), + session.conversation_id, + child_depth, + watchdog_interval_s, + spawn_source, ) .await - .map_err(collab_spawn_error); + .map_err(collab_spawn_error)?; + Ok(crate::agent::control::LiveAgent { + thread_id, + metadata: session + .services + .agent_control + .get_agent_metadata(thread_id) + .unwrap_or_default(), + status: session.services.agent_control.get_status(thread_id).await, + }) + } else { + session + .services + .agent_control + .spawn_agent_with_metadata( + config, + match (spawn_source.get_agent_path(), initial_operation) { + (Some(recipient), Op::UserInput { items, .. }) + if items + .iter() + .all(|item| matches!(item, UserInput::Text { .. })) => + { + Op::InterAgentCommunication { + communication: InterAgentCommunication::new( + turn.session_source + .get_agent_path() + .unwrap_or_else(AgentPath::root), + recipient, + Vec::new(), + prompt.clone(), + /*trigger_turn*/ true, + ), + } + } + (_, initial_operation) => initial_operation, + }, + Some(spawn_source), + SpawnAgentOptions { + fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()), + fork_mode, + environments: Some( + turn.environments + .iter() + .map(TurnEnvironment::selection) + .collect(), + ), + }, + ) + .await + .map_err(collab_spawn_error) + }; let (new_thread_id, new_agent_metadata, status) = match &result { Ok(spawned_agent) => ( Some(spawned_agent.thread_id), @@ -211,6 +261,68 @@ impl ToolHandler for Handler { } } +async fn spawn_watchdog( + agent_control: &crate::agent::AgentControl, + config: Config, + prompt: String, + owner_thread_id: ThreadId, + child_depth: i32, + interval_s: i64, + spawn_source: SessionSource, +) -> codex_protocol::error::Result { + let mut handle_config = config.clone(); + handle_config + .mcp_servers + .set(HashMap::new()) + .map_err(|err| { + codex_protocol::error::CodexErr::UnsupportedOperation(format!( + "failed to clear watchdog MCP servers: {err}" + )) + })?; + let target_thread_id = agent_control + .spawn_agent(handle_config, Op::Interrupt, Some(spawn_source)) + .await?; + let superseded_before_register = agent_control + .unregister_watchdogs_for_owner(owner_thread_id) + .await; + shutdown_removed_watchdogs(agent_control, superseded_before_register).await; + let registration = WatchdogRegistration { + owner_thread_id, + target_thread_id, + child_depth, + interval_s, + prompt, + config, + }; + let superseded_after_register = match agent_control.register_watchdog(registration).await { + Ok(removed) => removed, + Err(err) => { + let _ = agent_control.close_agent(target_thread_id).await; + return Err(err); + } + }; + shutdown_removed_watchdogs(agent_control, superseded_after_register).await; + Ok(target_thread_id) +} + +async fn shutdown_removed_watchdogs( + agent_control: &crate::agent::AgentControl, + removed_watchdogs: Vec, +) { + let mut thread_ids = HashSet::new(); + for removed in removed_watchdogs { + thread_ids.insert(removed.target_thread_id); + if let Some(helper_id) = removed.active_helper_id { + thread_ids.insert(helper_id); + } + } + let mut thread_ids = thread_ids.into_iter().collect::>(); + thread_ids.sort_by_key(ToString::to_string); + for thread_id in thread_ids { + let _ = agent_control.close_agent(thread_id).await; + } +} + #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct SpawnAgentArgs { diff --git a/codex-rs/core/src/tools/spec.rs b/codex-rs/core/src/tools/spec.rs index e93bba3a503f..31296949f681 100644 --- a/codex-rs/core/src/tools/spec.rs +++ b/codex-rs/core/src/tools/spec.rs @@ -1,6 +1,7 @@ use crate::shell::Shell; use crate::shell::ShellType; use crate::tools::handlers::agent_jobs::BatchJobHandler; +use crate::tools::handlers::multi_agents::CompactParentContextHandler; use crate::tools::handlers::multi_agents::WatchdogSelfCloseHandler; use crate::tools::handlers::multi_agents::WatchdogSnoozeHandler; use crate::tools::handlers::multi_agents_common::DEFAULT_WAIT_TIMEOUT_MS; @@ -216,6 +217,9 @@ pub(crate) fn build_specs_with_discoverable_tools( ToolHandlerKind::CodeModeWait => { builder.register_handler(handler.name, code_mode_wait_handler.clone()); } + ToolHandlerKind::CompactParentContext => { + builder.register_handler(handler.name, Arc::new(CompactParentContextHandler)); + } ToolHandlerKind::DynamicTool => { builder.register_handler(handler.name, dynamic_tool_handler.clone()); } diff --git a/codex-rs/tools/src/agent_tool.rs b/codex-rs/tools/src/agent_tool.rs index bbaae5d2fc18..28d559f896d7 100644 --- a/codex-rs/tools/src/agent_tool.rs +++ b/codex-rs/tools/src/agent_tool.rs @@ -128,7 +128,8 @@ pub fn create_send_message_tool() -> ToolSpec { ( "target".to_string(), JsonSchema::string(Some( - "Relative or canonical task name to message (from spawn_agent).".to_string(), + "Relative or canonical task name to message (from spawn_agent), or `parent` from a spawned non-watchdog agent." + .to_string(), )), ), ( @@ -159,7 +160,8 @@ pub fn create_followup_task_tool() -> ToolSpec { ( "target".to_string(), JsonSchema::string(Some( - "Agent id or canonical task name to message (from spawn_agent).".to_string(), + "Agent id or canonical task name to message (from spawn_agent), or `parent` from a watchdog check-in." + .to_string(), )), ), ( @@ -172,7 +174,7 @@ pub fn create_followup_task_tool() -> ToolSpec { ToolSpec::Function(ResponsesApiTool { name: "followup_task".to_string(), - description: "Send a message to an existing non-root target agent and trigger a turn in that target. If the target is currently mid-turn, the message is queued and will be used to start the target's next turn, after the current turn completes." + description: "Send a string message to an existing non-root agent and trigger a turn in the target. Watchdog check-ins may use target `parent`. Use interrupt=true to redirect work immediately. If interrupt=false and the target's turn has not completed, the message is queued and starts the target's next turn after the current turn completes." .to_string(), strict: false, defer_loading: None, @@ -260,18 +262,18 @@ pub fn create_close_agent_tool_v1() -> ToolSpec { }) } -pub fn create_watchdog_self_close_tool() -> ToolSpec { +pub fn create_watchdog_close_self_tool() -> ToolSpec { let properties = BTreeMap::from([( "message".to_string(), JsonSchema::string(Some( - "Optional final message to send to the parent/root thread before closing this watchdog handle and ending this check-in immediately." + "Optional final message sent to the parent agent before closing this watchdog." .to_string(), )), )]); ToolSpec::Function(ResponsesApiTool { - name: "watchdog_self_close".to_string(), - description: "Watchdog-only: send an optional final message to the parent/root thread, close this watchdog's persistent handle, and end this check-in immediately." + name: "close_self".to_string(), + description: "Watchdog-only: send an optional final message to the parent agent, stop future wakeups for this watchdog, and end the current check-in immediately. Use this tool, not a final assistant message, when the watchdog must shut down." .to_string(), strict: false, defer_loading: Some(true), @@ -280,6 +282,33 @@ pub fn create_watchdog_self_close_tool() -> ToolSpec { }) } +pub fn create_compact_parent_context_tool() -> ToolSpec { + let properties = BTreeMap::from([ + ( + "reason".to_string(), + JsonSchema::string(Some( + "Short reason why the parent/root thread should be compacted.".to_string(), + )), + ), + ( + "evidence".to_string(), + JsonSchema::string(Some( + "Specific observation that the parent/root thread is idle or stuck.".to_string(), + )), + ), + ]); + + ToolSpec::Function(ResponsesApiTool { + name: "compact_parent_context".to_string(), + description: "Watchdog-only: request compaction for this watchdog helper's parent/root thread when it is idle and appears stuck." + .to_string(), + strict: false, + defer_loading: Some(true), + parameters: JsonSchema::object(properties, /*required*/ None, Some(false.into())), + output_schema: None, + }) +} + pub fn create_watchdog_snooze_tool() -> ToolSpec { let properties = BTreeMap::from([ ( diff --git a/codex-rs/tools/src/agent_tool_tests.rs b/codex-rs/tools/src/agent_tool_tests.rs index 3157cfc547c2..150700e2048a 100644 --- a/codex-rs/tools/src/agent_tool_tests.rs +++ b/codex-rs/tools/src/agent_tool_tests.rs @@ -154,7 +154,9 @@ fn send_message_tool_requires_message_and_has_no_output_schema() { properties .get("target") .and_then(|schema| schema.description.as_deref()), - Some("Relative or canonical task name to message (from spawn_agent).") + Some( + "Relative or canonical task name to message (from spawn_agent), or `parent` from a spawned non-watchdog agent." + ) ); assert_eq!( parameters.required.as_ref(), diff --git a/codex-rs/tools/src/lib.rs b/codex-rs/tools/src/lib.rs index 3df4a1a70803..8bb477f86e0f 100644 --- a/codex-rs/tools/src/lib.rs +++ b/codex-rs/tools/src/lib.rs @@ -31,6 +31,7 @@ pub use agent_tool::SpawnAgentToolOptions; pub use agent_tool::WaitAgentTimeoutOptions; pub use agent_tool::create_close_agent_tool_v1; pub use agent_tool::create_close_agent_tool_v2; +pub use agent_tool::create_compact_parent_context_tool; pub use agent_tool::create_followup_task_tool; pub use agent_tool::create_list_agents_tool; pub use agent_tool::create_resume_agent_tool; @@ -40,7 +41,7 @@ pub use agent_tool::create_spawn_agent_tool_v1; pub use agent_tool::create_spawn_agent_tool_v2; pub use agent_tool::create_wait_agent_tool_v1; pub use agent_tool::create_wait_agent_tool_v2; -pub use agent_tool::create_watchdog_self_close_tool; +pub use agent_tool::create_watchdog_close_self_tool; pub use agent_tool::create_watchdog_snooze_tool; pub use agent_tool::create_watchdog_tools_namespace; pub use apply_patch_tool::ApplyPatchToolArgs; diff --git a/codex-rs/tools/src/tool_registry_plan.rs b/codex-rs/tools/src/tool_registry_plan.rs index 26bb159b17f0..23e4b883eb01 100644 --- a/codex-rs/tools/src/tool_registry_plan.rs +++ b/codex-rs/tools/src/tool_registry_plan.rs @@ -57,7 +57,7 @@ use crate::create_view_image_tool; use crate::create_wait_agent_tool_v1; use crate::create_wait_agent_tool_v2; use crate::create_wait_tool; -use crate::create_watchdog_self_close_tool; +use crate::create_watchdog_close_self_tool; use crate::create_watchdog_snooze_tool; use crate::create_watchdog_tools_namespace; use crate::create_web_search_tool; @@ -500,14 +500,14 @@ pub fn build_tool_registry_plan( if config.agent_watchdog { plan.push_spec( create_watchdog_tools_namespace(vec![ - create_watchdog_self_close_tool(), + create_watchdog_close_self_tool(), create_watchdog_snooze_tool(), ]), /*supports_parallel_tool_calls*/ false, config.code_mode_enabled, ); plan.register_handler( - crate::ToolName::namespaced("watchdog", "watchdog_self_close"), + crate::ToolName::namespaced("watchdog", "close_self"), ToolHandlerKind::WatchdogSelfClose, ); plan.register_handler( diff --git a/codex-rs/tools/src/tool_registry_plan_types.rs b/codex-rs/tools/src/tool_registry_plan_types.rs index 59b92e262e05..55f160b6cd6c 100644 --- a/codex-rs/tools/src/tool_registry_plan_types.rs +++ b/codex-rs/tools/src/tool_registry_plan_types.rs @@ -16,6 +16,7 @@ pub enum ToolHandlerKind { CloseAgentV2, CodeModeExecute, CodeModeWait, + CompactParentContext, DynamicTool, FollowupTaskV2, Goal,