Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ use codex_thread_store::ThreadMetadataPatch as StoreThreadMetadataPatch;
use codex_thread_store::ThreadSortKey as StoreThreadSortKey;
use codex_thread_store::ThreadStore;
use codex_thread_store::ThreadStoreError;
use codex_thread_store::UpdateThreadMetadataParams as StoreUpdateThreadMetadataParams;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use std::collections::HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl ExternalAgentConfigRequestProcessor {
.thread
.update_thread_metadata(
ThreadMetadataPatch {
name: Some(name),
name: Some(Some(name)),
..Default::default()
},
/*include_archived*/ false,
Expand Down
78 changes: 22 additions & 56 deletions codex-rs/app-server/src/request_processors/thread_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,17 +1417,17 @@ impl ThreadRequestProcessor {
};

let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_store
.update_thread_metadata(StoreUpdateThreadMetadataParams {
self.thread_manager
.update_thread_metadata(
thread_id,
patch: StoreThreadMetadataPatch {
name: Some(name.clone()),
StoreThreadMetadataPatch {
name: Some(Some(name.clone())),
..Default::default()
},
include_archived: false,
})
/*include_archived*/ false,
)
.await
.map_err(|err| thread_store_write_error("set thread name", err))?;
.map_err(|err| core_thread_write_error("set thread name", err))?;

Ok((
ThreadSetNameResponse {},
Expand All @@ -1446,33 +1446,17 @@ impl ThreadRequestProcessor {
let thread_id = ThreadId::from_string(&thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;

if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
if thread.config_snapshot().await.ephemeral {
return Err(invalid_request(format!(
"ephemeral thread does not support memory mode updates: {thread_id}"
)));
}

thread
.set_thread_memory_mode(mode.to_core())
.await
.map_err(|err| {
internal_error(format!("failed to set thread memory mode: {err}"))
})?;
return Ok(ThreadMemoryModeSetResponse {});
}

self.thread_store
.update_thread_metadata(StoreUpdateThreadMetadataParams {
self.thread_manager
.update_thread_metadata(
thread_id,
patch: StoreThreadMetadataPatch {
StoreThreadMetadataPatch {
memory_mode: Some(mode.to_core()),
..Default::default()
},
include_archived: false,
})
/*include_archived*/ false,
)
.await
.map_err(|err| thread_store_write_error("set thread memory mode", err))?;
.map_err(|err| core_thread_write_error("set thread memory mode", err))?;

Ok(ThreadMemoryModeSetResponse {})
}
Expand Down Expand Up @@ -1538,35 +1522,19 @@ impl ThreadRequestProcessor {
..Default::default()
};

let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let updated_thread = {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
if let Some(loaded_thread) = loaded_thread.as_ref() {
if loaded_thread.config_snapshot().await.ephemeral {
return Err(invalid_request(format!(
"ephemeral thread does not support metadata updates: {thread_id}"
)));
}
loaded_thread
.update_thread_metadata(patch, /*include_archived*/ true)
.await
} else {
self.thread_store
.update_thread_metadata(StoreUpdateThreadMetadataParams {
thread_id: thread_uuid,
patch,
include_archived: true,
})
.await
}
.map_err(|err| thread_store_write_error("update thread metadata", err))?
self.thread_manager
.update_thread_metadata(thread_uuid, patch, /*include_archived*/ true)
.await
.map_err(|err| core_thread_write_error("update thread metadata", err))?
};
let (mut thread, _) = thread_from_stored_thread(
updated_thread,
self.config.model_provider_id.as_str(),
&self.config.cwd,
);
if let Some(loaded_thread) = loaded_thread.as_ref() {
if let Ok(loaded_thread) = self.thread_manager.get_thread(thread_uuid).await {
thread.session_id = loaded_thread.session_configured().session_id.to_string();
}
self.attach_thread_name(thread_uuid, &mut thread).await;
Expand Down Expand Up @@ -3694,15 +3662,13 @@ fn conversation_summary_rollout_path_read_error(
}
}

fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError {
fn core_thread_write_error(operation: &str, err: CodexErr) -> JSONRPCErrorError {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => {
CodexErr::ThreadNotFound(thread_id) => {
invalid_request(format!("thread not found: {thread_id}"))
}
ThreadStoreError::InvalidRequest { message } => invalid_request(message),
ThreadStoreError::Unsupported { operation } => {
unsupported_thread_store_operation(operation)
}
CodexErr::InvalidRequest(message) => invalid_request(message),
CodexErr::UnsupportedOperation(message) => method_not_found(message),
err => internal_error(format!("failed to {operation}: {err}")),
}
}
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/tests/suite/v2/thread_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ async fn seed_pathless_store_thread(
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
name: Some("named pathless thread".to_string()),
name: Some(Some("named pathless thread".to_string())),
..Default::default()
},
include_archived: true,
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/tests/suite/v2/thread_unarchive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn thread_unarchive_preserves_pathless_store_metadata() -> Result<()> {
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
name: Some("named pathless thread".to_string()),
name: Some(Some("named pathless thread".to_string())),
..Default::default()
},
include_archived: true,
Expand Down
19 changes: 16 additions & 3 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2866,12 +2866,21 @@ impl Session {
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
) {
self.record_token_usage_info(turn_context, token_usage)
.await;
self.send_token_count_event(turn_context).await;
}

pub(crate) async fn record_token_usage_info(
&self,
turn_context: &TurnContext,
token_usage: Option<&TokenUsage>,
) {
if let Some(token_usage) = token_usage {
let mut state = self.state.lock().await;
state.update_token_info_from_usage(token_usage, turn_context.model_context_window());
}
self.send_token_count_event(turn_context).await;
}

pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) {
Expand Down Expand Up @@ -2912,11 +2921,15 @@ impl Session {
turn_context: &TurnContext,
new_rate_limits: RateLimitSnapshot,
) {
self.record_rate_limits_info(new_rate_limits).await;
self.send_token_count_event(turn_context).await;
}

pub(crate) async fn record_rate_limits_info(&self, new_rate_limits: RateLimitSnapshot) {
{
let mut state = self.state.lock().await;
state.set_rate_limits(new_rate_limits);
}
self.send_token_count_event(turn_context).await;
}

pub(crate) async fn mcp_dependency_prompted(&self) -> HashSet<String> {
Expand Down Expand Up @@ -2947,7 +2960,7 @@ impl Session {
state.set_server_reasoning_included(included);
}

async fn send_token_count_event(&self, turn_context: &TurnContext) {
pub(crate) async fn send_token_count_event(&self, turn_context: &TurnContext) {
let (info, rate_limits) = {
let state = self.state.lock().await;
state.token_info_and_rate_limits()
Expand Down
15 changes: 13 additions & 2 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,7 @@ async fn try_run_sampling_request(
Box<dyn ToolArgumentDiffConsumer>,
)> = None;
let mut should_emit_turn_diff = false;
let mut should_emit_token_count = false;
let reasoning_effort = turn_context.effective_reasoning_effort_for_tracing();
let plan_mode = turn_context.collaboration_mode.mode == ModeKind::Plan;
let mut assistant_message_stream_parsers = AssistantMessageStreamParsers::new(plan_mode);
Expand Down Expand Up @@ -2100,7 +2101,8 @@ async fn try_run_sampling_request(
ResponseEvent::RateLimits(snapshot) => {
// Update internal state with latest rate limits, but defer sending until
// token usage is available to avoid duplicate TokenCount events.
sess.update_rate_limits(&turn_context, snapshot).await;
sess.record_rate_limits_info(snapshot).await;
should_emit_token_count = true;
}
ResponseEvent::ModelsEtag(etag) => {
// Update internal state with latest models etag
Expand All @@ -2118,8 +2120,9 @@ async fn try_run_sampling_request(
&mut assistant_message_stream_parsers,
)
.await;
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
sess.record_token_usage_info(&turn_context, token_usage.as_ref())
.await;
should_emit_token_count = true;
should_emit_turn_diff = true;
if let Some(false) = end_turn {
needs_follow_up = true;
Expand Down Expand Up @@ -2247,6 +2250,14 @@ async fn try_run_sampling_request(

drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;

if should_emit_token_count {
// A tool call such as request_user_input can intentionally pause the turn. Emit token
// counts only after pending tools resolve so clients do not see progress events while the
// turn is waiting on the user. This also needs to happen before returning cancellation so
// token usage already recorded from the completed response is still persisted.
sess.send_token_count_event(&turn_context).await;
}

if cancellation_token.is_cancelled() {
return Err(CodexErr::TurnAborted);
}
Expand Down
53 changes: 53 additions & 0 deletions codex-rs/core/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ use codex_thread_store::LocalThreadStoreConfig;
use codex_thread_store::ReadThreadByRolloutPathParams;
use codex_thread_store::ReadThreadParams;
use codex_thread_store::StoredThread;
use codex_thread_store::ThreadMetadataPatch;
use codex_thread_store::ThreadStore;
use codex_thread_store::ThreadStoreError;
use codex_thread_store::UpdateThreadMetadataParams;
use codex_utils_absolute_path::AbsolutePathBuf;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -456,6 +458,44 @@ impl ThreadManager {
self.state.get_thread(thread_id).await
}

/// Updates metadata for loaded and cold threads through one entrypoint.
///
/// Loaded threads route through `CodexThread`/`LiveThread`, so metadata changes stay ordered
/// with live rollout writes. Cold threads go directly to the store, which owns unloaded JSONL
/// compatibility and SQLite metadata updates.
pub async fn update_thread_metadata(
&self,
thread_id: ThreadId,
patch: ThreadMetadataPatch,
include_archived: bool,
) -> CodexResult<StoredThread> {
if let Ok(thread) = self.get_thread(thread_id).await {
if thread.config_snapshot().await.ephemeral {
return Err(CodexErr::InvalidRequest(format!(
"ephemeral thread does not support metadata updates: {thread_id}"
)));
}
return thread
.update_thread_metadata(patch, include_archived)
.await
.map_err(|err| thread_store_metadata_update_error(thread_id, err));
}
self.state
.thread_store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch,
include_archived,
})
.await
.map_err(|err| match err {
ThreadStoreError::ThreadNotFound { thread_id } => {
CodexErr::ThreadNotFound(thread_id)
}
err => thread_store_metadata_update_error(thread_id, err),
})
}

/// List `thread_id` plus all known descendants in its spawn subtree.
pub async fn list_agent_subtree_thread_ids(
&self,
Expand Down Expand Up @@ -1268,6 +1308,19 @@ fn thread_store_rollout_read_error(err: ThreadStoreError) -> CodexErr {
}
}

fn thread_store_metadata_update_error(thread_id: ThreadId, err: ThreadStoreError) -> CodexErr {
match err {
ThreadStoreError::ThreadNotFound { thread_id } => CodexErr::ThreadNotFound(thread_id),
ThreadStoreError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
ThreadStoreError::Unsupported { operation } => CodexErr::UnsupportedOperation(format!(
"thread metadata update is not supported by this store: {operation}"
)),
err => CodexErr::Fatal(format!(
"failed to update thread metadata {thread_id}: {err}"
)),
}
}

/// Return a fork snapshot cut strictly before the nth user message (0-based).
///
/// Out-of-range values keep the full committed history at a turn boundary, but
Expand Down
32 changes: 0 additions & 32 deletions codex-rs/core/tests/suite/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2497,38 +2497,6 @@ async fn token_count_includes_rate_limits_snapshot() {
.await
.unwrap();

let first_token_event =
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TokenCount(_))).await;
let rate_limit_only = match first_token_event {
EventMsg::TokenCount(ev) => ev,
_ => unreachable!(),
};

let rate_limit_json = serde_json::to_value(&rate_limit_only).unwrap();
pretty_assertions::assert_eq!(
rate_limit_json,
json!({
"info": null,
"rate_limits": {
"limit_id": "codex",
"limit_name": null,
"primary": {
"used_percent": 12.5,
"window_minutes": 10,
"resets_at": 1704069000
},
"secondary": {
"used_percent": 40.0,
"window_minutes": 60,
"resets_at": 1704074400
},
"credits": null,
"plan_type": null,
"rate_limit_reached_type": null
}
})
);

let token_event = wait_for_event(
&codex,
|msg| matches!(msg, EventMsg::TokenCount(ev) if ev.info.is_some()),
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/tests/suite/otel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ async fn handle_response_item_records_tool_result_for_custom_tool_call() {
.await
.unwrap();

wait_for_event(&codex, |ev| matches!(ev, EventMsg::TokenCount(_))).await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;

logs_assert(|lines: &[&str]| {
let line = lines
Expand Down
Loading
Loading