Skip to content

Commit

Permalink
Merge pull request #650 from tursodatabase/fix-snapshot-compaction
Browse files Browse the repository at this point in the history
fix snapshot compaction
  • Loading branch information
MarinPostma committed Nov 20, 2023
2 parents f602f1c + 433e097 commit 3640528
Show file tree
Hide file tree
Showing 2 changed files with 411 additions and 84 deletions.
44 changes: 21 additions & 23 deletions libsql-server/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ unsafe impl WalHook for ReplicationLoggerHook {
replicator.submit_frames(frame_count as u32);
}

if let Err(e) = ctx.logger.log_file.write().maybe_compact(
ctx.logger.compactor.clone(),
ntruncate,
&ctx.logger.db_path,
) {
if let Err(e) = ctx
.logger
.log_file
.write()
.maybe_compact(ctx.logger.compactor.clone(), &ctx.logger.db_path)
{
tracing::error!("fatal error: {e}, exiting");
std::process::abort()
}
Expand Down Expand Up @@ -604,26 +605,16 @@ impl LogFile {
compact
}

fn maybe_compact(
&mut self,
compactor: LogCompactor,
size_after: u32,
path: &Path,
) -> anyhow::Result<()> {
fn maybe_compact(&mut self, compactor: LogCompactor, path: &Path) -> anyhow::Result<()> {
if self.should_compact() {
self.do_compaction(compactor, size_after, path)
self.do_compaction(compactor, path)
} else {
Ok(())
}
}

/// perform the log compaction.
fn do_compaction(
&mut self,
compactor: LogCompactor,
size_after: u32,
path: &Path,
) -> anyhow::Result<()> {
fn do_compaction(&mut self, compactor: LogCompactor, path: &Path) -> anyhow::Result<()> {
assert_eq!(self.uncommitted_frame_count, 0);

// nothing to compact
Expand All @@ -632,12 +623,18 @@ impl LogFile {
}

tracing::info!("performing log compaction");
let temp_log_path = path.join("temp_log");
// To perform the compaction, we create a new, empty file in the `to_compact` directory.
// We will then atomically swap that file with the current log file.
// In case of a crash, when filling the compactor job queue, if we find that we find a log
// file that doesn't contains only a header, we can safely assume that it was from a
// previous crash that happenned in the middle of this operation.
let to_compact_id = Uuid::new_v4();
let to_compact_log_path = path.join("to_compact").join(to_compact_id.to_string());
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&temp_log_path)?;
.open(&to_compact_log_path)?;
let mut new_log_file = LogFile::new(file, self.max_log_frame_count, self.max_log_duration)?;
let new_header = LogFileHeader {
start_frame_no: self.header.last_frame_no().unwrap() + 1,
Expand All @@ -648,9 +645,10 @@ impl LogFile {
new_log_file.header = new_header;
new_log_file.write_header().unwrap();
// swap old and new snapshot
atomic_rename(&temp_log_path, path.join("wallog")).unwrap();
// FIXME(marin): the dest path never changes, store it somewhere.
atomic_rename(&to_compact_log_path, path.join("wallog")).unwrap();
let old_log_file = std::mem::replace(self, new_log_file);
compactor.compact(old_log_file, temp_log_path, size_after)?;
compactor.compact(old_log_file, to_compact_log_path)?;

Ok(())
}
Expand Down Expand Up @@ -981,7 +979,7 @@ impl ReplicationLogger {
let size_after = last_frame.header().size_after;
assert!(size_after != 0);

log_file.do_compaction(self.compactor.clone(), size_after, &self.db_path)?;
log_file.do_compaction(self.compactor.clone(), &self.db_path)?;
Ok(true)
}
}
Expand Down
Loading

0 comments on commit 3640528

Please sign in to comment.