Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 241 additions & 3 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ use codex_protocol::ThreadId;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::CollabCloseEndEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::Op;
Expand All @@ -37,18 +40,26 @@ use codex_protocol::user_input::UserInput;
use codex_rollout::state_db;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_thread_store::ReadThreadParams;
use codex_tools::create_compact_parent_context_tool;
use codex_tools::create_watchdog_close_self_tool;
use codex_tools::create_watchdog_snooze_tool;
use codex_tools::create_watchdog_tools_namespace;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Weak;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::watch;
use tracing::warn;

const AGENT_NAMES: &str = include_str!("agent_names.txt");
const ROOT_LAST_TASK_MESSAGE: &str = "Main thread";
const CODEX_EXPERIMENTAL_FORK_PREVIOUS_RESPONSE_ID_ENV: &str =
"CODEX_EXPERIMENTAL_FORK_PREVIOUS_RESPONSE_ID";
const WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID: &str = "synthetic_watchdog_tool_search";
const WATCHDOG_BOOT_LIST_AGENTS_CALL_ID: &str = "synthetic_watchdog_list_agents";

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum SpawnAgentForkMode {
Expand Down Expand Up @@ -77,6 +88,18 @@ pub(crate) struct ListedAgent {
pub(crate) last_task_message: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum WatchdogParentCompactionResult {
NotWatchdogHelper,
ParentBusy {
parent_thread_id: ThreadId,
},
Submitted {
parent_thread_id: ThreadId,
submission_id: String,
},
}

fn default_agent_nickname_list() -> Vec<&'static str> {
AGENT_NAMES
.lines()
Expand Down Expand Up @@ -141,6 +164,70 @@ fn is_watchdog_helper_source(session_source: &SessionSource) -> bool {
)
}

fn unix_timestamp_seconds() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or_default()
}

fn synthetic_watchdog_tool_search_items() -> Vec<RolloutItem> {
let namespace = create_watchdog_tools_namespace(vec![
create_compact_parent_context_tool(),
create_watchdog_close_self_tool(),
create_watchdog_snooze_tool(),
]);
let Ok(namespace) = serde_json::to_value(namespace) else {
return Vec::new();
};

vec![
RolloutItem::ResponseItem(ResponseItem::ToolSearchCall {
id: None,
call_id: Some(WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID.to_string()),
status: Some("completed".to_string()),
execution: "client".to_string(),
arguments: serde_json::json!({
"query": "watchdog namespace tools",
}),
}),
RolloutItem::ResponseItem(ResponseItem::ToolSearchOutput {
call_id: Some(WATCHDOG_BOOT_TOOL_SEARCH_CALL_ID.to_string()),
status: "completed".to_string(),
execution: "client".to_string(),
tools: vec![namespace],
}),
]
}

fn synthetic_watchdog_list_agents_items(
owner_thread_id: ThreadId,
agents: Vec<ListedAgent>,
) -> Vec<RolloutItem> {
let envelope = serde_json::json!({
"source": "pre_injected_agents_list",
"generated_at": unix_timestamp_seconds(),
"owner_thread_id": owner_thread_id.to_string(),
"agents": agents,
});
let mut output = FunctionCallOutputPayload::from_text(envelope.to_string());
output.success = Some(true);

vec![
RolloutItem::ResponseItem(ResponseItem::FunctionCall {
id: None,
name: "list_agents".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: WATCHDOG_BOOT_LIST_AGENTS_CALL_ID.to_string(),
}),
RolloutItem::ResponseItem(ResponseItem::FunctionCallOutput {
call_id: WATCHDOG_BOOT_LIST_AGENTS_CALL_ID.to_string(),
output,
}),
]
}

/// Control-plane handle for multi-agent operations.
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
/// spawn new agents and the inter-agent communication layer.
Expand Down Expand Up @@ -497,6 +584,12 @@ impl AgentControl {
keep_forked_rollout_item(item)
});
}
if is_watchdog_helper_source(&session_source) {
forked_rollout_items.extend(
self.watchdog_boot_context_items(state, parent_thread_id)
.await,
);
}

state
.fork_thread_with_source(
Expand Down Expand Up @@ -906,6 +999,36 @@ impl AgentControl {
.await
}

pub(crate) async fn finish_watchdog_helper(&self, helper_thread_id: ThreadId) -> bool {
let Some(watchdogs) = self.watchdogs.as_ref() else {
return false;
};
watchdogs.finish_active_helper(helper_thread_id).await
}

pub(crate) async fn finish_watchdog_helper_thread(
&self,
agent_id: ThreadId,
) -> CodexResult<()> {
let state = self.upgrade()?;
if let Ok(thread) = state.get_thread(agent_id).await {
if let Some(state_db_ctx) = thread.state_db()
&& let Err(err) = state_db_ctx
.set_thread_spawn_edge_status(
agent_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await
{
warn!("failed to persist thread-spawn edge status for {agent_id}: {err}");
}
thread.codex.session.flush_rollout().await?;
}
let _ = state.remove_thread(&agent_id).await;
self.state.release_spawned_thread(agent_id);
Ok(())
}

fn watchdog_manager(&self) -> CodexResult<&Arc<WatchdogManager>> {
self.watchdogs.as_ref().ok_or_else(|| {
CodexErr::UnsupportedOperation("watchdog manager unavailable".to_string())
Expand Down Expand Up @@ -954,7 +1077,8 @@ impl AgentControl {
let Ok(thread) = state.get_thread(agent_id).await else {
return AgentStatus::NotFound;
};
thread.agent_status().await
self.reported_agent_status(agent_id, thread.agent_status().await)
.await
}

pub(crate) fn register_session_root(
Expand Down Expand Up @@ -1086,7 +1210,9 @@ impl AgentControl {
{
agents.push(ListedAgent {
agent_name: root_path.to_string(),
agent_status: root_thread.agent_status().await,
agent_status: self
.reported_agent_status(root_thread_id, root_thread.agent_status().await)
.await,
last_task_message: Some(ROOT_LAST_TASK_MESSAGE.to_string()),
});
}
Expand All @@ -1095,6 +1221,14 @@ impl AgentControl {
let Some(thread_id) = metadata.agent_id else {
continue;
};
if let Some(watchdogs) = self.watchdogs.as_ref()
&& watchdogs
.target_for_active_helper(thread_id)
.await
.is_some()
{
continue;
}
if resolved_prefix
.as_ref()
.is_some_and(|prefix| !agent_matches_prefix(metadata.agent_path.as_ref(), prefix))
Expand All @@ -1113,14 +1247,118 @@ impl AgentControl {
let last_task_message = metadata.last_task_message.clone();
agents.push(ListedAgent {
agent_name,
agent_status: thread.agent_status().await,
agent_status: self
.reported_agent_status(thread_id, thread.agent_status().await)
.await,
last_task_message,
});
}

Ok(agents)
}

async fn reported_agent_status(&self, agent_id: ThreadId, status: AgentStatus) -> AgentStatus {
if matches!(status, AgentStatus::PendingInit)
&& let Some(watchdogs) = self.watchdogs.as_ref()
&& watchdogs.is_watchdog_handle(agent_id).await
{
return AgentStatus::Running;
}
status
}

pub(crate) async fn send_watchdog_close_event(
&self,
owner_thread_id: ThreadId,
target_thread_id: ThreadId,
receiver_agent_nickname: Option<String>,
receiver_agent_role: Option<String>,
status: AgentStatus,
) -> CodexResult<()> {
let state = self.upgrade()?;
let owner_thread = state.get_thread(owner_thread_id).await?;
owner_thread
.codex
.session
.send_event_raw(Event {
id: format!("watchdog-close-{target_thread_id}"),
msg: CollabCloseEndEvent {
call_id: format!("watchdog-close-{target_thread_id}"),
sender_thread_id: owner_thread_id,
receiver_thread_id: target_thread_id,
receiver_agent_nickname,
receiver_agent_role,
status,
}
.into(),
})
.await;
Ok(())
}

pub(crate) async fn compact_parent_for_watchdog_helper(
&self,
helper_thread_id: ThreadId,
) -> CodexResult<WatchdogParentCompactionResult> {
let Some(watchdogs) = self.watchdogs.as_ref() else {
return Ok(WatchdogParentCompactionResult::NotWatchdogHelper);
};
let Some(parent_thread_id) = watchdogs.owner_for_active_helper(helper_thread_id).await
else {
return Ok(WatchdogParentCompactionResult::NotWatchdogHelper);
};
let state = self.upgrade()?;
let parent_thread = state.get_thread(parent_thread_id).await?;
if parent_thread
.codex
.session
.active_turn
.lock()
.await
.is_some()
{
return Ok(WatchdogParentCompactionResult::ParentBusy { parent_thread_id });
}

state
.send_op(parent_thread_id, Op::Compact)
.await
.map(|submission_id| WatchdogParentCompactionResult::Submitted {
parent_thread_id,
submission_id,
})
}

async fn watchdog_boot_context_items(
&self,
state: &Arc<ThreadManagerState>,
owner_thread_id: ThreadId,
) -> Vec<RolloutItem> {
let owner_source = match state.get_thread(owner_thread_id).await {
Ok(owner_thread) => {
owner_thread
.codex
.thread_config_snapshot()
.await
.session_source
}
Err(_) => SessionSource::Cli,
};
self.register_session_root(owner_thread_id, &owner_source);
let agents = self
.list_agents(&owner_source, /*path_prefix*/ None)
.await
.unwrap_or_default();

synthetic_watchdog_tool_search_items()
.into_iter()
.chain(synthetic_watchdog_list_agents_items(
owner_thread_id,
agents,
))
.collect()
}

/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
Expand Down
Loading
Loading