Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 78 additions & 18 deletions codex-rs/rollout/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use std::collections::HashSet;
use std::fs;
use std::fs::File;
use std::io::Error as IoError;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -694,17 +697,14 @@ impl RolloutRecorder {

(None, Some(log_file_info), path, Some(session_meta))
}
RolloutRecorderParams::Resume { path } => (
Some(
tokio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?,
),
None,
path,
None,
),
RolloutRecorderParams::Resume { path } => {
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&path)?;
Comment on lines +701 to +704
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve append mode for resumed rollout writers

The same stale-offset corruption affects resumed live threads: after RolloutRecorder::new(...Resume...) opens this non-append handle and later update_thread_metadata appends a metadata line via a separate append-only handle, subsequent resumed-thread writes use this handle's old EOF position and overwrite that appended record. The resumed writer should retain append/O_APPEND behavior in addition to the read/write permissions needed for final-line repair.

Useful? React with 👍 / 👎.

repair_final_line(&mut file, &path)?;
(Some(tokio::fs::File::from_std(file)), None, path, None)
}
};

// Clone the cwd for the spawned task to collect git info asynchronously
Expand Down Expand Up @@ -1362,10 +1362,49 @@ fn open_log_file(path: &Path) -> std::io::Result<File> {
)));
};
fs::create_dir_all(parent)?;
std::fs::OpenOptions::new()
.append(true)
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)
.truncate(false)
Comment on lines +1365 to +1369
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve append mode for live rollout writers

When a live thread has been persisted and then metadata changes, update_thread_metadata appends a SessionMeta line through append_rollout_item_to_path while this live writer handle stays open. Because this handle is now plain read/write instead of append(true), its cursor remains at the old EOF; the next live rollout item writes from that stale offset and overwrites the metadata append, corrupting the JSONL history. Keep append semantics on the recorder handle while still allowing the read/truncate operations used for repair.

Useful? React with 👍 / 👎.

.open(path)?;
repair_final_line(&mut file, path)?;
Ok(file)
}

fn repair_final_line(file: &mut File, path: &Path) -> std::io::Result<()> {
let bytes = fs::read(path)?;
let len = bytes.len();
if len == 0 {
return Ok(());
}

if bytes.last() == Some(&b'\n') {
file.seek(SeekFrom::End(0))?;
return Ok(());
}

let truncate_to = bytes
.iter()
.rposition(|byte| *byte == b'\n')
.map_or(0, |index| index + 1);
if serde_json::from_slice::<RolloutLine>(&bytes[truncate_to..]).is_ok() {
file.seek(SeekFrom::End(0))?;
file.write_all(b"\n")?;
warn!(
"added missing final newline to rollout file: path={}, original_len={len}",
path.display()
);
return Ok(());
}

file.set_len(truncate_to as u64)?;
file.seek(SeekFrom::End(0))?;
warn!(
"truncated incomplete final rollout line: path={}, original_len={len}, truncate_to={truncate_to}",
path.display()
);
Ok(())
}

/// Mutable state owned by the background rollout writer.
Expand Down Expand Up @@ -1468,8 +1507,11 @@ impl RolloutWriterState {
let message = err.to_string();
if self.last_logged_error.as_ref() != Some(&message) {
error!(
"rollout writer failed for {}; buffered rollout items will be retried: {err}",
self.rollout_path.display()
"rollout writer failed for {}; buffered rollout items will be retried: {err}; \
error_kind={:?}; raw_os_error={:?}",
self.rollout_path.display(),
err.kind(),
err.raw_os_error()
);
}
self.last_logged_error = Some(message);
Expand Down Expand Up @@ -1648,8 +1690,26 @@ impl JsonlWriter {
async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> {
let mut json = serde_json::to_string(item)?;
json.push('\n');
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await?;
let start_len = self.file.metadata().await?.len();
if let Err(err) = async {
self.file.write_all(json.as_bytes()).await?;
self.file.flush().await
}
.await
{
if let Err(rollback_err) = self.file.set_len(start_len).await {
Comment on lines +1693 to +1700
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid truncating records from other appenders

If another writer appends to the same rollout after this start_len is captured and this writer then hits a write/flush error, set_len(start_len) rolls the entire file back to the stale length and deletes the other writer's successfully appended records. This can happen with the separate metadata append path (append_rollout_item_to_path) or another resumed process, so the rollback needs to be guarded to only remove bytes written by this attempt, or skipped when the file has grown beyond this write's range.

Useful? React with 👍 / 👎.

error!(
"failed to roll back failed rollout line write: start_len={start_len}, \
bytes_len={}, write_error={err}, write_raw_os_error={:?}, \
rollback_error={rollback_err}, rollback_raw_os_error={:?}",
json.len(),
err.raw_os_error(),
rollback_err.raw_os_error()
);
return Err(rollback_err);
}
return Err(err);
}
Ok(())
}
}
Expand Down
23 changes: 21 additions & 2 deletions codex-rs/rollout/src/recorder_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,8 @@ async fn persist_reports_filesystem_error_and_retries_buffered_items() -> std::i
async fn writer_state_retries_write_error_before_reporting_flush_success() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let rollout_path = home.path().join("rollout.jsonl");
File::create(&rollout_path)?;
let read_only_file = std::fs::OpenOptions::new().read(true).open(&rollout_path)?;
std::fs::write(&rollout_path, br#"{"timestamp":"partial"#)?;
let read_only_file = File::open(&rollout_path)?;
let mut state = RolloutWriterState::new(
Some(tokio::fs::File::from_std(read_only_file)),
/*deferred_log_file_info*/ None,
Expand All @@ -513,10 +513,29 @@ async fn writer_state_retries_write_error_before_reporting_flush_success() -> st

state.flush().await?;
let text_after_retry = std::fs::read_to_string(&rollout_path)?;
assert!(
!text_after_retry.contains("partial"),
"retry should remove an incomplete final line before appending"
);
assert!(
text_after_retry.contains("queued-after-writer-error"),
"flush should retry after reopening and write buffered items"
);
for line in text_after_retry.lines() {
serde_json::from_str::<RolloutLine>(line).expect("line should parse after retry");
}
Ok(())
}

#[test]
fn repair_final_line_preserves_valid_line_without_newline() -> std::io::Result<()> {
let home = TempDir::new().expect("temp dir");
let rollout_path = home.path().join("rollout.jsonl");
let line = r#"{"timestamp":"2026-05-25T00:00:00.000Z","type":"event_msg","payload":{"type":"user_message","message":"valid-without-newline","kind":"plain"}}"#;
std::fs::write(&rollout_path, line)?;
let mut file = File::options().read(true).write(true).open(&rollout_path)?;
repair_final_line(&mut file, &rollout_path)?;
assert_eq!(std::fs::read_to_string(&rollout_path)?, format!("{line}\n"));
Ok(())
}

Expand Down
Loading