Skip to content

Commit

Permalink
feat(merkle-tree): Do not wait for tree initialization when starting …
Browse files Browse the repository at this point in the history
…node (#992)

## What ❔

Makes tree initialization async in metadata calculator (i.e., moves it
from the constructor to the `run()` method).

## Why ❔

It currently takes ~1m to initialize Merkle tree RocksDB for ENs.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Feb 2, 2024
1 parent a8699a2 commit fdbfcb1
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 39 deletions.
22 changes: 20 additions & 2 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,27 @@ pub(crate) struct MerkleTreeInfo {
pub leaf_count: u64,
}

/// Health details for a Merkle tree.
#[derive(Debug, Serialize)]
#[serde(tag = "stage", rename_all = "snake_case")]
pub(super) enum MerkleTreeHealth {
Initialization,
Recovery {
chunk_count: u64,
recovered_chunk_count: u64,
},
MainLoop(MerkleTreeInfo),
}

impl From<MerkleTreeHealth> for Health {
fn from(details: MerkleTreeHealth) -> Self {
Self::from(HealthStatus::Ready).with_details(details)
}
}

impl From<MerkleTreeInfo> for Health {
fn from(tree_info: MerkleTreeInfo) -> Self {
Self::from(HealthStatus::Ready).with_details(tree_info)
fn from(info: MerkleTreeInfo) -> Self {
Self::from(HealthStatus::Ready).with_details(MerkleTreeHealth::MainLoop(info))
}
}

Expand Down
58 changes: 41 additions & 17 deletions core/lib/zksync_core/src/metadata_calculator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use std::{
future::{self, Future},
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::Context as _;
use tokio::sync::watch;
use zksync_config::configs::{
chain::OperationsManagerConfig,
Expand All @@ -24,7 +25,7 @@ use zksync_types::{

pub(crate) use self::helpers::{AsyncTreeReader, L1BatchWithLogs, MerkleTreeInfo};
use self::{
helpers::{create_db, Delayer, GenericAsyncTree},
helpers::{create_db, Delayer, GenericAsyncTree, MerkleTreeHealth},
metrics::{TreeUpdateStage, METRICS},
updater::TreeUpdater,
};
Expand Down Expand Up @@ -80,7 +81,7 @@ impl MetadataCalculatorConfig {

#[derive(Debug)]
pub struct MetadataCalculator {
tree: GenericAsyncTree,
config: MetadataCalculatorConfig,
tree_reader: watch::Sender<Option<AsyncTreeReader>>,
object_store: Option<Arc<dyn ObjectStore>>,
delayer: Delayer,
Expand All @@ -99,24 +100,14 @@ impl MetadataCalculator {
"Maximum L1 batches per iteration is misconfigured to be 0; please update it to positive value"
);

let db = create_db(
config.db_path.clone().into(),
config.block_cache_capacity,
config.memtable_capacity,
config.stalled_writes_timeout,
config.multi_get_chunk_size,
)
.await?;
let tree = GenericAsyncTree::new(db, config.mode).await;

let (_, health_updater) = ReactiveHealthCheck::new("tree");
Ok(Self {
tree,
tree_reader: watch::channel(None).0,
object_store,
delayer: Delayer::new(config.delay_interval),
health_updater,
max_l1_batches_per_iter: config.max_l1_batches_per_iter,
config,
})
}

Expand All @@ -141,19 +132,52 @@ impl MetadataCalculator {
}
}

async fn create_tree(&self) -> anyhow::Result<GenericAsyncTree> {
self.health_updater
.update(MerkleTreeHealth::Initialization.into());

let started_at = Instant::now();
let db = create_db(
self.config.db_path.clone().into(),
self.config.block_cache_capacity,
self.config.memtable_capacity,
self.config.stalled_writes_timeout,
self.config.multi_get_chunk_size,
)
.await
.with_context(|| {
format!(
"failed opening Merkle tree RocksDB with configuration {:?}",
self.config
)
})?;
tracing::info!(
"Opened Merkle tree RocksDB with configuration {:?} in {:?}",
self.config,
started_at.elapsed()
);

Ok(GenericAsyncTree::new(db, self.config.mode).await)
}

pub async fn run(
self,
pool: ConnectionPool,
stop_receiver: watch::Receiver<bool>,
) -> anyhow::Result<()> {
let tree = self
.tree
let tree = self.create_tree().await?;
let tree = tree
.ensure_ready(&pool, &stop_receiver, &self.health_updater)
.await?;
let Some(tree) = tree else {
return Ok(()); // recovery was aborted because a stop signal was received
};
self.tree_reader.send_replace(Some(tree.reader()));
let tree_reader = tree.reader();
tracing::info!(
"Merkle tree is initialized and ready to process L1 batches: {:?}",
tree_reader.clone().info().await
);
self.tree_reader.send_replace(Some(tree_reader));

let updater = TreeUpdater::new(tree, self.max_l1_batches_per_iter, self.object_store);
updater
Expand Down
20 changes: 5 additions & 15 deletions core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ use std::{
use anyhow::Context as _;
use async_trait::async_trait;
use futures::future;
use serde::{Deserialize, Serialize};
use tokio::sync::{watch, Mutex, Semaphore};
use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_health_check::{Health, HealthStatus, HealthUpdater};
use zksync_health_check::HealthUpdater;
use zksync_merkle_tree::TreeEntry;
use zksync_types::{
snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus},
MiniblockNumber, H256,
};

use super::{
helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree},
helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree, MerkleTreeHealth},
metrics::{ChunkRecoveryStage, RecoveryStage, RECOVERY_METRICS},
};

Expand All @@ -68,14 +67,6 @@ trait HandleRecoveryEvent: fmt::Debug + Send + Sync {
}
}

/// Information about a Merkle tree during its snapshot recovery.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct RecoveryMerkleTreeInfo {
mode: &'static str, // always set to "recovery" to distinguish from `MerkleTreeInfo`
chunk_count: u64,
recovered_chunk_count: u64,
}

/// [`HealthUpdater`]-based [`HandleRecoveryEvent`] implementation.
#[derive(Debug)]
struct RecoveryHealthUpdater<'a> {
Expand Down Expand Up @@ -109,12 +100,11 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
RECOVERY_METRICS
.recovered_chunk_count
.set(recovered_chunk_count);
let health = Health::from(HealthStatus::Ready).with_details(RecoveryMerkleTreeInfo {
mode: "recovery",
let health = MerkleTreeHealth::Recovery {
chunk_count: self.chunk_count,
recovered_chunk_count,
});
self.inner.update(health);
};
self.inner.update(health.into());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_config::configs::{
chain::OperationsManagerConfig,
database::{MerkleTreeConfig, MerkleTreeMode},
};
use zksync_health_check::{CheckHealth, ReactiveHealthCheck};
use zksync_health_check::{CheckHealth, HealthStatus, ReactiveHealthCheck};
use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction};
use zksync_types::{L1BatchNumber, L2ChainId, StorageLog};

Expand Down
10 changes: 6 additions & 4 deletions core/lib/zksync_core/src/metadata_calculator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ async fn genesis_creation() {
run_calculator(calculator, pool.clone()).await;
let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await;

let GenericAsyncTree::Ready(tree) = &calculator.tree else {
panic!("Unexpected tree state: {:?}", calculator.tree);
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1));
}
Expand All @@ -77,8 +78,9 @@ async fn basic_workflow() {
assert!(merkle_paths.iter().all(|log| log.is_write));

let (calculator, _) = setup_calculator(temp_dir.path(), &pool).await;
let GenericAsyncTree::Ready(tree) = &calculator.tree else {
panic!("Unexpected tree state: {:?}", calculator.tree);
let tree = calculator.create_tree().await.unwrap();
let GenericAsyncTree::Ready(tree) = tree else {
panic!("Unexpected tree state: {tree:?}");
};
assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(2));
}
Expand Down

0 comments on commit fdbfcb1

Please sign in to comment.