Skip to content

Commit

Permalink
perf(en): Parallelize persistence and chunk processing during tree re…
Browse files Browse the repository at this point in the history
…covery (#2050)

## What ❔

Persists chunks during tree recovery in parallel to processing
subsequent chunks.

## Why ❔

- Could speed up tree recovery ~2x on the mainnet (both persistence and
processing of a single chunk of 200k entries take ~3s).
- Significantly easier to implement and reason about than alternatives.
- May be used together with alternatives.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli committed Jun 6, 2024
1 parent 4ab4922 commit b08a667
Show file tree
Hide file tree
Showing 15 changed files with 956 additions and 147 deletions.
7 changes: 7 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,12 @@ pub(crate) struct ExperimentalENConfig {
/// of recovery and then restarted with a different config).
#[serde(default = "ExperimentalENConfig::default_snapshots_recovery_tree_chunk_size")]
pub snapshots_recovery_tree_chunk_size: u64,
/// Buffer capacity for parallel persistence operations. Should be reasonably small since larger buffer means more RAM usage;
/// buffer elements are persisted tree chunks. OTOH, small buffer can lead to persistence parallelization being inefficient.
///
/// If not set, parallel persistence will be disabled.
#[serde(default)] // Temporarily use a conservative option (sequential recovery) as default
pub snapshots_recovery_tree_parallel_persistence_buffer: Option<NonZeroUsize>,

// Commitment generator
/// Maximum degree of parallelism during commitment generation, i.e., the maximum number of L1 batches being processed in parallel.
Expand All @@ -779,6 +785,7 @@ impl ExperimentalENConfig {
state_keeper_db_max_open_files: None,
snapshots_recovery_l1_batch: None,
snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(),
snapshots_recovery_tree_parallel_persistence_buffer: None,
commitment_generator_max_parallelism: None,
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ async fn run_tree(
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
recovery: MetadataCalculatorRecoveryConfig {
desired_chunk_size: config.experimental.snapshots_recovery_tree_chunk_size,
parallel_persistence_buffer: config
.experimental
.snapshots_recovery_tree_parallel_persistence_buffer,
},
};

Expand Down
36 changes: 23 additions & 13 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ struct Cli {
/// Perform testing on in-memory DB rather than RocksDB (i.e., with focus on hashing logic).
#[arg(long = "in-memory", short = 'M')]
in_memory: bool,
/// Parallelize DB persistence with processing.
#[arg(long = "parallelize", conflicts_with = "in_memory")]
parallelize: bool,
/// Block cache capacity for RocksDB in bytes.
#[arg(long = "block-cache", conflicts_with = "in_memory")]
block_cache: Option<usize>,
Expand All @@ -52,11 +55,13 @@ impl Cli {
Self::init_logging();
tracing::info!("Launched with options: {self:?}");

let (mut mock_db, mut rocksdb);
let mut _temp_dir = None;
let db: &mut dyn PruneDatabase = if self.in_memory {
mock_db = PatchSet::default();
&mut mock_db
let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
let recovered_version = 123;

if self.in_memory {
let recovery =
MerkleTreeRecovery::with_hasher(PatchSet::default(), recovered_version, hasher)?;
self.recover_tree(recovery, recovered_version)
} else {
let dir = TempDir::new().context("failed creating temp dir for RocksDB")?;
tracing::info!(
Expand All @@ -69,24 +74,29 @@ impl Cli {
};
let db =
RocksDB::with_options(dir.path(), db_options).context("failed creating RocksDB")?;
rocksdb = RocksDBWrapper::from(db);
_temp_dir = Some(dir);
&mut rocksdb
};
let db = RocksDBWrapper::from(db);
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)?;
if self.parallelize {
recovery.parallelize_persistence(4)?;
}
self.recover_tree(recovery, recovered_version)
}
}

let hasher: &dyn HashTree = if self.no_hashing { &() } else { &Blake2Hasher };
fn recover_tree<DB: PruneDatabase>(
self,
mut recovery: MerkleTreeRecovery<DB, &dyn HashTree>,
recovered_version: u64,
) -> anyhow::Result<()> {
let mut rng = StdRng::seed_from_u64(self.rng_seed);

let recovered_version = 123;
let key_step =
Key::MAX / (Key::from(self.update_count) * Key::from(self.writes_per_update));
assert!(key_step > Key::from(u64::MAX));
// ^ Total number of generated keys is <2^128.

let mut last_key = Key::zero();
let mut last_leaf_index = 0;
let mut recovery = MerkleTreeRecovery::with_hasher(db, recovered_version, hasher)
.context("cannot create tree")?;
let recovery_started_at = Instant::now();
for updated_idx in 0..self.update_count {
let started_at = Instant::now();
Expand Down
3 changes: 3 additions & 0 deletions core/lib/merkle_tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ pub(crate) static PRUNING_TIMINGS: Global<PruningTimings> = Global::new();
pub(crate) enum RecoveryStage {
Extend,
ApplyPatch,
ParallelPersistence,
}

const CHUNK_SIZE_BUCKETS: Buckets = Buckets::values(&[
Expand All @@ -391,6 +392,8 @@ pub(crate) struct RecoveryMetrics {
/// Latency of a specific stage of recovery for a single chunk.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub stage_latency: Family<RecoveryStage, Histogram<Duration>>,
/// Number of buffered commands if parallel persistence is used.
pub parallel_persistence_buffer_size: Gauge<usize>,
}

#[vise::register]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ use std::{collections::HashMap, time::Instant};
use anyhow::Context as _;
use zksync_crypto::hasher::blake2::Blake2Hasher;

pub use crate::storage::PersistenceThreadHandle;
use crate::{
hasher::{HashTree, HasherWithStats},
metrics::{RecoveryStage, RECOVERY_METRICS},
storage::{PatchSet, PruneDatabase, PrunePatchSet, Storage},
storage::{Database, MaybeParallel, PatchSet, PruneDatabase, PrunePatchSet, Storage},
types::{Key, Manifest, Root, TreeEntry, TreeTags, ValueHash},
};

#[cfg(test)]
mod tests;

/// Handle to a Merkle tree during its recovery.
#[derive(Debug)]
pub struct MerkleTreeRecovery<DB, H = Blake2Hasher> {
pub(crate) db: DB,
pub(crate) db: MaybeParallel<DB>,
hasher: H,
recovered_version: u64,
}
Expand Down Expand Up @@ -105,7 +109,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
db.apply_patch(PatchSet::from_manifest(manifest))?;

Ok(Self {
db,
db: MaybeParallel::Sequential(db),
hasher,
recovered_version,
})
Expand Down Expand Up @@ -257,7 +261,54 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
self.db.apply_patch(PatchSet::from_manifest(manifest))?;
tracing::debug!("Updated tree manifest to mark recovery as complete");

Ok(self.db)
self.db.join()
}
}

impl<DB: 'static + Clone + PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
/// Offloads database persistence to a background thread, so that it can run at the same time as processing of the following chunks.
/// Chunks are still guaranteed to be persisted atomically and in order.
///
/// # Arguments
///
/// - `buffer_capacity` determines how many chunks can be buffered before persistence blocks (i.e., back-pressure).
/// Also controls memory usage, since each chunk translates into a non-trivial database patch (order of 1 kB / entry;
/// i.e., a chunk with 200,000 entries would translate to a 200 MB patch).
///
/// # Return value
///
/// On success, returns a handle allowing to control background persistence thread. For now, it can only be used to emulate persistence crashes;
/// the handle can be dropped otherwise.
///
/// # Safety
///
/// If recovery is interrupted (e.g., its process crashes), then some of the latest chunks may not be persisted,
/// and will need to be processed again. It is **unsound** to restart recovery while a persistence thread may be active;
/// this may lead to a corrupted database state.
///
/// # Errors
///
/// Returns an error if `buffer_capacity` is 0, or if persistence was already parallelized.
pub fn parallelize_persistence(
&mut self,
buffer_capacity: usize,
) -> anyhow::Result<PersistenceThreadHandle> {
anyhow::ensure!(buffer_capacity > 0, "Buffer capacity must be positive");
self.db
.parallelize(self.recovered_version, buffer_capacity)
.context("persistence is already parallelized")
}

/// Waits until all changes in the underlying database are persisted, i.e. all chunks are flushed into it.
/// This is only relevant if [persistence was parallelized](Self::parallelize_persistence()) earlier;
/// otherwise, this method will return immediately.
///
/// # Errors
///
/// Propagates database I/O errors, should they occur during persistence.
pub fn wait_for_persistence(self) -> anyhow::Result<()> {
self.db.join()?;
Ok(())
}
}

Expand All @@ -267,63 +318,3 @@ fn entries_key_range(entries: &[TreeEntry]) -> String {
};
format!("{:0>64x}..={:0>64x}", first.key, last.key)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree};

#[test]
fn recovery_for_initialized_tree() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123)
.unwrap()
.finalize()
.unwrap();
let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string();
assert!(
err.contains("Tree is expected to be in the process of recovery"),
"{err}"
);
}

#[test]
fn recovery_for_different_version() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123).unwrap();
let err = MerkleTreeRecovery::new(&mut db, 42)
.unwrap_err()
.to_string();
assert!(
err.contains("Requested to recover tree version 42"),
"{err}"
);
}

#[test]
fn recovering_empty_tree() {
let db = MerkleTreeRecovery::new(PatchSet::default(), 42)
.unwrap()
.finalize()
.unwrap();
let tree = MerkleTree::new(db).unwrap();
assert_eq!(tree.latest_version(), Some(42));
assert_eq!(tree.root(42), Some(Root::Empty));
}

#[test]
fn recovering_tree_with_single_node() {
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap();
let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1));
recovery.extend_linear(vec![recovery_entry]).unwrap();
let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap();

assert_eq!(tree.latest_version(), Some(42));
let mut hasher = HasherWithStats::new(&Blake2Hasher);
assert_eq!(
tree.latest_root_hash(),
LeafNode::new(recovery_entry).hash(&mut hasher, 0)
);
tree.verify_consistency(42, true).unwrap();
}
}
56 changes: 56 additions & 0 deletions core/lib/merkle_tree/src/recovery/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use super::*;
use crate::{hasher::HasherWithStats, types::LeafNode, MerkleTree};

#[test]
fn recovery_for_initialized_tree() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123)
.unwrap()
.finalize()
.unwrap();
let err = MerkleTreeRecovery::new(db, 123).unwrap_err().to_string();
assert!(
err.contains("Tree is expected to be in the process of recovery"),
"{err}"
);
}

#[test]
fn recovery_for_different_version() {
let mut db = PatchSet::default();
MerkleTreeRecovery::new(&mut db, 123).unwrap();
let err = MerkleTreeRecovery::new(&mut db, 42)
.unwrap_err()
.to_string();
assert!(
err.contains("Requested to recover tree version 42"),
"{err}"
);
}

#[test]
fn recovering_empty_tree() {
let db = MerkleTreeRecovery::new(PatchSet::default(), 42)
.unwrap()
.finalize()
.unwrap();
let tree = MerkleTree::new(db).unwrap();
assert_eq!(tree.latest_version(), Some(42));
assert_eq!(tree.root(42), Some(Root::Empty));
}

#[test]
fn recovering_tree_with_single_node() {
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), 42).unwrap();
let recovery_entry = TreeEntry::new(Key::from(123), 1, ValueHash::repeat_byte(1));
recovery.extend_linear(vec![recovery_entry]).unwrap();
let tree = MerkleTree::new(recovery.finalize().unwrap()).unwrap();

assert_eq!(tree.latest_version(), Some(42));
let mut hasher = HasherWithStats::new(&Blake2Hasher);
assert_eq!(
tree.latest_root_hash(),
LeafNode::new(recovery_entry).hash(&mut hasher, 0)
);
tree.verify_consistency(42, true).unwrap();
}
7 changes: 6 additions & 1 deletion core/lib/merkle_tree/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! Storage-related logic.

pub(crate) use self::patch::{LoadAncestorsResult, WorkingPatchSet};
pub use self::{
database::{Database, NodeKeys, Patched, PruneDatabase, PrunePatchSet},
parallel::PersistenceThreadHandle,
patch::PatchSet,
rocksdb::{MerkleTreeColumnFamily, RocksDBWrapper},
};
pub(crate) use self::{
parallel::MaybeParallel,
patch::{LoadAncestorsResult, WorkingPatchSet},
};
use crate::{
hasher::HashTree,
metrics::{TreeUpdaterStats, BLOCK_TIMINGS, GENERAL_METRICS},
Expand All @@ -16,6 +20,7 @@ use crate::{
};

mod database;
mod parallel;
mod patch;
mod proofs;
mod rocksdb;
Expand Down
Loading

0 comments on commit b08a667

Please sign in to comment.