Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions codex-rs/core/src/mcp_tool_call_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,32 @@ async fn mcp_tool_call_request_meta_includes_turn_metadata_for_custom_server() {
);
}

#[tokio::test]
async fn mcp_tool_call_request_meta_includes_turn_started_at_unix_ms() {
let (_, turn_context) = make_session_and_context().await;
turn_context
.turn_metadata_state
.set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123);

let meta = build_mcp_tool_call_request_meta(
&turn_context,
"custom_server",
"call-custom",
/*metadata*/ None,
)
.expect("custom servers should receive turn metadata");
let turn_metadata = meta
.get(crate::X_CODEX_TURN_METADATA_HEADER)
.expect("turn metadata should be present");

assert_eq!(
turn_metadata
.get("turn_started_at_unix_ms")
.and_then(serde_json::Value::as_i64),
Some(1_700_000_000_123)
);
}

#[tokio::test]
async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps_meta() {
let (_, turn_context) = make_session_and_context().await;
Expand Down
5 changes: 4 additions & 1 deletion codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,13 @@ impl Session {
let task_kind = task.kind();
let span_name = task.span_name();
let started_at = Instant::now();
turn_context
let turn_started_at_unix_ms = turn_context
.turn_timing_state
.mark_turn_started(started_at)
.await;
turn_context
.turn_metadata_state
.set_turn_started_at_unix_ms(turn_started_at_unix_ms);
let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default();

let cancellation_token = CancellationToken::new();
Expand Down
50 changes: 42 additions & 8 deletions codex-rs/core/src/turn_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use codex_protocol::models::PermissionProfile;
use codex_protocol::protocol::SessionSource;
use codex_utils_absolute_path::AbsolutePathBuf;

const TURN_STARTED_AT_UNIX_MS_KEY: &str = "turn_started_at_unix_ms";

#[derive(Clone, Debug, Default)]
struct WorkspaceGitMetadata {
associated_remote_urls: Option<BTreeMap<String, String>>,
Expand Down Expand Up @@ -73,16 +75,31 @@ impl TurnMetadataBag {
}
}

fn merge_responsesapi_client_metadata(
fn merge_turn_metadata(
header: &str,
turn_started_at_unix_ms: Option<i64>,
responsesapi_client_metadata: Option<&HashMap<String, String>>,
) -> Option<String> {
let responsesapi_client_metadata = responsesapi_client_metadata?;
if turn_started_at_unix_ms.is_none() && responsesapi_client_metadata.is_none() {
return None;
}

let mut metadata = serde_json::from_str::<serde_json::Map<String, Value>>(header).ok()?;
for (key, value) in responsesapi_client_metadata {
metadata
.entry(key.clone())
.or_insert_with(|| Value::String(value.clone()));
if let Some(turn_started_at_unix_ms) = turn_started_at_unix_ms {
metadata.insert(
TURN_STARTED_AT_UNIX_MS_KEY.to_string(),
Value::Number(turn_started_at_unix_ms.into()),
);
}
if let Some(responsesapi_client_metadata) = responsesapi_client_metadata {
for (key, value) in responsesapi_client_metadata {
if key == TURN_STARTED_AT_UNIX_MS_KEY {
continue;
}
metadata
.entry(key.clone())
.or_insert_with(|| Value::String(value.clone()));
}
}
serde_json::to_string(&metadata).ok()
}
Expand Down Expand Up @@ -153,6 +170,7 @@ pub(crate) struct TurnMetadataState {
base_metadata: TurnMetadataBag,
base_header: String,
enriched_header: Arc<RwLock<Option<String>>>,
turn_started_at_unix_ms: Arc<RwLock<Option<i64>>>,
responsesapi_client_metadata: Arc<RwLock<Option<HashMap<String, String>>>>,
enrichment_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}
Expand Down Expand Up @@ -194,6 +212,7 @@ impl TurnMetadataState {
base_metadata,
base_header,
enriched_header: Arc::new(RwLock::new(None)),
turn_started_at_unix_ms: Arc::new(RwLock::new(None)),
responsesapi_client_metadata: Arc::new(RwLock::new(None)),
enrichment_task: Arc::new(Mutex::new(None)),
}
Expand All @@ -211,13 +230,21 @@ impl TurnMetadataState {
} else {
self.base_header.clone()
};
let turn_started_at_unix_ms = *self
.turn_started_at_unix_ms
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let responsesapi_client_metadata = self
.responsesapi_client_metadata
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
merge_responsesapi_client_metadata(&header, responsesapi_client_metadata.as_ref())
.or(Some(header))
merge_turn_metadata(
&header,
turn_started_at_unix_ms,
responsesapi_client_metadata.as_ref(),
)
.or(Some(header))
}

pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
Expand All @@ -236,6 +263,13 @@ impl TurnMetadataState {
Some(responsesapi_client_metadata);
}

pub(crate) fn set_turn_started_at_unix_ms(&self, turn_started_at_unix_ms: i64) {
*self
.turn_started_at_unix_ms
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(turn_started_at_unix_ms);
}

pub(crate) fn spawn_git_enrichment_task(&self) {
if self.repo_root.is_none() {
return;
Expand Down
61 changes: 61 additions & 0 deletions codex-rs/core/src/turn_metadata_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,58 @@ fn turn_metadata_state_classifies_subagent_thread_source() {
assert!(json.get("session_source").is_none());
}

#[test]
fn turn_metadata_state_includes_turn_started_at_unix_ms_after_start() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().abs();
let permission_profile = PermissionProfile::read_only();

let state = TurnMetadataState::new(
"session-a".to_string(),
&SessionSource::Exec,
"turn-a".to_string(),
cwd,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
state.set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123);

let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");

assert_eq!(
json["turn_started_at_unix_ms"].as_i64(),
Some(1_700_000_000_123)
);
}

#[test]
fn turn_metadata_state_ignores_client_turn_started_at_unix_ms_before_start() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().abs();
let permission_profile = PermissionProfile::read_only();

let state = TurnMetadataState::new(
"session-a".to_string(),
&SessionSource::Exec,
"turn-a".to_string(),
cwd,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
state.set_responsesapi_client_metadata(HashMap::from([(
"turn_started_at_unix_ms".to_string(),
"client-supplied".to_string(),
)]));

let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");

assert!(json.get("turn_started_at_unix_ms").is_none());
}

#[test]
fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields() {
let temp_dir = TempDir::new().expect("temp dir");
Expand All @@ -141,7 +193,12 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields(
("fiber_run_id".to_string(), "fiber-123".to_string()),
("session_id".to_string(), "client-supplied".to_string()),
("thread_source".to_string(), "client-supplied".to_string()),
(
"turn_started_at_unix_ms".to_string(),
"client-supplied".to_string(),
),
]));
state.set_turn_started_at_unix_ms(/*turn_started_at_unix_ms*/ 1_700_000_000_123);

let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
Expand All @@ -150,4 +207,8 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields(
assert_eq!(json["session_id"].as_str(), Some("session-a"));
assert_eq!(json["thread_source"].as_str(), Some("user"));
assert_eq!(json["turn_id"].as_str(), Some("turn-a"));
assert_eq!(
json["turn_started_at_unix_ms"].as_i64(),
Some(1_700_000_000_123)
);
}
12 changes: 9 additions & 3 deletions codex-rs/core/src/turn_timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ struct TurnTimingStateInner {
}

impl TurnTimingState {
pub(crate) async fn mark_turn_started(&self, started_at: Instant) {
pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 {
let started_at_unix_ms = now_unix_timestamp_ms();
let mut state = self.state.lock().await;
state.started_at = Some(started_at);
state.started_at_unix_secs = Some(now_unix_timestamp_secs());
state.started_at_unix_secs = Some(started_at_unix_ms / 1000);
state.first_token_at = None;
state.first_message_at = None;
started_at_unix_ms
}

pub(crate) async fn started_at_unix_secs(&self) -> Option<i64> {
Expand Down Expand Up @@ -102,10 +104,14 @@ impl TurnTimingState {
}

fn now_unix_timestamp_secs() -> i64 {
now_unix_timestamp_ms() / 1000
}

fn now_unix_timestamp_ms() -> i64 {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
i64::try_from(duration.as_secs()).unwrap_or(i64::MAX)
i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
}

impl TurnTimingStateInner {
Expand Down
23 changes: 23 additions & 0 deletions codex-rs/core/src/turn_timing_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use super::TurnTimingState;
use super::response_item_records_turn_ttft;
Expand Down Expand Up @@ -76,6 +78,27 @@ async fn turn_timing_state_records_ttfm_independently_of_ttft() {
);
}

#[tokio::test]
async fn turn_timing_state_records_turn_started_epoch_millis() {
let state = TurnTimingState::default();
let before = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after unix epoch")
.as_millis();

let started_at_unix_ms = state.mark_turn_started(Instant::now()).await;

let after = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time should be after unix epoch")
.as_millis();
assert!(u128::try_from(started_at_unix_ms).is_ok_and(|ms| before <= ms && ms <= after));
assert_eq!(
state.started_at_unix_secs().await,
Some(started_at_unix_ms / 1000)
);
}

#[test]
fn response_item_records_turn_ttft_for_first_output_signals() {
assert!(response_item_records_turn_ttft(
Expand Down
24 changes: 24 additions & 0 deletions codex-rs/core/tests/responses_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,14 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
!initial_turn_id.is_empty(),
"turn_id should not be empty in x-codex-turn-metadata"
);
let initial_turn_started_at_unix_ms = initial_parsed
.get("turn_started_at_unix_ms")
.and_then(serde_json::Value::as_i64)
.expect("turn_started_at_unix_ms should be present");
assert!(
initial_turn_started_at_unix_ms > 0,
"turn_started_at_unix_ms should be positive"
);
assert_eq!(
initial_parsed
.get("sandbox")
Expand Down Expand Up @@ -537,6 +545,22 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
.get("turn_id")
.and_then(serde_json::Value::as_str)
.expect("second turn_id should be present");
let first_turn_started_at_unix_ms = first_parsed
.get("turn_started_at_unix_ms")
.and_then(serde_json::Value::as_i64)
.expect("first turn_started_at_unix_ms should be present");
let second_turn_started_at_unix_ms = second_parsed
.get("turn_started_at_unix_ms")
.and_then(serde_json::Value::as_i64)
.expect("second turn_started_at_unix_ms should be present");
assert!(
first_turn_started_at_unix_ms > 0,
"first turn_started_at_unix_ms should be positive"
);
assert_eq!(
first_turn_started_at_unix_ms, second_turn_started_at_unix_ms,
"requests in the same turn should share turn_started_at_unix_ms"
);
assert_eq!(
first_parsed
.get("thread_source")
Expand Down
21 changes: 21 additions & 0 deletions codex-rs/core/tests/suite/search_tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,27 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
.is_some_and(|turn_id| !turn_id.is_empty()),
"apps tools/call should include turn metadata turn_id: {apps_tool_call:?}"
);
let mcp_turn_started_at_unix_ms = apps_tool_call
.pointer("/params/_meta/x-codex-turn-metadata/turn_started_at_unix_ms")
.and_then(Value::as_i64)
.expect("apps tools/call should include turn_started_at_unix_ms");
assert!(
mcp_turn_started_at_unix_ms > 0,
"apps tools/call should include a positive turn_started_at_unix_ms: {apps_tool_call:?}"
);

let first_request_turn_metadata: Value = serde_json::from_str(
&requests[0]
.header("x-codex-turn-metadata")
.expect("first response request should include turn metadata"),
)
.expect("first response request turn metadata should be valid JSON");
assert_eq!(
first_request_turn_metadata
.get("turn_started_at_unix_ms")
.and_then(Value::as_i64),
Some(mcp_turn_started_at_unix_ms)
);

let first_request_body = requests[0].body_json();
let first_request_tools = tool_names(&first_request_body);
Expand Down
Loading