From 56446b939a2bf24e5d2a8089320428a5428e04a4 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 18 Apr 2024 12:33:14 +0300 Subject: [PATCH 1/3] Track overall recovery latency (Postgres) --- Cargo.lock | 1 + core/bin/external_node/Cargo.toml | 1 + core/bin/external_node/src/init.rs | 14 ++++- core/lib/snapshots_applier/src/lib.rs | 49 +++++++++------- core/lib/snapshots_applier/src/tests/mod.rs | 56 +++++++++++-------- core/lib/snapshots_applier/src/tests/utils.rs | 19 +++++++ core/node/shared_metrics/src/lib.rs | 15 ++++- 7 files changed, 108 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0be0c6f344e..2c83b0fc019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8673,6 +8673,7 @@ dependencies = [ "zksync_l1_contract_interface", "zksync_object_store", "zksync_protobuf_config", + "zksync_shared_metrics", "zksync_snapshots_applier", "zksync_state", "zksync_storage", diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index c06bd12881a..d368d318b1b 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -31,6 +31,7 @@ zksync_health_check.workspace = true zksync_web3_decl.workspace = true zksync_types.workspace = true zksync_block_reverter.workspace = true +zksync_shared_metrics.workspace = true vlog.workspace = true zksync_concurrency.workspace = true diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index b1ddbbdae17..78846659ba1 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -1,11 +1,14 @@ //! EN initialization logic. +use std::time::Instant; + use anyhow::Context as _; use zksync_basic_types::{L1BatchNumber, L2ChainId}; use zksync_core::sync_layer::genesis::perform_genesis_if_needed; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_health_check::AppHealthCheck; use zksync_object_store::ObjectStoreFactory; +use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask}; use zksync_web3_decl::client::BoxedL2Client; @@ -96,11 +99,18 @@ pub(crate) async fn ensure_storage_initialized( blob_store, ); app_health.insert_component(snapshots_applier_task.health_check()); - snapshots_applier_task + + let recovery_started_at = Instant::now(); + let stats = snapshots_applier_task .run() .await .context("snapshot recovery failed")?; - tracing::info!("Snapshot recovery is complete"); + if stats.done_work { + let latency = recovery_started_at.elapsed(); + APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres] + .set(latency); + tracing::info!("Snapshot recovery is complete in {latency:?}"); + } } } Ok(()) diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index ca0d87d9967..7b3aa3f4453 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -218,6 +218,13 @@ impl SnapshotsApplierConfig { } } +/// Stats returned by [`SnapshotsApplierTask::run()`]. +#[derive(Debug)] +pub struct SnapshotApplierTaskStats { + /// Did the task do any work? + pub done_work: bool, +} + #[derive(Debug)] pub struct SnapshotsApplierTask { config: SnapshotsApplierConfig, @@ -256,7 +263,7 @@ impl SnapshotsApplierTask { /// or under any of the following conditions: /// /// - There are no snapshots on the main node - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self) -> anyhow::Result { tracing::info!("Starting snapshot recovery with config: {:?}", self.config); let mut backoff = self.config.initial_retry_backoff; @@ -272,14 +279,16 @@ impl SnapshotsApplierTask { .await; match result { - Ok(final_status) => { + Ok((strategy, final_status)) => { let health_details = SnapshotsApplierHealthDetails::done(&final_status)?; self.health_updater .update(Health::from(HealthStatus::Ready).with_details(health_details)); // Freeze the health check in the "ready" status, so that the snapshot recovery isn't marked // as "shut down", which would lead to the app considered unhealthy. self.health_updater.freeze(); - return Ok(()); + return Ok(SnapshotApplierTaskStats { + done_work: !matches!(strategy, SnapshotRecoveryStrategy::Completed), + }); } Err(SnapshotsApplierError::Fatal(err)) => { tracing::error!("Fatal error occurred during snapshots recovery: {err:?}"); @@ -305,21 +314,21 @@ impl SnapshotsApplierTask { } /// Strategy determining how snapshot recovery should proceed. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum SnapshotRecoveryStrategy { /// Snapshot recovery should proceed from scratch with the specified params. - New(SnapshotRecoveryStatus), + New, /// Snapshot recovery should continue with the specified params. - Resumed(SnapshotRecoveryStatus), + Resumed, /// Snapshot recovery has already been completed. - Completed(SnapshotRecoveryStatus), + Completed, } impl SnapshotRecoveryStrategy { async fn new( storage: &mut Connection<'_, Core>, main_node_client: &dyn SnapshotsApplierMainNodeClient, - ) -> Result { + ) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> { let latency = METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start(); let applied_snapshot_status = storage @@ -331,12 +340,12 @@ impl SnapshotRecoveryStrategy { let sealed_miniblock_number = storage.blocks_dal().get_sealed_miniblock_number().await?; if sealed_miniblock_number.is_some() { - return Ok(Self::Completed(applied_snapshot_status)); + return Ok((Self::Completed, applied_snapshot_status)); } let latency = latency.observe(); tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}"); - Ok(Self::Resumed(applied_snapshot_status)) + Ok((Self::Resumed, applied_snapshot_status)) } else { let is_genesis_needed = storage.blocks_dal().is_genesis_needed().await?; if !is_genesis_needed { @@ -363,7 +372,7 @@ impl SnapshotRecoveryStrategy { let latency = latency.observe(); tracing::info!("Initialized fresh snapshots applier in {latency:?}"); - Ok(Self::New(recovery_status)) + Ok((Self::New, recovery_status)) } } @@ -458,7 +467,7 @@ impl<'a> SnapshotsApplier<'a> { blob_store: &'a dyn ObjectStore, health_updater: &'a HealthUpdater, max_concurrency: usize, - ) -> Result { + ) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> { // While the recovery is in progress, the node is healthy (no error has occurred), // but is affected (its usual APIs don't work). health_updater.update(HealthStatus::Affected.into()); @@ -468,15 +477,13 @@ impl<'a> SnapshotsApplier<'a> { .await?; let mut storage_transaction = storage.start_transaction().await?; - let strategy = + let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?; - tracing::info!("Chosen snapshot recovery strategy: {strategy:?}"); - let (applied_snapshot_status, created_from_scratch) = match strategy { - SnapshotRecoveryStrategy::Completed(status) => { - return Ok(status); - } - SnapshotRecoveryStrategy::New(status) => (status, true), - SnapshotRecoveryStrategy::Resumed(status) => (status, false), + tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}"); + let created_from_scratch = match strategy { + SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)), + SnapshotRecoveryStrategy::New => true, + SnapshotRecoveryStrategy::Resumed => false, }; let mut this = Self { @@ -538,7 +545,7 @@ impl<'a> SnapshotsApplier<'a> { this.recover_tokens().await?; this.tokens_recovered = true; this.update_health(); - Ok(this.applied_snapshot_status) + Ok((strategy, this.applied_snapshot_status)) } fn update_health(&self) { diff --git a/core/lib/snapshots_applier/src/tests/mod.rs b/core/lib/snapshots_applier/src/tests/mod.rs index 3776d42da1d..70b6f5e69f8 100644 --- a/core/lib/snapshots_applier/src/tests/mod.rs +++ b/core/lib/snapshots_applier/src/tests/mod.rs @@ -12,15 +12,15 @@ use zksync_health_check::CheckHealth; use zksync_object_store::ObjectStoreFactory; use zksync_types::{ api::{BlockDetails, L1BatchDetails}, - block::{L1BatchHeader, MiniblockHeader}, - get_code_key, Address, L1BatchNumber, ProtocolVersion, ProtocolVersionId, + block::L1BatchHeader, + get_code_key, L1BatchNumber, ProtocolVersion, ProtocolVersionId, }; use self::utils::{ - mock_recovery_status, prepare_clients, MockMainNodeClient, ObjectStoreWithErrors, + mock_miniblock_header, mock_recovery_status, mock_snapshot_header, mock_tokens, + prepare_clients, random_storage_logs, MockMainNodeClient, ObjectStoreWithErrors, }; use super::*; -use crate::tests::utils::{mock_snapshot_header, mock_tokens, random_storage_logs}; mod utils; @@ -65,7 +65,8 @@ async fn snapshots_creator_can_successfully_recover_db( object_store.clone(), ); let task_health = task.health_check(); - task.run().await.unwrap(); + let stats = task.run().await.unwrap(); + assert!(stats.done_work); assert_matches!( task_health.check_health().await.status(), HealthStatus::Ready @@ -103,16 +104,39 @@ async fn snapshots_creator_can_successfully_recover_db( assert_eq!(db_log.value, expected_log.value); assert_eq!(db_log.miniblock_number, expected_status.miniblock_number); } - drop(storage); // Try recovering again. let task = SnapshotsApplierTask::new( SnapshotsApplierConfig::for_tests(), - pool, + pool.clone(), + Box::new(client.clone()), + object_store.clone(), + ); + task.run().await.unwrap(); + // Here, stats would unfortunately have `done_work: true` because work detection isn't smart enough. + + // Emulate a node processing data after recovery. + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + let miniblock = mock_miniblock_header(expected_status.miniblock_number + 1); + storage + .blocks_dal() + .insert_miniblock(&miniblock) + .await + .unwrap(); + drop(storage); + + let task = SnapshotsApplierTask::new( + SnapshotsApplierConfig::for_tests(), + pool.clone(), Box::new(client), object_store, ); - task.run().await.unwrap(); + let stats = task.run().await.unwrap(); + assert!(!stats.done_work); } #[tokio::test] @@ -183,21 +207,7 @@ async fn applier_errors_after_genesis() { .save_protocol_version_with_tx(&ProtocolVersion::default()) .await .unwrap(); - let genesis_miniblock = MiniblockHeader { - number: MiniblockNumber(0), - timestamp: 0, - hash: H256::zero(), - l1_tx_count: 0, - l2_tx_count: 0, - fee_account_address: Address::repeat_byte(1), - base_fee_per_gas: 1, - batch_fee_input: Default::default(), - gas_per_pubdata_limit: 2, - base_system_contracts_hashes: Default::default(), - protocol_version: Some(ProtocolVersionId::latest()), - virtual_blocks: 0, - gas_limit: 0, - }; + let genesis_miniblock = mock_miniblock_header(MiniblockNumber(0)); storage .blocks_dal() .insert_miniblock(&genesis_miniblock) diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index 509d55cfd67..b0f582d494b 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory}; use zksync_types::{ api, + block::MiniblockHeader, snapshots::{ SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader, SnapshotRecoveryStatus, SnapshotStorageLog, SnapshotStorageLogsChunk, @@ -105,6 +106,24 @@ impl ObjectStore for ObjectStoreWithErrors { } } +pub(super) fn mock_miniblock_header(miniblock_number: MiniblockNumber) -> MiniblockHeader { + MiniblockHeader { + number: miniblock_number, + timestamp: 0, + hash: H256::from_low_u64_be(u64::from(miniblock_number.0)), + l1_tx_count: 0, + l2_tx_count: 0, + fee_account_address: Address::repeat_byte(1), + base_fee_per_gas: 0, + gas_per_pubdata_limit: 0, + batch_fee_input: Default::default(), + base_system_contracts_hashes: Default::default(), + protocol_version: Some(Default::default()), + virtual_blocks: 0, + gas_limit: 0, + } +} + fn block_details_base(hash: H256) -> api::BlockDetailsBase { api::BlockDetailsBase { timestamp: 0, diff --git a/core/node/shared_metrics/src/lib.rs b/core/node/shared_metrics/src/lib.rs index e549e34ad95..ae2ee564300 100644 --- a/core/node/shared_metrics/src/lib.rs +++ b/core/node/shared_metrics/src/lib.rs @@ -2,10 +2,20 @@ use std::{fmt, time::Duration}; -use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics}; +use vise::{ + Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit, +}; use zksync_dal::transactions_dal::L2TxSubmissionResult; use zksync_types::aggregated_operations::AggregatedActionType; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub enum SnapshotRecoveryStage { + Postgres, + Tree, + StateKeeperCache, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage")] pub enum InitStage { @@ -127,6 +137,9 @@ pub enum MiniblockStage { #[derive(Debug, Metrics)] #[metrics(prefix = "server")] pub struct AppMetrics { + /// Latency to perform a certain stage of the snapshot recovery. + #[metrics(unit = Unit::Seconds)] + pub snapshot_recovery_latency: Family>, /// Latency to initialize a specific server component. pub init_latency: Family>, pub block_number: Family>, From 80e3d75eff80b2b7535814d41a2ba5c41ba84a0c Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 18 Apr 2024 13:30:10 +0300 Subject: [PATCH 2/3] Track overall recovery latency (RocksDB) --- core/lib/state/src/rocksdb/mod.rs | 33 +++++++++++++++++-- core/lib/state/src/rocksdb/recovery.rs | 15 ++++++--- core/lib/state/src/rocksdb/tests.rs | 25 ++++++++++++-- .../src/metadata_calculator/recovery/mod.rs | 14 ++++++-- .../src/state_keeper/state_keeper_storage.rs | 27 ++++++++++----- 5 files changed, 95 insertions(+), 19 deletions(-) diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index cb8c3bfd30c..60ca3f269ff 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -35,9 +35,9 @@ use zksync_storage::{db::NamedColumnFamily, RocksDB}; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256, U256}; use zksync_utils::{h256_to_u256, u256_to_h256}; -use self::metrics::METRICS; #[cfg(test)] use self::tests::RocksdbStorageEventListener; +use self::{metrics::METRICS, recovery::Strategy}; use crate::{InMemoryStorage, ReadStorage}; mod metrics; @@ -168,6 +168,35 @@ impl RocksdbStorageBuilder { self.0.l1_batch_number().await } + /// Ensures that the storage is ready to process L1 batches (i.e., has completed snapshot recovery). + /// + /// # Return value + /// + /// Returns a flag indicating whether snapshot recovery was performed. + /// + /// # Errors + /// + /// Returns I/O RocksDB and Postgres errors. + pub async fn ensure_ready( + &mut self, + storage: &mut Connection<'_, Core>, + stop_receiver: &watch::Receiver, + ) -> anyhow::Result { + let ready_result = self + .0 + .ensure_ready( + storage, + RocksdbStorage::DESIRED_LOG_CHUNK_SIZE, + stop_receiver, + ) + .await; + match ready_result { + Ok((strategy, _)) => Ok(matches!(strategy, Strategy::Recovery)), + Err(RocksdbSyncError::Interrupted) => Ok(false), + Err(RocksdbSyncError::Internal(err)) => Err(err), + } + } + /// Synchronizes this storage with Postgres using the provided connection. /// /// # Return value @@ -248,7 +277,7 @@ impl RocksdbStorage { storage: &mut Connection<'_, Core>, stop_receiver: &watch::Receiver, ) -> Result<(), RocksdbSyncError> { - let mut current_l1_batch_number = self + let (_, mut current_l1_batch_number) = self .ensure_ready(storage, Self::DESIRED_LOG_CHUNK_SIZE, stop_receiver) .await?; diff --git a/core/lib/state/src/rocksdb/recovery.rs b/core/lib/state/src/rocksdb/recovery.rs index 5909ce84cfe..6686196bfdc 100644 --- a/core/lib/state/src/rocksdb/recovery.rs +++ b/core/lib/state/src/rocksdb/recovery.rs @@ -15,6 +15,13 @@ use super::{ RocksdbStorage, RocksdbSyncError, StateValue, }; +#[derive(Debug)] +pub(super) enum Strategy { + Complete, + Recovery, + Genesis, +} + #[derive(Debug)] struct KeyChunk { id: u64, @@ -33,9 +40,9 @@ impl RocksdbStorage { storage: &mut Connection<'_, Core>, desired_log_chunk_size: u64, stop_receiver: &watch::Receiver, - ) -> Result { + ) -> Result<(Strategy, L1BatchNumber), RocksdbSyncError> { if let Some(number) = self.l1_batch_number().await { - return Ok(number); + return Ok((Strategy::Complete, number)); } // Check whether we need to perform a snapshot migration. @@ -52,10 +59,10 @@ impl RocksdbStorage { stop_receiver, ) .await?; - snapshot_recovery.l1_batch_number + 1 + (Strategy::Recovery, snapshot_recovery.l1_batch_number + 1) } else { // No recovery snapshot; we're initializing the cache from the genesis - L1BatchNumber(0) + (Strategy::Genesis, L1BatchNumber(0)) }) } diff --git a/core/lib/state/src/rocksdb/tests.rs b/core/lib/state/src/rocksdb/tests.rs index d280b67e3c0..a31a0a96412 100644 --- a/core/lib/state/src/rocksdb/tests.rs +++ b/core/lib/state/src/rocksdb/tests.rs @@ -88,6 +88,24 @@ async fn sync_test_storage(dir: &TempDir, conn: &mut Connection<'_, Core>) -> Ro .expect("Storage synchronization unexpectedly stopped") } +async fn sync_test_storage_and_check_recovery( + dir: &TempDir, + conn: &mut Connection<'_, Core>, + expect_recovery: bool, +) -> RocksdbStorage { + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut builder = RocksdbStorage::builder(dir.path()) + .await + .expect("Failed initializing RocksDB"); + let was_recovered = builder.ensure_ready(conn, &stop_receiver).await.unwrap(); + assert_eq!(was_recovered, expect_recovery); + builder + .synchronize(conn, &stop_receiver) + .await + .unwrap() + .expect("Storage synchronization unexpectedly stopped") +} + #[tokio::test] async fn rocksdb_storage_syncing_with_postgres() { let pool = ConnectionPool::::test_pool().await; @@ -336,7 +354,7 @@ async fn low_level_snapshot_recovery(log_chunk_size: u64) { let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); let mut storage = RocksdbStorage::new(dir.path().into()).await.unwrap(); let (_stop_sender, stop_receiver) = watch::channel(false); - let next_l1_batch = storage + let (_, next_l1_batch) = storage .ensure_ready(&mut conn, log_chunk_size, &stop_receiver) .await .unwrap(); @@ -425,7 +443,7 @@ async fn recovering_from_snapshot_and_following_logs() { create_l1_batch(&mut conn, snapshot_recovery.l1_batch_number + 2, &[]).await; let dir = TempDir::new().expect("cannot create temporary dir for state keeper"); - let mut storage = sync_test_storage(&dir, &mut conn).await; + let mut storage = sync_test_storage_and_check_recovery(&dir, &mut conn, true).await; for (i, log) in new_storage_logs.iter().enumerate() { assert_eq!(storage.read_value(&log.key), log.value); @@ -450,6 +468,9 @@ async fn recovering_from_snapshot_and_following_logs() { ); assert!(!storage.is_write_initial(&log.key)); } + + drop(storage); + sync_test_storage_and_check_recovery(&dir, &mut conn, false).await; } #[tokio::test] diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index b95d00804ba..b0b1b17d02f 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -38,6 +38,7 @@ use tokio::sync::{watch, Mutex, Semaphore}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_health_check::HealthUpdater; use zksync_merkle_tree::TreeEntry; +use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; use zksync_types::{ snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, MiniblockNumber, H256, @@ -168,6 +169,7 @@ impl GenericAsyncTree { stop_receiver: &watch::Receiver, health_updater: &HealthUpdater, ) -> anyhow::Result> { + let started_at = Instant::now(); let (tree, snapshot_recovery) = match self { Self::Ready(tree) => return Ok(Some(tree)), Self::Recovering(tree) => { @@ -205,8 +207,16 @@ impl GenericAsyncTree { concurrency_limit: recovery_pool.max_size() as usize, events: Box::new(RecoveryHealthUpdater::new(health_updater)), }; - tree.recover(snapshot, recovery_options, &recovery_pool, stop_receiver) - .await + let tree = tree + .recover(snapshot, recovery_options, &recovery_pool, stop_receiver) + .await?; + if tree.is_some() { + // Only report latency if recovery wasn't cancelled + let elapsed = started_at.elapsed(); + APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Tree].set(elapsed); + tracing::info!("Recovered Merkle tree in {elapsed:?}"); + } + Ok(tree) } } diff --git a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs index 0cb6f2b0001..7e68fde7b57 100644 --- a/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs +++ b/core/lib/zksync_core/src/state_keeper/state_keeper_storage.rs @@ -1,10 +1,11 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc, time::Instant}; use anyhow::Context; use async_trait::async_trait; use once_cell::sync::OnceCell; use tokio::{runtime::Handle, sync::watch}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; +use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS}; use zksync_state::{ PostgresStorage, ReadStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily, }; @@ -209,17 +210,25 @@ pub struct AsyncCatchupTask { impl AsyncCatchupTask { pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + let started_at = Instant::now(); tracing::debug!("Catching up RocksDB asynchronously"); - let mut rocksdb_builder: RocksdbStorageBuilder = - RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) - .await - .context("Failed initializing RocksDB storage")?; + + let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref()) + .await + .context("Failed creating RocksDB storage builder")?; rocksdb_builder.enable_enum_index_migration(self.enum_index_migration_chunk_size); - let mut connection = self - .pool - .connection() + let mut connection = self.pool.connection().await?; + let was_recovered_from_snapshot = rocksdb_builder + .ensure_ready(&mut connection, &stop_receiver) .await - .context("Failed accessing Postgres storage")?; + .context("failed initializing state keeper RocksDB from snapshot or scratch")?; + if was_recovered_from_snapshot { + let elapsed = started_at.elapsed(); + APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache] + .set(elapsed); + tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}"); + } + let rocksdb = rocksdb_builder .synchronize(&mut connection, &stop_receiver) .await From 9168a2d44394615fb6f4cdb42ff1eea0afdc1db2 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 18 Apr 2024 13:41:10 +0300 Subject: [PATCH 3/3] Unify log format --- core/bin/external_node/src/init.rs | 2 +- core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/bin/external_node/src/init.rs b/core/bin/external_node/src/init.rs index 78846659ba1..9cb5fb81c18 100644 --- a/core/bin/external_node/src/init.rs +++ b/core/bin/external_node/src/init.rs @@ -109,7 +109,7 @@ pub(crate) async fn ensure_storage_initialized( let latency = recovery_started_at.elapsed(); APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres] .set(latency); - tracing::info!("Snapshot recovery is complete in {latency:?}"); + tracing::info!("Recovered Postgres from snapshot in {latency:?}"); } } } diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index b0b1b17d02f..898c6da6fc6 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -211,10 +211,10 @@ impl GenericAsyncTree { .recover(snapshot, recovery_options, &recovery_pool, stop_receiver) .await?; if tree.is_some() { - // Only report latency if recovery wasn't cancelled + // Only report latency if recovery wasn't canceled let elapsed = started_at.elapsed(); APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Tree].set(elapsed); - tracing::info!("Recovered Merkle tree in {elapsed:?}"); + tracing::info!("Recovered Merkle tree from snapshot in {elapsed:?}"); } Ok(tree) }