diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 83ca0d11915e..9e369bd407d0 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1821,6 +1821,7 @@ dependencies = [ "codex-plugin", "codex-protocol", "codex-utils-absolute-path", + "codex-utils-log", "os_info", "pretty_assertions", "serde", @@ -1852,6 +1853,7 @@ dependencies = [ "chrono", "codex-client", "codex-protocol", + "codex-utils-log", "codex-utils-rustls-provider", "eventsource-stream", "futures", @@ -1926,6 +1928,7 @@ dependencies = [ "codex-utils-cargo-bin", "codex-utils-cli", "codex-utils-json-to-toml", + "codex-utils-log", "codex-utils-pty", "core_test_support", "flate2", @@ -2532,6 +2535,7 @@ dependencies = [ "codex-utils-cargo-bin", "codex-utils-home-dir", "codex-utils-image", + "codex-utils-log", "codex-utils-output-truncation", "codex-utils-path", "codex-utils-plugins", @@ -3049,6 +3053,7 @@ dependencies = [ "codex-otel", "codex-protocol", "codex-terminal-detection", + "codex-utils-log", "codex-utils-template", "core_test_support", "jsonwebtoken", @@ -3493,6 +3498,7 @@ dependencies = [ "codex-protocol", "codex-utils-cargo-bin", "codex-utils-home-dir", + "codex-utils-log", "codex-utils-pty", "futures", "keyring", @@ -3800,6 +3806,7 @@ dependencies = [ "codex-utils-elapsed", "codex-utils-fuzzy-match", "codex-utils-home-dir", + "codex-utils-log", "codex-utils-oss", "codex-utils-path", "codex-utils-plugins", @@ -3964,6 +3971,14 @@ dependencies = [ "toml 0.9.11+spec-1.1.0", ] +[[package]] +name = "codex-utils-log" +version = "0.0.0" +dependencies = [ + "pretty_assertions", + "sha2", +] + [[package]] name = "codex-utils-oss" version = "0.0.0" diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index 7207fd3aba4d..a8be9085afce 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -84,6 +84,7 @@ members = [ "utils/cache", "utils/image", "utils/json-to-toml", + "utils/log", "utils/home-dir", "utils/pty", "utils/readiness", @@ -221,6 +222,7 @@ codex-utils-fuzzy-match = { path = "utils/fuzzy-match" } codex-utils-home-dir = { path = "utils/home-dir" } codex-utils-image = { path = "utils/image" } codex-utils-json-to-toml = { path = "utils/json-to-toml" } +codex-utils-log = { path = "utils/log" } codex-utils-oss = { path = "utils/oss" } codex-utils-output-truncation = { path = "utils/output-truncation" } codex-utils-path = { path = "utils/path-utils" } diff --git a/codex-rs/analytics/Cargo.toml b/codex-rs/analytics/Cargo.toml index 918e7edc720e..15c9c8a1528c 100644 --- a/codex-rs/analytics/Cargo.toml +++ b/codex-rs/analytics/Cargo.toml @@ -19,6 +19,7 @@ codex-login = { workspace = true } codex-model-provider = { workspace = true } codex-plugin = { workspace = true } codex-protocol = { workspace = true } +codex-utils-log = { workspace = true } os_info = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index fbcfa32dc5e1..f8f4dd5f599d 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -34,6 +34,7 @@ use codex_login::CodexAuth; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::request_permissions::RequestPermissionsResponse; +use codex_utils_log::bounded_str; use std::collections::HashSet; use std::sync::Arc; use std::sync::Mutex; @@ -454,7 +455,7 @@ async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec { let status = response.status(); let body = response.text().await.unwrap_or_default(); - tracing::warn!("events failed with status {status}: {body}"); + tracing::warn!("events failed with status {status}: {}", bounded_str(&body)); } Err(err) => { tracing::warn!("failed to send events request: {err}"); diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index 95baac4e9edf..5445ce808dd8 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -71,6 +71,7 @@ codex-thread-store = { workspace = true } codex-tools = { workspace = true } codex-utils-absolute-path = { workspace = true } codex-utils-json-to-toml = { workspace = true } +codex-utils-log = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive"] } futures = { workspace = true } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index fc835b82ecf0..45f32914879c 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -79,6 +79,7 @@ use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::W3cTraceContext; use codex_rollout::StateDbHandle; use codex_state::log_db::LogDbLayer; +use codex_utils_log::bounded_debug; use tokio::sync::Mutex; use tokio::sync::Semaphore; use tokio::sync::broadcast; @@ -615,14 +616,14 @@ impl MessageProcessor { pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) { // Currently, we do not expect to receive any notifications from the // client, so we just log them. - tracing::info!("<- notification: {:?}", notification); + tracing::info!("<- notification: {}", bounded_debug(¬ification)); } /// Handles typed notifications from in-process clients. pub(crate) async fn process_client_notification(&self, notification: ClientNotification) { // Currently, we do not expect to receive any typed notifications from // in-process clients, so we just log them. - tracing::info!("<- typed notification: {:?}", notification); + tracing::info!("<- typed notification: {}", bounded_debug(¬ification)); } async fn run_request_with_context( @@ -722,14 +723,14 @@ impl MessageProcessor { /// Handle a standalone JSON-RPC response originating from the peer. pub(crate) async fn process_response(&self, response: JSONRPCResponse) { - tracing::info!("<- response: {:?}", response); + tracing::info!("<- response: {}", bounded_debug(&response)); let JSONRPCResponse { id, result, .. } = response; self.outgoing.notify_client_response(id, result).await } /// Handle an error object received from the peer. pub(crate) async fn process_error(&self, err: JSONRPCError) { - tracing::error!("<- error: {:?}", err); + tracing::error!("<- error: {}", bounded_debug(&err)); self.outgoing.notify_client_error(err.id, err.error).await; } diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 97ef37f74234..75dd67df551a 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -18,11 +18,15 @@ use codex_otel::span_w3c_trace_context; use codex_protocol::ThreadId; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::request_permissions::RequestPermissionsResponse; +use codex_utils_log::bounded_debug; +use codex_utils_log::bounded_display; use tokio::sync::Mutex; use tokio::sync::mpsc; use tokio::sync::oneshot; use tracing::Instrument; +use tracing::Level; use tracing::Span; +use tracing::enabled; use tracing::warn; use crate::error_code::internal_error; @@ -342,7 +346,10 @@ impl OutgoingMessageSender { }; if let Err(err) = send_result { - warn!("failed to send request {outgoing_message_id:?} to client: {err:?}"); + warn!( + "failed to send request {outgoing_message_id:?} to client: {}", + bounded_debug(&err) + ); let mut request_id_to_callback = self.request_id_to_callback.lock().await; request_id_to_callback.remove(&outgoing_message_id); } @@ -365,7 +372,10 @@ impl OutgoingMessageSender { }) .await { - warn!("failed to resend request to client: {err:?}"); + warn!( + "failed to resend request to client: {}", + bounded_debug(&err) + ); } } } @@ -383,7 +393,10 @@ impl OutgoingMessageSender { .track_server_response(completed_at_ms, response); } if let Err(err) = entry.callback.send(Ok(result)) { - warn!("could not notify callback for {id:?} due to: {err:?}"); + warn!( + "could not notify callback for {id:?} due to: {}", + bounded_debug(&err) + ); } } None => { @@ -397,11 +410,17 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { - warn!("client responded with error for {id:?}: {error:?}"); + warn!( + "client responded with error for {id:?}: {}", + bounded_debug(&error) + ); self.analytics_events_client .track_server_request_aborted(now_unix_timestamp_ms(), id.clone()); if let Err(err) = entry.callback.send(Err(error)) { - warn!("could not notify callback for {id:?} due to: {err:?}"); + warn!( + "could not notify callback for {id:?} due to: {}", + bounded_debug(&err) + ); } } None => { @@ -437,7 +456,10 @@ impl OutgoingMessageSender { && let Err(err) = entry.callback.send(Err(error.clone())) { let request_id = entry.request.id(); - warn!("could not notify callback for {request_id:?} due to: {err:?}"); + warn!( + "could not notify callback for {request_id:?} due to: {}", + bounded_debug(&err) + ); } } } @@ -495,7 +517,10 @@ impl OutgoingMessageSender { && let Err(err) = entry.callback.send(Err(error.clone())) { let request_id = entry.request.id(); - warn!("could not notify callback for {request_id:?} due to: {err:?}",); + warn!( + "could not notify callback for {request_id:?} due to: {}", + bounded_debug(&err) + ); } } } @@ -560,10 +585,13 @@ impl OutgoingMessageSender { connection_ids: &[ConnectionId], notification: ServerNotification, ) { - tracing::trace!( - targeted_connections = connection_ids.len(), - "app-server event: {notification}" - ); + if enabled!(Level::TRACE) { + let notification_log = bounded_display(¬ification); + tracing::trace!( + targeted_connections = connection_ids.len(), + "app-server event: {notification_log}" + ); + } let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone()); if connection_ids.is_empty() { if let Err(err) = self @@ -573,7 +601,10 @@ impl OutgoingMessageSender { }) .await { - warn!("failed to send server notification to client: {err:?}"); + warn!( + "failed to send server notification to client: {}", + bounded_debug(&err) + ); } return; } @@ -587,7 +618,10 @@ impl OutgoingMessageSender { }) .await { - warn!("failed to send server notification to client: {err:?}"); + warn!( + "failed to send server notification to client: {}", + bounded_debug(&err) + ); } } } @@ -597,7 +631,9 @@ impl OutgoingMessageSender { connection_id: ConnectionId, notification: ServerNotification, ) { - tracing::trace!("app-server event: {notification}"); + if enabled!(Level::TRACE) { + tracing::trace!("app-server event: {}", bounded_display(¬ification)); + } let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone()); let (write_complete_tx, write_complete_rx) = oneshot::channel(); if let Err(err) = self @@ -609,7 +645,10 @@ impl OutgoingMessageSender { }) .await { - warn!("failed to send server notification to client: {err:?}"); + warn!( + "failed to send server notification to client: {}", + bounded_debug(&err) + ); } let _ = write_complete_rx.await; } @@ -678,7 +717,10 @@ impl OutgoingMessageSender { }; if let Err(err) = send_result { - warn!("failed to send {message_kind} to client: {err:?}"); + warn!( + "failed to send {message_kind} to client: {}", + bounded_debug(&err) + ); } } } diff --git a/codex-rs/codex-api/Cargo.toml b/codex-rs/codex-api/Cargo.toml index 08f70cf33cf1..a12fea9fc5bf 100644 --- a/codex-rs/codex-api/Cargo.toml +++ b/codex-rs/codex-api/Cargo.toml @@ -12,6 +12,7 @@ bytes = { workspace = true } chrono = { workspace = true } codex-client = { workspace = true } codex-protocol = { workspace = true } +codex-utils-log = { workspace = true } codex-utils-rustls-provider = { workspace = true } futures = { workspace = true } http = { workspace = true } diff --git a/codex-rs/codex-api/src/endpoint/models.rs b/codex-rs/codex-api/src/endpoint/models.rs index ec9ee7aac6d3..25bcd19fef23 100644 --- a/codex-rs/codex-api/src/endpoint/models.rs +++ b/codex-rs/codex-api/src/endpoint/models.rs @@ -6,6 +6,7 @@ use codex_client::HttpTransport; use codex_client::RequestTelemetry; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelsResponse; +use codex_utils_log::bounded_bytes_lossy; use http::HeaderMap; use http::Method; use http::header::ETAG; @@ -65,7 +66,7 @@ impl ModelsClient { .map_err(|e| { ApiError::Stream(format!( "failed to decode models response: {e}; body: {}", - String::from_utf8_lossy(&resp.body) + bounded_bytes_lossy(&resp.body) )) })?; diff --git a/codex-rs/codex-api/src/endpoint/realtime_call.rs b/codex-rs/codex-api/src/endpoint/realtime_call.rs index b0342c53498d..3a4eec43b8ba 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_call.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_call.rs @@ -8,6 +8,7 @@ use bytes::Bytes; use codex_client::HttpTransport; use codex_client::RequestBody; use codex_client::RequestTelemetry; +use codex_utils_log::bounded_str; use http::HeaderMap; use http::HeaderValue; use http::Method; @@ -119,7 +120,7 @@ impl RealtimeCallClient { session_config: RealtimeSessionConfig, extra_headers: HeaderMap, ) -> Result { - trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {sdp}"); + trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {}", bounded_str(&sdp)); // WebRTC can begin inference as soon as the peer connection comes up, so the initial // session payload is sent with call creation. The sideband WebSocket still sends its normal // session.update after it joins. @@ -202,7 +203,7 @@ fn decode_call_id_from_location(headers: &HeaderMap) -> Result .ok_or_else(|| ApiError::Stream("realtime call response missing Location".to_string()))? .to_str() .map_err(|err| ApiError::Stream(format!("invalid realtime call Location: {err}")))?; - trace!("realtime call Location: {location}"); + trace!("realtime call Location: {}", bounded_str(location)); location .split('?') diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 9fcca1c3e318..b776d713814d 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -18,6 +18,8 @@ use crate::provider::Provider; use codex_client::backoff; use codex_client::maybe_build_rustls_client_config_with_custom_ca; use codex_protocol::protocol::RealtimeTranscriptDelta; +use codex_utils_log::bounded_debug; +use codex_utils_log::bounded_str; use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use futures::SinkExt; use futures::StreamExt; @@ -345,7 +347,7 @@ impl RealtimeWebsocketWriter { async fn send_json(&self, message: &RealtimeOutboundMessage) -> Result<(), ApiError> { let payload = serde_json::to_string(message) .map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?; - debug!(?message, "realtime websocket request"); + debug!(message = %bounded_debug(message), "realtime websocket request"); self.send_payload(payload).await } @@ -356,7 +358,7 @@ impl RealtimeWebsocketWriter { )); } - trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket request: {payload}"); + trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket request: {}", bounded_str(&payload)); self.stream .send(Message::Text(payload.into())) .await @@ -390,10 +392,10 @@ impl RealtimeWebsocketEvents { match msg { Message::Text(text) => { - trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket event: {text}"); + trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket event: {}", bounded_str(&text)); if let Some(mut event) = parse_realtime_event(&text, self.event_parser) { self.update_active_transcript(&mut event).await; - debug!(?event, "realtime websocket parsed event"); + debug!(event = %bounded_debug(&event), "realtime websocket parsed event"); return Ok(Some(event)); } debug!("realtime websocket ignored unsupported text frame"); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_common.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_common.rs index 2c96280672f0..e5f9b0431189 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_common.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_common.rs @@ -1,6 +1,7 @@ use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RealtimeTranscriptDelta; use codex_protocol::protocol::RealtimeTranscriptDone; +use codex_utils_log::bounded_str; use serde_json::Value; use tracing::debug; @@ -8,7 +9,10 @@ pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option let parsed: Value = match serde_json::from_str(payload) { Ok(message) => message, Err(err) => { - debug!("failed to parse {parser_name} event: {err}, data: {payload}"); + debug!( + "failed to parse {parser_name} event: {err}, data: {}", + bounded_str(payload) + ); return None; } }; @@ -16,7 +20,10 @@ pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option let message_type = match parsed.get("type").and_then(Value::as_str) { Some(message_type) => message_type.to_string(), None => { - debug!("received {parser_name} event without type field: {payload}"); + debug!( + "received {parser_name} event without type field: {}", + bounded_str(payload) + ); return None; } }; diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs index 3c1d25aed755..43cfbdd6a181 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v1.rs @@ -6,6 +6,7 @@ use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_done_ use codex_protocol::protocol::RealtimeAudioFrame; use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RealtimeHandoffRequested; +use codex_utils_log::bounded_str; use serde_json::Value; use tracing::debug; @@ -83,7 +84,10 @@ pub(super) fn parse_realtime_event_v1(payload: &str) -> Option { } "error" => parse_error_event(&parsed), _ => { - debug!("received unsupported realtime v1 event type: {message_type}, data: {payload}"); + debug!( + "received unsupported realtime v1 event type: {message_type}, data: {}", + bounded_str(payload) + ); None } } diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs index ee0bba994614..a187be24f256 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs @@ -11,6 +11,7 @@ use codex_protocol::protocol::RealtimeNoopRequested; use codex_protocol::protocol::RealtimeResponseCancelled; use codex_protocol::protocol::RealtimeResponseCreated; use codex_protocol::protocol::RealtimeResponseDone; +use codex_utils_log::bounded_str; use serde_json::Map as JsonMap; use serde_json::Value; use tracing::debug; @@ -72,7 +73,10 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { })), "error" => parse_error_event(&parsed), _ => { - debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}"); + debug!( + "received unsupported realtime v2 event type: {message_type}, data: {}", + bounded_str(payload) + ); None } } diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index f0a001981725..f8438214d214 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -11,6 +11,7 @@ use crate::sse::process_responses_event; use crate::telemetry::WebsocketTelemetry; use codex_client::TransportError; use codex_client::maybe_build_rustls_client_config_with_custom_ca; +use codex_utils_log::bounded_str; use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use futures::SinkExt; use futures::StreamExt; @@ -702,7 +703,7 @@ async fn run_websocket_response_stream( match message { Message::Text(text) => { - trace!("websocket event: {text}"); + trace!("websocket event: {}", bounded_str(&text)); if let Some(wrapped_error) = parse_wrapped_websocket_error_event(&text) && let Some(error) = map_wrapped_websocket_error_event(wrapped_error, text.to_string()) @@ -713,7 +714,10 @@ async fn run_websocket_response_stream( let event = match serde_json::from_str::(&text) { Ok(event) => event, Err(err) => { - debug!("failed to parse websocket event: {err}, data: {text}"); + debug!( + "failed to parse websocket event: {err}, data: {}", + bounded_str(&text) + ); continue; } }; @@ -787,7 +791,7 @@ async fn send_websocket_request( ))); } }; - trace!("websocket request: {request_text}"); + trace!("websocket request: {}", bounded_str(&request_text)); let request_start = Instant::now(); let result = tokio::time::timeout( diff --git a/codex-rs/codex-api/src/sse/responses.rs b/codex-rs/codex-api/src/sse/responses.rs index c9f35e5a4ec8..cba1689e95df 100644 --- a/codex-rs/codex-api/src/sse/responses.rs +++ b/codex-rs/codex-api/src/sse/responses.rs @@ -8,6 +8,7 @@ use codex_client::StreamResponse; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::ModelVerification; use codex_protocol::protocol::TokenUsage; +use codex_utils_log::bounded_str; use eventsource_stream::Eventsource; use futures::StreamExt; use serde::Deserialize; @@ -415,7 +416,7 @@ pub async fn process_sse( let sse = match response { Ok(Some(Ok(sse))) => sse, Ok(Some(Err(e))) => { - debug!("SSE Error: {e:#}"); + debug!("SSE Error: {}", bounded_str(&format!("{e:#}"))); let _ = tx_event.send(Err(ApiError::Stream(e.to_string()))).await; return; } @@ -434,12 +435,15 @@ pub async fn process_sse( } }; - trace!("SSE event: {}", &sse.data); + trace!("SSE event: {}", bounded_str(&sse.data)); let event: ResponsesStreamEvent = match serde_json::from_str(&sse.data) { Ok(event) => event, Err(e) => { - debug!("Failed to parse SSE event: {e}, data: {}", &sse.data); + debug!( + "Failed to parse SSE event: {e}, data: {}", + bounded_str(&sse.data) + ); continue; } }; diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 8472918dcae9..7756176a4556 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -65,6 +65,7 @@ codex-utils-absolute-path = { workspace = true } codex-utils-cache = { workspace = true } codex-utils-image = { workspace = true } codex-utils-home-dir = { workspace = true } +codex-utils-log = { workspace = true } codex-utils-output-truncation = { workspace = true } codex-utils-path = { workspace = true } codex-utils-plugins = { workspace = true } diff --git a/codex-rs/core/src/mcp_tool_call.rs b/codex-rs/core/src/mcp_tool_call.rs index 8f6e3afaf5af..b65b9686cb6f 100644 --- a/codex-rs/core/src/mcp_tool_call.rs +++ b/codex-rs/core/src/mcp_tool_call.rs @@ -80,6 +80,7 @@ use codex_rmcp_client::ElicitationAction; use codex_rmcp_client::ElicitationResponse; use codex_rollout::state_db; use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_log::bounded_debug; use codex_utils_output_truncation::TruncationPolicy; use codex_utils_output_truncation::truncate_text; use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP; @@ -373,7 +374,7 @@ async fn handle_approved_mcp_tool_call( )) .await; if let Err(error) = &result { - tracing::warn!("MCP tool call error: {error:?}"); + tracing::warn!("MCP tool call error: {}", bounded_debug(error)); } let duration = start.elapsed(); notify_mcp_tool_call_completed( diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 249b3ae15f46..06332dc1ceb7 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -45,6 +45,7 @@ use codex_protocol::protocol::RealtimeHandoffRequested; use codex_protocol::protocol::RealtimeOutputModality; use codex_protocol::protocol::RealtimeVoice; use codex_protocol::protocol::RealtimeVoicesList; +use codex_utils_log::bounded_debug; use http::HeaderMap; use http::HeaderValue; use http::header::AUTHORIZATION; @@ -834,7 +835,7 @@ async fn handle_start_inner( RealtimeEvent::AudioOut(_) => {} _ => { info!( - event = ?event, + event = %bounded_debug(&event), "received realtime conversation event" ); } diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index a26b9bb3f6fe..3a065477860f 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -5,6 +5,7 @@ use crate::realtime_conversation::handle_text as handle_realtime_conversation_te use async_channel::Receiver; use codex_otel::set_parent_from_w3c_trace_context; use codex_protocol::protocol::Submission; +use codex_utils_log::bounded_debug; use tracing::Instrument; use tracing::debug_span; use tracing::info_span; @@ -738,7 +739,7 @@ pub(super) async fn submission_loop( // To break out of this loop, send Op::Shutdown. let mut shutdown_received = false; while let Ok(sub) = rx_sub.recv().await { - debug!(?sub, "Submission"); + debug!(submission = %bounded_debug(&sub), "Submission"); let dispatch_span = submission_dispatch_span(&sub); let should_exit = async { match sub.op.clone() { diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index aee4bd360ff5..910b89069b12 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -3,6 +3,8 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::atomic::Ordering; +use codex_utils_log::bounded_str; + use crate::SkillInjections; use crate::SkillLoadOutcome; use crate::build_skill_injections; @@ -662,7 +664,7 @@ pub(crate) async fn run_turn( break; } Err(e) => { - info!("Turn error: {e:#}"); + info!("Turn error: {}", bounded_str(&format!("{e:#}"))); let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None)); sess.send_event(&turn_context, event).await; // let the user continue the conversation diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index bc44fe0c3b07..a17a1ec59e69 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -29,6 +29,7 @@ use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; use codex_rollout::state_db; use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_log::bounded_debug; use codex_utils_stream_parser::strip_proposed_plan_blocks; use futures::Future; use tracing::debug; @@ -454,7 +455,7 @@ pub(crate) async fn handle_non_tool_response_item( item: &ResponseItem, plan_mode: bool, ) -> Option { - debug!(?item, "Output item"); + debug!(item = %bounded_debug(item), "Output item"); match item { ResponseItem::Message { .. } diff --git a/codex-rs/login/Cargo.toml b/codex-rs/login/Cargo.toml index e914fa0c7a98..e23c3d57ec07 100644 --- a/codex-rs/login/Cargo.toml +++ b/codex-rs/login/Cargo.toml @@ -20,6 +20,7 @@ codex-model-provider-info = { workspace = true } codex-otel = { workspace = true } codex-protocol = { workspace = true } codex-terminal-detection = { workspace = true } +codex-utils-log = { workspace = true } codex-utils-template = { workspace = true } once_cell = { workspace = true } os_info = { workspace = true } diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index a2e4e8e0d866..aacbf8470394 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -42,6 +42,7 @@ use codex_protocol::account::PlanType as AccountPlanType; use codex_protocol::auth::PlanType as InternalPlanType; use codex_protocol::auth::RefreshTokenFailedError; use codex_protocol::auth::RefreshTokenFailedReason; +use codex_utils_log::bounded_str; use serde_json::Value; use thiserror::Error; @@ -842,7 +843,7 @@ async fn request_chatgpt_token_refresh( Ok(refresh_response) } else { let body = response.text().await.unwrap_or_default(); - tracing::error!("Failed to refresh token: {status}: {body}"); + tracing::error!("Failed to refresh token: {status}: {}", bounded_str(&body)); if status == StatusCode::UNAUTHORIZED { let failed = classify_refresh_token_failure(&body); Err(RefreshTokenError::Permanent(failed)) @@ -869,7 +870,7 @@ fn classify_refresh_token_failure(body: &str) -> RefreshTokenFailedError { if reason == RefreshTokenFailedReason::Other { tracing::warn!( backend_code = normalized_code.as_deref(), - backend_body = body, + backend_body = %bounded_str(body), "Encountered unknown 401 response while refreshing token" ); } diff --git a/codex-rs/login/src/auth/util.rs b/codex-rs/login/src/auth/util.rs index a993bbf4a378..efa9b94b61dc 100644 --- a/codex-rs/login/src/auth/util.rs +++ b/codex-rs/login/src/auth/util.rs @@ -1,7 +1,8 @@ +use codex_utils_log::bounded_str; use tracing::debug; pub(crate) fn try_parse_error_message(text: &str) -> String { - debug!("Parsing server error response: {}", text); + debug!("Parsing server error response: {}", bounded_str(text)); let json = serde_json::from_str::(text).unwrap_or_default(); if let Some(error) = json.get("error") && let Some(message) = error.get("message") diff --git a/codex-rs/rmcp-client/Cargo.toml b/codex-rs/rmcp-client/Cargo.toml index 0efd8a4178aa..0b7f8b264ca8 100644 --- a/codex-rs/rmcp-client/Cargo.toml +++ b/codex-rs/rmcp-client/Cargo.toml @@ -20,6 +20,7 @@ codex-config = { workspace = true } codex-exec-server = { workspace = true } codex-keyring-store = { workspace = true } codex-protocol = { workspace = true } +codex-utils-log = { workspace = true } codex-utils-pty = { workspace = true } codex-utils-home-dir = { workspace = true } bytes = { workspace = true } diff --git a/codex-rs/rmcp-client/src/executor_process_transport.rs b/codex-rs/rmcp-client/src/executor_process_transport.rs index 41f0b7660d95..fffe0cf190b1 100644 --- a/codex-rs/rmcp-client/src/executor_process_transport.rs +++ b/codex-rs/rmcp-client/src/executor_process_transport.rs @@ -20,7 +20,6 @@ use std::future::Future; use std::io; -use std::mem::take; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -41,9 +40,10 @@ use serde_json::to_vec; use tokio::runtime::Handle; use tokio::sync::broadcast; use tracing::debug; -use tracing::info; use tracing::warn; +use crate::stderr_log::StderrLogBuffer; + static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1); // Remote public implementation. @@ -76,7 +76,7 @@ pub(super) struct ExecutorProcessTransport { stdout: Vec, /// Buffered stderr bytes for diagnostic logging. - stderr: Vec, + stderr: StderrLogBuffer, /// Whether the executor has reported process closure or a terminal /// subscription failure. Once closed, any remaining partial stdout line is @@ -101,12 +101,13 @@ impl ExecutorProcessTransport { // process event log will replay anything that landed before this // subscriber was attached. let events = process.subscribe_events(); + let stderr = StderrLogBuffer::new(program_name.clone()); Self { process, events, program_name, stdout: Vec::new(), - stderr: Vec::new(), + stderr, closed: false, terminated: false, last_seq: 0, @@ -312,33 +313,11 @@ impl ExecutorProcessTransport { } fn push_stderr(&mut self, bytes: &[u8]) { - // Keep stderr line-oriented in logs so a chatty MCP server does not - // produce one log record per byte chunk. - self.stderr.extend_from_slice(bytes); - while let Some(index) = self.stderr.iter().position(|byte| *byte == b'\n') { - let mut line = self.stderr.drain(..=index).collect::>(); - line.pop(); - if line.last() == Some(&b'\r') { - line.pop(); - } - info!( - "MCP server stderr ({}): {}", - self.program_name, - String::from_utf8_lossy(&line) - ); - } + self.stderr.push(bytes); } fn flush_stderr(&mut self) { - if self.stderr.is_empty() { - return; - } - let line = take(&mut self.stderr); - info!( - "MCP server stderr ({}): {}", - self.program_name, - String::from_utf8_lossy(&line) - ); + self.stderr.flush(); } fn trim_trailing_carriage_return(mut line: Vec) -> Vec { diff --git a/codex-rs/rmcp-client/src/lib.rs b/codex-rs/rmcp-client/src/lib.rs index e1ee18c75324..79da406c0542 100644 --- a/codex-rs/rmcp-client/src/lib.rs +++ b/codex-rs/rmcp-client/src/lib.rs @@ -8,6 +8,7 @@ mod oauth; mod perform_oauth_login; mod program_resolver; mod rmcp_client; +mod stderr_log; mod stdio_server_launcher; mod utils; diff --git a/codex-rs/rmcp-client/src/logging_client_handler.rs b/codex-rs/rmcp-client/src/logging_client_handler.rs index 0c3da0fe2cd1..df126d058c0d 100644 --- a/codex-rs/rmcp-client/src/logging_client_handler.rs +++ b/codex-rs/rmcp-client/src/logging_client_handler.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use codex_utils_log::bounded_debug; +use codex_utils_log::bounded_display; use rmcp::ClientHandler; use rmcp::RoleClient; use rmcp::model::CancelledNotificationParam; @@ -12,7 +14,9 @@ use rmcp::model::ProgressNotificationParam; use rmcp::model::ResourceUpdatedNotificationParam; use rmcp::service::NotificationContext; use rmcp::service::RequestContext; +use tracing::Level; use tracing::debug; +use tracing::enabled; use tracing::error; use tracing::info; use tracing::warn; @@ -51,9 +55,11 @@ impl ClientHandler for LoggingClientHandler { params: CancelledNotificationParam, _context: NotificationContext, ) { + let request_id = bounded_display(¶ms.request_id); + let reason = bounded_debug(¶ms.reason); info!( - "MCP server cancelled request (request_id: {}, reason: {:?})", - params.request_id, params.reason + "MCP server cancelled request (request_id: {}, reason: {})", + request_id, reason ); } @@ -62,9 +68,11 @@ impl ClientHandler for LoggingClientHandler { params: ProgressNotificationParam, _context: NotificationContext, ) { + let progress_token = bounded_debug(¶ms.progress_token); + let message = bounded_debug(¶ms.message); info!( - "MCP server progress notification (token: {:?}, progress: {}, total: {:?}, message: {:?})", - params.progress_token, params.progress, params.total, params.message + "MCP server progress notification (token: {}, progress: {}, total: {:?}, message: {})", + progress_token, params.progress, params.total, message ); } @@ -73,7 +81,8 @@ impl ClientHandler for LoggingClientHandler { params: ResourceUpdatedNotificationParam, _context: NotificationContext, ) { - info!("MCP server resource updated (uri: {})", params.uri); + let uri = bounded_display(¶ms.uri); + info!("MCP server resource updated (uri: {})", uri); } async fn on_resource_list_changed(&self, _context: NotificationContext) { @@ -108,28 +117,44 @@ impl ClientHandler for LoggingClientHandler { | LoggingLevel::Alert | LoggingLevel::Critical | LoggingLevel::Error => { - error!( - "MCP server log message (level: {:?}, logger: {:?}, data: {})", - level, logger, data - ); + if enabled!(Level::ERROR) { + let logger = bounded_debug(&logger); + let data = bounded_display(&data); + error!( + "MCP server log message (level: {:?}, logger: {}, data: {})", + level, logger, data + ); + } } LoggingLevel::Warning => { - warn!( - "MCP server log message (level: {:?}, logger: {:?}, data: {})", - level, logger, data - ); + if enabled!(Level::WARN) { + let logger = bounded_debug(&logger); + let data = bounded_display(&data); + warn!( + "MCP server log message (level: {:?}, logger: {}, data: {})", + level, logger, data + ); + } } LoggingLevel::Notice | LoggingLevel::Info => { - info!( - "MCP server log message (level: {:?}, logger: {:?}, data: {})", - level, logger, data - ); + if enabled!(Level::INFO) { + let logger = bounded_debug(&logger); + let data = bounded_display(&data); + info!( + "MCP server log message (level: {:?}, logger: {}, data: {})", + level, logger, data + ); + } } LoggingLevel::Debug => { - debug!( - "MCP server log message (level: {:?}, logger: {:?}, data: {})", - level, logger, data - ); + if enabled!(Level::DEBUG) { + let logger = bounded_debug(&logger); + let data = bounded_display(&data); + debug!( + "MCP server log message (level: {:?}, logger: {}, data: {})", + level, logger, data + ); + } } } } diff --git a/codex-rs/rmcp-client/src/stderr_log.rs b/codex-rs/rmcp-client/src/stderr_log.rs new file mode 100644 index 000000000000..2b318b7f7df6 --- /dev/null +++ b/codex-rs/rmcp-client/src/stderr_log.rs @@ -0,0 +1,110 @@ +use codex_utils_log::DEFAULT_BOUNDED_LOG_VALUE_BYTES; +use codex_utils_log::bounded_bytes_lossy; +use tracing::info; + +pub(crate) struct StderrLogBuffer { + program_name: String, + buffer: Vec, +} + +impl StderrLogBuffer { + pub(crate) fn new(program_name: String) -> Self { + Self { + program_name, + buffer: Vec::new(), + } + } + + pub(crate) fn push(&mut self, mut bytes: &[u8]) { + while !bytes.is_empty() { + if let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n') { + self.push_without_newline(&bytes[..newline_index]); + self.log_complete_line(); + bytes = &bytes[newline_index + 1..]; + } else { + self.push_without_newline(bytes); + return; + } + } + } + + pub(crate) fn flush(&mut self) { + if self.buffer.is_empty() { + return; + } + self.log_line("MCP server stderr"); + self.buffer.clear(); + } + + fn push_without_newline(&mut self, mut bytes: &[u8]) { + while !bytes.is_empty() { + let remaining_capacity = + DEFAULT_BOUNDED_LOG_VALUE_BYTES.saturating_sub(self.buffer.len()); + if remaining_capacity == 0 { + self.log_line("MCP server stderr line exceeded limit"); + self.buffer.clear(); + continue; + } + + let chunk_len = remaining_capacity.min(bytes.len()); + self.buffer.extend_from_slice(&bytes[..chunk_len]); + bytes = &bytes[chunk_len..]; + + if self.buffer.len() >= DEFAULT_BOUNDED_LOG_VALUE_BYTES { + self.log_line("MCP server stderr line exceeded limit"); + self.buffer.clear(); + } + } + } + + fn log_complete_line(&mut self) { + if self.buffer.last() == Some(&b'\r') { + self.buffer.pop(); + } + if self.buffer.is_empty() { + return; + } + self.log_line("MCP server stderr"); + self.buffer.clear(); + } + + fn log_line(&self, label: &str) { + info!( + "{} ({}): {}", + label, + self.program_name, + bounded_bytes_lossy(&self.buffer) + ); + } + + #[cfg(test)] + fn buffered_len(&self) -> usize { + self.buffer.len() + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn push_without_newline_does_not_grow_past_log_limit() { + let mut buffer = StderrLogBuffer::new("server".to_string()); + let bytes = vec![b'a'; DEFAULT_BOUNDED_LOG_VALUE_BYTES * 2 + 17]; + + buffer.push(&bytes); + + assert_eq!(17, buffer.buffered_len()); + } + + #[test] + fn push_newline_clears_buffer() { + let mut buffer = StderrLogBuffer::new("server".to_string()); + + buffer.push(b"hello\n"); + + assert_eq!(0, buffer.buffered_len()); + } +} diff --git a/codex-rs/rmcp-client/src/stdio_server_launcher.rs b/codex-rs/rmcp-client/src/stdio_server_launcher.rs index 9928511bb6c7..6b3e5ee4433c 100644 --- a/codex-rs/rmcp-client/src/stdio_server_launcher.rs +++ b/codex-rs/rmcp-client/src/stdio_server_launcher.rs @@ -46,14 +46,13 @@ use rmcp::service::RxJsonRpcMessage; use rmcp::service::TxJsonRpcMessage; use rmcp::transport::Transport; use rmcp::transport::child_process::TokioChildProcess; -use tokio::io::AsyncBufReadExt; -use tokio::io::BufReader; +use tokio::io::AsyncReadExt; use tokio::process::Command; -use tracing::info; use tracing::warn; use crate::executor_process_transport::ExecutorProcessTransport; use crate::program_resolver; +use crate::stderr_log::StderrLogBuffer; use crate::utils::create_env_for_mcp_server; use crate::utils::create_env_overlay_for_remote_mcp_server; use crate::utils::remote_mcp_env_var_names; @@ -272,13 +271,16 @@ impl LocalStdioServerLauncher { if let Some(stderr) = stderr { tokio::spawn(async move { - let mut reader = BufReader::new(stderr).lines(); + let mut stderr = stderr; + let mut stderr_log = StderrLogBuffer::new(program_name.clone()); + let mut buffer = [0_u8; 8192]; loop { - match reader.next_line().await { - Ok(Some(line)) => { - info!("MCP server stderr ({program_name}): {line}"); + match stderr.read(&mut buffer).await { + Ok(0) => { + stderr_log.flush(); + break; } - Ok(None) => break, + Ok(bytes_read) => stderr_log.push(&buffer[..bytes_read]), Err(error) => { warn!("Failed to read MCP server stderr ({program_name}): {error}"); break; diff --git a/codex-rs/tui/Cargo.toml b/codex-rs/tui/Cargo.toml index c213e92bae41..4696f83d489a 100644 --- a/codex-rs/tui/Cargo.toml +++ b/codex-rs/tui/Cargo.toml @@ -62,6 +62,7 @@ codex-utils-cli = { workspace = true } codex-utils-elapsed = { workspace = true } codex-utils-fuzzy-match = { workspace = true } codex-utils-home-dir = { workspace = true } +codex-utils-log = { workspace = true } codex-utils-oss = { workspace = true } codex-utils-path = { workspace = true } codex-utils-plugins = { workspace = true } diff --git a/codex-rs/tui/src/app/app_server_events.rs b/codex-rs/tui/src/app/app_server_events.rs index 413e8b8c8070..a301eed38fbc 100644 --- a/codex-rs/tui/src/app/app_server_events.rs +++ b/codex-rs/tui/src/app/app_server_events.rs @@ -121,7 +121,10 @@ impl App { }; if let Err(err) = result { - tracing::warn!("failed to enqueue app-server notification: {err}"); + tracing::warn!( + "failed to enqueue app-server notification: {}", + codex_utils_log::bounded_display(&err) + ); } return; } @@ -180,7 +183,10 @@ impl App { self.enqueue_thread_request(thread_id, request).await }; if let Err(err) = result { - tracing::warn!("failed to enqueue app-server request: {err}"); + tracing::warn!( + "failed to enqueue app-server request: {}", + codex_utils_log::bounded_display(&err) + ); } } } diff --git a/codex-rs/tui/src/app_event_sender.rs b/codex-rs/tui/src/app_event_sender.rs index 5b8da9ec09d7..1d89a851008f 100644 --- a/codex-rs/tui/src/app_event_sender.rs +++ b/codex-rs/tui/src/app_event_sender.rs @@ -39,7 +39,10 @@ impl AppEventSender { session_log::log_inbound_app_event(&event); } if let Err(e) = self.app_event_tx.send(event) { - tracing::error!("failed to send event: {e}"); + tracing::error!( + "failed to send event: {}", + codex_utils_log::bounded_display(&e) + ); } } diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index e76b4c0b96a5..80fa6695b056 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -216,6 +216,7 @@ use codex_protocol::ThreadId; use codex_protocol::user_input::ByteRange; use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS; use codex_protocol::user_input::TextElement; +use codex_utils_log::bounded_str; mod attachment_state; mod draft_state; @@ -900,7 +901,7 @@ impl ChatComposer { // so we can directly try to read the image dimensions. match image::image_dimensions(&path_buf) { Ok((width, height)) => { - tracing::info!("OK: {pasted}"); + tracing::info!("OK: {}", bounded_str(&pasted)); tracing::debug!("image dimensions={}x{}", width, height); let format = pasted_image_format(&path_buf); tracing::debug!("attached image format={}", format.label()); diff --git a/codex-rs/tui/src/chatwidget/protocol_requests.rs b/codex-rs/tui/src/chatwidget/protocol_requests.rs index 5b53f44fdf8f..187e3aa1d14b 100644 --- a/codex-rs/tui/src/chatwidget/protocol_requests.rs +++ b/codex-rs/tui/src/chatwidget/protocol_requests.rs @@ -137,7 +137,10 @@ impl ChatWidget { } pub(super) fn on_turn_diff(&mut self, unified_diff: String) { - debug!("TurnDiffEvent: {unified_diff}"); + debug!( + "TurnDiffEvent: {}", + codex_utils_log::bounded_str(&unified_diff) + ); self.refresh_status_line(); } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 871cdcc0dd90..6d57c25afa41 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -1216,7 +1216,7 @@ async fn run_ratatui_app( // (including backtraces) after we restore the terminal. let prev_hook = std::panic::take_hook(); std::panic::set_hook(Box::new(move |info| { - tracing::error!("panic: {info}"); + tracing::error!("panic: {}", codex_utils_log::bounded_display(info)); prev_hook(info); })); let mut terminal = tui::init()?; diff --git a/codex-rs/tui/src/markdown_stream.rs b/codex-rs/tui/src/markdown_stream.rs index 66b5cab121f7..d9c993914544 100644 --- a/codex-rs/tui/src/markdown_stream.rs +++ b/codex-rs/tui/src/markdown_stream.rs @@ -8,6 +8,9 @@ //! On finalization, `finalize_and_drain_source()` flushes whatever remains (the last line, which //! may lack a trailing newline). +use codex_utils_log::bounded_debug; +#[cfg(test)] +use codex_utils_log::bounded_str; #[cfg(test)] use ratatui::text::Line; use std::path::Path; @@ -75,7 +78,7 @@ impl MarkdownStreamCollector { /// Append a raw streaming delta to the internal source buffer. pub fn push_delta(&mut self, delta: &str) { - tracing::trace!("push_delta: {delta:?}"); + tracing::trace!("push_delta: {}", bounded_debug(&delta)); self.buffer.push_str(delta); } @@ -174,7 +177,10 @@ impl MarkdownStreamCollector { self.buffer.len(), source.len() ); - tracing::trace!("markdown finalize (raw source):\n---\n{source}\n---"); + tracing::trace!( + "markdown finalize (raw source):\n---\n{}\n---", + bounded_str(&source) + ); let mut rendered: Vec> = Vec::new(); markdown::append_markdown(&source, self.width, Some(self.cwd.as_path()), &mut rendered); diff --git a/codex-rs/utils/log/BUILD.bazel b/codex-rs/utils/log/BUILD.bazel new file mode 100644 index 000000000000..e27294af63cb --- /dev/null +++ b/codex-rs/utils/log/BUILD.bazel @@ -0,0 +1,6 @@ +load("//:defs.bzl", "codex_rust_crate") + +codex_rust_crate( + name = "log", + crate_name = "codex_utils_log", +) diff --git a/codex-rs/utils/log/Cargo.toml b/codex-rs/utils/log/Cargo.toml new file mode 100644 index 000000000000..82c42f3a6623 --- /dev/null +++ b/codex-rs/utils/log/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "codex-utils-log" +version.workspace = true +edition.workspace = true +license.workspace = true + +[lints] +workspace = true + +[dependencies] +sha2 = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } + +[lib] +doctest = false diff --git a/codex-rs/utils/log/src/lib.rs b/codex-rs/utils/log/src/lib.rs new file mode 100644 index 000000000000..cc47314a5006 --- /dev/null +++ b/codex-rs/utils/log/src/lib.rs @@ -0,0 +1,289 @@ +use std::fmt; + +use sha2::Digest; +use sha2::Sha256; + +pub const DEFAULT_BOUNDED_LOG_VALUE_BYTES: usize = 16 * 1024; + +pub fn bounded_display(value: &T) -> String +where + T: fmt::Display + ?Sized, +{ + bounded_format_with_limit(format_args!("{value}"), DEFAULT_BOUNDED_LOG_VALUE_BYTES) +} + +pub fn bounded_debug(value: &T) -> String +where + T: fmt::Debug + ?Sized, +{ + bounded_format_with_limit(format_args!("{value:?}"), DEFAULT_BOUNDED_LOG_VALUE_BYTES) +} + +pub fn bounded_str(value: &str) -> String { + bounded_str_with_limit(value, DEFAULT_BOUNDED_LOG_VALUE_BYTES) +} + +pub fn bounded_str_with_limit(value: &str, max_bytes: usize) -> String { + bounded_utf8_bytes(value.as_bytes(), max_bytes) +} + +pub fn bounded_bytes_lossy(value: &[u8]) -> String { + bounded_bytes_lossy_with_limit(value, DEFAULT_BOUNDED_LOG_VALUE_BYTES) +} + +pub fn bounded_bytes_lossy_with_limit(value: &[u8], max_bytes: usize) -> String { + if value.len() <= max_bytes { + return String::from_utf8_lossy(value).into_owned(); + } + + let (prefix, suffix) = split_bytes(value, max_bytes); + let shown_bytes = prefix.len().saturating_add(suffix.len()); + assemble_bounded_log_value( + &String::from_utf8_lossy(prefix), + &String::from_utf8_lossy(suffix), + value.len(), + shown_bytes, + digest_hex(value), + ) +} + +fn bounded_utf8_bytes(value: &[u8], max_bytes: usize) -> String { + if value.len() <= max_bytes { + return String::from_utf8_lossy(value).into_owned(); + } + + let text = String::from_utf8_lossy(value); + let prefix_len = utf8_prefix_boundary(&text, max_bytes / 2); + let suffix_start = utf8_suffix_boundary(&text, max_bytes - max_bytes / 2); + let prefix = &text[..prefix_len]; + let suffix = &text[suffix_start..]; + let shown_bytes = prefix.len().saturating_add(suffix.len()); + assemble_bounded_log_value(prefix, suffix, value.len(), shown_bytes, digest_hex(value)) +} + +fn bounded_format_with_limit(args: fmt::Arguments<'_>, max_bytes: usize) -> String { + let mut writer = BoundedFormatWriter::new(max_bytes); + if fmt::write(&mut writer, args).is_err() { + return String::from(""); + } + writer.finish() +} + +struct BoundedFormatWriter { + max_bytes: usize, + head: String, + tail: String, + full: Option, + original_bytes: usize, + digest: Sha256, +} + +impl BoundedFormatWriter { + fn new(max_bytes: usize) -> Self { + Self { + max_bytes, + head: String::new(), + tail: String::new(), + full: Some(String::new()), + original_bytes: 0, + digest: Sha256::new(), + } + } + + fn finish(self) -> String { + if let Some(full) = self.full { + return full; + } + + let shown_bytes = self.head.len().saturating_add(self.tail.len()); + let digest = self.digest.finalize(); + assemble_bounded_log_value( + &self.head, + &self.tail, + self.original_bytes, + shown_bytes, + format!("{digest:x}"), + ) + } + + fn head_capacity(&self) -> usize { + self.max_bytes / 2 + } + + fn tail_capacity(&self) -> usize { + self.max_bytes - self.head_capacity() + } + + fn push_head(&mut self, value: &str) { + let remaining = self.head_capacity().saturating_sub(self.head.len()); + if remaining == 0 { + return; + } + + let boundary = utf8_prefix_boundary(value, remaining); + self.head.push_str(&value[..boundary]); + } + + fn push_tail(&mut self, value: &str) { + let capacity = self.tail_capacity(); + if capacity == 0 { + return; + } + + if value.len() >= capacity { + let start = utf8_suffix_boundary(value, capacity); + self.tail.clear(); + self.tail.push_str(&value[start..]); + return; + } + + self.tail.push_str(value); + if self.tail.len() > capacity { + let start = utf8_suffix_boundary(&self.tail, capacity); + self.tail.replace_range(..start, ""); + } + } +} + +impl fmt::Write for BoundedFormatWriter { + fn write_str(&mut self, value: &str) -> fmt::Result { + let original_bytes = self.original_bytes.saturating_add(value.len()); + self.digest.update(value.as_bytes()); + self.push_head(value); + self.push_tail(value); + + if let Some(full) = &mut self.full { + if original_bytes <= self.max_bytes { + full.push_str(value); + } else { + self.full = None; + } + } + + self.original_bytes = original_bytes; + Ok(()) + } +} + +fn split_bytes(value: &[u8], max_bytes: usize) -> (&[u8], &[u8]) { + let prefix_len = max_bytes / 2; + let suffix_len = max_bytes - prefix_len; + let suffix_start = value.len().saturating_sub(suffix_len); + (&value[..prefix_len], &value[suffix_start..]) +} + +fn utf8_prefix_boundary(value: &str, max_bytes: usize) -> usize { + if value.len() <= max_bytes { + return value.len(); + } + let mut boundary = max_bytes; + while boundary > 0 && !value.is_char_boundary(boundary) { + boundary -= 1; + } + boundary +} + +fn utf8_suffix_boundary(value: &str, max_bytes: usize) -> usize { + if value.len() <= max_bytes { + return 0; + } + let mut boundary = value.len().saturating_sub(max_bytes); + while boundary < value.len() && !value.is_char_boundary(boundary) { + boundary += 1; + } + boundary +} + +fn assemble_bounded_log_value( + prefix: &str, + suffix: &str, + original_bytes: usize, + shown_bytes: usize, + sha256: String, +) -> String { + let omitted_bytes = original_bytes.saturating_sub(shown_bytes); + format!( + "{prefix}...[truncated: original_bytes={original_bytes} shown_bytes={shown_bytes} omitted_bytes={omitted_bytes} sha256={sha256}]...{suffix}" + ) +} + +fn digest_hex(value: &[u8]) -> String { + let digest = Sha256::digest(value); + format!("{digest:x}") +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn bounded_str_returns_short_values_unchanged() { + assert_eq!("hello", bounded_str_with_limit("hello", /*max_bytes*/ 16)); + } + + #[test] + fn bounded_str_preserves_prefix_suffix_and_reports_metadata() { + let value = "abcdefghijklmnop"; + + let bounded = bounded_str_with_limit(value, /*max_bytes*/ 8); + + assert!(bounded.starts_with("abcd...[truncated: original_bytes=16 shown_bytes=8")); + assert!(bounded.contains(" omitted_bytes=8 ")); + assert!(bounded.contains(" sha256=")); + assert!(bounded.ends_with("]...mnop")); + } + + #[test] + fn bounded_str_respects_utf8_boundaries() { + let value = "αβγδεζηθ"; + + let bounded = bounded_str_with_limit(value, /*max_bytes*/ 9); + + assert!(bounded.starts_with("αβ...[truncated:")); + assert!(bounded.ends_with("]...ηθ")); + } + + #[test] + fn bounded_bytes_lossy_hashes_original_bytes() { + let value = b"abcdef\xffghijklmnop"; + + let bounded = bounded_bytes_lossy_with_limit(value, /*max_bytes*/ 8); + + assert!(bounded.starts_with("abcd...[truncated: original_bytes=17 shown_bytes=8")); + assert!(bounded.ends_with("]...mnop")); + assert!(bounded.contains(&digest_hex(value))); + } + + #[test] + fn bounded_debug_formats_then_bounds() { + let value = Some("a".repeat(DEFAULT_BOUNDED_LOG_VALUE_BYTES)); + + let bounded = bounded_debug(&value); + + assert!(bounded.starts_with("Some(\"")); + assert!(bounded.contains("[truncated:")); + } + + #[test] + fn bounded_format_streams_prefix_suffix_and_hash() { + struct ChunkedDisplay; + + impl fmt::Display for ChunkedDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for chunk in ["abcd", "efgh", "ijkl", "mnop"] { + f.write_str(chunk)?; + } + Ok(()) + } + } + + let bounded = + bounded_format_with_limit(format_args!("{ChunkedDisplay}"), /*max_bytes*/ 8); + + assert!(bounded.starts_with("abcd...[truncated: original_bytes=16 shown_bytes=8")); + assert!(bounded.ends_with("]...mnop")); + assert!(bounded.contains(&digest_hex(b"abcdefghijklmnop"))); + } +}