Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions codex-rs/app-server/src/request_processors/thread_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2783,15 +2783,20 @@ impl ThreadRequestProcessor {
/*include_history*/ true,
)
.await?;
if source_thread.thread_id != existing_thread_id {
return Err(invalid_request(format!(
"cannot resume running thread {existing_thread_id} from source thread {}",
source_thread.thread_id
)));
}
Some((existing_thread_id, existing_thread, source_thread))
} else {
None
let source_thread = self
.read_stored_thread_for_resume(
&params.thread_id,
params.path.as_ref(),
Comment thread
etraut-openai marked this conversation as resolved.
/*include_history*/ true,
)
.await?;
let existing_thread_id = source_thread.thread_id;
match self.thread_manager.get_thread(existing_thread_id).await {
Ok(existing_thread) => Some((existing_thread_id, existing_thread, source_thread)),
Err(_) => None,
}
};

if let Some((existing_thread_id, existing_thread, source_thread)) = running_thread {
Expand All @@ -2808,6 +2813,48 @@ impl ThreadRequestProcessor {
active_path.display()
)));
}
let config_snapshot = existing_thread.config_snapshot().await;
let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot);
Comment thread
etraut-openai marked this conversation as resolved.
if !mismatch_details.is_empty() {
let has_subscribers = !self
.thread_state_manager
.subscribed_connection_ids(existing_thread_id)
.await
.is_empty();
let loaded_status = self
.thread_watch_manager
.loaded_status_for_thread(&existing_thread_id.to_string())
.await;
let is_running =
matches!(existing_thread.agent_status().await, AgentStatus::Running);

if !has_subscribers && matches!(loaded_status, ThreadStatus::Idle) && !is_running {
// A loaded idle thread is only a cache entry. Shut it down
// before removing it so cold resume cannot duplicate a
// thread that timed out during shutdown.
match wait_for_thread_shutdown(&existing_thread).await {
ThreadShutdownResult::Complete => {
self.thread_manager.remove_thread(&existing_thread_id).await;
self.finalize_thread_teardown(existing_thread_id).await;
return Ok(false);
}
ThreadShutdownResult::SubmitFailed => {
warn!("failed to submit Shutdown to thread {existing_thread_id}");
}
ThreadShutdownResult::TimedOut => {
warn!("thread {existing_thread_id} shutdown timed out");
}
}
}

// Preserve rejoin semantics when another client can still observe
// the loaded thread or shutdown did not complete.
tracing::warn!(
"thread/resume overrides ignored for loaded thread {}: {}",
existing_thread_id,
mismatch_details.join("; ")
);
}
let redact_resume_payloads =
should_redact_thread_resume_payloads(app_server_client_name.as_deref());
let history_items = source_thread
Expand Down Expand Up @@ -2837,15 +2884,6 @@ impl ThreadRequestProcessor {
)
.await?;

let config_snapshot = existing_thread.config_snapshot().await;
let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot);
if !mismatch_details.is_empty() {
tracing::warn!(
"thread/resume overrides ignored for running thread {}: {}",
existing_thread_id,
mismatch_details.join("; ")
);
}
let mut summary_source_thread = source_thread;
summary_source_thread.history = None;
let mut thread_summary = self.stored_thread_to_api_thread(
Expand Down
35 changes: 32 additions & 3 deletions codex-rs/app-server/tests/suite/v2/thread_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use codex_app_server_protocol::ThreadSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::TurnItemsView;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
Expand Down Expand Up @@ -1929,7 +1930,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is
}

#[tokio::test]
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
async fn thread_resume_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
Expand Down Expand Up @@ -1973,6 +1974,33 @@ async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -
let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert_eq!(after_modified, rollout.before_modified);

let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
thread_id: thread_id.clone(),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)),
)
.await??;

let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: "not-a-valid-thread-id".to_string(),
path: Some(normalized_existing_path(&rollout.rollout_file_path)?),
cwd: Some(codex_home.path().to_string_lossy().to_string()),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { cwd, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(cwd, AbsolutePathBuf::from_absolute_path(codex_home.path())?);

let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id,
Expand Down Expand Up @@ -2291,9 +2319,10 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result
)
.await??;

let stale_path = rollout_path(codex_home.path(), "2025-01-01T00-00-00", &thread_id);
let stale_thread_id = Uuid::new_v4().to_string();
let stale_path = rollout_path(codex_home.path(), "2025-01-01T00-00-00", &stale_thread_id);
std::fs::create_dir_all(stale_path.parent().expect("stale path parent"))?;
let thread_uuid = Uuid::parse_str(&thread_id)?;
let thread_uuid = Uuid::parse_str(&stale_thread_id)?;
let mut stale_file = std::fs::File::create(&stale_path)?;
let stale_meta = json!({
"timestamp": "2025-01-01T00:00:00Z",
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
cwd: Some(codex_home.path().to_string_lossy().to_string()),
..Default::default()
})
.await?;
Expand Down
Loading