Skip to content
Merged
62 changes: 34 additions & 28 deletions codex-rs/core/src/memories/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ Phase 2 consolidates the latest stage-1 outputs into the filesystem memory artif

What it does:

- claims a single global phase-2 job (so only one consolidation runs at a time)
- claims a single global phase-2 lock before touching the memories root (so only one consolidation
inspects or mutates the workspace at a time)
- loads a bounded set of stage-1 outputs from the state DB using phase-2
selection rules:
- ignores memories whose `last_usage` falls outside the configured
Expand All @@ -82,53 +83,58 @@ What it does:
- computes a completion watermark from the claimed watermark + newest input timestamps
- syncs local memory artifacts under the memories root:
- `raw_memories.md` (merged raw memories, latest first)
- `rollout_summaries/` (one summary file per retained rollout)
- prunes stale rollout summaries that are no longer retained
- finds old resource files from memory extensions under
`memories_extensions/<extension>/resources/` for extension directories that
have an `instructions.md`, using the memory module retention window
- if there are no Phase 1 inputs or old extension resources, marks the job
successful and exits

If there is input, it then:
- `rollout_summaries/` (one summary file per selected rollout)
- keeps the memories root itself as a git-baseline directory, initialized under
`~/.codex/memories/.git` by `codex-git-utils`
- prunes stale rollout summaries that are no longer selected
- prunes memory extension resource files older than the extension retention
window, so cleanup appears in the workspace diff
- writes `phase2_workspace_diff.md` in the memories root with the git-style diff
from the previous successful Phase 2 baseline to the current worktree
- if the memory workspace has no changes after artifact sync/pruning, marks the
job successful and exits

If the memory workspace has changes, it then:

- spawns an internal consolidation sub-agent
- builds the Phase 2 prompt with a diff of the current Phase 1 input
selection versus the last successful Phase 2 selection (`added`,
`retained`, `removed`)
- includes old extension resource paths in the prompt diff
- builds the Phase 2 prompt with the path to the generated workspace diff
- points the agent at `phase2_workspace_diff.md` for the detailed diff context
- runs it with no approvals, no network, and local write access only
- disables collab for that agent (to prevent recursive delegation)
- watches the agent status and heartbeats the global job lease while it runs
- resets the memory git baseline after the agent completes successfully; the
generated diff file is removed before this reset so deleted content is not
kept in the prompt artifact or unreachable git objects
- marks the phase-2 job success/failure in the state DB when the agent finishes
- prunes old extension resource files after the consolidation agent completes
and the successful Phase 2 job is recorded

Selection diff behavior:
Selection and workspace-diff behavior:

- successful Phase 2 runs mark the exact stage-1 snapshots they consumed with
`selected_for_phase2 = 1` and persist the matching
`selected_for_phase2_source_updated_at`
- Phase 1 upserts preserve the previous `selected_for_phase2` baseline until
the next successful Phase 2 run rewrites it
- the next Phase 2 run compares the current top-N stage-1 inputs against that
prior snapshot selection to label inputs as `added` or `retained`; a
refreshed thread stays `added` until Phase 2 successfully selects its newer
snapshot
- rows that were previously selected but still exist outside the current top-N
selection are surfaced as `removed`
- before the agent starts, local `rollout_summaries/` and `raw_memories.md`
keep the union of the current selection and the previous successful
selection, so removed-thread evidence stays available during forgetting
- Phase 2 loads only the current top-N selected stage-1 inputs, syncs
`rollout_summaries/` and `raw_memories.md` directly to that selection, then
lets the git-style workspace diff surface additions, modifications, and
deletions against the previous successful memory baseline
- when the selected input set is empty, stale `rollout_summaries/` files are
removed and `raw_memories.md` is rewritten to the empty-input placeholder;
consolidated outputs such as `MEMORY.md`, `memory_summary.md`, and `skills/`
are left for the agent to update

Watermark behavior:

- The global phase-2 job claim includes an input watermark representing the latest input timestamp known when the job was claimed.
- The global phase-2 lock does not use DB watermarks as a dirty check; git
workspace dirtiness decides whether an agent needs to run.
- The global phase-2 job row still tracks an input watermark as bookkeeping
for the latest DB input timestamp known when the job was claimed.
- Phase 2 recomputes a `new_watermark` using the max of:
- the claimed watermark
- the newest `source_updated_at` timestamp in the stage-1 inputs it actually loaded
- On success, Phase 2 stores that completion watermark in the DB.
- This lets later phase-2 runs know whether new stage-1 data arrived since the last successful consolidation (dirty vs not dirty), while also avoiding moving the watermark backwards.
- This avoids moving the recorded completion watermark backwards, but does not
decide whether Phase 2 has work.

In practice, this phase is responsible for refreshing the on-disk memory workspace and producing/updating the higher-level consolidated memory outputs.

Expand Down
188 changes: 19 additions & 169 deletions codex-rs/core/src/memories/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,27 @@ use chrono::Duration;
use chrono::NaiveDateTime;
use chrono::Utc;
use std::path::Path;
use std::path::PathBuf;
use tracing::warn;

const FILENAME_TS_FORMAT: &str = "%Y-%m-%dT%H-%M-%S";
pub(super) const EXTENSION_RESOURCE_RETENTION_DAYS: i64 = 7;

#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct RemovedExtensionResource {
pub(super) extension: String,
pub(super) resource_path: String,
pub(super) async fn prune_old_extension_resources(memory_root: &Path) {
prune_old_extension_resources_with_now(memory_root, Utc::now()).await
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct PendingExtensionResourceRemoval {
pub(super) removed: RemovedExtensionResource,
path: PathBuf,
}

pub(super) async fn find_old_extension_resources(
memory_root: &Path,
) -> Vec<PendingExtensionResourceRemoval> {
find_old_extension_resources_with_now(memory_root, Utc::now()).await
}

async fn find_old_extension_resources_with_now(
memory_root: &Path,
now: DateTime<Utc>,
) -> Vec<PendingExtensionResourceRemoval> {
let mut pending = Vec::new();
async fn prune_old_extension_resources_with_now(memory_root: &Path, now: DateTime<Utc>) {
let cutoff = now - Duration::days(EXTENSION_RESOURCE_RETENTION_DAYS);
let extensions_root = memory_extensions_root(memory_root);
let mut extensions = match tokio::fs::read_dir(&extensions_root).await {
Ok(extensions) => extensions,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return pending,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return,
Err(err) => {
warn!(
"failed reading memory extensions root {}: {err}",
extensions_root.display()
);
return pending;
return;
}
};

Expand All @@ -52,19 +33,10 @@ async fn find_old_extension_resources_with_now(
let Ok(file_type) = extension_entry.file_type().await else {
continue;
};
if !file_type.is_dir() {
continue;
}
let Some(extension) = extension_path
.file_name()
.and_then(|name| name.to_str())
.map(ToOwned::to_owned)
else {
continue;
};
if !tokio::fs::try_exists(extension_path.join("instructions.md"))
.await
.unwrap_or(false)
if !file_type.is_dir()
|| !tokio::fs::try_exists(extension_path.join("instructions.md"))
.await
.unwrap_or(false)
{
continue;
}
Expand Down Expand Up @@ -106,34 +78,14 @@ async fn find_old_extension_resources_with_now(
continue;
}

pending.push(PendingExtensionResourceRemoval {
removed: RemovedExtensionResource {
extension: extension.clone(),
resource_path: format!("resources/{file_name}"),
},
path: resource_file_path,
});
}
}

pending.sort_by(|left, right| {
left.removed
.extension
.cmp(&right.removed.extension)
.then_with(|| left.removed.resource_path.cmp(&right.removed.resource_path))
});
pending
}

pub(super) async fn remove_extension_resources(resources: &[PendingExtensionResourceRemoval]) {
for resource in resources {
if let Err(err) = tokio::fs::remove_file(&resource.path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed pruning old memory extension resource {}: {err}",
resource.path.display()
);
if let Err(err) = tokio::fs::remove_file(&resource_file_path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed pruning old memory extension resource {}: {err}",
resource_file_path.display()
);
}
}
}
}
Expand All @@ -145,107 +97,5 @@ fn resource_timestamp(file_name: &str) -> Option<DateTime<Utc>> {
}

#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tempfile::TempDir;

#[tokio::test]
async fn finds_only_old_resources_from_extensions_with_instructions() {
let codex_home = TempDir::new().expect("create temp codex home");
let memory_root = codex_home.path().join("memories");
let extensions_root = memory_extensions_root(&memory_root);
let chronicle_resources = extensions_root.join("chronicle/resources");
tokio::fs::create_dir_all(&chronicle_resources)
.await
.expect("create chronicle resources");
tokio::fs::write(
extensions_root.join("chronicle/instructions.md"),
"instructions",
)
.await
.expect("write chronicle instructions");

let now = DateTime::from_naive_utc_and_offset(
NaiveDateTime::parse_from_str("2026-04-14T12-00-00", FILENAME_TS_FORMAT)
.expect("parse now"),
Utc,
);
let old_file = chronicle_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
let exact_cutoff_file =
chronicle_resources.join("2026-04-07T12-00-00-abcd-10min-cutoff.md");
let recent_file = chronicle_resources.join("2026-04-08T12-00-00-abcd-10min-recent.md");
let invalid_file = chronicle_resources.join("not-a-timestamp.md");
for file in [&old_file, &exact_cutoff_file, &recent_file, &invalid_file] {
tokio::fs::write(file, "resource")
.await
.expect("write chronicle resource");
}

let ignored_resources = extensions_root.join("ignored/resources");
tokio::fs::create_dir_all(&ignored_resources)
.await
.expect("create ignored resources");
let ignored_old_file = ignored_resources.join("2026-04-06T11-59-59-abcd-10min-old.md");
tokio::fs::write(&ignored_old_file, "ignored")
.await
.expect("write ignored resource");

let pending = find_old_extension_resources_with_now(&memory_root, now).await;

assert_eq!(
pending
.iter()
.map(|resource| resource.removed.clone())
.collect::<Vec<_>>(),
vec![
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-06T11-59-59-abcd-10min-old.md".to_string(),
},
RemovedExtensionResource {
extension: "chronicle".to_string(),
resource_path: "resources/2026-04-07T12-00-00-abcd-10min-cutoff.md".to_string(),
},
]
);
assert!(
tokio::fs::try_exists(&old_file)
.await
.expect("check old file before remove")
);
assert!(
tokio::fs::try_exists(&exact_cutoff_file)
.await
.expect("check cutoff file before remove")
);

remove_extension_resources(&pending).await;

assert!(
!tokio::fs::try_exists(&old_file)
.await
.expect("check old file")
);
assert!(
!tokio::fs::try_exists(&exact_cutoff_file)
.await
.expect("check cutoff file")
);
assert!(
tokio::fs::try_exists(&recent_file)
.await
.expect("check recent file")
);
assert!(
tokio::fs::try_exists(&invalid_file)
.await
.expect("check invalid file")
);
assert!(
tokio::fs::try_exists(&ignored_old_file)
.await
.expect("check ignored old file")
);
}
}
#[path = "extensions_tests.rs"]
mod tests;
Loading
Loading