From e5a237c9973b6eea8503e2b245418870d0106ce5 Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Tue, 28 Oct 2025 15:32:10 -0700 Subject: [PATCH] [codex][app-server] resume conversation from history --- codex-rs/app-server-protocol/src/protocol.rs | 8 +- .../app-server/src/codex_message_processor.rs | 141 +++++++++++----- .../app-server/tests/suite/list_resume.rs | 154 +++++++++++++++++- codex-rs/core/src/conversation_manager.rs | 10 ++ codex-rs/core/src/rollout/recorder.rs | 2 +- 5 files changed, 263 insertions(+), 52 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol.rs b/codex-rs/app-server-protocol/src/protocol.rs index 496bc81009..7ade5f4e3b 100644 --- a/codex-rs/app-server-protocol/src/protocol.rs +++ b/codex-rs/app-server-protocol/src/protocol.rs @@ -11,6 +11,7 @@ use codex_protocol::config_types::ReasoningEffort; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::SandboxMode; use codex_protocol::config_types::Verbosity; +use codex_protocol::models::ResponseItem; use codex_protocol::parse_command::ParsedCommand; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::EventMsg; @@ -499,12 +500,15 @@ pub struct LogoutAccountResponse {} #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] pub struct ResumeConversationParams { - /// Absolute path to the rollout JSONL file. If omitted, `conversationId` must be provided. + /// Absolute path to the rollout JSONL file, when explicitly resuming a known rollout. #[serde(skip_serializing_if = "Option::is_none")] pub path: Option, - /// If the rollout path is not known, it can be discovered via the conversation id at at the cost of extra latency. + /// If the rollout path is not known, it can be discovered via the conversation id at the cost of extra latency. #[serde(skip_serializing_if = "Option::is_none")] pub conversation_id: Option, + /// if the rollout path or conversation id is not known, it can be resumed from given history + #[serde(skip_serializing_if = "Option::is_none")] + pub history: Option>, /// Optional overrides to apply when spawning the resumed session. #[serde(skip_serializing_if = "Option::is_none")] pub overrides: Option, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 9edb73c5d1..fabdf03c89 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -64,6 +64,7 @@ use codex_core::CodexConversation; use codex_core::ConversationManager; use codex_core::Cursor as RolloutCursor; use codex_core::INTERACTIVE_SESSION_SOURCES; +use codex_core::InitialHistory; use codex_core::NewConversation; use codex_core::RolloutRecorder; use codex_core::SessionMeta; @@ -79,6 +80,7 @@ use codex_core::config_edit::persist_overrides_and_clear_if_none; use codex_core::default_client::get_codex_user_agent; use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; +use codex_core::find_conversation_path_by_id_str; use codex_core::get_platform_sandbox; use codex_core::git_info::git_diff_to_remote; use codex_core::protocol::ApplyPatchApprovalRequestEvent; @@ -97,6 +99,7 @@ use codex_protocol::config_types::ForcedLoginMethod; use codex_protocol::items::TurnItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::RateLimitSnapshot; +use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::USER_MESSAGE_BEGIN; use codex_protocol::user_input::UserInput as CoreInputItem; use codex_utils_json_to_toml::json_to_toml; @@ -1015,45 +1018,15 @@ impl CodexMessageProcessor { request_id: RequestId, params: ResumeConversationParams, ) { - let path = match params { - ResumeConversationParams { - path: Some(path), .. - } => path, - ResumeConversationParams { - conversation_id: Some(conversation_id), - .. - } => { - match codex_core::find_conversation_path_by_id_str( - &self.config.codex_home, - &conversation_id.to_string(), - ) - .await - { - Ok(Some(p)) => p, - _ => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "unable to locate rollout path".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - } - } - _ => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "either path or conversation id must be provided".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let ResumeConversationParams { + path, + conversation_id, + history, + overrides, + } = params; // Derive a Config using the same logic as new conversation, honoring overrides if provided. - let config = match params.overrides { + let config = match overrides { Some(overrides) => { derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone()).await } @@ -1062,19 +1035,90 @@ impl CodexMessageProcessor { let config = match config { Ok(cfg) => cfg, Err(err) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("error deriving config: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; + self.send_invalid_request_error( + request_id, + format!("error deriving config: {err}"), + ) + .await; return; } }; + let conversation_history = if let Some(path) = path { + match RolloutRecorder::get_rollout_history(&path).await { + Ok(initial_history) => initial_history, + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to load rollout `{}`: {err}", path.display()), + ) + .await; + return; + } + } + } else if let Some(conversation_id) = conversation_id { + match find_conversation_path_by_id_str( + &self.config.codex_home, + &conversation_id.to_string(), + ) + .await + { + Ok(Some(found_path)) => { + match RolloutRecorder::get_rollout_history(&found_path).await { + Ok(initial_history) => initial_history, + Err(err) => { + self.send_invalid_request_error( + request_id, + format!( + "failed to load rollout `{}` for conversation {conversation_id}: {err}", + found_path.display() + ), + ).await; + return; + } + } + } + Ok(None) => { + self.send_invalid_request_error( + request_id, + format!("no rollout found for conversation id {conversation_id}"), + ) + .await; + return; + } + Err(err) => { + self.send_invalid_request_error( + request_id, + format!("failed to locate conversation id {conversation_id}: {err}"), + ) + .await; + return; + } + } + } else { + match history { + Some(history) if !history.is_empty() => InitialHistory::Forked( + history.into_iter().map(RolloutItem::ResponseItem).collect(), + ), + Some(_) | None => { + self.send_invalid_request_error( + request_id, + "either path, conversation id or non empty history must be provided" + .to_string(), + ) + .await; + return; + } + } + }; + match self .conversation_manager - .resume_conversation_from_rollout(config, path.clone(), self.auth_manager.clone()) + .resume_conversation_with_history( + config, + conversation_history, + self.auth_manager.clone(), + ) .await { Ok(NewConversation { @@ -1119,6 +1163,15 @@ impl CodexMessageProcessor { } } + async fn send_invalid_request_error(&self, request_id: RequestId, message: String) { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message, + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) { let ArchiveConversationParams { conversation_id, diff --git a/codex-rs/app-server/tests/suite/list_resume.rs b/codex-rs/app-server/tests/suite/list_resume.rs index e0d17ec224..76b92b3edc 100644 --- a/codex-rs/app-server/tests/suite/list_resume.rs +++ b/codex-rs/app-server/tests/suite/list_resume.rs @@ -11,6 +11,9 @@ use codex_app_server_protocol::ResumeConversationParams; use codex_app_server_protocol::ResumeConversationResponse; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::SessionConfiguredNotification; +use codex_core::protocol::EventMsg; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use pretty_assertions::assert_eq; use serde_json::json; use std::fs; @@ -168,11 +171,14 @@ async fn test_list_and_resume_conversations() -> Result<()> { assert!(empty_items.is_empty()); assert!(empty_next.is_none()); - // Now resume one of the sessions and expect a SessionConfigured notification and response. + let first_item = &items[0]; + + // Now resume one of the sessions from an explicit rollout path. let resume_req_id = mcp .send_resume_conversation_request(ResumeConversationParams { - path: Some(items[0].path.clone()), + path: Some(first_item.path.clone()), conversation_id: None, + history: None, overrides: Some(NewConversationParams { model: Some("o3".to_string()), ..Default::default() @@ -187,17 +193,25 @@ async fn test_list_and_resume_conversations() -> Result<()> { ) .await??; let session_configured: ServerNotification = notification.try_into()?; - // Basic shape assertion: ensure event type is sessionConfigured let ServerNotification::SessionConfigured(SessionConfiguredNotification { model, rollout_path, + initial_messages: session_initial_messages, .. }) = session_configured else { unreachable!("expected sessionConfigured notification"); }; assert_eq!(model, "o3"); - assert_eq!(items[0].path.clone(), rollout_path); + assert_eq!(rollout_path, first_item.path.clone()); + let session_initial_messages = session_initial_messages + .expect("expected initial messages when resuming from rollout path"); + match session_initial_messages.as_slice() { + [EventMsg::UserMessage(message)] => { + assert_eq!(message.message, first_item.preview.clone()); + } + other => panic!("unexpected initial messages from rollout resume: {other:#?}"), + } // Then the response for resumeConversation let resume_resp: JSONRPCResponse = timeout( @@ -206,10 +220,140 @@ async fn test_list_and_resume_conversations() -> Result<()> { ) .await??; let ResumeConversationResponse { - conversation_id, .. + conversation_id, + model: resume_model, + initial_messages: response_initial_messages, + .. } = to_response::(resume_resp)?; // conversation id should be a valid UUID assert!(!conversation_id.to_string().is_empty()); + assert_eq!(resume_model, "o3"); + let response_initial_messages = + response_initial_messages.expect("expected initial messages in resume response"); + match response_initial_messages.as_slice() { + [EventMsg::UserMessage(message)] => { + assert_eq!(message.message, first_item.preview.clone()); + } + other => panic!("unexpected initial messages in resume response: {other:#?}"), + } + + // Resuming with only a conversation id should locate the rollout automatically. + let resume_by_id_req_id = mcp + .send_resume_conversation_request(ResumeConversationParams { + path: None, + conversation_id: Some(first_item.conversation_id), + history: None, + overrides: Some(NewConversationParams { + model: Some("o3".to_string()), + ..Default::default() + }), + }) + .await?; + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("sessionConfigured"), + ) + .await??; + let session_configured: ServerNotification = notification.try_into()?; + let ServerNotification::SessionConfigured(SessionConfiguredNotification { + model, + rollout_path, + initial_messages: session_initial_messages, + .. + }) = session_configured + else { + unreachable!("expected sessionConfigured notification"); + }; + assert_eq!(model, "o3"); + assert_eq!(rollout_path, first_item.path.clone()); + let session_initial_messages = session_initial_messages + .expect("expected initial messages when resuming from conversation id"); + match session_initial_messages.as_slice() { + [EventMsg::UserMessage(message)] => { + assert_eq!(message.message, first_item.preview.clone()); + } + other => panic!("unexpected initial messages from conversation id resume: {other:#?}"), + } + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_by_id_req_id)), + ) + .await??; + let ResumeConversationResponse { + conversation_id: by_id_conversation_id, + model: by_id_model, + initial_messages: by_id_initial_messages, + .. + } = to_response::(resume_resp)?; + assert!(!by_id_conversation_id.to_string().is_empty()); + assert_eq!(by_id_model, "o3"); + let by_id_initial_messages = by_id_initial_messages + .expect("expected initial messages when resuming from conversation id response"); + match by_id_initial_messages.as_slice() { + [EventMsg::UserMessage(message)] => { + assert_eq!(message.message, first_item.preview.clone()); + } + other => { + panic!("unexpected initial messages in conversation id resume response: {other:#?}") + } + } + + // Resuming with explicit history should succeed even without a stored rollout. + let fork_history_text = "Hello from history"; + let history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: fork_history_text.to_string(), + }], + }]; + let resume_with_history_req_id = mcp + .send_resume_conversation_request(ResumeConversationParams { + path: None, + conversation_id: None, + history: Some(history), + overrides: Some(NewConversationParams { + model: Some("o3".to_string()), + ..Default::default() + }), + }) + .await?; + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("sessionConfigured"), + ) + .await??; + let session_configured: ServerNotification = notification.try_into()?; + let ServerNotification::SessionConfigured(SessionConfiguredNotification { + model, + initial_messages: session_initial_messages, + .. + }) = session_configured + else { + unreachable!("expected sessionConfigured notification"); + }; + assert_eq!(model, "o3"); + assert!( + session_initial_messages.as_ref().is_none_or(Vec::is_empty), + "expected no initial messages when resuming from explicit history but got {session_initial_messages:#?}" + ); + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_with_history_req_id)), + ) + .await??; + let ResumeConversationResponse { + conversation_id: history_conversation_id, + model: history_model, + initial_messages: history_initial_messages, + .. + } = to_response::(resume_resp)?; + assert!(!history_conversation_id.to_string().is_empty()); + assert_eq!(history_model, "o3"); + assert!( + history_initial_messages.as_ref().is_none_or(Vec::is_empty), + "expected no initial messages in resume response when history is provided but got {history_initial_messages:#?}" + ); Ok(()) } diff --git a/codex-rs/core/src/conversation_manager.rs b/codex-rs/core/src/conversation_manager.rs index b911526f68..f0aa8b068d 100644 --- a/codex-rs/core/src/conversation_manager.rs +++ b/codex-rs/core/src/conversation_manager.rs @@ -132,6 +132,16 @@ impl ConversationManager { auth_manager: Arc, ) -> CodexResult { let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; + self.resume_conversation_with_history(config, initial_history, auth_manager) + .await + } + + pub async fn resume_conversation_with_history( + &self, + config: Config, + initial_history: InitialHistory, + auth_manager: Arc, + ) -> CodexResult { let CodexSpawnOk { codex, conversation_id, diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index d575da9652..a39f85c823 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -207,7 +207,7 @@ impl RolloutRecorder { .map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}"))) } - pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result { + pub async fn get_rollout_history(path: &Path) -> std::io::Result { info!("Resuming rollout from {path:?}"); let text = tokio::fs::read_to_string(path).await?; if text.trim().is_empty() {