Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
649 changes: 532 additions & 117 deletions codex-rs/core/src/codex/memory_startup.rs

Large diffs are not rendered by default.

41 changes: 35 additions & 6 deletions codex-rs/core/src/memories/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,28 @@ use std::path::PathBuf;
/// Subagent source label used to identify consolidation tasks.
pub(crate) const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation";
/// Maximum number of rollout candidates processed per startup pass.
pub(crate) const MAX_ROLLOUTS_PER_STARTUP: usize = 8;
pub(crate) const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
pub(crate) const PHASE_ONE_CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP;
/// Maximum number of recent raw memories retained per working directory.
pub(crate) const MAX_RAW_MEMORIES_PER_CWD: usize = 10;
/// Maximum number of recent raw memories retained per scope.
pub(crate) const MAX_RAW_MEMORIES_PER_SCOPE: usize = 64;
/// Maximum rollout age considered for phase-1 extraction.
pub(crate) const PHASE_ONE_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
/// Lease duration (seconds) for phase-1 job ownership.
pub(crate) const PHASE_ONE_JOB_LEASE_SECONDS: i64 = 3_600;
/// Lease duration (seconds) for per-cwd consolidation locks.
pub(crate) const CONSOLIDATION_LOCK_LEASE_SECONDS: i64 = 600;
pub(crate) const MEMORY_SCOPE_KIND_CWD: &str = "cwd";
pub(crate) const MEMORY_SCOPE_KIND_USER: &str = "user";
pub(crate) const MEMORY_SCOPE_KEY_USER: &str = "user";

const MEMORY_SUBDIR: &str = "memory";
const RAW_MEMORIES_SUBDIR: &str = "raw_memories";
const MEMORY_SUMMARY_FILENAME: &str = "memory_summary.md";
const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md";
const SKILLS_SUBDIR: &str = "skills";
const CWD_MEMORY_BUCKET_HEX_LEN: usize = 16;

pub(crate) use phase_one::RAW_MEMORY_PROMPT;
pub(crate) use phase_one::parse_stage_one_output;
Expand All @@ -43,8 +51,9 @@ pub(crate) use rollout::StageOneRolloutFilter;
pub(crate) use rollout::serialize_filtered_rollout_response_items;
pub(crate) use selection::select_rollout_candidates_from_db;
pub(crate) use storage::prune_to_recent_memories_and_rebuild_summary;
pub(crate) use storage::rebuild_memory_summary_from_memories;
pub(crate) use storage::sync_raw_memories_from_memories;
pub(crate) use storage::wipe_consolidation_outputs;
pub(crate) use storage::write_raw_memory;
pub(crate) use types::RolloutCandidate;

/// Returns the on-disk memory root directory for a given working directory.
Expand All @@ -56,6 +65,21 @@ pub(crate) fn memory_root_for_cwd(codex_home: &Path, cwd: &Path) -> PathBuf {
codex_home.join("memories").join(bucket).join(MEMORY_SUBDIR)
}

/// Returns the DB scope key for a cwd-scoped memory entry.
///
/// This uses the same normalization/fallback behavior as cwd bucket derivation.
pub(crate) fn memory_scope_key_for_cwd(cwd: &Path) -> String {
normalize_cwd_for_memory(cwd).display().to_string()
}

/// Returns the on-disk user-shared memory root directory.
pub(crate) fn memory_root_for_user(codex_home: &Path) -> PathBuf {
codex_home
.join("memories")
.join(MEMORY_SCOPE_KEY_USER)
.join(MEMORY_SUBDIR)
}

fn raw_memories_dir(root: &Path) -> PathBuf {
root.join(RAW_MEMORIES_SUBDIR)
}
Expand All @@ -70,9 +94,14 @@ pub(crate) async fn ensure_layout(root: &Path) -> std::io::Result<()> {
}

fn memory_bucket_for_cwd(cwd: &Path) -> String {
let normalized = normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf());
let normalized = normalize_cwd_for_memory(cwd);
let normalized = normalized.to_string_lossy();
let mut hasher = Sha256::new();
hasher.update(normalized.as_bytes());
format!("{:x}", hasher.finalize())
let full_hash = format!("{:x}", hasher.finalize());
full_hash[..CWD_MEMORY_BUCKET_HEX_LEN].to_string()
}

fn normalize_cwd_for_memory(cwd: &Path) -> PathBuf {
normalize_for_path_comparison(cwd).unwrap_or_else(|_| cwd.to_path_buf())
}
21 changes: 7 additions & 14 deletions codex-rs/core/src/memories/selection.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,40 @@
use chrono::Duration;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_state::ThreadMemory;
use codex_state::ThreadMetadata;
use std::collections::BTreeMap;

use super::types::RolloutCandidate;

/// Selects rollout candidates that need stage-1 memory extraction.
///
/// A rollout is selected when it is not the active thread and has no memory yet
/// (or the stored memory is older than the thread metadata timestamp).
/// A rollout is selected when it is not the active thread and was updated
/// within the configured max age window.
pub(crate) fn select_rollout_candidates_from_db(
items: &[ThreadMetadata],
current_thread_id: ThreadId,
existing_memories: &[ThreadMemory],
max_items: usize,
max_age_days: i64,
) -> Vec<RolloutCandidate> {
if max_items == 0 {
return Vec::new();
}

let memory_updated_by_thread = existing_memories
.iter()
.map(|memory| (memory.thread_id.to_string(), memory.updated_at))
.collect::<BTreeMap<_, _>>();
let cutoff = Utc::now() - Duration::days(max_age_days.max(0));

let mut candidates = Vec::new();

for item in items {
if item.id == current_thread_id {
continue;
}

let memory_updated_at = memory_updated_by_thread.get(&item.id.to_string());
if memory_updated_at.is_some_and(|memory_updated_at| *memory_updated_at >= item.updated_at)
{
if item.updated_at < cutoff {
continue;
}

candidates.push(RolloutCandidate {
thread_id: item.id,
rollout_path: item.rollout_path.clone(),
cwd: item.cwd.clone(),
title: item.title.clone(),
updated_at: Some(item.updated_at.to_rfc3339()),
});

Expand Down
120 changes: 60 additions & 60 deletions codex-rs/core/src/memories/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,60 @@ use std::path::PathBuf;
use tracing::warn;

use super::LEGACY_CONSOLIDATED_FILENAME;
use super::MAX_RAW_MEMORIES_PER_CWD;
use super::MAX_RAW_MEMORIES_PER_SCOPE;
use super::MEMORY_REGISTRY_FILENAME;
use super::SKILLS_SUBDIR;
use super::ensure_layout;
use super::memory_summary_file;
use super::raw_memories_dir;
use super::types::RolloutCandidate;

/// Writes (or replaces) the per-thread markdown raw memory on disk.
///
/// This also removes older files for the same thread id to keep one canonical
/// raw memory file per thread.
pub(crate) async fn write_raw_memory(
/// Prunes stale raw memory files and rebuilds the routing summary for recent memories.
pub(crate) async fn prune_to_recent_memories_and_rebuild_summary(
root: &Path,
candidate: &RolloutCandidate,
raw_memory: &str,
) -> std::io::Result<PathBuf> {
let slug = build_memory_slug(&candidate.title);
let filename = format!("{}_{}.md", candidate.thread_id, slug);
let path = raw_memories_dir(root).join(filename);
memories: &[ThreadMemory],
) -> std::io::Result<()> {
ensure_layout(root).await?;

remove_outdated_thread_raw_memories(root, &candidate.thread_id.to_string(), &path).await?;
let keep = memories
.iter()
.take(MAX_RAW_MEMORIES_PER_SCOPE)
.map(|memory| memory.thread_id.to_string())
.collect::<BTreeSet<_>>();

let mut body = String::new();
writeln!(body, "thread_id: {}", candidate.thread_id)
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
writeln!(body, "cwd: {}", candidate.cwd.display())
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
writeln!(body, "rollout_path: {}", candidate.rollout_path.display())
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
if let Some(updated_at) = candidate.updated_at.as_deref() {
writeln!(body, "updated_at: {updated_at}")
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
}
writeln!(body).map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
body.push_str(raw_memory.trim());
body.push('\n');
prune_raw_memories(root, &keep).await?;
rebuild_memory_summary(root, memories).await
}

tokio::fs::write(&path, body).await?;
Ok(path)
/// Rebuild `memory_summary.md` for a scope without pruning raw memory files.
pub(crate) async fn rebuild_memory_summary_from_memories(
root: &Path,
memories: &[ThreadMemory],
) -> std::io::Result<()> {
ensure_layout(root).await?;
rebuild_memory_summary(root, memories).await
}

/// Prunes stale raw memory files and rebuilds the routing summary for recent memories.
pub(crate) async fn prune_to_recent_memories_and_rebuild_summary(
/// Syncs canonical raw memory files from DB-backed memory rows.
pub(crate) async fn sync_raw_memories_from_memories(
root: &Path,
memories: &[ThreadMemory],
) -> std::io::Result<()> {
ensure_layout(root).await?;

let keep = memories
let retained = memories
.iter()
.take(MAX_RAW_MEMORIES_PER_SCOPE)
.collect::<Vec<_>>();
let keep = retained
.iter()
.take(MAX_RAW_MEMORIES_PER_CWD)
.map(|memory| memory.thread_id.to_string())
.collect::<BTreeSet<_>>();

prune_raw_memories(root, &keep).await?;
rebuild_memory_summary(root, memories).await

for memory in retained {
write_raw_memory_for_thread(root, memory).await?;
}
Ok(())
}

/// Clears consolidation outputs so a fresh consolidation run can regenerate them.
Expand Down Expand Up @@ -103,7 +100,7 @@ async fn rebuild_memory_summary(root: &Path, memories: &[ThreadMemory]) -> std::
}

body.push_str("Map of concise summaries to thread IDs (latest first):\n\n");
for memory in memories.iter().take(MAX_RAW_MEMORIES_PER_CWD) {
for memory in memories.iter().take(MAX_RAW_MEMORIES_PER_SCOPE) {
let summary = compact_summary_for_index(&memory.memory_summary);
writeln!(body, "- {summary} (thread: `{}`)", memory.thread_id)
.map_err(|err| std::io::Error::other(format!("format memory summary: {err}")))?;
Expand Down Expand Up @@ -179,27 +176,25 @@ async fn remove_outdated_thread_raw_memories(
Ok(())
}

fn build_memory_slug(value: &str) -> String {
let mut slug = String::new();
let mut last_was_sep = false;

for ch in value.chars() {
let normalized = ch.to_ascii_lowercase();
if normalized.is_ascii_alphanumeric() {
slug.push(normalized);
last_was_sep = false;
} else if !last_was_sep {
slug.push('_');
last_was_sep = true;
}
}
async fn write_raw_memory_for_thread(
root: &Path,
memory: &ThreadMemory,
) -> std::io::Result<PathBuf> {
let path = raw_memories_dir(root).join(format!("{}.md", memory.thread_id));

let slug = slug.trim_matches('_').to_string();
if slug.is_empty() {
"memory".to_string()
} else {
slug.chars().take(64).collect()
}
remove_outdated_thread_raw_memories(root, &memory.thread_id.to_string(), &path).await?;

let mut body = String::new();
writeln!(body, "thread_id: {}", memory.thread_id)
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
writeln!(body, "updated_at: {}", memory.updated_at.to_rfc3339())
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
writeln!(body).map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
body.push_str(memory.raw_memory.trim());
body.push('\n');

tokio::fs::write(&path, body).await?;
Ok(path)
}

fn compact_summary_for_index(summary: &str) -> String {
Expand All @@ -208,10 +203,15 @@ fn compact_summary_for_index(summary: &str) -> String {

fn extract_thread_id_from_summary_filename(file_name: &str) -> Option<&str> {
let stem = file_name.strip_suffix(".md")?;
let (thread_id, _) = stem.split_once('_')?;
if thread_id.is_empty() {
if stem.is_empty() {
None
} else if let Some((thread_id, _legacy_slug)) = stem.split_once('_') {
if thread_id.is_empty() {
None
} else {
Some(thread_id)
}
} else {
Some(thread_id)
Some(stem)
}
}
Loading
Loading