diff --git a/codex-rs/core/src/mcp_tool_call_tests.rs b/codex-rs/core/src/mcp_tool_call_tests.rs index 5ab2c6c4e89a..ab63be0c88df 100644 --- a/codex-rs/core/src/mcp_tool_call_tests.rs +++ b/codex-rs/core/src/mcp_tool_call_tests.rs @@ -702,6 +702,32 @@ async fn mcp_tool_call_request_meta_includes_turn_metadata_for_custom_server() { ); } +#[tokio::test] +async fn mcp_tool_call_request_meta_includes_turn_started_at_unix_ms() { + let (_, turn_context) = make_session_and_context().await; + turn_context + .turn_metadata_state + .set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123); + + let meta = build_mcp_tool_call_request_meta( + &turn_context, + "custom_server", + "call-custom", + /*metadata*/ None, + ) + .expect("custom servers should receive turn metadata"); + let turn_metadata = meta + .get(crate::X_CODEX_TURN_METADATA_HEADER) + .expect("turn metadata should be present"); + + assert_eq!( + turn_metadata + .get("turn_started_at_unix_ms") + .and_then(serde_json::Value::as_i64), + Some(1_700_000_000_123) + ); +} + #[tokio::test] async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps_meta() { let (_, turn_context) = make_session_and_context().await; diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 15b688742806..e0286f3090a2 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -299,10 +299,13 @@ impl Session { let task_kind = task.kind(); let span_name = task.span_name(); let started_at = Instant::now(); - turn_context + let turn_started_at_unix_ms = turn_context .turn_timing_state .mark_turn_started(started_at) .await; + turn_context + .turn_metadata_state + .set_turn_started_at_unix_ms(turn_started_at_unix_ms); let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default(); let cancellation_token = CancellationToken::new(); diff --git a/codex-rs/core/src/turn_metadata.rs b/codex-rs/core/src/turn_metadata.rs index 095fb8e21b57..97902ce1e16e 100644 --- a/codex-rs/core/src/turn_metadata.rs +++ b/codex-rs/core/src/turn_metadata.rs @@ -18,6 +18,8 @@ use codex_protocol::models::PermissionProfile; use codex_protocol::protocol::SessionSource; use codex_utils_absolute_path::AbsolutePathBuf; +const TURN_STARTED_AT_UNIX_MS_KEY: &str = "turn_started_at_unix_ms"; + #[derive(Clone, Debug, Default)] struct WorkspaceGitMetadata { associated_remote_urls: Option>, @@ -73,16 +75,31 @@ impl TurnMetadataBag { } } -fn merge_responsesapi_client_metadata( +fn merge_turn_metadata( header: &str, + turn_started_at_unix_ms: Option, responsesapi_client_metadata: Option<&HashMap>, ) -> Option { - let responsesapi_client_metadata = responsesapi_client_metadata?; + if turn_started_at_unix_ms.is_none() && responsesapi_client_metadata.is_none() { + return None; + } + let mut metadata = serde_json::from_str::>(header).ok()?; - for (key, value) in responsesapi_client_metadata { - metadata - .entry(key.clone()) - .or_insert_with(|| Value::String(value.clone())); + if let Some(turn_started_at_unix_ms) = turn_started_at_unix_ms { + metadata.insert( + TURN_STARTED_AT_UNIX_MS_KEY.to_string(), + Value::Number(turn_started_at_unix_ms.into()), + ); + } + if let Some(responsesapi_client_metadata) = responsesapi_client_metadata { + for (key, value) in responsesapi_client_metadata { + if key == TURN_STARTED_AT_UNIX_MS_KEY { + continue; + } + metadata + .entry(key.clone()) + .or_insert_with(|| Value::String(value.clone())); + } } serde_json::to_string(&metadata).ok() } @@ -153,6 +170,7 @@ pub(crate) struct TurnMetadataState { base_metadata: TurnMetadataBag, base_header: String, enriched_header: Arc>>, + turn_started_at_unix_ms: Arc>>, responsesapi_client_metadata: Arc>>>, enrichment_task: Arc>>>, } @@ -194,6 +212,7 @@ impl TurnMetadataState { base_metadata, base_header, enriched_header: Arc::new(RwLock::new(None)), + turn_started_at_unix_ms: Arc::new(RwLock::new(None)), responsesapi_client_metadata: Arc::new(RwLock::new(None)), enrichment_task: Arc::new(Mutex::new(None)), } @@ -211,13 +230,21 @@ impl TurnMetadataState { } else { self.base_header.clone() }; + let turn_started_at_unix_ms = *self + .turn_started_at_unix_ms + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); let responsesapi_client_metadata = self .responsesapi_client_metadata .read() .unwrap_or_else(std::sync::PoisonError::into_inner) .clone(); - merge_responsesapi_client_metadata(&header, responsesapi_client_metadata.as_ref()) - .or(Some(header)) + merge_turn_metadata( + &header, + turn_started_at_unix_ms, + responsesapi_client_metadata.as_ref(), + ) + .or(Some(header)) } pub(crate) fn current_meta_value(&self) -> Option { @@ -236,6 +263,13 @@ impl TurnMetadataState { Some(responsesapi_client_metadata); } + pub(crate) fn set_turn_started_at_unix_ms(&self, turn_started_at_unix_ms: i64) { + *self + .turn_started_at_unix_ms + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(turn_started_at_unix_ms); + } + pub(crate) fn spawn_git_enrichment_task(&self) { if self.repo_root.is_none() { return; diff --git a/codex-rs/core/src/turn_metadata_tests.rs b/codex-rs/core/src/turn_metadata_tests.rs index 0004633c415c..5201eb35605b 100644 --- a/codex-rs/core/src/turn_metadata_tests.rs +++ b/codex-rs/core/src/turn_metadata_tests.rs @@ -122,6 +122,58 @@ fn turn_metadata_state_classifies_subagent_thread_source() { assert!(json.get("session_source").is_none()); } +#[test] +fn turn_metadata_state_includes_turn_started_at_unix_ms_after_start() { + let temp_dir = TempDir::new().expect("temp dir"); + let cwd = temp_dir.path().abs(); + let permission_profile = PermissionProfile::read_only(); + + let state = TurnMetadataState::new( + "session-a".to_string(), + &SessionSource::Exec, + "turn-a".to_string(), + cwd, + &permission_profile, + WindowsSandboxLevel::Disabled, + /*enforce_managed_network*/ false, + ); + state.set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123); + + let header = state.current_header_value().expect("header"); + let json: Value = serde_json::from_str(&header).expect("json"); + + assert_eq!( + json["turn_started_at_unix_ms"].as_i64(), + Some(1_700_000_000_123) + ); +} + +#[test] +fn turn_metadata_state_ignores_client_turn_started_at_unix_ms_before_start() { + let temp_dir = TempDir::new().expect("temp dir"); + let cwd = temp_dir.path().abs(); + let permission_profile = PermissionProfile::read_only(); + + let state = TurnMetadataState::new( + "session-a".to_string(), + &SessionSource::Exec, + "turn-a".to_string(), + cwd, + &permission_profile, + WindowsSandboxLevel::Disabled, + /*enforce_managed_network*/ false, + ); + state.set_responsesapi_client_metadata(HashMap::from([( + "turn_started_at_unix_ms".to_string(), + "client-supplied".to_string(), + )])); + + let header = state.current_header_value().expect("header"); + let json: Value = serde_json::from_str(&header).expect("json"); + + assert!(json.get("turn_started_at_unix_ms").is_none()); +} + #[test] fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields() { let temp_dir = TempDir::new().expect("temp dir"); @@ -141,7 +193,12 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields( ("fiber_run_id".to_string(), "fiber-123".to_string()), ("session_id".to_string(), "client-supplied".to_string()), ("thread_source".to_string(), "client-supplied".to_string()), + ( + "turn_started_at_unix_ms".to_string(), + "client-supplied".to_string(), + ), ])); + state.set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123); let header = state.current_header_value().expect("header"); let json: Value = serde_json::from_str(&header).expect("json"); @@ -150,4 +207,8 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields( assert_eq!(json["session_id"].as_str(), Some("session-a")); assert_eq!(json["thread_source"].as_str(), Some("user")); assert_eq!(json["turn_id"].as_str(), Some("turn-a")); + assert_eq!( + json["turn_started_at_unix_ms"].as_i64(), + Some(1_700_000_000_123) + ); } diff --git a/codex-rs/core/src/turn_timing.rs b/codex-rs/core/src/turn_timing.rs index 214d6cafd943..d6bf37253f6e 100644 --- a/codex-rs/core/src/turn_timing.rs +++ b/codex-rs/core/src/turn_timing.rs @@ -53,12 +53,14 @@ struct TurnTimingStateInner { } impl TurnTimingState { - pub(crate) async fn mark_turn_started(&self, started_at: Instant) { + pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 { + let started_at_unix_ms = now_unix_timestamp_ms(); let mut state = self.state.lock().await; state.started_at = Some(started_at); - state.started_at_unix_secs = Some(now_unix_timestamp_secs()); + state.started_at_unix_secs = Some(started_at_unix_ms / 1000); state.first_token_at = None; state.first_message_at = None; + started_at_unix_ms } pub(crate) async fn started_at_unix_secs(&self) -> Option { @@ -102,10 +104,14 @@ impl TurnTimingState { } fn now_unix_timestamp_secs() -> i64 { + now_unix_timestamp_ms() / 1000 +} + +fn now_unix_timestamp_ms() -> i64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default(); - i64::try_from(duration.as_secs()).unwrap_or(i64::MAX) + i64::try_from(duration.as_millis()).unwrap_or(i64::MAX) } impl TurnTimingStateInner { diff --git a/codex-rs/core/src/turn_timing_tests.rs b/codex-rs/core/src/turn_timing_tests.rs index ffa366e59297..a6675aea8eb6 100644 --- a/codex-rs/core/src/turn_timing_tests.rs +++ b/codex-rs/core/src/turn_timing_tests.rs @@ -5,6 +5,8 @@ use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseItem; use pretty_assertions::assert_eq; use std::time::Instant; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use super::TurnTimingState; use super::response_item_records_turn_ttft; @@ -76,6 +78,27 @@ async fn turn_timing_state_records_ttfm_independently_of_ttft() { ); } +#[tokio::test] +async fn turn_timing_state_records_turn_started_epoch_millis() { + let state = TurnTimingState::default(); + let before = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after unix epoch") + .as_millis(); + + let started_at_unix_ms = state.mark_turn_started(Instant::now()).await; + + let after = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after unix epoch") + .as_millis(); + assert!(u128::try_from(started_at_unix_ms).is_ok_and(|ms| before <= ms && ms <= after)); + assert_eq!( + state.started_at_unix_secs().await, + Some(started_at_unix_ms / 1000) + ); +} + #[test] fn response_item_records_turn_ttft_for_first_output_signals() { assert!(response_item_records_turn_ttft( diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index f94b5edde672..56e98931163c 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -434,6 +434,14 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e() !initial_turn_id.is_empty(), "turn_id should not be empty in x-codex-turn-metadata" ); + let initial_turn_started_at_unix_ms = initial_parsed + .get("turn_started_at_unix_ms") + .and_then(serde_json::Value::as_i64) + .expect("turn_started_at_unix_ms should be present"); + assert!( + initial_turn_started_at_unix_ms > 0, + "turn_started_at_unix_ms should be positive" + ); assert_eq!( initial_parsed .get("sandbox") @@ -537,6 +545,22 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e() .get("turn_id") .and_then(serde_json::Value::as_str) .expect("second turn_id should be present"); + let first_turn_started_at_unix_ms = first_parsed + .get("turn_started_at_unix_ms") + .and_then(serde_json::Value::as_i64) + .expect("first turn_started_at_unix_ms should be present"); + let second_turn_started_at_unix_ms = second_parsed + .get("turn_started_at_unix_ms") + .and_then(serde_json::Value::as_i64) + .expect("second turn_started_at_unix_ms should be present"); + assert!( + first_turn_started_at_unix_ms > 0, + "first turn_started_at_unix_ms should be positive" + ); + assert_eq!( + first_turn_started_at_unix_ms, second_turn_started_at_unix_ms, + "requests in the same turn should share turn_started_at_unix_ms" + ); assert_eq!( first_parsed .get("thread_source") diff --git a/codex-rs/core/tests/suite/search_tool.rs b/codex-rs/core/tests/suite/search_tool.rs index 1794f2fd3fe2..859f53711085 100644 --- a/codex-rs/core/tests/suite/search_tool.rs +++ b/codex-rs/core/tests/suite/search_tool.rs @@ -604,6 +604,27 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() - .is_some_and(|turn_id| !turn_id.is_empty()), "apps tools/call should include turn metadata turn_id: {apps_tool_call:?}" ); + let mcp_turn_started_at_unix_ms = apps_tool_call + .pointer("/params/_meta/x-codex-turn-metadata/turn_started_at_unix_ms") + .and_then(Value::as_i64) + .expect("apps tools/call should include turn_started_at_unix_ms"); + assert!( + mcp_turn_started_at_unix_ms > 0, + "apps tools/call should include a positive turn_started_at_unix_ms: {apps_tool_call:?}" + ); + + let first_request_turn_metadata: Value = serde_json::from_str( + &requests[0] + .header("x-codex-turn-metadata") + .expect("first response request should include turn metadata"), + ) + .expect("first response request turn metadata should be valid JSON"); + assert_eq!( + first_request_turn_metadata + .get("turn_started_at_unix_ms") + .and_then(Value::as_i64), + Some(mcp_turn_started_at_unix_ms) + ); let first_request_body = requests[0].body_json(); let first_request_tools = tool_names(&first_request_body);