Skip to content

Commit

Permalink
Always retain the highest incremental snapshot for all full snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed May 2, 2022
1 parent 475e7d1 commit 74b586a
Showing 1 changed file with 88 additions and 80 deletions.
168 changes: 88 additions & 80 deletions runtime/src/snapshot_utils.rs
Expand Up @@ -27,8 +27,8 @@ use {
solana_measure::measure::Measure,
solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey},
std::{
cmp::{max, Ordering},
collections::HashSet,
cmp::Ordering,
collections::{HashMap, HashSet},
fmt,
fs::{self, File},
io::{BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, Write},
Expand Down Expand Up @@ -1346,86 +1346,78 @@ pub fn purge_old_snapshot_archives<P>(
maximum_full_snapshot_archives_to_retain,
maximum_incremental_snapshot_archives_to_retain
);
let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
snapshot_archives.sort_unstable();
snapshot_archives.reverse();
let max_snaps = max(1, maximum_full_snapshot_archives_to_retain); // Always keep at least one snapshot

let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
full_snapshot_archives.sort_unstable();
full_snapshot_archives.reverse();

let num_to_retain = full_snapshot_archives.len().min(
maximum_full_snapshot_archives_to_retain
.max(1 /* Always keep at least one full snapshot */),
);
trace!(
"There are {} full snapshot archives, purging {} of them",
snapshot_archives.len(),
snapshot_archives.len().saturating_sub(max_snaps)
"There are {} full snapshot archives, retaining {}",
full_snapshot_archives.len(),
num_to_retain,
);

for old_archive in snapshot_archives.into_iter().skip(max_snaps) {
let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
if full_snapshot_archives.is_empty() {
None
} else {
Some(full_snapshot_archives.split_at(num_to_retain))
}
.unwrap_or_default();

let retained_full_snapshot_slots = full_snapshot_archives_to_retain
.iter()
.map(|ai| ai.slot())
.collect::<HashSet<_>>();

fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
for path in archives.iter().map(|a| a.path()) {
trace!("Removing snapshot archive: {}", path.display());
fs::remove_file(path)
.unwrap_or_else(|err| info!("Failed to remove {}: {}", path.display(), err));
}
}
remove_archives(full_snapshot_archives_to_remove);

let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
for incremental_snapshot_archive in get_incremental_snapshot_archives(&snapshot_archives_dir) {
incremental_snapshot_archives_by_base_slot
.entry(incremental_snapshot_archive.base_slot())
.or_default()
.push(incremental_snapshot_archive)
}

let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
{
incremental_snapshot_archives.sort_unstable();
let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
maximum_incremental_snapshot_archives_to_retain
} else if retained_full_snapshot_slots.contains(&base_slot) {
1
} else {
0
};
trace!(
"Purging old full snapshot archive: {}",
old_archive.path().display()
"There are {} incremental snapshot archives for base slot {}, removing {} of them",
incremental_snapshot_archives.len(),
base_slot,
incremental_snapshot_archives
.len()
.saturating_sub(num_to_retain),
);
fs::remove_file(old_archive.path())
.unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err));
}

// Purge incremental snapshots with a different base slot than the highest full snapshot slot.
// Of the incremental snapshots with the same base slot, purge the oldest ones and retain the
// latest.
//
// First split the incremental snapshot archives into two vectors:
// - One vector will be all the incremental snapshot archives with a _different_ base slot than
// the highest full snapshot slot.
// - The other vector will be all the incremental snapshot archives with the _same_ base slot
// as the highest full snapshot slot.
//
// To find the incremental snapshot archives to retain, first sort the second vector (the
// _same_ base slot), then reverse (so highest slots are first) and skip the first
// `maximum_incremental_snapshot_archives_to_retain`.
//
// Purge all the rest.
let highest_full_snapshot_slot = get_highest_full_snapshot_archive_slot(&snapshot_archives_dir);
let mut incremental_snapshot_archives_with_same_base_slot = vec![];
let mut incremental_snapshot_archives_with_different_base_slot = vec![];
get_incremental_snapshot_archives(&snapshot_archives_dir)
.drain(..)
.for_each(|incremental_snapshot_archive| {
if Some(incremental_snapshot_archive.base_slot()) == highest_full_snapshot_slot {
incremental_snapshot_archives_with_same_base_slot
.push(incremental_snapshot_archive);
} else {
incremental_snapshot_archives_with_different_base_slot
.push(incremental_snapshot_archive);
}
});

if !incremental_snapshot_archives_with_different_base_slot.is_empty() {
trace!(
"Purging {} incremental snapshot archives with a different base slot than the highest full snapshot slot",
incremental_snapshot_archives_with_different_base_slot.len()
incremental_snapshot_archives.truncate(
incremental_snapshot_archives
.len()
.saturating_sub(num_to_retain),
);
remove_archives(&incremental_snapshot_archives);
}
trace!(
"There are {} incremental snapshots with same base slot as the highest full snapshot slot, purging {} of them",
incremental_snapshot_archives_with_same_base_slot.len(),
incremental_snapshot_archives_with_same_base_slot.len()
.saturating_sub(maximum_incremental_snapshot_archives_to_retain)
);

incremental_snapshot_archives_with_same_base_slot.sort_unstable();
incremental_snapshot_archives_with_different_base_slot
.iter()
.chain(
incremental_snapshot_archives_with_same_base_slot
.iter()
.rev()
.skip(maximum_incremental_snapshot_archives_to_retain),
)
.for_each(|incremental_snapshot_archive| {
trace!(
"Purging old incremental snapshot archive: {}",
incremental_snapshot_archive.path().display()
);
fs::remove_file(incremental_snapshot_archive.path()).unwrap_or_else(|err| {
info!("Failed to remove old incremental snapshot archive: {}", err)
})
});
}

fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
Expand Down Expand Up @@ -2648,7 +2640,7 @@ mod tests {
snap_name
);
}
assert!(retained_snaps.len() == expected_snapshots.len());
assert_eq!(retained_snaps.len(), expected_snapshots.len());
}

#[test]
Expand Down Expand Up @@ -2728,7 +2720,7 @@ mod tests {
);
let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
full_snapshot_archives.sort_unstable();
assert_eq!(full_snapshot_archives.len(), maximum_snapshots_to_retain,);
assert_eq!(full_snapshot_archives.len(), maximum_snapshots_to_retain);
assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
Expand All @@ -2738,6 +2730,7 @@ mod tests {

#[test]
fn test_purge_old_incremental_snapshot_archives() {
solana_logger::setup();
let snapshot_archives_dir = tempfile::TempDir::new().unwrap();
let starting_slot = 100_000;

Expand Down Expand Up @@ -2794,19 +2787,34 @@ mod tests {
maximum_full_snapshot_archives_to_retain,
);
remaining_full_snapshot_archives.sort_unstable();
let latest_full_snapshot_archive_slot =
remaining_full_snapshot_archives.last().unwrap().slot();

// Ensure correct number of incremental snapshot archives are purged/retained
let mut remaining_incremental_snapshot_archives =
get_incremental_snapshot_archives(snapshot_archives_dir.path());
assert_eq!(
remaining_incremental_snapshot_archives.len(),
maximum_incremental_snapshot_archives_to_retain
+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)
);
remaining_incremental_snapshot_archives.sort_unstable();
remaining_incremental_snapshot_archives.reverse();

// Ensure there exists one incremental snapshot all but the latest full snapshot
for i in (1..maximum_full_snapshot_archives_to_retain).rev() {
let incremental_snapshot_archive =
remaining_incremental_snapshot_archives.pop().unwrap();

let expected_base_slot =
latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
let expected_slot = expected_base_slot
+ (full_snapshot_interval - incremental_snapshot_interval) as u64;
assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
}

// Ensure all remaining incremental snapshots are only for the latest full snapshot
let latest_full_snapshot_archive_slot =
remaining_full_snapshot_archives.last().unwrap().slot();
for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
assert_eq!(
incremental_snapshot_archive.base_slot(),
Expand All @@ -2823,13 +2831,13 @@ mod tests {
num_incremental_snapshots_per_full_snapshot
- maximum_incremental_snapshot_archives_to_retain,
)
.collect::<Vec<_>>();
.collect::<HashSet<_>>();

let actual_remaining_incremental_snapshot_archive_slots =
remaining_incremental_snapshot_archives
.iter()
.map(|snapshot| snapshot.slot())
.collect::<Vec<_>>();
.collect::<HashSet<_>>();
assert_eq!(
actual_remaining_incremental_snapshot_archive_slots,
expected_remaing_incremental_snapshot_archive_slots
Expand Down

0 comments on commit 74b586a

Please sign in to comment.