diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 23b05b9defa..19b2b8aa9a6 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -4,6 +4,9 @@ use std::collections::HashSet; use std::fs; use std::fs::File; use std::io::Error as IoError; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -694,17 +697,14 @@ impl RolloutRecorder { (None, Some(log_file_info), path, Some(session_meta)) } - RolloutRecorderParams::Resume { path } => ( - Some( - tokio::fs::OpenOptions::new() - .append(true) - .open(&path) - .await?, - ), - None, - path, - None, - ), + RolloutRecorderParams::Resume { path } => { + let mut file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&path)?; + repair_final_line(&mut file, &path)?; + (Some(tokio::fs::File::from_std(file)), None, path, None) + } }; // Clone the cwd for the spawned task to collect git info asynchronously @@ -1362,10 +1362,49 @@ fn open_log_file(path: &Path) -> std::io::Result { ))); }; fs::create_dir_all(parent)?; - std::fs::OpenOptions::new() - .append(true) + let mut file = std::fs::OpenOptions::new() + .read(true) + .write(true) .create(true) - .open(path) + .truncate(false) + .open(path)?; + repair_final_line(&mut file, path)?; + Ok(file) +} + +fn repair_final_line(file: &mut File, path: &Path) -> std::io::Result<()> { + let bytes = fs::read(path)?; + let len = bytes.len(); + if len == 0 { + return Ok(()); + } + + if bytes.last() == Some(&b'\n') { + file.seek(SeekFrom::End(0))?; + return Ok(()); + } + + let truncate_to = bytes + .iter() + .rposition(|byte| *byte == b'\n') + .map_or(0, |index| index + 1); + if serde_json::from_slice::(&bytes[truncate_to..]).is_ok() { + file.seek(SeekFrom::End(0))?; + file.write_all(b"\n")?; + warn!( + "added missing final newline to rollout file: path={}, original_len={len}", + path.display() + ); + return Ok(()); + } + + file.set_len(truncate_to as u64)?; + file.seek(SeekFrom::End(0))?; + warn!( + "truncated incomplete final rollout line: path={}, original_len={len}, truncate_to={truncate_to}", + path.display() + ); + Ok(()) } /// Mutable state owned by the background rollout writer. @@ -1468,8 +1507,11 @@ impl RolloutWriterState { let message = err.to_string(); if self.last_logged_error.as_ref() != Some(&message) { error!( - "rollout writer failed for {}; buffered rollout items will be retried: {err}", - self.rollout_path.display() + "rollout writer failed for {}; buffered rollout items will be retried: {err}; \ + error_kind={:?}; raw_os_error={:?}", + self.rollout_path.display(), + err.kind(), + err.raw_os_error() ); } self.last_logged_error = Some(message); @@ -1648,8 +1690,26 @@ impl JsonlWriter { async fn write_line(&mut self, item: &impl serde::Serialize) -> std::io::Result<()> { let mut json = serde_json::to_string(item)?; json.push('\n'); - self.file.write_all(json.as_bytes()).await?; - self.file.flush().await?; + let start_len = self.file.metadata().await?.len(); + if let Err(err) = async { + self.file.write_all(json.as_bytes()).await?; + self.file.flush().await + } + .await + { + if let Err(rollback_err) = self.file.set_len(start_len).await { + error!( + "failed to roll back failed rollout line write: start_len={start_len}, \ + bytes_len={}, write_error={err}, write_raw_os_error={:?}, \ + rollback_error={rollback_err}, rollback_raw_os_error={:?}", + json.len(), + err.raw_os_error(), + rollback_err.raw_os_error() + ); + return Err(rollback_err); + } + return Err(err); + } Ok(()) } } diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 90fe15c3120..43db7a7dd8a 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -494,8 +494,8 @@ async fn persist_reports_filesystem_error_and_retries_buffered_items() -> std::i async fn writer_state_retries_write_error_before_reporting_flush_success() -> std::io::Result<()> { let home = TempDir::new().expect("temp dir"); let rollout_path = home.path().join("rollout.jsonl"); - File::create(&rollout_path)?; - let read_only_file = std::fs::OpenOptions::new().read(true).open(&rollout_path)?; + std::fs::write(&rollout_path, br#"{"timestamp":"partial"#)?; + let read_only_file = File::open(&rollout_path)?; let mut state = RolloutWriterState::new( Some(tokio::fs::File::from_std(read_only_file)), /*deferred_log_file_info*/ None, @@ -513,10 +513,29 @@ async fn writer_state_retries_write_error_before_reporting_flush_success() -> st state.flush().await?; let text_after_retry = std::fs::read_to_string(&rollout_path)?; + assert!( + !text_after_retry.contains("partial"), + "retry should remove an incomplete final line before appending" + ); assert!( text_after_retry.contains("queued-after-writer-error"), "flush should retry after reopening and write buffered items" ); + for line in text_after_retry.lines() { + serde_json::from_str::(line).expect("line should parse after retry"); + } + Ok(()) +} + +#[test] +fn repair_final_line_preserves_valid_line_without_newline() -> std::io::Result<()> { + let home = TempDir::new().expect("temp dir"); + let rollout_path = home.path().join("rollout.jsonl"); + let line = r#"{"timestamp":"2026-05-25T00:00:00.000Z","type":"event_msg","payload":{"type":"user_message","message":"valid-without-newline","kind":"plain"}}"#; + std::fs::write(&rollout_path, line)?; + let mut file = File::options().read(true).write(true).open(&rollout_path)?; + repair_final_line(&mut file, &rollout_path)?; + assert_eq!(std::fs::read_to_string(&rollout_path)?, format!("{line}\n")); Ok(()) }