diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 1766c971b1e..19e811cedc5 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -2783,15 +2783,20 @@ impl ThreadRequestProcessor { /*include_history*/ true, ) .await?; - if source_thread.thread_id != existing_thread_id { - return Err(invalid_request(format!( - "cannot resume running thread {existing_thread_id} from source thread {}", - source_thread.thread_id - ))); - } Some((existing_thread_id, existing_thread, source_thread)) } else { - None + let source_thread = self + .read_stored_thread_for_resume( + ¶ms.thread_id, + params.path.as_ref(), + /*include_history*/ true, + ) + .await?; + let existing_thread_id = source_thread.thread_id; + match self.thread_manager.get_thread(existing_thread_id).await { + Ok(existing_thread) => Some((existing_thread_id, existing_thread, source_thread)), + Err(_) => None, + } }; if let Some((existing_thread_id, existing_thread, source_thread)) = running_thread { @@ -2808,6 +2813,48 @@ impl ThreadRequestProcessor { active_path.display() ))); } + let config_snapshot = existing_thread.config_snapshot().await; + let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot); + if !mismatch_details.is_empty() { + let has_subscribers = !self + .thread_state_manager + .subscribed_connection_ids(existing_thread_id) + .await + .is_empty(); + let loaded_status = self + .thread_watch_manager + .loaded_status_for_thread(&existing_thread_id.to_string()) + .await; + let is_running = + matches!(existing_thread.agent_status().await, AgentStatus::Running); + + if !has_subscribers && matches!(loaded_status, ThreadStatus::Idle) && !is_running { + // A loaded idle thread is only a cache entry. Shut it down + // before removing it so cold resume cannot duplicate a + // thread that timed out during shutdown. + match wait_for_thread_shutdown(&existing_thread).await { + ThreadShutdownResult::Complete => { + self.thread_manager.remove_thread(&existing_thread_id).await; + self.finalize_thread_teardown(existing_thread_id).await; + return Ok(false); + } + ThreadShutdownResult::SubmitFailed => { + warn!("failed to submit Shutdown to thread {existing_thread_id}"); + } + ThreadShutdownResult::TimedOut => { + warn!("thread {existing_thread_id} shutdown timed out"); + } + } + } + + // Preserve rejoin semantics when another client can still observe + // the loaded thread or shutdown did not complete. + tracing::warn!( + "thread/resume overrides ignored for loaded thread {}: {}", + existing_thread_id, + mismatch_details.join("; ") + ); + } let redact_resume_payloads = should_redact_thread_resume_payloads(app_server_client_name.as_deref()); let history_items = source_thread @@ -2837,15 +2884,6 @@ impl ThreadRequestProcessor { ) .await?; - let config_snapshot = existing_thread.config_snapshot().await; - let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot); - if !mismatch_details.is_empty() { - tracing::warn!( - "thread/resume overrides ignored for running thread {}: {}", - existing_thread_id, - mismatch_details.join("; ") - ); - } let mut summary_source_thread = source_thread; summary_source_thread.history = None; let mut thread_summary = self.stored_thread_to_api_thread( diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 6416499663a..8e60c90a011 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -43,6 +43,7 @@ use codex_app_server_protocol::ThreadSource; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadUnsubscribeParams; use codex_app_server_protocol::TurnItemsView; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; @@ -1929,7 +1930,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is } #[tokio::test] -async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> { +async fn thread_resume_defers_updated_at_until_turn_start() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?; @@ -1973,6 +1974,33 @@ async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() - let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; assert_eq!(after_modified, rollout.before_modified); + let unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { + thread_id: thread_id.clone(), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)), + ) + .await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: "not-a-valid-thread-id".to_string(), + path: Some(normalized_existing_path(&rollout.rollout_file_path)?), + cwd: Some(codex_home.path().to_string_lossy().to_string()), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { cwd, .. } = to_response::(resume_resp)?; + assert_eq!(cwd, AbsolutePathBuf::from_absolute_path(codex_home.path())?); + let turn_id = mcp .send_turn_start_request(TurnStartParams { thread_id, @@ -2291,9 +2319,10 @@ async fn thread_resume_rejects_mismatched_path_for_running_thread_id() -> Result ) .await??; - let stale_path = rollout_path(codex_home.path(), "2025-01-01T00-00-00", &thread_id); + let stale_thread_id = Uuid::new_v4().to_string(); + let stale_path = rollout_path(codex_home.path(), "2025-01-01T00-00-00", &stale_thread_id); std::fs::create_dir_all(stale_path.parent().expect("stale path parent"))?; - let thread_uuid = Uuid::parse_str(&thread_id)?; + let thread_uuid = Uuid::parse_str(&stale_thread_id)?; let mut stale_file = std::fs::File::create(&stale_path)?; let stale_meta = json!({ "timestamp": "2025-01-01T00:00:00Z", diff --git a/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs index c0188add8ce..45437b7ee11 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs @@ -317,6 +317,7 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id, + cwd: Some(codex_home.path().to_string_lossy().to_string()), ..Default::default() }) .await?;