Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,32 +233,31 @@ impl AgentControl {
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match (session_source, options.fork_mode.as_ref()) {
(Some(session_source), Some(_)) => {
self.spawn_forked_thread(
Box::pin(self.spawn_forked_thread(
&state,
config,
session_source,
&options,
inherited_shell_snapshot,
inherited_exec_policy,
)
))
.await?
}
(Some(session_source), None) => {
state
.spawn_new_thread_with_source(
config.clone(),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
)
.await?
Box::pin(state.spawn_new_thread_with_source(
config.clone(),
self.clone(),
session_source,
/*thread_source*/ Some(ThreadSource::Subagent),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
options.environments.clone(),
))
.await?
}
(None, _) => state.spawn_new_thread(config.clone(), self.clone()).await?,
(None, _) => Box::pin(state.spawn_new_thread(config.clone(), self.clone())).await?,
};
agent_metadata.agent_id = Some(new_thread.thread_id);
reservation.commit(agent_metadata.clone());
Expand Down
165 changes: 87 additions & 78 deletions codex-rs/core/src/tools/handlers/multi_agents/close_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,86 +20,95 @@ impl ToolHandler for Handler {
matches!(payload, ToolPayload::Function { .. })
}

async fn handle(&self, invocation: ToolInvocation) -> Result<Self::Output, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
fn handle(
&self,
invocation: ToolInvocation,
) -> impl std::future::Future<Output = Result<Self::Output, FunctionCallError>> + Send {
Box::pin(handle_close_agent(invocation))
}
}

async fn handle_close_agent(
invocation: ToolInvocation,
) -> Result<CloseAgentResult, FunctionCallError> {
let ToolInvocation {
session,
turn,
payload,
call_id,
..
} = invocation;
let arguments = function_arguments(payload)?;
let args: CloseAgentArgs = parse_arguments(&arguments)?;
let agent_id = parse_agent_id_target(&args.target)?;
let receiver_agent = session
.services
.agent_control
.get_agent_metadata(agent_id)
.unwrap_or_default();
session
.send_event(
&turn,
CollabCloseBeginEvent {
call_id: call_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
}
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;
.into(),
)
.await;
let status = match session
.services
.agent_control
.subscribe_status(agent_id)
.await
{
Ok(mut status_rx) => status_rx.borrow_and_update().clone(),
Err(err) => {
let status = session.services.agent_control.get_status(agent_id).await;
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id: call_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname.clone(),
receiver_agent_role: receiver_agent.agent_role.clone(),
status,
}
.into(),
)
.await;
return Err(collab_agent_error(agent_id, err));
}
};
let result = Box::pin(session.services.agent_control.close_agent(agent_id))
.await
.map_err(|err| collab_agent_error(agent_id, err))
.map(|_| ());
session
.send_event(
&turn,
CollabCloseEndEvent {
call_id,
completed_at_ms: now_unix_timestamp_ms(),
sender_thread_id: session.conversation_id,
receiver_thread_id: agent_id,
receiver_agent_nickname: receiver_agent.agent_nickname,
receiver_agent_role: receiver_agent.agent_role,
status: status.clone(),
}
.into(),
)
.await;
result?;

Ok(CloseAgentResult {
previous_status: status,
})
}
Ok(CloseAgentResult {
previous_status: status,
})
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down
Loading
Loading