Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::codex_message_processor::read_summary_from_rollout;
use crate::codex_message_processor::summary_to_thread;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::ClientRequestResult;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::thread_state::ThreadState;
use crate::thread_state::TurnSummary;
Expand Down Expand Up @@ -1408,12 +1409,25 @@ async fn handle_error(

async fn on_patch_approval_response(
call_id: String,
receiver: oneshot::Receiver<JsonValue>,
receiver: oneshot::Receiver<ClientRequestResult>,
codex: Arc<CodexThread>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Ok(Ok(value)) => value,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
if let Err(submit_err) = codex
.submit(Op::PatchApproval {
id: call_id.clone(),
decision: ReviewDecision::Denied,
})
.await
{
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
}
return;
}
Err(err) => {
error!("request failed: {err:?}");
if let Err(submit_err) = codex
Expand Down Expand Up @@ -1451,12 +1465,16 @@ async fn on_patch_approval_response(
async fn on_exec_approval_response(
call_id: String,
turn_id: String,
receiver: oneshot::Receiver<JsonValue>,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Ok(Ok(value)) => value,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
return;
}
Err(err) => {
error!("request failed: {err:?}");
return;
Expand Down Expand Up @@ -1488,12 +1506,28 @@ async fn on_exec_approval_response(

async fn on_request_user_input_response(
event_turn_id: String,
receiver: oneshot::Receiver<JsonValue>,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Ok(Ok(value)) => value,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
let empty = CoreRequestUserInputResponse {
answers: HashMap::new(),
};
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
response: empty,
})
.await
{
error!("failed to submit UserInputAnswer: {err}");
}
return;
}
Err(err) => {
error!("request failed: {err:?}");
let empty = CoreRequestUserInputResponse {
Expand Down Expand Up @@ -1628,14 +1662,14 @@ async fn on_file_change_request_approval_response(
conversation_id: ThreadId,
item_id: String,
changes: Vec<FileUpdateChange>,
receiver: oneshot::Receiver<JsonValue>,
receiver: oneshot::Receiver<ClientRequestResult>,
codex: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
) {
let response = receiver.await;
let (decision, completion_status) = match response {
Ok(value) => {
Ok(Ok(value)) => {
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
Expand All @@ -1650,6 +1684,10 @@ async fn on_file_change_request_approval_response(
// Only short-circuit on declines/cancels/failures.
(decision, completion_status)
}
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
Expand Down Expand Up @@ -1688,13 +1726,13 @@ async fn on_command_execution_request_approval_response(
command: String,
cwd: PathBuf,
command_actions: Vec<V2ParsedCommand>,
receiver: oneshot::Receiver<JsonValue>,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
) {
let response = receiver.await;
let (decision, completion_status) = match response {
Ok(value) => {
Ok(Ok(value)) => {
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
Expand Down Expand Up @@ -1729,6 +1767,10 @@ async fn on_command_execution_request_approval_response(
};
(decision, completion_status)
}
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
Expand Down
25 changes: 23 additions & 2 deletions codex-rs/app-server/src/dynamic_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,35 @@ use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::error;

use crate::outgoing_message::ClientRequestResult;

pub(crate) async fn on_call_response(
call_id: String,
receiver: oneshot::Receiver<serde_json::Value>,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Ok(Ok(value)) => value,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
let fallback = CoreDynamicToolResponse {
content_items: vec![CoreDynamicToolCallOutputContentItem::InputText {
text: "dynamic tool request failed".to_string(),
}],
success: false,
};
if let Err(err) = conversation
.submit(Op::DynamicToolResponse {
id: call_id.clone(),
response: fallback,
})
.await
{
error!("failed to submit DynamicToolResponse: {err}");
}
return;
}
Err(err) => {
error!("request failed: {err:?}");
let fallback = CoreDynamicToolResponse {
Expand Down
17 changes: 14 additions & 3 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,20 @@ impl ExternalAuthRefresher for ExternalAuthRefreshBridge {
.await;

let result = match timeout(EXTERNAL_AUTH_REFRESH_TIMEOUT, rx).await {
Ok(result) => result.map_err(|err| {
std::io::Error::other(format!("auth refresh request canceled: {err}"))
})?,
Ok(result) => {
// Two failure scenarios:
// 1) `oneshot::Receiver` failed (sender dropped) => request canceled/channel closed.
// 2) client answered with JSON-RPC error payload => propagate code/message.
let result = result.map_err(|err| {
std::io::Error::other(format!("auth refresh request canceled: {err}"))
})?;
result.map_err(|err| {
std::io::Error::other(format!(
"auth refresh request failed: code={} message={}",
err.code, err.message
))
})?
}
Err(_) => {
let _canceled = self.outgoing.cancel_request(&request_id).await;
return Err(std::io::Error::other(format!(
Expand Down
55 changes: 48 additions & 7 deletions codex-rs/app-server/src/outgoing_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::error_code::INTERNAL_ERROR_CODE;
#[cfg(test)]
use codex_protocol::account::PlanType;

pub(crate) type ClientRequestResult = std::result::Result<Result, JSONRPCErrorError>;

/// Stable identifier for a transport connection.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(crate) struct ConnectionId(pub(crate) u64);
Expand All @@ -46,7 +48,7 @@ pub(crate) enum OutgoingEnvelope {
pub(crate) struct OutgoingMessageSender {
next_server_request_id: AtomicI64,
sender: mpsc::Sender<OutgoingEnvelope>,
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<Result>>>,
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<ClientRequestResult>>>,
}

#[derive(Clone)]
Expand All @@ -69,7 +71,7 @@ impl ThreadScopedOutgoingMessageSender {
pub(crate) async fn send_request(
&self,
payload: ServerRequestPayload,
) -> oneshot::Receiver<Result> {
) -> oneshot::Receiver<ClientRequestResult> {
if self.connection_ids.is_empty() {
let (_tx, rx) = oneshot::channel();
return rx;
Expand Down Expand Up @@ -118,7 +120,7 @@ impl OutgoingMessageSender {
&self,
connection_ids: &[ConnectionId],
request: ServerRequestPayload,
) -> oneshot::Receiver<Result> {
) -> oneshot::Receiver<ClientRequestResult> {
let (_id, rx) = self
.send_request_with_id_to_connections(connection_ids, request)
.await;
Expand All @@ -128,15 +130,15 @@ impl OutgoingMessageSender {
pub(crate) async fn send_request_with_id(
&self,
request: ServerRequestPayload,
) -> (RequestId, oneshot::Receiver<Result>) {
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
self.send_request_with_id_to_connections(&[], request).await
}

async fn send_request_with_id_to_connections(
&self,
connection_ids: &[ConnectionId],
request: ServerRequestPayload,
) -> (RequestId, oneshot::Receiver<Result>) {
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
let id = RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed));
let outgoing_message_id = id.clone();
let (tx_approve, rx_approve) = oneshot::channel();
Expand Down Expand Up @@ -190,7 +192,7 @@ impl OutgoingMessageSender {

match entry {
Some((id, sender)) => {
if let Err(err) = sender.send(result) {
if let Err(err) = sender.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
}
Expand All @@ -207,8 +209,11 @@ impl OutgoingMessageSender {
};

match entry {
Some((id, _sender)) => {
Some((id, sender)) => {
warn!("client responded with error for {id:?}: {error:?}");
if let Err(err) = sender.send(Err(error)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
}
}
None => {
warn!("could not find callback for {id:?}");
Expand Down Expand Up @@ -390,11 +395,13 @@ mod tests {
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::ApplyPatchApprovalParams;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::time::timeout;
Expand Down Expand Up @@ -609,4 +616,38 @@ mod tests {
other => panic!("expected targeted error envelope, got: {other:?}"),
}
}

#[tokio::test]
async fn notify_client_error_forwards_error_to_waiter() {
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);

let (request_id, wait_for_result) = outgoing
.send_request_with_id(ServerRequestPayload::ApplyPatchApproval(
ApplyPatchApprovalParams {
conversation_id: ThreadId::new(),
call_id: "call-id".to_string(),
file_changes: HashMap::new(),
reason: None,
grant_root: None,
},
))
.await;

let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "refresh failed".to_string(),
data: None,
};

outgoing
.notify_client_error(request_id, error.clone())
.await;

let result = timeout(Duration::from_secs(1), wait_for_result)
.await
.expect("wait should not time out")
.expect("waiter should receive a callback");
assert_eq!(result, Err(error));
}
}
Loading