Skip to content

Commit

Permalink
WIP: abort on duplicate layers & fail early
Browse files Browse the repository at this point in the history
fixes #7790
  • Loading branch information
problame committed May 17, 2024
1 parent 1a2a3cb commit 7e91015
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 214 deletions.
39 changes: 35 additions & 4 deletions pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,41 @@ impl Layer {

let downloaded = resident.expect("just initialized");

// if the rename works, the path is as expected
// TODO: sync system call
std::fs::rename(temp_path, owner.local_path())
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;
// We never want to overwrite an existing file, so we use `RENAME_NOREPLACE`.
// TODO: move to `virtual_file` module, next to `VirtualFile::crashsafe_overwrite`
// TODO: do the same check for when L0s are being written.
{
#[cfg(target_os = "linux")]
{
nix::fcntl::renameat2(
None,
temp_path,
None,
owner.local_path(),
nix::fcntl::RenameFlags::RENAME_NOREPLACE,
)
}
#[cfg(target_os = "macos")]
{
// use getattrlist system call to check for VOL_CAP_INT_RENAME_EXCL
// which is the precondition for using RENAME_EXCL, according to the
// macos manpage for renamex_np: https://www.manpagez.com/man/2/renamex_np/osx-10.12.3.php
let supports_rename_excl: bool = { todo!() };

if !supports_rename_excl {
anyhow::bail!("filesystem does not support atomic rename");
}

todo!("use renamex_np with RENAME_EXCL to rename {temp_path} to {owner}", owner = owner.local_path());

std::io::Result::Ok(())
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
std::compile_error!("OS (and filesystem) must support atomic rename with no-replace mode");
}
}
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;

Ok(ResidentLayer { downloaded, owner })
}
Expand Down
36 changes: 11 additions & 25 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::{
array,
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
sync::atomic::AtomicU64,
};
use std::{
Expand Down Expand Up @@ -4499,43 +4499,29 @@ impl Timeline {
) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;

let mut duplicated_layers = HashSet::new();

let mut insert_layers = Vec::with_capacity(new_deltas.len());

for l in new_deltas {
if guard.contains(l.as_ref()) {
// expected in tests
tracing::error!(layer=%l, "duplicated L1 layer");

// good ways to cause a duplicate: we repeatedly error after taking the writelock
// `guard` on self.layers. as of writing this, there are no error returns except
// for compact_level0_phase1 creating an L0, which does not happen in practice
// because we have not implemented L0 => L0 compaction.
duplicated_layers.insert(l.layer_desc().key());
// we should abort
error!("duplicate L1 layer; \
likely we already overwrote the exiting file on disk with a possibly different bit pattern, \
causing incoherency with in-memory copies (PS page cache) {l}");
std::process::abort(); // scary to run `struct Layer` desctructor (?)
} else if LayerMap::is_l0(l.layer_desc()) {
bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
} else {
insert_layers.push(l.clone());
// we should abort so we don't otherwise we might leave some
error!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
std::process::abort(); // if we returned an error, we'd not check the remaining `new_delta` layers for duplicates
}
}

// only remove those inputs which were not outputs
let remove_layers: Vec<Layer> = layers_to_remove
.iter()
.filter(|l| !duplicated_layers.contains(&l.layer_desc().key()))
.cloned()
.collect();

if !new_images.is_empty() {
guard.track_new_image_layers(new_images, &self.metrics);
}

// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
guard.finish_compact_l0(layers_to_remove, &new_deltas, &self.metrics);

if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
remote_client.schedule_compaction_update(layers_to_remove, new_deltas)?;
}

drop_wlock(guard);
Expand Down
42 changes: 0 additions & 42 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,48 +373,6 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}

// This failpoint is used together with `test_duplicate_layers` integration test.
// It returns the compaction result exactly the same layers as input to compaction.
// We want to ensure that this will not cause any problem when updating the layer map
// after the compaction is finished.
//
// Currently, there are two rare edge cases that will cause duplicated layers being
// inserted.
// 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
// is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
// map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
// point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
// and this causes an overwrite. This is acceptable because the content is the same, and we should do a
// layer replace instead of the normal remove / upload process.
// 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
// size length. Compaction will likely create the same set of n files afterwards.
//
// This failpoint is a superset of both of the cases.
if cfg!(feature = "testing") {
let active = (|| {
::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
false
})();

if active {
let mut new_layers = Vec::with_capacity(level0_deltas.len());
for delta in &level0_deltas {
// we are just faking these layers as being produced again for this failpoint
new_layers.push(
delta
.download_and_keep_resident()
.await
.context("download layer for failpoint")?,
);
}
tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
return Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: level0_deltas,
});
}
}

// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
Expand Down
143 changes: 0 additions & 143 deletions test_runner/regress/test_duplicate_layers.py

This file was deleted.

0 comments on commit 7e91015

Please sign in to comment.