Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions codex-rs/app-server/src/in_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ use std::time::Duration;

use crate::analytics_utils::analytics_events_client_from_config;
use crate::config_manager::ConfigManager;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::OVERLOADED_ERROR_CODE;
use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use crate::message_processor::ConnectionSessionState;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
Expand Down Expand Up @@ -526,11 +526,9 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
entry.insert(response_tx);
}
Entry::Occupied(_) => {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("duplicate request id: {request_id:?}"),
data: None,
}));
let _ = response_tx.send(Err(invalid_request(format!(
"duplicate request id: {request_id:?}"
))));
continue;
}
}
Expand All @@ -553,13 +551,9 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"in-process app-server request processor is closed"
.to_string(),
data: None,
}));
let _ = response_tx.send(Err(internal_error(
"in-process app-server request processor is closed",
)));
}
break;
}
Expand Down Expand Up @@ -627,15 +621,20 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
if let Err(send_error) = event_tx
.try_send(InProcessServerEvent::ServerRequest(request))
{
let (code, message, inner) = match send_error {
let (error, inner) = match send_error {
mpsc::error::TrySendError::Full(inner) => (
OVERLOADED_ERROR_CODE,
"in-process server request queue is full",
JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message:
"in-process server request queue is full".to_string(),
data: None,
},
inner,
),
mpsc::error::TrySendError::Closed(inner) => (
INTERNAL_ERROR_CODE,
"in-process server request consumer is closed",
internal_error(
"in-process server request consumer is closed",
),
inner,
),
};
Expand All @@ -644,14 +643,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
_ => unreachable!("we just sent a ServerRequest variant"),
};
outgoing_message_sender
.notify_client_error(
request_id,
JSONRPCErrorError {
code,
message: message.to_string(),
data: None,
},
)
.notify_client_error(request_id, error)
.await;
}
}
Expand Down Expand Up @@ -688,21 +680,17 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
drop(writer_rx);
drop(processor_tx);
outgoing_message_sender
.cancel_all_requests(Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}))
.cancel_all_requests(Some(internal_error(
"in-process app-server runtime is shutting down",
)))
.await;
// Drop the runtime's last sender before awaiting the router task so
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
drop(outgoing_message_sender);
for (_, response_tx) in pending_request_responses {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}));
let _ = response_tx.send(Err(internal_error(
"in-process app-server runtime is shutting down",
)));
}

if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
Expand Down Expand Up @@ -731,6 +719,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
#[cfg(test)]
mod tests {
use super::*;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::DeviceKeyPublicParams;
Expand Down
32 changes: 11 additions & 21 deletions codex-rs/app-server/src/outgoing_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use tracing::Instrument;
use tracing::Span;
use tracing::warn;

use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::internal_error;
use crate::server_request_error::TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON;
pub(crate) use codex_app_server_transport::ConnectionId;
Expand Down Expand Up @@ -157,11 +156,14 @@ impl ThreadScopedOutgoingMessageSender {
self.outgoing
.cancel_requests_for_thread(
self.thread_id,
Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "client request resolved because the turn state was changed"
.to_string(),
data: Some(serde_json::json!({ "reason": TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON })),
Some({
let mut error = internal_error(
"client request resolved because the turn state was changed",
);
error.data = Some(serde_json::json!({
"reason": TURN_TRANSITION_PENDING_REQUEST_ERROR_REASON,
}));
error
}),
)
.await
Expand Down Expand Up @@ -1011,11 +1013,7 @@ mod tests {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
};
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "boom".to_string(),
data: None,
};
let error = internal_error("boom");

outgoing.send_error(request_id.clone(), error.clone()).await;

Expand Down Expand Up @@ -1139,11 +1137,7 @@ mod tests {
))
.await;

let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "refresh failed".to_string(),
data: None,
};
let error = internal_error("refresh failed");

outgoing
.notify_client_error(request_id, error.clone())
Expand Down Expand Up @@ -1253,11 +1247,7 @@ mod tests {
},
))
.await;
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "tracked request cancelled".to_string(),
data: None,
};
let error = internal_error("tracked request cancelled");

outgoing
.cancel_requests_for_thread(thread_id, Some(error.clone()))
Expand Down
3 changes: 0 additions & 3 deletions codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ use crate::command_exec::CommandExecManager;
use crate::command_exec::StartCommandExecParams;
use crate::config_manager::ConfigManager;
use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_PARAMS_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::invalid_params;
use crate::models::supported_models;
use crate::outgoing_message::ConnectionId;
Expand Down
120 changes: 36 additions & 84 deletions codex-rs/app-server/src/request_processors/account_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,9 @@ impl AccountRequestProcessor {
}

fn external_auth_active_error(&self) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "External auth is active. Use account/login/start (chatgptAuthTokens) to update it or account/logout to clear it."
.to_string(),
data: None,
}
invalid_request(
"External auth is active. Use account/login/start (chatgptAuthTokens) to update it or account/logout to clear it.",
)
}

async fn login_api_key_common(
Expand All @@ -263,11 +260,9 @@ impl AccountRequestProcessor {
self.config.forced_login_method,
Some(ForcedLoginMethod::Chatgpt)
) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "API key login is disabled. Use ChatGPT login instead.".to_string(),
data: None,
});
return Err(invalid_request(
"API key login is disabled. Use ChatGPT login instead.",
));
}

// Cancel any active login attempt.
Expand All @@ -287,11 +282,7 @@ impl AccountRequestProcessor {
self.auth_manager.reload().await;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to save api key: {err}"),
data: None,
}),
Err(err) => Err(internal_error(format!("failed to save api key: {err}"))),
}
}

Expand Down Expand Up @@ -321,11 +312,9 @@ impl AccountRequestProcessor {
}

if matches!(config.forced_login_method, Some(ForcedLoginMethod::Api)) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "ChatGPT login is disabled. Use API key login instead.".to_string(),
data: None,
});
return Err(invalid_request(
"ChatGPT login is disabled. Use API key login instead.",
));
}

let opts = LoginServerOptions {
Expand Down Expand Up @@ -354,18 +343,10 @@ impl AccountRequestProcessor {

fn login_chatgpt_device_code_start_error(err: IoError) -> JSONRPCErrorError {
let is_not_found = err.kind() == std::io::ErrorKind::NotFound;
JSONRPCErrorError {
code: if is_not_found {
INVALID_REQUEST_ERROR_CODE
} else {
INTERNAL_ERROR_CODE
},
message: if is_not_found {
err.to_string()
} else {
format!("failed to request device code: {err}")
},
data: None,
if is_not_found {
invalid_request(err.to_string())
} else {
internal_error(format!("failed to request device code: {err}"))
}
}

Expand Down Expand Up @@ -698,11 +679,7 @@ impl AccountRequestProcessor {
match self.auth_manager.logout_with_revoke().await {
Ok(_) => {}
Err(err) => {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("logout failed: {err}"),
data: None,
});
return Err(internal_error(format!("logout failed: {err}")));
}
}

Expand Down Expand Up @@ -885,28 +862,19 @@ impl AccountRequestProcessor {
params: SendAddCreditsNudgeEmailParams,
) -> Result<AddCreditsNudgeEmailStatus, JSONRPCErrorError> {
let Some(auth) = self.auth_manager.auth().await else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "codex account authentication required to notify workspace owner"
.to_string(),
data: None,
});
return Err(invalid_request(
"codex account authentication required to notify workspace owner",
));
};

if !auth.uses_codex_backend() {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "chatgpt authentication required to notify workspace owner".to_string(),
data: None,
});
return Err(invalid_request(
"chatgpt authentication required to notify workspace owner",
));
}

let client = BackendClient::from_auth(self.config.chatgpt_base_url.clone(), &auth)
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to construct backend client: {err}"),
data: None,
})?;
.map_err(|err| internal_error(format!("failed to construct backend client: {err}")))?;

match client
.send_add_credits_nudge_email(Self::backend_credit_type(params.credit_type))
Expand All @@ -916,11 +884,9 @@ impl AccountRequestProcessor {
Err(err) if err.status().is_some_and(|status| status.as_u16() == 429) => {
Ok(AddCreditsNudgeEmailStatus::CooldownActive)
}
Err(err) => Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to notify workspace owner: {err}"),
data: None,
}),
Err(err) => Err(internal_error(format!(
"failed to notify workspace owner: {err}"
))),
}
}

Expand All @@ -941,42 +907,28 @@ impl AccountRequestProcessor {
JSONRPCErrorError,
> {
let Some(auth) = self.auth_manager.auth().await else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "codex account authentication required to read rate limits".to_string(),
data: None,
});
return Err(invalid_request(
"codex account authentication required to read rate limits",
));
};

if !auth.uses_codex_backend() {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "chatgpt authentication required to read rate limits".to_string(),
data: None,
});
return Err(invalid_request(
"chatgpt authentication required to read rate limits",
));
}

let client = BackendClient::from_auth(self.config.chatgpt_base_url.clone(), &auth)
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to construct backend client: {err}"),
data: None,
})?;
.map_err(|err| internal_error(format!("failed to construct backend client: {err}")))?;

let snapshots = client
.get_rate_limits_many()
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to fetch codex rate limits: {err}"),
data: None,
})?;
.map_err(|err| internal_error(format!("failed to fetch codex rate limits: {err}")))?;
if snapshots.is_empty() {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "failed to fetch codex rate limits: no snapshots returned".to_string(),
data: None,
});
return Err(internal_error(
"failed to fetch codex rate limits: no snapshots returned",
));
}

let rate_limits_by_limit_id: HashMap<String, CoreRateLimitSnapshot> = snapshots
Expand Down
Loading
Loading