Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions codex-rs/app-server-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::config_types::Verbosity;
use codex_protocol::models::ResponseItem;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::EventMsg;
Expand Down Expand Up @@ -499,12 +500,15 @@ pub struct LogoutAccountResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct ResumeConversationParams {
/// Absolute path to the rollout JSONL file. If omitted, `conversationId` must be provided.
/// Absolute path to the rollout JSONL file, when explicitly resuming a known rollout.
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PathBuf>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have a separate request for it. This current approach makes it possible to send a request without path or history.

/// If the rollout path is not known, it can be discovered via the conversation id at at the cost of extra latency.
/// If the rollout path is not known, it can be discovered via the conversation id at the cost of extra latency.
#[serde(skip_serializing_if = "Option::is_none")]
pub conversation_id: Option<ConversationId>,
/// if the rollout path or conversation id is not known, it can be resumed from given history
#[serde(skip_serializing_if = "Option::is_none")]
pub history: Option<Vec<ResponseItem>>,
/// Optional overrides to apply when spawning the resumed session.
#[serde(skip_serializing_if = "Option::is_none")]
pub overrides: Option<NewConversationParams>,
Expand Down
141 changes: 97 additions & 44 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::Cursor as RolloutCursor;
use codex_core::INTERACTIVE_SESSION_SOURCES;
use codex_core::InitialHistory;
use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
Expand All @@ -79,6 +80,7 @@ use codex_core::config_edit::persist_overrides_and_clear_if_none;
use codex_core::default_client::get_codex_user_agent;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox;
use codex_core::git_info::git_diff_to_remote;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
Expand All @@ -97,6 +99,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_utils_json_to_toml::json_to_toml;
Expand Down Expand Up @@ -1015,45 +1018,15 @@ impl CodexMessageProcessor {
request_id: RequestId,
params: ResumeConversationParams,
) {
let path = match params {
ResumeConversationParams {
path: Some(path), ..
} => path,
ResumeConversationParams {
conversation_id: Some(conversation_id),
..
} => {
match codex_core::find_conversation_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
{
Ok(Some(p)) => p,
_ => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "unable to locate rollout path".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
}
}
_ => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "either path or conversation id must be provided".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let ResumeConversationParams {
path,
conversation_id,
history,
overrides,
} = params;

// Derive a Config using the same logic as new conversation, honoring overrides if provided.
let config = match params.overrides {
let config = match overrides {
Some(overrides) => {
derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone()).await
}
Expand All @@ -1062,19 +1035,90 @@ impl CodexMessageProcessor {
let config = match config {
Ok(cfg) => cfg,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.send_invalid_request_error(
request_id,
format!("error deriving config: {err}"),
)
.await;
return;
}
};

let conversation_history = if let Some(path) = path {
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to load rollout `{}`: {err}", path.display()),
)
.await;
return;
}
}
} else if let Some(conversation_id) = conversation_id {
match find_conversation_path_by_id_str(
&self.config.codex_home,
&conversation_id.to_string(),
)
.await
{
Ok(Some(found_path)) => {
match RolloutRecorder::get_rollout_history(&found_path).await {
Ok(initial_history) => initial_history,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
found_path.display()
),
).await;
return;
}
}
}
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for conversation id {conversation_id}"),
)
.await;
return;
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate conversation id {conversation_id}: {err}"),
)
.await;
return;
}
}
} else {
match history {
Some(history) if !history.is_empty() => InitialHistory::Forked(
history.into_iter().map(RolloutItem::ResponseItem).collect(),
),
Some(_) | None => {
self.send_invalid_request_error(
request_id,
"either path, conversation id or non empty history must be provided"
.to_string(),
)
.await;
return;
}
}
};

match self
.conversation_manager
.resume_conversation_from_rollout(config, path.clone(), self.auth_manager.clone())
.resume_conversation_with_history(
config,
conversation_history,
self.auth_manager.clone(),
)
.await
{
Ok(NewConversation {
Expand Down Expand Up @@ -1119,6 +1163,15 @@ impl CodexMessageProcessor {
}
}

async fn send_invalid_request_error(&self, request_id: RequestId, message: String) {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
}

async fn archive_conversation(&self, request_id: RequestId, params: ArchiveConversationParams) {
let ArchiveConversationParams {
conversation_id,
Expand Down
Loading
Loading