Skip to content

Commit

Permalink
feat(merkle tree): Allow random-order tree recovery (#485)
Browse files Browse the repository at this point in the history
## What ❔

Allow recovery the Merkle tree from entries provided in an arbitrary
order.

## Why ❔

This is necessary to implement the snapshot recovery PoC and could be
beneficial in the long run.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli committed Nov 17, 2023
1 parent e8bbf76 commit 146e4cf
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 37 deletions.
29 changes: 22 additions & 7 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct Cli {
/// Number of entries per update.
#[arg(name = "ops")]
writes_per_update: usize,
/// Perform random recovery instead of linear recovery.
#[arg(name = "random", long)]
random: bool,
/// Use a no-op hashing function.
#[arg(name = "no-hash", long)]
no_hashing: bool,
Expand Down Expand Up @@ -89,17 +92,29 @@ impl Cli {
let started_at = Instant::now();
let recovery_entries = (0..self.writes_per_update)
.map(|_| {
last_key += key_step - Key::from(rng.gen::<u64>());
// ^ Increases the key by a random increment close to `key` step with some randomness.
last_leaf_index += 1;
RecoveryEntry {
key: last_key,
value: ValueHash::zero(),
leaf_index: last_leaf_index,
if self.random {
RecoveryEntry {
key: Key::from(rng.gen::<[u8; 32]>()),
value: ValueHash::zero(),
leaf_index: last_leaf_index,
}
} else {
last_key += key_step - Key::from(rng.gen::<u64>());
// ^ Increases the key by a random increment close to `key` step with some randomness.
RecoveryEntry {
key: last_key,
value: ValueHash::zero(),
leaf_index: last_leaf_index,
}
}
})
.collect();
recovery.extend(recovery_entries);
if self.random {
recovery.extend_random(recovery_entries);
} else {
recovery.extend_linear(recovery_entries);
}
tracing::info!(
"Updated tree with recovery chunk #{updated_idx} in {:?}",
started_at.elapsed()
Expand Down
31 changes: 27 additions & 4 deletions core/lib/merkle_tree/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
storage.greatest_key()
}

/// Extends a tree with a chunk of entries.
/// Extends a tree with a chunk of linearly ordered entries.
///
/// Entries must be ordered by increasing `key`, and the key of the first entry must be greater
/// than [`Self::last_processed_key()`].
Expand All @@ -154,12 +154,35 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
%entries.key_range = entries_key_range(&entries),
),
)]
pub fn extend(&mut self, entries: Vec<RecoveryEntry>) {
pub fn extend_linear(&mut self, entries: Vec<RecoveryEntry>) {
tracing::debug!("Started extending tree");

let started_at = Instant::now();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_recovery(entries);
let patch = storage.extend_during_linear_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());

let started_at = Instant::now();
self.db.apply_patch(patch);
tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed());
}

/// Extends a tree with a chunk of entries. Unlike [`Self::extend_linear()`], entries may be
/// ordered in any way you like.
#[tracing::instrument(
level = "debug",
skip_all,
fields(
recovered_version = self.recovered_version,
entries.len = entries.len(),
),
)]
pub fn extend_random(&mut self, entries: Vec<RecoveryEntry>) {
tracing::debug!("Started extending tree");

let started_at = Instant::now();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_random_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());

let started_at = Instant::now();
Expand Down Expand Up @@ -262,7 +285,7 @@ mod tests {
value: ValueHash::repeat_byte(1),
leaf_index: 1,
};
recovery.extend(vec![recovery_entry]);
recovery.extend_linear(vec![recovery_entry]);
let tree = recovery.finalize();

assert_eq!(tree.latest_version(), Some(42));
Expand Down
28 changes: 27 additions & 1 deletion core/lib/merkle_tree/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
Some(self.updater.load_greatest_key(self.db)?.0.full_key)
}

pub fn extend_during_recovery(mut self, recovery_entries: Vec<RecoveryEntry>) -> PatchSet {
pub fn extend_during_linear_recovery(
mut self,
recovery_entries: Vec<RecoveryEntry>,
) -> PatchSet {
let (mut prev_key, mut prev_nibbles) = match self.updater.load_greatest_key(self.db) {
Some((leaf, nibbles)) => (Some(leaf.full_key), nibbles),
None => (None, Nibbles::EMPTY),
Expand Down Expand Up @@ -353,6 +356,29 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
patch
}

pub fn extend_during_random_recovery(
mut self,
recovery_entries: Vec<RecoveryEntry>,
) -> PatchSet {
let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start();
let sorted_keys = SortedKeys::new(recovery_entries.iter().map(|entry| entry.key));
let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db);
let load_nodes_latency = load_nodes_latency.observe();
tracing::debug!("Load stage took {load_nodes_latency:?}");

let extend_patch_latency = BLOCK_TIMINGS.extend_patch.start();
for (entry, parent_nibbles) in recovery_entries.into_iter().zip(parent_nibbles) {
self.updater
.insert(entry.key, entry.value, &parent_nibbles, || entry.leaf_index);
self.leaf_count += 1;
}
let extend_patch_latency = extend_patch_latency.observe();
tracing::debug!("Tree traversal stage took {extend_patch_latency:?}");

let (_, patch) = self.finalize();
patch
}

fn finalize(self) -> (ValueHash, PatchSet) {
tracing::debug!(
"Finished updating tree; total leaf count: {}, stats: {:?}",
Expand Down
66 changes: 52 additions & 14 deletions core/lib/merkle_tree/src/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ fn recovery_flattens_node_versions() {
leaf_index: i + 1,
});
let patch = Storage::new(&PatchSet::default(), &(), recovery_version, false)
.extend_during_recovery(recovery_entries.collect());
.extend_during_linear_recovery(recovery_entries.collect());
assert_eq!(patch.patches_by_version.len(), 1);
let (updated_version, patch) = patch.patches_by_version.into_iter().next().unwrap();
assert_eq!(updated_version, recovery_version);
Expand Down Expand Up @@ -546,7 +546,7 @@ fn test_recovery_with_node_hierarchy(chunk_size: usize) {
let mut db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(chunk_size) {
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
.extend_during_linear_recovery(recovery_chunk.to_vec());
db.apply_patch(patch);
}
assert_eq!(db.updated_version, Some(recovery_version));
Expand Down Expand Up @@ -605,7 +605,7 @@ fn test_recovery_with_deep_node_hierarchy(chunk_size: usize) {
let mut db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(chunk_size) {
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
.extend_during_linear_recovery(recovery_chunk.to_vec());
db.apply_patch(patch);
}
let mut patch = db.patches_by_version.remove(&recovery_version).unwrap();
Expand Down Expand Up @@ -673,7 +673,7 @@ fn recovery_workflow_with_multiple_stages() {
leaf_index: i,
});
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_entries.collect());
.extend_during_linear_recovery(recovery_entries.collect());
assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 100);
db.apply_patch(patch);

Expand All @@ -684,7 +684,7 @@ fn recovery_workflow_with_multiple_stages() {
});

let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(more_recovery_entries.collect());
.extend_during_linear_recovery(more_recovery_entries.collect());
assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 200);
db.apply_patch(patch);

Expand All @@ -701,6 +701,7 @@ fn recovery_workflow_with_multiple_stages() {
}

fn test_recovery_pruning_equivalence(
is_linear: bool,
chunk_size: usize,
recovery_chunk_size: usize,
hasher: &dyn HashTree,
Expand Down Expand Up @@ -752,13 +753,21 @@ fn test_recovery_pruning_equivalence(
});
let mut recovery_entries: Vec<_> = recovery_entries.collect();
assert_eq!(recovery_entries.len(), 100);
recovery_entries.sort_unstable_by_key(|entry| entry.key);
if is_linear {
recovery_entries.sort_unstable_by_key(|entry| entry.key);
} else {
recovery_entries.shuffle(&mut rng);
}

// Recover the tree.
let mut recovered_db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(recovery_chunk_size) {
let patch = Storage::new(&recovered_db, hasher, recovered_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
let storage = Storage::new(&recovered_db, hasher, recovered_version, false);
let patch = if is_linear {
storage.extend_during_linear_recovery(recovery_chunk.to_vec())
} else {
storage.extend_during_random_recovery(recovery_chunk.to_vec())
};
recovered_db.apply_patch(patch);
}
let sub_patch = recovered_db
Expand Down Expand Up @@ -798,25 +807,54 @@ fn test_recovery_pruning_equivalence(
}

#[test]
fn recovery_pruning_equivalence() {
fn linear_recovery_pruning_equivalence() {
for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(true, chunk_size, 100, &());
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] {
test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &());
}
}
}

#[test]
fn random_recovery_pruning_equivalence() {
for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(chunk_size, 100, &());
test_recovery_pruning_equivalence(false, chunk_size, 100, &());
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] {
test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &());
test_recovery_pruning_equivalence(false, chunk_size, recovery_chunk_size, &());
}
}
}

#[test]
fn recovery_pruning_equivalence_with_hashing() {
fn linear_recovery_pruning_equivalence_with_hashing() {
for chunk_size in [3, 7, 21, 42, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(chunk_size, 100, &Blake2Hasher);
test_recovery_pruning_equivalence(true, chunk_size, 100, &Blake2Hasher);
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 19, 73] {
test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &Blake2Hasher);
test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &Blake2Hasher);
}
}
}

#[test]
fn random_recovery_pruning_equivalence_with_hashing() {
for chunk_size in [3, 7, 21, 42, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(false, chunk_size, 100, &Blake2Hasher);
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 19, 73] {
test_recovery_pruning_equivalence(
false,
chunk_size,
recovery_chunk_size,
&Blake2Hasher,
);
}
}
}
43 changes: 34 additions & 9 deletions core/lib/merkle_tree/tests/integration/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn recovery_basics() {

let recovered_version = 123;
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), recovered_version);
recovery.extend(recovery_entries);
recovery.extend_linear(recovery_entries);

assert_eq!(recovery.last_processed_key(), Some(greatest_key));
assert_eq!(recovery.root_hash(), *expected_hash);
Expand All @@ -36,7 +36,7 @@ fn recovery_basics() {
tree.verify_consistency(recovered_version).unwrap();
}

fn test_recovery_in_chunks<DB: PruneDatabase>(mut create_db: impl FnMut() -> DB) {
fn test_recovery_in_chunks<DB: PruneDatabase>(is_linear: bool, mut create_db: impl FnMut() -> DB) {
let (kvs, expected_hash) = &*KVS_AND_HASH;
let recovery_entries = kvs
.iter()
Expand All @@ -47,15 +47,25 @@ fn test_recovery_in_chunks<DB: PruneDatabase>(mut create_db: impl FnMut() -> DB)
leaf_index: i as u64 + 1,
});
let mut recovery_entries: Vec<_> = recovery_entries.collect();
recovery_entries.sort_unstable_by_key(|entry| entry.key);
let greatest_key = recovery_entries[99].key;
if is_linear {
recovery_entries.sort_unstable_by_key(|entry| entry.key);
}
let greatest_key = recovery_entries
.iter()
.map(|entry| entry.key)
.max()
.unwrap();

let recovered_version = 123;
for chunk_size in [6, 10, 17, 42] {
let mut db = create_db();
let mut recovery = MerkleTreeRecovery::new(&mut db, recovered_version);
for (i, chunk) in recovery_entries.chunks(chunk_size).enumerate() {
recovery.extend(chunk.to_vec());
if is_linear {
recovery.extend_linear(chunk.to_vec());
} else {
recovery.extend_random(chunk.to_vec());
}
if i % 3 == 1 {
recovery = MerkleTreeRecovery::new(&mut db, recovered_version);
// ^ Simulate recovery interruption and restart
Expand Down Expand Up @@ -119,8 +129,13 @@ fn test_tree_after_recovery<DB: Database>(
}

#[test]
fn recovery_in_chunks() {
test_recovery_in_chunks(PatchSet::default);
fn linear_recovery_in_chunks() {
test_recovery_in_chunks(true, PatchSet::default);
}

#[test]
fn random_recovery_in_chunks() {
test_recovery_in_chunks(false, PatchSet::default);
}

mod rocksdb {
Expand All @@ -130,10 +145,20 @@ mod rocksdb {
use zksync_merkle_tree::RocksDBWrapper;

#[test]
fn recovery_in_chunks() {
fn linear_recovery_in_chunks() {
let temp_dir = TempDir::new().unwrap();
let mut counter = 0;
test_recovery_in_chunks(true, || {
counter += 1;
RocksDBWrapper::new(&temp_dir.path().join(counter.to_string()))
});
}

#[test]
fn random_recovery_in_chunks() {
let temp_dir = TempDir::new().unwrap();
let mut counter = 0;
test_recovery_in_chunks(|| {
test_recovery_in_chunks(false, || {
counter += 1;
RocksDBWrapper::new(&temp_dir.path().join(counter.to_string()))
});
Expand Down
2 changes: 0 additions & 2 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 146e4cf

Please sign in to comment.