diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index f72ccb523f57..74161018bcfa 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -6331,180 +6331,173 @@ impl CodexMessageProcessor { app_server_client_name: Option, app_server_client_version: Option, ) { - if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { - self.track_error_response( - &request_id, - &error, - Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), - ); - self.outgoing.send_error(request_id, error).await; - return; - } - let (_, thread) = match self.load_thread(¶ms.thread_id).await { - Ok(v) => v, - Err(error) => { - self.track_error_response(&request_id, &error, /*error_type*/ None); - self.outgoing.send_error(request_id, error).await; - return; + let result = async { + if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { + self.track_error_response( + &request_id, + &error, + Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), + ); + return Err(error); } - }; - if let Err(error) = Self::set_app_server_client_info( - thread.as_ref(), - app_server_client_name, - app_server_client_version, - ) - .await - { - self.track_error_response(&request_id, &error, /*error_type*/ None); - self.outgoing.send_error(request_id, error).await; - return; - } + let (_, thread) = self + .load_thread(¶ms.thread_id) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; + Self::set_app_server_client_info( + thread.as_ref(), + app_server_client_name, + app_server_client_version, + ) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; - let collaboration_modes_config = CollaborationModesConfig { - default_mode_request_user_input: thread.enabled(Feature::DefaultModeRequestUserInput), - }; - let collaboration_mode = params.collaboration_mode.map(|mode| { - self.normalize_turn_start_collaboration_mode(mode, collaboration_modes_config) - }); - let environments: Option> = - params.environments.map(|environments| { - environments - .into_iter() - .map(|environment| TurnEnvironmentSelection { - environment_id: environment.environment_id, - cwd: environment.cwd, - }) - .collect() + let collaboration_modes_config = CollaborationModesConfig { + default_mode_request_user_input: thread + .enabled(Feature::DefaultModeRequestUserInput), + }; + let collaboration_mode = params.collaboration_mode.map(|mode| { + self.normalize_turn_start_collaboration_mode(mode, collaboration_modes_config) }); - if let Some(environments) = environments.as_ref() - && let Err(err) = self - .thread_manager - .validate_environment_selections(environments) - { - self.send_invalid_request_error(request_id, environment_selection_error_message(err)) - .await; - return; - } + let environments: Option> = + params.environments.map(|environments| { + environments + .into_iter() + .map(|environment| TurnEnvironmentSelection { + environment_id: environment.environment_id, + cwd: environment.cwd, + }) + .collect() + }); + if let Some(environments) = environments.as_ref() { + self.thread_manager + .validate_environment_selections(environments) + .map_err(|err| invalid_request(environment_selection_error_message(err)))?; + } - // Map v2 input items to core input items. - let mapped_items: Vec = params - .input - .into_iter() - .map(V2UserInput::into_core) - .collect(); + // Map v2 input items to core input items. + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + + let has_any_overrides = params.cwd.is_some() + || params.approval_policy.is_some() + || params.approvals_reviewer.is_some() + || params.sandbox_policy.is_some() + || params.permission_profile.is_some() + || params.model.is_some() + || params.service_tier.is_some() + || params.effort.is_some() + || params.summary.is_some() + || collaboration_mode.is_some() + || params.personality.is_some(); + + if params.sandbox_policy.is_some() && params.permission_profile.is_some() { + return Err(invalid_request( + "`permissionProfile` cannot be combined with `sandboxPolicy`", + )); + } - let has_any_overrides = params.cwd.is_some() - || params.approval_policy.is_some() - || params.approvals_reviewer.is_some() - || params.sandbox_policy.is_some() - || params.permission_profile.is_some() - || params.model.is_some() - || params.service_tier.is_some() - || params.effort.is_some() - || params.summary.is_some() - || collaboration_mode.is_some() - || params.personality.is_some(); - - if params.sandbox_policy.is_some() && params.permission_profile.is_some() { - self.send_invalid_request_error( - request_id, - "`permissionProfile` cannot be combined with `sandboxPolicy`".to_string(), - ) - .await; - return; - } + let cwd = params.cwd; + let approval_policy = params.approval_policy.map(AskForApproval::to_core); + let approvals_reviewer = params + .approvals_reviewer + .map(codex_app_server_protocol::ApprovalsReviewer::to_core); + let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); + let permission_profile = params.permission_profile.map(Into::into); + let model = params.model; + let effort = params.effort.map(Some); + let summary = params.summary; + let service_tier = params.service_tier; + let personality = params.personality; + + // If any overrides are provided, validate them synchronously so the + // request can fail before accepting user input. The actual update is + // still queued together with the input below to preserve submission order. + if has_any_overrides { + thread + .validate_turn_context_overrides(CodexThreadTurnContextOverrides { + cwd: cwd.clone(), + approval_policy, + approvals_reviewer, + sandbox_policy: sandbox_policy.clone(), + permission_profile: permission_profile.clone(), + windows_sandbox_level: None, + model: model.clone(), + effort, + summary, + service_tier, + collaboration_mode: collaboration_mode.clone(), + personality, + }) + .await + .map_err(|err| { + invalid_request(format!("invalid turn context override: {err}")) + })?; + } - let cwd = params.cwd; - let approval_policy = params.approval_policy.map(AskForApproval::to_core); - let approvals_reviewer = params - .approvals_reviewer - .map(codex_app_server_protocol::ApprovalsReviewer::to_core); - let sandbox_policy = params.sandbox_policy.map(|p| p.to_core()); - let permission_profile = params.permission_profile.map(Into::into); - let model = params.model; - let effort = params.effort.map(Some); - let summary = params.summary; - let service_tier = params.service_tier; - let personality = params.personality; - - // If any overrides are provided, validate them synchronously so the - // request can fail before accepting user input. The actual update is - // still queued together with the input below to preserve submission order. - if has_any_overrides { - let result = thread - .validate_turn_context_overrides(CodexThreadTurnContextOverrides { - cwd: cwd.clone(), + // Start the turn by submitting the user input. Return its submission id as turn_id. + let turn_op = if has_any_overrides { + Op::UserInputWithTurnContext { + items: mapped_items, + environments, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, + cwd, approval_policy, approvals_reviewer, - sandbox_policy: sandbox_policy.clone(), - permission_profile: permission_profile.clone(), + sandbox_policy, + permission_profile, windows_sandbox_level: None, - model: model.clone(), + model, effort, summary, service_tier, - collaboration_mode: collaboration_mode.clone(), + collaboration_mode, personality, - }) - .await; - if let Err(err) = result { - self.send_invalid_request_error( - request_id, - format!("invalid turn context override: {err}"), - ) - .await; - return; - } - } + } + } else { + Op::UserInput { + items: mapped_items, + environments, + final_output_json_schema: params.output_schema, + responsesapi_client_metadata: params.responsesapi_client_metadata, + } + }; + let turn_id = self + .submit_core_op(&request_id, thread.as_ref(), turn_op) + .await + .map_err(|err| { + let error = internal_error(format!("failed to start turn: {err}")); + self.track_error_response(&request_id, &error, /*error_type*/ None); + error + })?; - // Start the turn by submitting the user input. Return its submission id as turn_id. - let turn_op = if has_any_overrides { - Op::UserInputWithTurnContext { - items: mapped_items, - environments, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - cwd, - approval_policy, - approvals_reviewer, - sandbox_policy, - permission_profile, - windows_sandbox_level: None, - model, - effort, - summary, - service_tier, - collaboration_mode, - personality, - } - } else { - Op::UserInput { - items: mapped_items, - environments, - final_output_json_schema: params.output_schema, - responsesapi_client_metadata: params.responsesapi_client_metadata, - } - }; - let turn_id = self - .submit_core_op(&request_id, thread.as_ref(), turn_op) - .await; + self.outgoing + .record_request_turn_id(&request_id, &turn_id) + .await; + let turn = Turn { + id: turn_id, + items: vec![], + error: None, + status: TurnStatus::InProgress, + started_at: None, + completed_at: None, + duration_ms: None, + }; - match turn_id { - Ok(turn_id) => { - self.outgoing - .record_request_turn_id(&request_id, &turn_id) - .await; - let turn = Turn { - id: turn_id.clone(), - items: vec![], - error: None, - status: TurnStatus::InProgress, - started_at: None, - completed_at: None, - duration_ms: None, - }; + Ok::<_, JSONRPCErrorError>(TurnStartResponse { turn }) + } + .await; - let response = TurnStartResponse { turn }; + match result { + Ok(response) => { self.analytics_events_client.track_response( request_id.connection_id.0, ClientResponse::TurnStart { @@ -6514,13 +6507,7 @@ impl CodexMessageProcessor { ); self.outgoing.send_response(request_id, response).await; } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to start turn: {err}"), - data: None, - }; - self.track_error_response(&request_id, &error, /*error_type*/ None); + Err(error) => { self.outgoing.send_error(request_id, error).await; } } @@ -6531,15 +6518,17 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadInjectItemsParams, ) { - let (_, thread) = match self.load_thread(¶ms.thread_id).await { - Ok(value) => value, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let result = self.thread_inject_items_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn thread_inject_items_response( + &self, + params: ThreadInjectItemsParams, + ) -> Result { + let (_, thread) = self.load_thread(¶ms.thread_id).await?; - let items = match params + let items = params .items .into_iter() .enumerate() @@ -6548,31 +6537,16 @@ impl CodexMessageProcessor { .map_err(|err| format!("items[{index}] is not a valid response item: {err}")) }) .collect::, _>>() - { - Ok(items) => items, - Err(message) => { - self.send_invalid_request_error(request_id, message).await; - return; - } - }; + .map_err(invalid_request)?; - match thread.inject_response_items(items).await { - Ok(()) => { - self.outgoing - .send_response(request_id, ThreadInjectItemsResponse {}) - .await; - } - Err(CodexErr::InvalidRequest(message)) => { - self.send_invalid_request_error(request_id, message).await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to inject response items: {err}"), - ) - .await; - } - } + thread + .inject_response_items(items) + .await + .map_err(|err| match err { + CodexErr::InvalidRequest(message) => invalid_request(message), + err => internal_error(format!("failed to inject response items: {err}")), + })?; + Ok(ThreadInjectItemsResponse {}) } async fn set_app_server_client_info( @@ -6591,52 +6565,116 @@ impl CodexMessageProcessor { } async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) { - let (_, thread) = match self.load_thread(¶ms.thread_id).await { - Ok(v) => v, - Err(error) => { - self.track_error_response(&request_id, &error, /*error_type*/ None); - self.outgoing.send_error(request_id, error).await; - return; + let result = async { + let (_, thread) = self + .load_thread(¶ms.thread_id) + .await + .inspect_err(|error| { + self.track_error_response(&request_id, error, /*error_type*/ None); + })?; + + if params.expected_turn_id.is_empty() { + return Err(invalid_request("expectedTurnId must not be empty")); + } + self.outgoing + .record_request_turn_id(&request_id, ¶ms.expected_turn_id) + .await; + if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { + self.track_error_response( + &request_id, + &error, + Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), + ); + return Err(error); } - }; - if params.expected_turn_id.is_empty() { - self.send_invalid_request_error( - request_id, - "expectedTurnId must not be empty".to_string(), - ) - .await; - return; - } - self.outgoing - .record_request_turn_id(&request_id, ¶ms.expected_turn_id) - .await; - if let Err(error) = Self::validate_v2_input_limit(¶ms.input) { - self.track_error_response( - &request_id, - &error, - Some(AnalyticsJsonRpcError::Input(InputError::TooLarge)), - ); - self.outgoing.send_error(request_id, error).await; - return; + let mapped_items: Vec = params + .input + .into_iter() + .map(V2UserInput::into_core) + .collect(); + + let turn_id = thread + .steer_input( + mapped_items, + Some(¶ms.expected_turn_id), + params.responsesapi_client_metadata, + ) + .await + .map_err(|err| { + let (code, message, data, error_type) = match err { + SteerInputError::NoActiveTurn(_) => ( + INVALID_REQUEST_ERROR_CODE, + "no active turn to steer".to_string(), + None, + Some(AnalyticsJsonRpcError::TurnSteer( + TurnSteerRequestError::NoActiveTurn, + )), + ), + SteerInputError::ExpectedTurnMismatch { expected, actual } => ( + INVALID_REQUEST_ERROR_CODE, + format!("expected active turn id `{expected}` but found `{actual}`"), + None, + Some(AnalyticsJsonRpcError::TurnSteer( + TurnSteerRequestError::ExpectedTurnMismatch, + )), + ), + SteerInputError::ActiveTurnNotSteerable { turn_kind } => { + let (message, turn_steer_error) = match turn_kind { + codex_protocol::protocol::NonSteerableTurnKind::Review => ( + "cannot steer a review turn".to_string(), + TurnSteerRequestError::NonSteerableReview, + ), + codex_protocol::protocol::NonSteerableTurnKind::Compact => ( + "cannot steer a compact turn".to_string(), + TurnSteerRequestError::NonSteerableCompact, + ), + }; + let error = TurnError { + message: message.clone(), + codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { + turn_kind: turn_kind.into(), + }), + additional_details: None, + }; + let data = match serde_json::to_value(error) { + Ok(data) => Some(data), + Err(error) => { + tracing::error!( + ?error, + "failed to serialize active-turn-not-steerable turn error" + ); + None + } + }; + ( + INVALID_REQUEST_ERROR_CODE, + message, + data, + Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)), + ) + } + SteerInputError::EmptyInput => ( + INVALID_REQUEST_ERROR_CODE, + "input must not be empty".to_string(), + None, + Some(AnalyticsJsonRpcError::Input(InputError::Empty)), + ), + }; + let error = JSONRPCErrorError { + code, + message, + data, + }; + self.track_error_response(&request_id, &error, error_type); + error + })?; + Ok::<_, JSONRPCErrorError>(TurnSteerResponse { turn_id }) } + .await; - let mapped_items: Vec = params - .input - .into_iter() - .map(V2UserInput::into_core) - .collect(); - - match thread - .steer_input( - mapped_items, - Some(¶ms.expected_turn_id), - params.responsesapi_client_metadata, - ) - .await - { - Ok(turn_id) => { - let response = TurnSteerResponse { turn_id }; + match result { + Ok(response) => { self.analytics_events_client.track_response( request_id.connection_id.0, ClientResponse::TurnSteer { @@ -6646,72 +6684,7 @@ impl CodexMessageProcessor { ); self.outgoing.send_response(request_id, response).await; } - Err(err) => { - let (code, message, data, error_type) = match err { - SteerInputError::NoActiveTurn(_) => ( - INVALID_REQUEST_ERROR_CODE, - "no active turn to steer".to_string(), - None, - Some(AnalyticsJsonRpcError::TurnSteer( - TurnSteerRequestError::NoActiveTurn, - )), - ), - SteerInputError::ExpectedTurnMismatch { expected, actual } => ( - INVALID_REQUEST_ERROR_CODE, - format!("expected active turn id `{expected}` but found `{actual}`"), - None, - Some(AnalyticsJsonRpcError::TurnSteer( - TurnSteerRequestError::ExpectedTurnMismatch, - )), - ), - SteerInputError::ActiveTurnNotSteerable { turn_kind } => { - let (message, turn_steer_error) = match turn_kind { - codex_protocol::protocol::NonSteerableTurnKind::Review => ( - "cannot steer a review turn".to_string(), - TurnSteerRequestError::NonSteerableReview, - ), - codex_protocol::protocol::NonSteerableTurnKind::Compact => ( - "cannot steer a compact turn".to_string(), - TurnSteerRequestError::NonSteerableCompact, - ), - }; - let error = TurnError { - message: message.clone(), - codex_error_info: Some(CodexErrorInfo::ActiveTurnNotSteerable { - turn_kind: turn_kind.into(), - }), - additional_details: None, - }; - let data = match serde_json::to_value(error) { - Ok(data) => Some(data), - Err(error) => { - tracing::error!( - ?error, - "failed to serialize active-turn-not-steerable turn error" - ); - None - } - }; - ( - INVALID_REQUEST_ERROR_CODE, - message, - data, - Some(AnalyticsJsonRpcError::TurnSteer(turn_steer_error)), - ) - } - SteerInputError::EmptyInput => ( - INVALID_REQUEST_ERROR_CODE, - "input must not be empty".to_string(), - None, - Some(AnalyticsJsonRpcError::Input(InputError::Empty)), - ), - }; - let error = JSONRPCErrorError { - code, - message, - data, - }; - self.track_error_response(&request_id, &error, error_type); + Err(error) => { self.outgoing.send_error(request_id, error).await; } } @@ -6719,16 +6692,10 @@ impl CodexMessageProcessor { async fn prepare_realtime_conversation_thread( &self, - request_id: ConnectionRequestId, + request_id: &ConnectionRequestId, thread_id: &str, - ) -> Option<(ThreadId, Arc)> { - let (thread_id, thread) = match self.load_thread(thread_id).await { - Ok(v) => v, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return None; - } - }; + ) -> Result)>, JSONRPCErrorError> { + let (thread_id, thread) = self.load_thread(thread_id).await?; match self .ensure_conversation_listener( @@ -6741,24 +6708,18 @@ impl CodexMessageProcessor { { Ok(EnsureConversationListenerResult::Attached) => {} Ok(EnsureConversationListenerResult::ConnectionClosed) => { - return None; - } - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return None; + return Ok(None); } + Err(error) => return Err(error), } if !thread.enabled(Feature::RealtimeConversation) { - self.send_invalid_request_error( - request_id, - format!("thread {thread_id} does not support realtime conversation"), - ) - .await; - return None; + return Err(invalid_request(format!( + "thread {thread_id} does not support realtime conversation" + ))); } - Some((thread_id, thread)) + Ok(Some((thread_id, thread))) } async fn thread_realtime_start( @@ -6766,15 +6727,14 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadRealtimeStartParams, ) { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(request_id.clone(), ¶ms.thread_id) - .await - else { - return; - }; - - let submit = self - .submit_core_op( + let result = async { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( &request_id, thread.as_ref(), Op::RealtimeConversationStart(ConversationStartParams { @@ -6792,22 +6752,14 @@ impl CodexMessageProcessor { voice: params.voice, }), ) - .await; - - match submit { - Ok(_) => { - self.outgoing - .send_response(request_id, ThreadRealtimeStartResponse::default()) - .await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to start realtime conversation: {err}"), - ) - .await; - } + .await + .map_err(|err| { + internal_error(format!("failed to start realtime conversation: {err}")) + })?; + Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStartResponse::default())) } + .await; + self.send_optional_result(request_id, result).await; } async fn thread_realtime_append_audio( @@ -6815,37 +6767,30 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadRealtimeAppendAudioParams, ) { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(request_id.clone(), ¶ms.thread_id) - .await - else { - return; - }; - - let submit = self - .submit_core_op( + let result = async { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( &request_id, thread.as_ref(), Op::RealtimeConversationAudio(ConversationAudioParams { frame: params.audio.into(), }), ) - .await; - - match submit { - Ok(_) => { - self.outgoing - .send_response(request_id, ThreadRealtimeAppendAudioResponse::default()) - .await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to append realtime conversation audio: {err}"), - ) - .await; - } + .await + .map_err(|err| { + internal_error(format!( + "failed to append realtime conversation audio: {err}" + )) + })?; + Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendAudioResponse::default())) } + .await; + self.send_optional_result(request_id, result).await; } async fn thread_realtime_append_text( @@ -6853,35 +6798,28 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadRealtimeAppendTextParams, ) { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(request_id.clone(), ¶ms.thread_id) - .await - else { - return; - }; - - let submit = self - .submit_core_op( + let result = async { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op( &request_id, thread.as_ref(), Op::RealtimeConversationText(ConversationTextParams { text: params.text }), ) - .await; - - match submit { - Ok(_) => { - self.outgoing - .send_response(request_id, ThreadRealtimeAppendTextResponse::default()) - .await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to append realtime conversation text: {err}"), - ) - .await; - } + .await + .map_err(|err| { + internal_error(format!( + "failed to append realtime conversation text: {err}" + )) + })?; + Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeAppendTextResponse::default())) } + .await; + self.send_optional_result(request_id, result).await; } async fn thread_realtime_stop( @@ -6889,31 +6827,22 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadRealtimeStopParams, ) { - let Some((_, thread)) = self - .prepare_realtime_conversation_thread(request_id.clone(), ¶ms.thread_id) - .await - else { - return; - }; - - let submit = self - .submit_core_op(&request_id, thread.as_ref(), Op::RealtimeConversationClose) - .await; - - match submit { - Ok(_) => { - self.outgoing - .send_response(request_id, ThreadRealtimeStopResponse::default()) - .await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to stop realtime conversation: {err}"), - ) - .await; - } + let result = async { + let Some((_, thread)) = self + .prepare_realtime_conversation_thread(&request_id, ¶ms.thread_id) + .await? + else { + return Ok(None); + }; + self.submit_core_op(&request_id, thread.as_ref(), Op::RealtimeConversationClose) + .await + .map_err(|err| { + internal_error(format!("failed to stop realtime conversation: {err}")) + })?; + Ok::<_, JSONRPCErrorError>(Some(ThreadRealtimeStopResponse::default())) } + .await; + self.send_optional_result(request_id, result).await; } async fn thread_realtime_list_voices(