Skip to content

Commit

Permalink
perf(en): Monitor recovery latency by stage (#1725)
Browse files Browse the repository at this point in the history
## What ❔

Logs and exposes as gauges latency of recovery stages (Postgres, Merkle
tree, state keeper).

## Why ❔

Necessary for monitoring recovery performance.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Apr 22, 2024
1 parent 945e43c commit d7efdd5
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 71 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/external_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ zksync_health_check.workspace = true
zksync_web3_decl.workspace = true
zksync_types.workspace = true
zksync_block_reverter.workspace = true
zksync_shared_metrics.workspace = true
vlog.workspace = true

zksync_concurrency.workspace = true
Expand Down
14 changes: 12 additions & 2 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! EN initialization logic.

use std::time::Instant;

use anyhow::Context as _;
use zksync_basic_types::{L1BatchNumber, L2ChainId};
use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_web3_decl::client::BoxedL2Client;

Expand Down Expand Up @@ -96,11 +99,18 @@ pub(crate) async fn ensure_storage_initialized(
blob_store,
);
app_health.insert_component(snapshots_applier_task.health_check());
snapshots_applier_task

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run()
.await
.context("snapshot recovery failed")?;
tracing::info!("Snapshot recovery is complete");
if stats.done_work {
let latency = recovery_started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres]
.set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
}
}
Ok(())
Expand Down
57 changes: 32 additions & 25 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SnapshotsApplierHealthDetails {
fn done(status: &SnapshotRecoveryStatus) -> anyhow::Result<Self> {
if status.storage_logs_chunks_left_to_process() != 0 {
anyhow::bail!(
"Inconsistent Postgres state: there are miniblocks, but the snapshot recovery status \
"Inconsistent Postgres state: there are L2 blocks, but the snapshot recovery status \
contains unprocessed storage log chunks: {status:?}"
);
}
Expand Down Expand Up @@ -218,6 +218,13 @@ impl SnapshotsApplierConfig {
}
}

/// Stats returned by [`SnapshotsApplierTask::run()`].
#[derive(Debug)]
pub struct SnapshotApplierTaskStats {
/// Did the task do any work?
pub done_work: bool,
}

#[derive(Debug)]
pub struct SnapshotsApplierTask {
config: SnapshotsApplierConfig,
Expand Down Expand Up @@ -256,7 +263,7 @@ impl SnapshotsApplierTask {
/// or under any of the following conditions:
///
/// - There are no snapshots on the main node
pub async fn run(self) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<SnapshotApplierTaskStats> {
tracing::info!("Starting snapshot recovery with config: {:?}", self.config);

let mut backoff = self.config.initial_retry_backoff;
Expand All @@ -272,14 +279,16 @@ impl SnapshotsApplierTask {
.await;

match result {
Ok(final_status) => {
Ok((strategy, final_status)) => {
let health_details = SnapshotsApplierHealthDetails::done(&final_status)?;
self.health_updater
.update(Health::from(HealthStatus::Ready).with_details(health_details));
// Freeze the health check in the "ready" status, so that the snapshot recovery isn't marked
// as "shut down", which would lead to the app considered unhealthy.
self.health_updater.freeze();
return Ok(());
return Ok(SnapshotApplierTaskStats {
done_work: !matches!(strategy, SnapshotRecoveryStrategy::Completed),
});
}
Err(SnapshotsApplierError::Fatal(err)) => {
tracing::error!("Fatal error occurred during snapshots recovery: {err:?}");
Expand All @@ -305,21 +314,21 @@ impl SnapshotsApplierTask {
}

/// Strategy determining how snapshot recovery should proceed.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
enum SnapshotRecoveryStrategy {
/// Snapshot recovery should proceed from scratch with the specified params.
New(SnapshotRecoveryStatus),
New,
/// Snapshot recovery should continue with the specified params.
Resumed(SnapshotRecoveryStatus),
Resumed,
/// Snapshot recovery has already been completed.
Completed(SnapshotRecoveryStatus),
Completed,
}

impl SnapshotRecoveryStrategy {
async fn new(
storage: &mut Connection<'_, Core>,
main_node_client: &dyn SnapshotsApplierMainNodeClient,
) -> Result<Self, SnapshotsApplierError> {
) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> {
let latency =
METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start();
let applied_snapshot_status = storage
Expand All @@ -328,14 +337,14 @@ impl SnapshotRecoveryStrategy {
.await?;

if let Some(applied_snapshot_status) = applied_snapshot_status {
let sealed_miniblock_number = storage.blocks_dal().get_sealed_l2_block_number().await?;
if sealed_miniblock_number.is_some() {
return Ok(Self::Completed(applied_snapshot_status));
let sealed_l2_block_number = storage.blocks_dal().get_sealed_l2_block_number().await?;
if sealed_l2_block_number.is_some() {
return Ok((Self::Completed, applied_snapshot_status));
}

let latency = latency.observe();
tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}");
Ok(Self::Resumed(applied_snapshot_status))
Ok((Self::Resumed, applied_snapshot_status))
} else {
let is_genesis_needed = storage.blocks_dal().is_genesis_needed().await?;
if !is_genesis_needed {
Expand All @@ -362,7 +371,7 @@ impl SnapshotRecoveryStrategy {

let latency = latency.observe();
tracing::info!("Initialized fresh snapshots applier in {latency:?}");
Ok(Self::New(recovery_status))
Ok((Self::New, recovery_status))
}
}

Expand All @@ -376,7 +385,7 @@ impl SnapshotRecoveryStrategy {
let l1_batch_number = snapshot.l1_batch_number;
let l2_block_number = snapshot.l2_block_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, miniblock #{l2_block_number}, \
"Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \
version {version}, storage logs are divided into {chunk_count} chunk(s)",
version = snapshot.version,
chunk_count = snapshot.storage_logs_chunks.len()
Expand Down Expand Up @@ -457,7 +466,7 @@ impl<'a> SnapshotsApplier<'a> {
blob_store: &'a dyn ObjectStore,
health_updater: &'a HealthUpdater,
max_concurrency: usize,
) -> Result<SnapshotRecoveryStatus, SnapshotsApplierError> {
) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> {
// While the recovery is in progress, the node is healthy (no error has occurred),
// but is affected (its usual APIs don't work).
health_updater.update(HealthStatus::Affected.into());
Expand All @@ -467,15 +476,13 @@ impl<'a> SnapshotsApplier<'a> {
.await?;
let mut storage_transaction = storage.start_transaction().await?;

let strategy =
let (strategy, applied_snapshot_status) =
SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?;
tracing::info!("Chosen snapshot recovery strategy: {strategy:?}");
let (applied_snapshot_status, created_from_scratch) = match strategy {
SnapshotRecoveryStrategy::Completed(status) => {
return Ok(status);
}
SnapshotRecoveryStrategy::New(status) => (status, true),
SnapshotRecoveryStrategy::Resumed(status) => (status, false),
tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}");
let created_from_scratch = match strategy {
SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)),
SnapshotRecoveryStrategy::New => true,
SnapshotRecoveryStrategy::Resumed => false,
};

let mut this = Self {
Expand Down Expand Up @@ -537,7 +544,7 @@ impl<'a> SnapshotsApplier<'a> {
this.recover_tokens().await?;
this.tokens_recovered = true;
this.update_health();
Ok(this.applied_snapshot_status)
Ok((strategy, this.applied_snapshot_status))
}

fn update_health(&self) {
Expand Down
58 changes: 34 additions & 24 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use zksync_health_check::CheckHealth;
use zksync_object_store::ObjectStoreFactory;
use zksync_types::{
api::{BlockDetails, L1BatchDetails},
block::{L1BatchHeader, L2BlockHeader},
get_code_key, Address, L1BatchNumber, ProtocolVersion, ProtocolVersionId,
block::L1BatchHeader,
get_code_key, L1BatchNumber, ProtocolVersion, ProtocolVersionId,
};

use self::utils::{
mock_recovery_status, prepare_clients, MockMainNodeClient, ObjectStoreWithErrors,
mock_l2_block_header, mock_recovery_status, mock_snapshot_header, mock_tokens, prepare_clients,
random_storage_logs, MockMainNodeClient, ObjectStoreWithErrors,
};
use super::*;
use crate::tests::utils::{mock_snapshot_header, mock_tokens, random_storage_logs};

mod utils;

Expand Down Expand Up @@ -65,7 +65,8 @@ async fn snapshots_creator_can_successfully_recover_db(
object_store.clone(),
);
let task_health = task.health_check();
task.run().await.unwrap();
let stats = task.run().await.unwrap();
assert!(stats.done_work);
assert_matches!(
task_health.check_health().await.status(),
HealthStatus::Ready
Expand Down Expand Up @@ -103,16 +104,39 @@ async fn snapshots_creator_can_successfully_recover_db(
assert_eq!(db_log.value, expected_log.value);
assert_eq!(db_log.l2_block_number, expected_status.l2_block_number);
}
drop(storage);

// Try recovering again.
let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool,
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
);
task.run().await.unwrap();
// Here, stats would unfortunately have `done_work: true` because work detection isn't smart enough.

// Emulate a node processing data after recovery.
storage
.protocol_versions_dal()
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
let l2_block = mock_l2_block_header(expected_status.l2_block_number + 1);
storage
.blocks_dal()
.insert_l2_block(&l2_block)
.await
.unwrap();
drop(storage);

let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool.clone(),
Box::new(client),
object_store,
);
task.run().await.unwrap();
let stats = task.run().await.unwrap();
assert!(!stats.done_work);
}

#[tokio::test]
Expand Down Expand Up @@ -145,7 +169,7 @@ async fn health_status_immediately_after_task_start() {

async fn fetch_tokens(
&self,
_at_miniblock: L2BlockNumber,
_at_l2_block: L2BlockNumber,
) -> EnrichedClientResult<Vec<TokenInfo>> {
self.0.wait().await;
future::pending().await
Expand Down Expand Up @@ -183,21 +207,7 @@ async fn applier_errors_after_genesis() {
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
let genesis_l2_block = L2BlockHeader {
number: L2BlockNumber(0),
timestamp: 0,
hash: H256::zero(),
l1_tx_count: 0,
l2_tx_count: 0,
fee_account_address: Address::repeat_byte(1),
base_fee_per_gas: 1,
batch_fee_input: Default::default(),
gas_per_pubdata_limit: 2,
base_system_contracts_hashes: Default::default(),
protocol_version: Some(ProtocolVersionId::latest()),
virtual_blocks: 0,
gas_limit: 0,
};
let genesis_l2_block = mock_l2_block_header(L2BlockNumber(0));
storage
.blocks_dal()
.insert_l2_block(&genesis_l2_block)
Expand Down
19 changes: 19 additions & 0 deletions core/lib/snapshots_applier/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_trait::async_trait;
use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory};
use zksync_types::{
api,
block::L2BlockHeader,
snapshots::{
SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader,
SnapshotRecoveryStatus, SnapshotStorageLog, SnapshotStorageLogsChunk,
Expand Down Expand Up @@ -105,6 +106,24 @@ impl ObjectStore for ObjectStoreWithErrors {
}
}

pub(super) fn mock_l2_block_header(l2_block_number: L2BlockNumber) -> L2BlockHeader {
L2BlockHeader {
number: l2_block_number,
timestamp: 0,
hash: H256::from_low_u64_be(u64::from(l2_block_number.0)),
l1_tx_count: 0,
l2_tx_count: 0,
fee_account_address: Address::repeat_byte(1),
base_fee_per_gas: 0,
gas_per_pubdata_limit: 0,
batch_fee_input: Default::default(),
base_system_contracts_hashes: Default::default(),
protocol_version: Some(Default::default()),
virtual_blocks: 0,
gas_limit: 0,
}
}

fn block_details_base(hash: H256) -> api::BlockDetailsBase {
api::BlockDetailsBase {
timestamp: 0,
Expand Down
Loading

0 comments on commit d7efdd5

Please sign in to comment.