Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions codex-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ members = [
"utils/cache",
"utils/image",
"utils/json-to-toml",
"utils/log",
"utils/home-dir",
"utils/pty",
"utils/readiness",
Expand Down Expand Up @@ -221,6 +222,7 @@ codex-utils-fuzzy-match = { path = "utils/fuzzy-match" }
codex-utils-home-dir = { path = "utils/home-dir" }
codex-utils-image = { path = "utils/image" }
codex-utils-json-to-toml = { path = "utils/json-to-toml" }
codex-utils-log = { path = "utils/log" }
codex-utils-oss = { path = "utils/oss" }
codex-utils-output-truncation = { path = "utils/output-truncation" }
codex-utils-path = { path = "utils/path-utils" }
Expand Down
1 change: 1 addition & 0 deletions codex-rs/analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ codex-login = { workspace = true }
codex-model-provider = { workspace = true }
codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-log = { workspace = true }
os_info = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use codex_login::CodexAuth;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_utils_log::bounded_str;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
Expand Down Expand Up @@ -454,7 +455,7 @@ async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec<Trac
Ok(response) => {
let status = response.status();
let body = response.text().await.unwrap_or_default();
tracing::warn!("events failed with status {status}: {body}");
tracing::warn!("events failed with status {status}: {}", bounded_str(&body));
}
Err(err) => {
tracing::warn!("failed to send events request: {err}");
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ codex-thread-store = { workspace = true }
codex-tools = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
codex-utils-log = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout::StateDbHandle;
use codex_state::log_db::LogDbLayer;
use codex_utils_log::bounded_debug;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -615,14 +616,14 @@ impl MessageProcessor {
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
// Currently, we do not expect to receive any notifications from the
// client, so we just log them.
tracing::info!("<- notification: {:?}", notification);
tracing::info!("<- notification: {}", bounded_debug(&notification));
}

/// Handles typed notifications from in-process clients.
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
// Currently, we do not expect to receive any typed notifications from
// in-process clients, so we just log them.
tracing::info!("<- typed notification: {:?}", notification);
tracing::info!("<- typed notification: {}", bounded_debug(&notification));
}

async fn run_request_with_context<F>(
Expand Down Expand Up @@ -722,14 +723,14 @@ impl MessageProcessor {

/// Handle a standalone JSON-RPC response originating from the peer.
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
tracing::info!("<- response: {:?}", response);
tracing::info!("<- response: {}", bounded_debug(&response));
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
}

/// Handle an error object received from the peer.
pub(crate) async fn process_error(&self, err: JSONRPCError) {
tracing::error!("<- error: {:?}", err);
tracing::error!("<- error: {}", bounded_debug(&err));
self.outgoing.notify_client_error(err.id, err.error).await;
}

Expand Down
74 changes: 58 additions & 16 deletions codex-rs/app-server/src/outgoing_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ use codex_otel::span_w3c_trace_context;
use codex_protocol::ThreadId;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_utils_log::bounded_debug;
use codex_utils_log::bounded_display;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Instrument;
use tracing::Level;
use tracing::Span;
use tracing::enabled;
use tracing::warn;

use crate::error_code::internal_error;
Expand Down Expand Up @@ -342,7 +346,10 @@ impl OutgoingMessageSender {
};

if let Err(err) = send_result {
warn!("failed to send request {outgoing_message_id:?} to client: {err:?}");
warn!(
"failed to send request {outgoing_message_id:?} to client: {}",
bounded_debug(&err)
);
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback.remove(&outgoing_message_id);
}
Expand All @@ -365,7 +372,10 @@ impl OutgoingMessageSender {
})
.await
{
warn!("failed to resend request to client: {err:?}");
warn!(
"failed to resend request to client: {}",
bounded_debug(&err)
);
}
}
}
Expand All @@ -383,7 +393,10 @@ impl OutgoingMessageSender {
.track_server_response(completed_at_ms, response);
}
if let Err(err) = entry.callback.send(Ok(result)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
warn!(
"could not notify callback for {id:?} due to: {}",
bounded_debug(&err)
);
}
}
None => {
Expand All @@ -397,11 +410,17 @@ impl OutgoingMessageSender {

match entry {
Some((id, entry)) => {
warn!("client responded with error for {id:?}: {error:?}");
warn!(
"client responded with error for {id:?}: {}",
bounded_debug(&error)
);
self.analytics_events_client
.track_server_request_aborted(now_unix_timestamp_ms(), id.clone());
if let Err(err) = entry.callback.send(Err(error)) {
warn!("could not notify callback for {id:?} due to: {err:?}");
warn!(
"could not notify callback for {id:?} due to: {}",
bounded_debug(&err)
);
}
}
None => {
Expand Down Expand Up @@ -437,7 +456,10 @@ impl OutgoingMessageSender {
&& let Err(err) = entry.callback.send(Err(error.clone()))
{
let request_id = entry.request.id();
warn!("could not notify callback for {request_id:?} due to: {err:?}");
warn!(
"could not notify callback for {request_id:?} due to: {}",
bounded_debug(&err)
);
}
}
}
Expand Down Expand Up @@ -495,7 +517,10 @@ impl OutgoingMessageSender {
&& let Err(err) = entry.callback.send(Err(error.clone()))
{
let request_id = entry.request.id();
warn!("could not notify callback for {request_id:?} due to: {err:?}",);
warn!(
"could not notify callback for {request_id:?} due to: {}",
bounded_debug(&err)
);
}
}
}
Expand Down Expand Up @@ -560,10 +585,13 @@ impl OutgoingMessageSender {
connection_ids: &[ConnectionId],
notification: ServerNotification,
) {
tracing::trace!(
targeted_connections = connection_ids.len(),
"app-server event: {notification}"
);
if enabled!(Level::TRACE) {
let notification_log = bounded_display(&notification);
tracing::trace!(
targeted_connections = connection_ids.len(),
"app-server event: {notification_log}"
);
}
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
if connection_ids.is_empty() {
if let Err(err) = self
Expand All @@ -573,7 +601,10 @@ impl OutgoingMessageSender {
})
.await
{
warn!("failed to send server notification to client: {err:?}");
warn!(
"failed to send server notification to client: {}",
bounded_debug(&err)
);
}
return;
}
Expand All @@ -587,7 +618,10 @@ impl OutgoingMessageSender {
})
.await
{
warn!("failed to send server notification to client: {err:?}");
warn!(
"failed to send server notification to client: {}",
bounded_debug(&err)
);
}
}
}
Expand All @@ -597,7 +631,9 @@ impl OutgoingMessageSender {
connection_id: ConnectionId,
notification: ServerNotification,
) {
tracing::trace!("app-server event: {notification}");
if enabled!(Level::TRACE) {
tracing::trace!("app-server event: {}", bounded_display(&notification));
}
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
let (write_complete_tx, write_complete_rx) = oneshot::channel();
if let Err(err) = self
Expand All @@ -609,7 +645,10 @@ impl OutgoingMessageSender {
})
.await
{
warn!("failed to send server notification to client: {err:?}");
warn!(
"failed to send server notification to client: {}",
bounded_debug(&err)
);
}
let _ = write_complete_rx.await;
}
Expand Down Expand Up @@ -678,7 +717,10 @@ impl OutgoingMessageSender {
};

if let Err(err) = send_result {
warn!("failed to send {message_kind} to client: {err:?}");
warn!(
"failed to send {message_kind} to client: {}",
bounded_debug(&err)
);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/codex-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ bytes = { workspace = true }
chrono = { workspace = true }
codex-client = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-log = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion codex-rs/codex-api/src/endpoint/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use codex_client::HttpTransport;
use codex_client::RequestTelemetry;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelsResponse;
use codex_utils_log::bounded_bytes_lossy;
use http::HeaderMap;
use http::Method;
use http::header::ETAG;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl<T: HttpTransport> ModelsClient<T> {
.map_err(|e| {
ApiError::Stream(format!(
"failed to decode models response: {e}; body: {}",
String::from_utf8_lossy(&resp.body)
bounded_bytes_lossy(&resp.body)
))
})?;

Expand Down
5 changes: 3 additions & 2 deletions codex-rs/codex-api/src/endpoint/realtime_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytes::Bytes;
use codex_client::HttpTransport;
use codex_client::RequestBody;
use codex_client::RequestTelemetry;
use codex_utils_log::bounded_str;
use http::HeaderMap;
use http::HeaderValue;
use http::Method;
Expand Down Expand Up @@ -119,7 +120,7 @@ impl<T: HttpTransport> RealtimeCallClient<T> {
session_config: RealtimeSessionConfig,
extra_headers: HeaderMap,
) -> Result<RealtimeCallResponse, ApiError> {
trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {sdp}");
trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {}", bounded_str(&sdp));
// WebRTC can begin inference as soon as the peer connection comes up, so the initial
// session payload is sent with call creation. The sideband WebSocket still sends its normal
// session.update after it joins.
Expand Down Expand Up @@ -202,7 +203,7 @@ fn decode_call_id_from_location(headers: &HeaderMap) -> Result<String, ApiError>
.ok_or_else(|| ApiError::Stream("realtime call response missing Location".to_string()))?
.to_str()
.map_err(|err| ApiError::Stream(format!("invalid realtime call Location: {err}")))?;
trace!("realtime call Location: {location}");
trace!("realtime call Location: {}", bounded_str(location));

location
.split('?')
Expand Down
Loading
Loading