Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use tokio::sync::mpsc;
fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread {
Thread {
id: thread_id.to_string(),
forked_from_id: None,
preview: "first prompt".to_string(),
ephemeral,
model_provider: "openai".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12186,6 +12186,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10041,6 +10041,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,13 @@
"description": "Whether the thread is ephemeral and should not be materialized on disk.",
"type": "boolean"
},
"forkedFromId": {
"description": "Source thread id when this thread was created by forking another thread.",
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/app-server-protocol/schema/typescript/v2/Thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import type { ThreadStatus } from "./ThreadStatus";
import type { Turn } from "./Turn";

export type Thread = { id: string,
/**
* Source thread id when this thread was created by forking another thread.
*/
forkedFromId: string | null,
/**
* Usually the first user message in the thread, if available.
*/
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,7 @@ mod tests {
response: v2::ThreadStartResponse {
thread: v2::Thread {
id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
forked_from_id: None,
preview: "first prompt".to_string(),
ephemeral: true,
model_provider: "openai".to_string(),
Expand Down Expand Up @@ -1343,6 +1344,7 @@ mod tests {
"response": {
"thread": {
"id": "67e55044-10b1-426f-9247-bb680e5fe0c8",
"forkedFromId": null,
"preview": "first prompt",
"ephemeral": true,
"modelProvider": "openai",
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3574,6 +3574,8 @@ impl From<CoreSkillErrorInfo> for SkillErrorInfo {
#[ts(export_to = "v2/")]
pub struct Thread {
pub id: String,
/// Source thread id when this thread was created by forking another thread.
pub forked_from_id: Option<String>,
/// Usually the first user message in the thread, if available.
pub preview: String,
/// Whether the thread is ephemeral and should not be materialized on disk.
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Example with notification opt-out:

- `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
Expand Down
69 changes: 68 additions & 1 deletion codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3523,6 +3523,11 @@ impl CodexMessageProcessor {
}
build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path)
};
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path.as_ref()
{
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_uuid, &mut thread).await;

if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
Expand Down Expand Up @@ -4352,7 +4357,12 @@ impl CodexMessageProcessor {
)
.await
{
Ok(summary) => summary_to_thread(summary),
Ok(summary) => {
let mut thread = summary_to_thread(summary);
thread.forked_from_id =
forked_from_id_from_rollout(fork_rollout_path.as_path()).await;
thread
}
Err(err) => {
self.send_internal_error(
request_id,
Expand Down Expand Up @@ -4386,6 +4396,14 @@ impl CodexMessageProcessor {
}
};
thread.preview = preview_from_rollout_items(&history_items);
thread.forked_from_id = source_thread_id
.or_else(|| {
history_items.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.id),
_ => None,
})
})
.map(|id| id.to_string());
if let Err(message) = populate_thread_turns(
&mut thread,
ThreadTurnSource::HistoryItems(&history_items),
Expand Down Expand Up @@ -8566,6 +8584,7 @@ async fn load_thread_summary_for_rollout(
rollout_path.display()
)
})?;
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
if let Some(persisted_metadata) = persisted_metadata {
merge_mutable_thread_metadata(
&mut thread,
Expand All @@ -8577,6 +8596,14 @@ async fn load_thread_summary_for_rollout(
Ok(thread)
}

async fn forked_from_id_from_rollout(path: &Path) -> Option<String> {
read_session_meta_line(path)
.await
.ok()
.and_then(|meta_line| meta_line.meta.forked_from_id)
.map(|thread_id| thread_id.to_string())
}

fn merge_mutable_thread_metadata(thread: &mut Thread, persisted_thread: Thread) {
thread.git_info = persisted_thread.git_info;
}
Expand Down Expand Up @@ -8657,6 +8684,7 @@ fn build_thread_from_snapshot(
let now = time::OffsetDateTime::now_utc().unix_timestamp();
Thread {
id: thread_id.to_string(),
forked_from_id: None,
preview: String::new(),
ephemeral: config_snapshot.ephemeral,
model_provider: config_snapshot.model_provider_id.clone(),
Expand Down Expand Up @@ -8699,6 +8727,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {

Thread {
id: conversation_id.to_string(),
forked_from_id: None,
preview,
ephemeral: false,
model_provider,
Expand Down Expand Up @@ -9195,6 +9224,44 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn read_summary_from_rollout_preserves_forked_from_id() -> Result<()> {
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
use std::fs;

let temp_dir = TempDir::new()?;
let path = temp_dir.path().join("rollout.jsonl");

let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;
let forked_from_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let timestamp = "2025-09-05T16:53:11.850Z".to_string();

let session_meta = SessionMeta {
id: conversation_id,
forked_from_id: Some(forked_from_id),
timestamp: timestamp.clone(),
model_provider: Some("test-provider".to_string()),
..SessionMeta::default()
};

let line = RolloutLine {
timestamp,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta,
git: None,
}),
};
fs::write(&path, format!("{}\n", serde_json::to_string(&line)?))?;

assert_eq!(
forked_from_id_from_rollout(path.as_path()).await,
Some(forked_from_id.to_string())
);
Ok(())
}

#[tokio::test]
async fn aborting_pending_request_clears_pending_state() -> Result<()> {
let thread_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/src/thread_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ mod tests {
fn test_thread(thread_id: &str, source: codex_app_server_protocol::SessionSource) -> Thread {
Thread {
id: thread_id.to_string(),
forked_from_id: None,
preview: String::new(),
ephemeral: false,
model_provider: "mock-provider".to_string(),
Expand Down
1 change: 1 addition & 0 deletions codex-rs/app-server/tests/suite/v2/thread_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
);

assert_ne!(thread.id, conversation_id);
assert_eq!(thread.forked_from_id, Some(conversation_id.clone()));
assert_eq!(thread.preview, preview);
assert_eq!(thread.model_provider, "mock_provider");
assert_eq!(thread.status, ThreadStatus::Idle);
Expand Down
Loading
Loading