Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion codex-rs/app-server-protocol/src/protocol/thread_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ impl ThreadHistoryBuilder {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::ForkReference(_)
| RolloutItem::RolloutReference(_)
| RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_) => {}
}
}

Expand Down
64 changes: 48 additions & 16 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4039,7 +4039,12 @@ impl CodexMessageProcessor {
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd);
if include_turns && let Some(history) = history {
thread.turns = build_turns_from_rollout_items(&history.items);
let items = codex_core::materialize_rollout_items_for_replay(
&self.config.codex_home,
&history.items,
)
.await;
thread.turns = build_turns_from_rollout_items(&items);
}
Ok(Some(thread))
}
Expand Down Expand Up @@ -4104,7 +4109,12 @@ impl CodexMessageProcessor {
.load_history(/*include_archived*/ true)
.await
.map_err(|err| thread_read_history_load_error(thread_id, err))?;
thread.turns = build_turns_from_rollout_items(&history.items);
let items = codex_core::materialize_rollout_items_for_replay(
&self.config.codex_home,
&history.items,
)
.await;
thread.turns = build_turns_from_rollout_items(&items);
}

Ok(())
Expand Down Expand Up @@ -4137,6 +4147,8 @@ impl CodexMessageProcessor {
.load_thread_turns_list_history(thread_uuid)
.await
.map_err(thread_read_view_error)?;
let items =
codex_core::materialize_rollout_items_for_replay(&self.config.codex_home, &items).await;
// This API optimizes network transfer by letting clients page through a
// thread's turns incrementally, but it still replays the entire rollout on
// every request. Rollback and compaction events can change earlier turns, so
Expand Down Expand Up @@ -4815,12 +4827,20 @@ impl CodexMessageProcessor {
) -> std::result::Result<Thread, String> {
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_provider, &self.config.cwd);
if include_turns && let Some(history) = history {
populate_thread_turns_from_history(
&mut thread,
if let Some(history) = history {
let history_items = codex_core::materialize_rollout_items_for_replay(
self.config.codex_home.as_path(),
&history.items,
/*active_turn*/ None,
)?;
)
.await;
thread.preview = preview_from_rollout_items(&history_items);
if include_turns {
populate_thread_turns_from_history(
&mut thread,
&history_items,
/*active_turn*/ None,
)?;
}
}
Ok(thread)
}
Expand Down Expand Up @@ -4925,7 +4945,11 @@ impl CodexMessageProcessor {
thread.id = thread_id.to_string();
thread.path = Some(rollout_path.to_path_buf());
if include_turns {
let history_items = thread_history.get_rollout_items();
let history_items = codex_core::materialize_rollout_items_for_replay(
self.config.codex_home.as_path(),
&thread_history.get_rollout_items(),
)
.await;
populate_thread_turns_from_history(
&mut thread,
&history_items,
Expand Down Expand Up @@ -5079,7 +5103,10 @@ impl CodexMessageProcessor {
let mut thread =
if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() {
let stored_thread = self
.read_stored_thread_for_new_fork(thread_id, include_turns)
// The forked rollout may contain a compact ForkReference. Load history
// even when excludeTurns is set so preview generation can materialize the
// referenced source rollout without returning turns.
.read_stored_thread_for_new_fork(thread_id, /*include_history*/ true)
.await?;
self.stored_thread_to_api_thread(
stored_thread,
Expand Down Expand Up @@ -8386,7 +8413,7 @@ async fn handle_thread_listener_command(
async fn handle_pending_thread_resume_request(
conversation_id: ThreadId,
conversation: &Arc<CodexThread>,
_codex_home: &Path,
codex_home: &Path,
thread_state_manager: &ThreadStateManager,
thread_state: &Arc<Mutex<ThreadState>>,
thread_watch_manager: &ThreadWatchManager,
Expand Down Expand Up @@ -8415,12 +8442,14 @@ async fn handle_pending_thread_resume_request(
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
let mut thread = pending.thread_summary;
let history_items = if pending.include_turns {
codex_core::materialize_rollout_items_for_replay(codex_home, &pending.history_items).await
} else {
Vec::new()
};
if pending.include_turns
&& let Err(message) = populate_thread_turns_from_history(
&mut thread,
&pending.history_items,
active_turn.as_ref(),
)
&& let Err(message) =
populate_thread_turns_from_history(&mut thread, &history_items, active_turn.as_ref())
{
outgoing
.send_error(request_id, internal_error(message))
Expand Down Expand Up @@ -8508,7 +8537,7 @@ async fn handle_pending_thread_resume_request(
// paying the cost of turn reconstruction for historical usage replay.
if let Some(token_usage_thread) = token_usage_thread {
let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items(
&pending.history_items,
&history_items,
token_usage_thread.turns.as_slice(),
);
// Rejoining a loaded thread has the same UI contract as a cold resume, but
Expand Down Expand Up @@ -10840,6 +10869,7 @@ mod tests {

let session_meta = SessionMeta {
id: conversation_id,
segment_id: None,
timestamp: timestamp.clone(),
model_provider: None,
..SessionMeta::default()
Expand Down Expand Up @@ -10896,6 +10926,7 @@ mod tests {

let session_meta = SessionMeta {
id: conversation_id,
segment_id: None,
timestamp: timestamp.clone(),
source: SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
Expand Down Expand Up @@ -10944,6 +10975,7 @@ mod tests {

let session_meta = SessionMeta {
id: conversation_id,
segment_id: None,
forked_from_id: Some(forked_from_id),
timestamp: timestamp.clone(),
model_provider: Some("test-provider".to_string()),
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/app-server/tests/common/rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub fn create_fake_rollout_with_source(
// Build JSONL lines
let meta = SessionMeta {
id: conversation_id,
segment_id: None,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
Expand Down Expand Up @@ -215,6 +216,7 @@ pub fn create_fake_rollout_with_text_elements(
// Build JSONL lines
let meta = SessionMeta {
id: conversation_id,
segment_id: None,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
Expand Down
121 changes: 121 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_read.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
Expand Down Expand Up @@ -47,6 +48,7 @@ use codex_feedback::CodexFeedback;
use codex_protocol::models::BaseInstructions;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutReferenceItem;
use codex_protocol::protocol::SessionSource as ProtocolSessionSource;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::UserMessageEvent;
Expand Down Expand Up @@ -421,6 +423,125 @@ async fn thread_read_loaded_include_turns_reads_store_history_without_rollout_pa
Ok(())
}

#[tokio::test]
async fn thread_read_unloaded_include_turns_materializes_rollout_reference() -> Result<()> {
let codex_home = TempDir::new()?;
let referenced_thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"history before segment rotation",
Some("mock_provider"),
/*git_info*/ None,
)?;
let referenced_rollout_path = rollout_path(
codex_home.path(),
"2025-01-05T12-00-00",
&referenced_thread_id,
);

let thread_id = codex_protocol::ThreadId::from_string("00000000-0000-4000-8000-000000000125")?;
let store_id = Uuid::new_v4().to_string();
create_config_toml_with_thread_store(codex_home.path(), &store_id)?;
let store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
store
.create_thread(CreateThreadParams {
thread_id,
forked_from_id: None,
source: ProtocolSessionSource::Cli,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
metadata: ThreadPersistenceMetadata {
cwd: None,
model_provider: "test-provider".to_string(),
memory_mode: ThreadMemoryMode::Disabled,
},
event_persistence_mode: ThreadEventPersistenceMode::default(),
})
.await?;
store
.append_items(AppendThreadItemsParams {
thread_id,
items: vec![
RolloutItem::RolloutReference(RolloutReferenceItem {
rollout_path: referenced_rollout_path,
thread_id: Some(codex_protocol::ThreadId::from_string(
&referenced_thread_id,
)?),
rollout_timestamp: None,
segment_id: None,
max_depth: 2,
}),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "history after segment rotation".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
})),
],
})
.await?;

let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli.into(),
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;

let result = client
.request(ClientRequest::ThreadRead {
request_id: RequestId::Integer(1),
params: ThreadReadParams {
thread_id: thread_id.to_string(),
include_turns: true,
},
})
.await?
.expect("thread/read should succeed");
let ThreadReadResponse { thread, .. } = serde_json::from_value(result)?;

assert_eq!(
turn_user_texts(&thread.turns),
vec![
"history before segment rotation",
"history after segment rotation",
]
);

client.shutdown().await?;
Ok(())
}

#[tokio::test]
async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()> {
let codex_home = TempDir::new()?;
Expand Down
Loading
Loading