diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index b7f918f4bca..de05efe396c 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -399,12 +399,15 @@ async fn main() -> anyhow::Result<()> { L1ExecutedBatchesRevert::Allowed, ); - let mut connection = connection_pool.access_storage().await.unwrap(); + let mut connection = connection_pool.access_storage().await?; let sealed_l1_batch_number = connection .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap(); + .context("Failed getting sealed L1 batch number")? + .context( + "Cannot roll back pending L1 batch since there are no L1 batches in Postgres", + )?; drop(connection); tracing::info!("Rolling back to l1 batch number {sealed_l1_batch_number}"); @@ -418,9 +421,7 @@ async fn main() -> anyhow::Result<()> { } let sigint_receiver = setup_sigint_handler(); - tracing::warn!("The external node is in the alpha phase, and should be used with caution."); - tracing::info!("Started the external node"); tracing::info!("Main node URL is: {}", main_node_url); diff --git a/core/bin/snapshots_creator/src/main.rs b/core/bin/snapshots_creator/src/main.rs index b3c0f0b54ee..e8f1fa3bf7f 100644 --- a/core/bin/snapshots_creator/src/main.rs +++ b/core/bin/snapshots_creator/src/main.rs @@ -148,9 +148,9 @@ async fn run( // We subtract 1 so that after restore, EN node has at least one L1 batch to fetch let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?; - assert_ne!( - sealed_l1_batch_number, - L1BatchNumber(0), + let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?; + anyhow::ensure!( + sealed_l1_batch_number != L1BatchNumber(0), "Cannot create snapshot when only the genesis L1 batch is present in Postgres" ); let l1_batch_number = sealed_l1_batch_number - 1; diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index ba3c314a135..ae3ef48bcbb 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -43,8 +43,8 @@ impl BlocksDal<'_, '_> { Ok(count == 0) } - pub async fn get_sealed_l1_batch_number(&mut self) -> anyhow::Result { - let number = sqlx::query!( + pub async fn get_sealed_l1_batch_number(&mut self) -> sqlx::Result> { + let row = sqlx::query!( r#" SELECT MAX(number) AS "number" @@ -57,11 +57,9 @@ impl BlocksDal<'_, '_> { .instrument("get_sealed_block_number") .report_latency() .fetch_one(self.storage.conn()) - .await? - .number - .context("DAL invocation before genesis")?; + .await?; - Ok(L1BatchNumber(number as u32)) + Ok(row.number.map(|num| L1BatchNumber(num as u32))) } pub async fn get_sealed_miniblock_number(&mut self) -> sqlx::Result { diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index 96d22727144..30f1fe2779e 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -135,15 +135,16 @@ impl RocksdbStorage { /// in Postgres. pub async fn update_from_postgres(&mut self, conn: &mut StorageProcessor<'_>) { let latency = METRICS.update.start(); - let latest_l1_batch_number = conn + let Some(latest_l1_batch_number) = conn .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap(); - tracing::debug!( - "loading storage for l1 batch number {}", - latest_l1_batch_number.0 - ); + .unwrap() + else { + // No L1 batches are persisted in Postgres; update is not necessary. + return; + }; + tracing::debug!("Loading storage for l1 batch number {latest_l1_batch_number}"); let mut current_l1_batch_number = self.l1_batch_number().0; assert!( diff --git a/core/lib/zksync_core/src/consensus/testonly.rs b/core/lib/zksync_core/src/consensus/testonly.rs index c2a7a4ec475..4195cd76a05 100644 --- a/core/lib/zksync_core/src/consensus/testonly.rs +++ b/core/lib/zksync_core/src/consensus/testonly.rs @@ -306,7 +306,8 @@ async fn run_mock_metadata_calculator(ctx: &ctx::Ctx, pool: ConnectionPool) -> a .blocks_dal() .get_sealed_l1_batch_number() .await - .context("get_sealed_l1_batch_number()")?; + .context("get_sealed_l1_batch_number()")? + .context("no L1 batches in Postgres")?; while n <= last { let metadata = create_l1_batch_metadata(n.0); diff --git a/core/lib/zksync_core/src/eth_sender/aggregator.rs b/core/lib/zksync_core/src/eth_sender/aggregator.rs index 549d0a5acf9..7368aaa0cd2 100644 --- a/core/lib/zksync_core/src/eth_sender/aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/aggregator.rs @@ -95,11 +95,15 @@ impl Aggregator { protocol_version_id: ProtocolVersionId, l1_verifier_config: L1VerifierConfig, ) -> Option { - let last_sealed_l1_batch_number = storage + let Some(last_sealed_l1_batch_number) = storage .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap(); + .unwrap() + else { + return None; // No L1 batches in Postgres; no operations are ready yet + }; + if let Some(op) = self .get_execute_operations( storage, diff --git a/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs b/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs index 2ed1f97fee7..b52edb96375 100644 --- a/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs +++ b/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs @@ -20,14 +20,16 @@ impl L1BatchMetricsReporter { } async fn report_metrics(&self) { + let mut block_metrics = vec![]; let mut conn = self.connection_pool.access_storage().await.unwrap(); - let mut block_metrics = vec![( - conn.blocks_dal() - .get_sealed_l1_batch_number() - .await - .unwrap(), - BlockStage::Sealed, - )]; + let last_l1_batch = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap(); + if let Some(number) = last_l1_batch { + block_metrics.push((number, BlockStage::Sealed)); + } let last_l1_batch_with_metadata = conn .blocks_dal() diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs similarity index 57% rename from core/lib/zksync_core/src/metadata_calculator/recovery.rs rename to core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index 52c254a1dbe..0d37dd02417 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -38,7 +38,7 @@ use tokio::sync::{watch, Mutex, Semaphore}; use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_health_check::{Health, HealthStatus, HealthUpdater}; use zksync_merkle_tree::TreeEntry; -use zksync_types::{L1BatchNumber, MiniblockNumber, H256, U256}; +use zksync_types::{snapshots::SnapshotRecoveryStatus, MiniblockNumber, H256, U256}; use zksync_utils::u256_to_h256; use super::{ @@ -46,6 +46,9 @@ use super::{ metrics::{ChunkRecoveryStage, RecoveryStage, RECOVERY_METRICS}, }; +#[cfg(test)] +mod tests; + /// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able /// to control recovery behavior in tests. #[async_trait] @@ -125,22 +128,11 @@ impl SnapshotParameters { /// (i.e., not changed after a node restart). const DESIRED_CHUNK_SIZE: u64 = 200_000; - async fn new(pool: &ConnectionPool, l1_batch: L1BatchNumber) -> anyhow::Result { + async fn new(pool: &ConnectionPool, recovery: &SnapshotRecoveryStatus) -> anyhow::Result { + let miniblock = recovery.miniblock_number; + let expected_root_hash = recovery.l1_batch_root_hash; + let mut storage = pool.access_storage().await?; - let (_, miniblock) = storage - .blocks_dal() - .get_miniblock_range_of_l1_batch(l1_batch) - .await - .with_context(|| format!("Failed getting miniblock range for L1 batch #{l1_batch}"))? - .with_context(|| format!("L1 batch #{l1_batch} doesn't have miniblocks"))?; - let expected_root_hash = storage - .blocks_dal() - .get_l1_batch_metadata(l1_batch) - .await - .with_context(|| format!("Failed getting metadata for L1 batch #{l1_batch}"))? - .with_context(|| format!("L1 batch #{l1_batch} has no metadata"))? - .metadata - .root_hash; let log_count = storage .storage_logs_dal() .count_miniblock_storage_logs(miniblock) @@ -176,28 +168,29 @@ impl GenericAsyncTree { stop_receiver: &watch::Receiver, health_updater: &HealthUpdater, ) -> anyhow::Result> { - let (tree, l1_batch) = match self { + let (tree, snapshot_recovery) = match self { Self::Ready(tree) => return Ok(Some(tree)), Self::Recovering(tree) => { - let l1_batch = snapshot_l1_batch(pool).await?.context( - "Merkle tree is recovering, but Postgres doesn't contain snapshot L1 batch", + let snapshot_recovery = get_snapshot_recovery(pool).await?.context( + "Merkle tree is recovering, but Postgres doesn't contain snapshot recovery information", )?; let recovered_version = tree.recovered_version(); anyhow::ensure!( - u64::from(l1_batch.0) == recovered_version, - "Snapshot L1 batch in Postgres ({l1_batch}) differs from the recovered Merkle tree version \ + u64::from(snapshot_recovery.l1_batch_number.0) == recovered_version, + "Snapshot L1 batch in Postgres ({snapshot_recovery:?}) differs from the recovered Merkle tree version \ ({recovered_version})" ); - tracing::info!("Resuming tree recovery with snapshot L1 batch #{l1_batch}"); - (tree, l1_batch) + tracing::info!("Resuming tree recovery with status: {snapshot_recovery:?}"); + (tree, snapshot_recovery) } Self::Empty { db, mode } => { - if let Some(l1_batch) = snapshot_l1_batch(pool).await? { + if let Some(snapshot_recovery) = get_snapshot_recovery(pool).await? { tracing::info!( - "Starting Merkle tree recovery with snapshot L1 batch #{l1_batch}" + "Starting Merkle tree recovery with status {snapshot_recovery:?}" ); + let l1_batch = snapshot_recovery.l1_batch_number; let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode); - (tree, l1_batch) + (tree, snapshot_recovery) } else { // Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`. return Ok(Some(AsyncTree::new(db, mode))); @@ -205,7 +198,7 @@ impl GenericAsyncTree { } }; - let snapshot = SnapshotParameters::new(pool, l1_batch).await?; + let snapshot = SnapshotParameters::new(pool, &snapshot_recovery).await?; tracing::debug!("Obtained snapshot parameters: {snapshot:?}"); let recovery_options = RecoveryOptions { chunk_count: snapshot.chunk_count(), @@ -421,253 +414,12 @@ impl AsyncTreeRecovery { } } -async fn snapshot_l1_batch(_pool: &ConnectionPool) -> anyhow::Result> { - Ok(None) // FIXME (PLA-708): implement real logic -} - -#[cfg(test)] -mod tests { - use std::{path::PathBuf, time::Duration}; - - use assert_matches::assert_matches; - use tempfile::TempDir; - use test_casing::test_casing; - use zksync_config::configs::database::MerkleTreeMode; - use zksync_health_check::{CheckHealth, ReactiveHealthCheck}; - use zksync_types::{L2ChainId, StorageLog}; - use zksync_utils::h256_to_u256; - - use super::*; - use crate::{ - genesis::{ensure_genesis_state, GenesisParams}, - metadata_calculator::{ - helpers::create_db, - tests::{extend_db_state, gen_storage_logs, run_calculator, setup_calculator}, - }, - }; - - #[test] - fn calculating_hashed_key_ranges_with_single_chunk() { - let mut ranges = AsyncTreeRecovery::hashed_key_ranges(1); - let full_range = ranges.next().unwrap(); - assert_eq!(full_range, H256::zero()..=H256([0xff; 32])); - } - - #[test] - fn calculating_hashed_key_ranges_for_256_chunks() { - let ranges = AsyncTreeRecovery::hashed_key_ranges(256); - let mut start = H256::zero(); - let mut end = H256([0xff; 32]); - - for (i, range) in ranges.enumerate() { - let i = u8::try_from(i).unwrap(); - start.0[0] = i; - end.0[0] = i; - assert_eq!(range, start..=end); - } - } - - #[test_casing(5, [3, 7, 23, 100, 255])] - fn calculating_hashed_key_ranges_for_arbitrary_chunks(chunk_count: usize) { - let ranges: Vec<_> = AsyncTreeRecovery::hashed_key_ranges(chunk_count).collect(); - assert_eq!(ranges.len(), chunk_count); - - for window in ranges.windows(2) { - let [prev_range, range] = window else { - unreachable!(); - }; - assert_eq!( - h256_to_u256(*range.start()), - h256_to_u256(*prev_range.end()) + 1 - ); - } - assert_eq!(*ranges.first().unwrap().start(), H256::zero()); - assert_eq!(*ranges.last().unwrap().end(), H256([0xff; 32])); - } - - #[test] - fn calculating_chunk_count() { - let mut snapshot = SnapshotParameters { - miniblock: MiniblockNumber(1), - log_count: 160_000_000, - expected_root_hash: H256::zero(), - }; - assert_eq!(snapshot.chunk_count(), 800); - - snapshot.log_count += 1; - assert_eq!(snapshot.chunk_count(), 801); - - snapshot.log_count = 100; - assert_eq!(snapshot.chunk_count(), 1); - } - - async fn create_tree_recovery(path: PathBuf, l1_batch: L1BatchNumber) -> AsyncTreeRecovery { - let db = create_db( - path, - 0, - 16 << 20, // 16 MiB, - Duration::ZERO, // writes should never be stalled in tests - 500, - ) - .await; - AsyncTreeRecovery::new(db, l1_batch.0.into(), MerkleTreeMode::Full) - } - - #[tokio::test] - async fn basic_recovery_workflow() { - let pool = ConnectionPool::test_pool().await; - let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let root_hash = prepare_recovery_snapshot(&pool, &temp_dir).await; - let snapshot = SnapshotParameters::new(&pool, L1BatchNumber(1)) - .await - .unwrap(); - - assert!(snapshot.log_count > 200); - assert_eq!(snapshot.miniblock, MiniblockNumber(1)); - assert_eq!(snapshot.expected_root_hash, root_hash); - - let (_stop_sender, stop_receiver) = watch::channel(false); - for chunk_count in [1, 4, 9, 16, 60, 256] { - println!("Recovering tree with {chunk_count} chunks"); - - let tree_path = temp_dir.path().join(format!("recovery-{chunk_count}")); - let tree = create_tree_recovery(tree_path, L1BatchNumber(1)).await; - let (health_check, health_updater) = ReactiveHealthCheck::new("tree"); - let recovery_options = RecoveryOptions { - chunk_count, - concurrency_limit: 1, - events: Box::new(RecoveryHealthUpdater::new(&health_updater)), - }; - let tree = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .expect("Tree recovery unexpectedly aborted"); - - assert_eq!(tree.root_hash(), root_hash); - let health = health_check.check_health().await; - assert_matches!(health.status(), HealthStatus::Ready); - } - } - - async fn prepare_recovery_snapshot(pool: &ConnectionPool, temp_dir: &TempDir) -> H256 { - let mut storage = pool.access_storage().await.unwrap(); - ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock()) - .await - .unwrap(); - let mut logs = gen_storage_logs(100..300, 1).pop().unwrap(); - - // Add all logs from the genesis L1 batch to `logs` so that they cover all state keys. - let genesis_logs = storage - .storage_logs_dal() - .get_touched_slots_for_l1_batch(L1BatchNumber(0)) - .await; - let genesis_logs = genesis_logs - .into_iter() - .map(|(key, value)| StorageLog::new_write_log(key, value)); - logs.extend(genesis_logs); - extend_db_state(&mut storage, vec![logs]).await; - drop(storage); - - // Ensure that metadata for L1 batch #1 is present in the DB. - let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool).await; - run_calculator(calculator, pool.clone()).await - } - - #[derive(Debug)] - struct TestEventListener { - expected_recovered_chunks: usize, - stop_threshold: usize, - processed_chunk_count: AtomicUsize, - stop_sender: watch::Sender, - } - - impl TestEventListener { - fn new(stop_threshold: usize, stop_sender: watch::Sender) -> Self { - Self { - expected_recovered_chunks: 0, - stop_threshold, - processed_chunk_count: AtomicUsize::new(0), - stop_sender, - } - } - - fn expect_recovered_chunks(mut self, count: usize) -> Self { - self.expected_recovered_chunks = count; - self - } - } - - #[async_trait] - impl HandleRecoveryEvent for TestEventListener { - fn recovery_started(&mut self, _chunk_count: usize, recovered_chunk_count: usize) { - assert_eq!(recovered_chunk_count, self.expected_recovered_chunks); - } - - async fn chunk_recovered(&self) { - let processed_chunk_count = - self.processed_chunk_count.fetch_add(1, Ordering::SeqCst) + 1; - if processed_chunk_count >= self.stop_threshold { - self.stop_sender.send_replace(true); - } - } - } - - #[test_casing(3, [5, 7, 8])] - #[tokio::test] - async fn recovery_fault_tolerance(chunk_count: usize) { - let pool = ConnectionPool::test_pool().await; - let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); - let root_hash = prepare_recovery_snapshot(&pool, &temp_dir).await; - - let tree_path = temp_dir.path().join("recovery"); - let tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; - let (stop_sender, stop_receiver) = watch::channel(false); - let recovery_options = RecoveryOptions { - chunk_count, - concurrency_limit: 1, - events: Box::new(TestEventListener::new(1, stop_sender)), - }; - let snapshot = SnapshotParameters::new(&pool, L1BatchNumber(1)) - .await - .unwrap(); - assert!(tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .is_none()); - - // Emulate a restart and recover 2 more chunks. - let mut tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; - assert_ne!(tree.root_hash().await, root_hash); - let (stop_sender, stop_receiver) = watch::channel(false); - let recovery_options = RecoveryOptions { - chunk_count, - concurrency_limit: 1, - events: Box::new(TestEventListener::new(2, stop_sender).expect_recovered_chunks(1)), - }; - assert!(tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .is_none()); - - // Emulate another restart and recover remaining chunks. - let mut tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; - assert_ne!(tree.root_hash().await, root_hash); - let (stop_sender, stop_receiver) = watch::channel(false); - let recovery_options = RecoveryOptions { - chunk_count, - concurrency_limit: 1, - events: Box::new( - TestEventListener::new(usize::MAX, stop_sender).expect_recovered_chunks(3), - ), - }; - let tree = tree - .recover(snapshot, recovery_options, &pool, &stop_receiver) - .await - .unwrap() - .expect("Tree recovery unexpectedly aborted"); - assert_eq!(tree.root_hash(), root_hash); - } +async fn get_snapshot_recovery( + pool: &ConnectionPool, +) -> anyhow::Result> { + let mut storage = pool.access_storage_tagged("metadata_calculator").await?; + Ok(storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await?) } diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs new file mode 100644 index 00000000000..d121990dc28 --- /dev/null +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/tests.rs @@ -0,0 +1,431 @@ +//! Tests for metadata calculator snapshot recovery. + +use std::{path::PathBuf, time::Duration}; + +use assert_matches::assert_matches; +use tempfile::TempDir; +use test_casing::test_casing; +use tokio::sync::mpsc; +use zksync_config::configs::{ + chain::OperationsManagerConfig, + database::{MerkleTreeConfig, MerkleTreeMode}, +}; +use zksync_health_check::{CheckHealth, ReactiveHealthCheck}; +use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction}; +use zksync_types::{ + block::{L1BatchHeader, MiniblockHeader}, + L1BatchNumber, L2ChainId, ProtocolVersion, ProtocolVersionId, StorageLog, +}; +use zksync_utils::h256_to_u256; + +use super::*; +use crate::{ + genesis::{ensure_genesis_state, GenesisParams}, + metadata_calculator::{ + helpers::create_db, + tests::{ + extend_db_state, extend_db_state_from_l1_batch, gen_storage_logs, run_calculator, + setup_calculator, + }, + MetadataCalculator, MetadataCalculatorConfig, MetadataCalculatorModeConfig, + }, +}; + +#[test] +fn calculating_hashed_key_ranges_with_single_chunk() { + let mut ranges = AsyncTreeRecovery::hashed_key_ranges(1); + let full_range = ranges.next().unwrap(); + assert_eq!(full_range, H256::zero()..=H256([0xff; 32])); +} + +#[test] +fn calculating_hashed_key_ranges_for_256_chunks() { + let ranges = AsyncTreeRecovery::hashed_key_ranges(256); + let mut start = H256::zero(); + let mut end = H256([0xff; 32]); + + for (i, range) in ranges.enumerate() { + let i = u8::try_from(i).unwrap(); + start.0[0] = i; + end.0[0] = i; + assert_eq!(range, start..=end); + } +} + +#[test_casing(5, [3, 7, 23, 100, 255])] +fn calculating_hashed_key_ranges_for_arbitrary_chunks(chunk_count: usize) { + let ranges: Vec<_> = AsyncTreeRecovery::hashed_key_ranges(chunk_count).collect(); + assert_eq!(ranges.len(), chunk_count); + + for window in ranges.windows(2) { + let [prev_range, range] = window else { + unreachable!(); + }; + assert_eq!( + h256_to_u256(*range.start()), + h256_to_u256(*prev_range.end()) + 1 + ); + } + assert_eq!(*ranges.first().unwrap().start(), H256::zero()); + assert_eq!(*ranges.last().unwrap().end(), H256([0xff; 32])); +} + +#[test] +fn calculating_chunk_count() { + let mut snapshot = SnapshotParameters { + miniblock: MiniblockNumber(1), + log_count: 160_000_000, + expected_root_hash: H256::zero(), + }; + assert_eq!(snapshot.chunk_count(), 800); + + snapshot.log_count += 1; + assert_eq!(snapshot.chunk_count(), 801); + + snapshot.log_count = 100; + assert_eq!(snapshot.chunk_count(), 1); +} + +async fn create_tree_recovery(path: PathBuf, l1_batch: L1BatchNumber) -> AsyncTreeRecovery { + let db = create_db( + path, + 0, + 16 << 20, // 16 MiB, + Duration::ZERO, // writes should never be stalled in tests + 500, + ) + .await; + AsyncTreeRecovery::new(db, l1_batch.0.into(), MerkleTreeMode::Full) +} + +#[tokio::test] +async fn basic_recovery_workflow() { + let pool = ConnectionPool::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let snapshot_recovery = prepare_recovery_snapshot(&pool, &temp_dir).await; + let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery) + .await + .unwrap(); + + assert!(snapshot.log_count > 200); + + let (_stop_sender, stop_receiver) = watch::channel(false); + for chunk_count in [1, 4, 9, 16, 60, 256] { + println!("Recovering tree with {chunk_count} chunks"); + + let tree_path = temp_dir.path().join(format!("recovery-{chunk_count}")); + let tree = create_tree_recovery(tree_path, L1BatchNumber(1)).await; + let (health_check, health_updater) = ReactiveHealthCheck::new("tree"); + let recovery_options = RecoveryOptions { + chunk_count, + concurrency_limit: 1, + events: Box::new(RecoveryHealthUpdater::new(&health_updater)), + }; + let tree = tree + .recover(snapshot, recovery_options, &pool, &stop_receiver) + .await + .unwrap() + .expect("Tree recovery unexpectedly aborted"); + + assert_eq!(tree.root_hash(), snapshot_recovery.l1_batch_root_hash); + let health = health_check.check_health().await; + assert_matches!(health.status(), HealthStatus::Ready); + } +} + +async fn prepare_recovery_snapshot( + pool: &ConnectionPool, + temp_dir: &TempDir, +) -> SnapshotRecoveryStatus { + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::from(270), &GenesisParams::mock()) + .await + .unwrap(); + let mut logs = gen_storage_logs(100..300, 1).pop().unwrap(); + + // Add all logs from the genesis L1 batch to `logs` so that they cover all state keys. + let genesis_logs = storage + .storage_logs_dal() + .get_touched_slots_for_l1_batch(L1BatchNumber(0)) + .await; + let genesis_logs = genesis_logs + .into_iter() + .map(|(key, value)| StorageLog::new_write_log(key, value)); + logs.extend(genesis_logs); + extend_db_state(&mut storage, vec![logs]).await; + drop(storage); + + // Ensure that metadata for L1 batch #1 is present in the DB. + let (calculator, _) = setup_calculator(&temp_dir.path().join("init"), pool).await; + let l1_batch_root_hash = run_calculator(calculator, pool.clone()).await; + + SnapshotRecoveryStatus { + l1_batch_number: L1BatchNumber(1), + l1_batch_root_hash, + miniblock_number: MiniblockNumber(1), + miniblock_root_hash: H256::zero(), // not used + last_finished_chunk_id: Some(0), + total_chunk_count: 1, + } +} + +#[derive(Debug)] +struct TestEventListener { + expected_recovered_chunks: usize, + stop_threshold: usize, + processed_chunk_count: AtomicUsize, + stop_sender: watch::Sender, +} + +impl TestEventListener { + fn new(stop_threshold: usize, stop_sender: watch::Sender) -> Self { + Self { + expected_recovered_chunks: 0, + stop_threshold, + processed_chunk_count: AtomicUsize::new(0), + stop_sender, + } + } + + fn expect_recovered_chunks(mut self, count: usize) -> Self { + self.expected_recovered_chunks = count; + self + } +} + +#[async_trait] +impl HandleRecoveryEvent for TestEventListener { + fn recovery_started(&mut self, _chunk_count: usize, recovered_chunk_count: usize) { + assert_eq!(recovered_chunk_count, self.expected_recovered_chunks); + } + + async fn chunk_recovered(&self) { + let processed_chunk_count = self.processed_chunk_count.fetch_add(1, Ordering::SeqCst) + 1; + if processed_chunk_count >= self.stop_threshold { + self.stop_sender.send_replace(true); + } + } +} + +#[test_casing(3, [5, 7, 8])] +#[tokio::test] +async fn recovery_fault_tolerance(chunk_count: usize) { + let pool = ConnectionPool::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let snapshot_recovery = prepare_recovery_snapshot(&pool, &temp_dir).await; + + let tree_path = temp_dir.path().join("recovery"); + let tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; + let (stop_sender, stop_receiver) = watch::channel(false); + let recovery_options = RecoveryOptions { + chunk_count, + concurrency_limit: 1, + events: Box::new(TestEventListener::new(1, stop_sender)), + }; + let snapshot = SnapshotParameters::new(&pool, &snapshot_recovery) + .await + .unwrap(); + assert!(tree + .recover(snapshot, recovery_options, &pool, &stop_receiver) + .await + .unwrap() + .is_none()); + + // Emulate a restart and recover 2 more chunks. + let mut tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; + assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); + let (stop_sender, stop_receiver) = watch::channel(false); + let recovery_options = RecoveryOptions { + chunk_count, + concurrency_limit: 1, + events: Box::new(TestEventListener::new(2, stop_sender).expect_recovered_chunks(1)), + }; + assert!(tree + .recover(snapshot, recovery_options, &pool, &stop_receiver) + .await + .unwrap() + .is_none()); + + // Emulate another restart and recover remaining chunks. + let mut tree = create_tree_recovery(tree_path.clone(), L1BatchNumber(1)).await; + assert_ne!(tree.root_hash().await, snapshot_recovery.l1_batch_root_hash); + let (stop_sender, stop_receiver) = watch::channel(false); + let recovery_options = RecoveryOptions { + chunk_count, + concurrency_limit: 1, + events: Box::new( + TestEventListener::new(usize::MAX, stop_sender).expect_recovered_chunks(3), + ), + }; + let tree = tree + .recover(snapshot, recovery_options, &pool, &stop_receiver) + .await + .unwrap() + .expect("Tree recovery unexpectedly aborted"); + assert_eq!(tree.root_hash(), snapshot_recovery.l1_batch_root_hash); +} + +#[derive(Debug)] +enum RecoveryWorkflowCase { + Stop, + CreateBatch, +} + +impl RecoveryWorkflowCase { + const ALL: [Self; 2] = [Self::Stop, Self::CreateBatch]; +} + +#[test_casing(2, RecoveryWorkflowCase::ALL)] +#[tokio::test] +async fn entire_recovery_workflow(case: RecoveryWorkflowCase) { + let pool = ConnectionPool::test_pool().await; + // Emulate the recovered view of Postgres. Unlike with previous tests, we don't perform genesis. + let snapshot_logs = gen_storage_logs(100..300, 1).pop().unwrap(); + let mut storage = pool.access_storage().await.unwrap(); + let snapshot_recovery = prepare_clean_recovery_snapshot(&mut storage, &snapshot_logs).await; + + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let merkle_tree_config = MerkleTreeConfig { + path: temp_dir.path().to_str().unwrap().to_owned(), + ..MerkleTreeConfig::default() + }; + let calculator_config = MetadataCalculatorConfig::for_main_node( + &merkle_tree_config, + &OperationsManagerConfig { delay_interval: 50 }, + MetadataCalculatorModeConfig::Lightweight, + ); + let mut calculator = MetadataCalculator::new(&calculator_config).await; + let (delay_sx, mut delay_rx) = mpsc::unbounded_channel(); + calculator.delayer.delay_notifier = delay_sx; + + let (stop_sender, stop_receiver) = watch::channel(false); + let tree_reader = calculator.tree_reader(); + let calculator_task = tokio::spawn(calculator.run(pool.clone(), stop_receiver)); + + match case { + // Wait until the tree is fully initialized and stop the calculator. + RecoveryWorkflowCase::Stop => { + let tree_info = tree_reader.await.info().await; + assert_eq!(tree_info.root_hash, snapshot_recovery.l1_batch_root_hash); + assert_eq!(tree_info.leaf_count, 200); + assert_eq!( + tree_info.next_l1_batch_number, + snapshot_recovery.l1_batch_number + 1 + ); + } + + // Emulate state keeper adding a new L1 batch to Postgres. + RecoveryWorkflowCase::CreateBatch => { + tree_reader.await; + + let mut storage = storage.start_transaction().await.unwrap(); + let mut new_logs = gen_storage_logs(500..600, 1).pop().unwrap(); + // Logs must be sorted by `log.key` to match their enum index assignment + new_logs.sort_unstable_by_key(|log| log.key); + + extend_db_state_from_l1_batch( + &mut storage, + snapshot_recovery.l1_batch_number + 1, + [new_logs.clone()], + ) + .await; + storage.commit().await.unwrap(); + + // Wait until the inserted L1 batch is processed by the calculator. + let new_root_hash = loop { + let (next_l1_batch, root_hash) = delay_rx.recv().await.unwrap(); + if next_l1_batch == snapshot_recovery.l1_batch_number + 2 { + break root_hash; + } + }; + + let all_tree_instructions: Vec<_> = snapshot_logs + .iter() + .chain(&new_logs) + .enumerate() + .map(|(i, log)| TreeInstruction::write(log.key, i as u64 + 1, log.value)) + .collect(); + let expected_new_root_hash = + ZkSyncTree::process_genesis_batch(&all_tree_instructions).root_hash; + assert_ne!(expected_new_root_hash, snapshot_recovery.l1_batch_root_hash); + assert_eq!(new_root_hash, expected_new_root_hash); + } + } + + stop_sender.send_replace(true); + calculator_task.await.expect("calculator panicked").unwrap(); +} + +/// Prepares a recovery snapshot without performing genesis. +async fn prepare_clean_recovery_snapshot( + storage: &mut StorageProcessor<'_>, + snapshot_logs: &[StorageLog], +) -> SnapshotRecoveryStatus { + let written_keys: Vec<_> = snapshot_logs.iter().map(|log| log.key).collect(); + let tree_instructions: Vec<_> = snapshot_logs + .iter() + .enumerate() + .map(|(i, log)| TreeInstruction::write(log.key, i as u64 + 1, log.value)) + .collect(); + let l1_batch_root_hash = ZkSyncTree::process_genesis_batch(&tree_instructions).root_hash; + + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + // TODO (PLA-596): Don't insert L1 batches / miniblocks once the relevant foreign keys are removed + let miniblock = MiniblockHeader { + number: MiniblockNumber(23), + timestamp: 23, + hash: H256::zero(), + l1_tx_count: 0, + l2_tx_count: 0, + base_fee_per_gas: 100, + l1_gas_price: 100, + l2_fair_gas_price: 100, + base_system_contracts_hashes: Default::default(), + protocol_version: Some(ProtocolVersionId::latest()), + virtual_blocks: 0, + }; + storage + .blocks_dal() + .insert_miniblock(&miniblock) + .await + .unwrap(); + let l1_batch = L1BatchHeader::new( + L1BatchNumber(23), + 23, + Default::default(), + Default::default(), + ProtocolVersionId::latest(), + ); + storage + .blocks_dal() + .insert_l1_batch(&l1_batch, &[], Default::default(), &[], &[]) + .await + .unwrap(); + + storage + .storage_logs_dedup_dal() + .insert_initial_writes(l1_batch.number, &written_keys) + .await; + storage + .storage_logs_dal() + .insert_storage_logs(miniblock.number, &[(H256::zero(), snapshot_logs.to_vec())]) + .await; + + let snapshot_recovery = SnapshotRecoveryStatus { + l1_batch_number: l1_batch.number, + l1_batch_root_hash, + miniblock_number: miniblock.number, + miniblock_root_hash: H256::zero(), // not used + last_finished_chunk_id: None, + total_chunk_count: 100, + }; + storage + .snapshot_recovery_dal() + .set_applied_snapshot_status(&snapshot_recovery) + .await + .unwrap(); + snapshot_recovery +} diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 4e480ce1d99..8c8121fac65 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -1,7 +1,5 @@ //! Tests for the metadata calculator component life cycle. -// TODO (PLA-708): test full recovery life cycle - use std::{future::Future, ops, panic, path::Path, time::Duration}; use assert_matches::assert_matches; @@ -91,9 +89,9 @@ async fn expected_tree_hash(pool: &ConnectionPool) -> H256 { .get_sealed_l1_batch_number() .await .unwrap() - .0; + .expect("No L1 batches in Postgres"); let mut all_logs = vec![]; - for i in 0..=sealed_l1_batch_number { + for i in 0..=sealed_l1_batch_number.0 { let logs = L1BatchWithLogs::new(&mut storage, L1BatchNumber(i)).await; let logs = logs.unwrap().storage_logs; all_logs.extend(logs); @@ -384,7 +382,7 @@ async fn setup_lightweight_calculator(db_path: &Path, pool: &ConnectionPool) -> fn create_config(db_path: &Path) -> (MerkleTreeConfig, OperationsManagerConfig) { let db_config = MerkleTreeConfig { path: path_to_string(&db_path.join("new")), - ..Default::default() + ..MerkleTreeConfig::default() }; let operation_config = OperationsManagerConfig { @@ -472,16 +470,25 @@ pub(super) async fn extend_db_state( new_logs: impl IntoIterator>, ) { let mut storage = storage.start_transaction().await.unwrap(); - let next_l1_batch = storage + let sealed_l1_batch = storage .blocks_dal() .get_sealed_l1_batch_number() .await .unwrap() - .0 - + 1; + .expect("no L1 batches in Postgres"); + extend_db_state_from_l1_batch(&mut storage, sealed_l1_batch + 1, new_logs).await; + storage.commit().await.unwrap(); +} + +pub(super) async fn extend_db_state_from_l1_batch( + storage: &mut StorageProcessor<'_>, + next_l1_batch: L1BatchNumber, + new_logs: impl IntoIterator>, +) { + assert!(storage.in_transaction(), "must be called in DB transaction"); let base_system_contracts = BaseSystemContracts::load_from_disk(); - for (idx, batch_logs) in (next_l1_batch..).zip(new_logs) { + for (idx, batch_logs) in (next_l1_batch.0..).zip(new_logs) { let batch_number = L1BatchNumber(idx); let mut header = L1BatchHeader::new( batch_number, @@ -528,9 +535,8 @@ pub(super) async fn extend_db_state( .mark_miniblocks_as_executed_in_l1_batch(batch_number) .await .unwrap(); - insert_initial_writes_for_batch(&mut storage, batch_number).await; + insert_initial_writes_for_batch(storage, batch_number).await; } - storage.commit().await.unwrap(); } async fn insert_initial_writes_for_batch( @@ -609,7 +615,8 @@ async fn remove_l1_batches( .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap(); + .unwrap() + .expect("no L1 batches in Postgres"); assert!(sealed_l1_batch_number >= last_l1_batch_to_keep); let mut batch_headers = vec![]; diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 575ba99bb28..1bed4cbbf08 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -239,11 +239,15 @@ impl TreeUpdater { mut storage: StorageProcessor<'_>, next_l1_batch_to_seal: &mut L1BatchNumber, ) { - let last_sealed_l1_batch = storage + let Some(last_sealed_l1_batch) = storage .blocks_dal() .get_sealed_l1_batch_number() .await - .unwrap(); + .unwrap() + else { + tracing::trace!("No L1 batches to seal: Postgres storage is empty"); + return; + }; let last_requested_l1_batch = next_l1_batch_to_seal.0 + self.max_l1_batches_per_iter as u32 - 1; let last_requested_l1_batch = last_requested_l1_batch.min(last_sealed_l1_batch.0); @@ -300,7 +304,7 @@ impl TreeUpdater { tracing::info!( "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ - Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch}, \ + Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ last L1 batch with metadata: {last_l1_batch_with_metadata:?}", max_batches_per_iter = self.max_l1_batches_per_iter ); diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index d11e0f78333..5b36f3c039b 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -299,7 +299,7 @@ pub(super) async fn mock_l1_batch_hash_computation(pool: ConnectionPool, number: .get_sealed_l1_batch_number() .await .unwrap(); - if last_l1_batch_number < L1BatchNumber(number) { + if last_l1_batch_number < Some(L1BatchNumber(number)) { tokio::time::sleep(POLL_INTERVAL).await; continue; }