Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(en): Monitor recovery latency by stage #1725

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/external_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ zksync_health_check.workspace = true
zksync_web3_decl.workspace = true
zksync_types.workspace = true
zksync_block_reverter.workspace = true
zksync_shared_metrics.workspace = true
vlog.workspace = true

zksync_concurrency.workspace = true
Expand Down
14 changes: 12 additions & 2 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//! EN initialization logic.

use std::time::Instant;

use anyhow::Context as _;
use zksync_basic_types::{L1BatchNumber, L2ChainId};
use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_web3_decl::client::BoxedL2Client;

Expand Down Expand Up @@ -96,11 +99,18 @@ pub(crate) async fn ensure_storage_initialized(
blob_store,
);
app_health.insert_component(snapshots_applier_task.health_check());
snapshots_applier_task

let recovery_started_at = Instant::now();
let stats = snapshots_applier_task
.run()
.await
.context("snapshot recovery failed")?;
tracing::info!("Snapshot recovery is complete");
if stats.done_work {
let latency = recovery_started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::Postgres]
.set(latency);
tracing::info!("Recovered Postgres from snapshot in {latency:?}");
}
}
}
Ok(())
Expand Down
57 changes: 32 additions & 25 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl SnapshotsApplierHealthDetails {
fn done(status: &SnapshotRecoveryStatus) -> anyhow::Result<Self> {
if status.storage_logs_chunks_left_to_process() != 0 {
anyhow::bail!(
"Inconsistent Postgres state: there are miniblocks, but the snapshot recovery status \
"Inconsistent Postgres state: there are L2 blocks, but the snapshot recovery status \
contains unprocessed storage log chunks: {status:?}"
);
}
Expand Down Expand Up @@ -218,6 +218,13 @@ impl SnapshotsApplierConfig {
}
}

/// Stats returned by [`SnapshotsApplierTask::run()`].
#[derive(Debug)]
pub struct SnapshotApplierTaskStats {
/// Did the task do any work?
pub done_work: bool,
}

#[derive(Debug)]
pub struct SnapshotsApplierTask {
config: SnapshotsApplierConfig,
Expand Down Expand Up @@ -256,7 +263,7 @@ impl SnapshotsApplierTask {
/// or under any of the following conditions:
///
/// - There are no snapshots on the main node
pub async fn run(self) -> anyhow::Result<()> {
pub async fn run(self) -> anyhow::Result<SnapshotApplierTaskStats> {
tracing::info!("Starting snapshot recovery with config: {:?}", self.config);

let mut backoff = self.config.initial_retry_backoff;
Expand All @@ -272,14 +279,16 @@ impl SnapshotsApplierTask {
.await;

match result {
Ok(final_status) => {
Ok((strategy, final_status)) => {
let health_details = SnapshotsApplierHealthDetails::done(&final_status)?;
self.health_updater
.update(Health::from(HealthStatus::Ready).with_details(health_details));
// Freeze the health check in the "ready" status, so that the snapshot recovery isn't marked
// as "shut down", which would lead to the app considered unhealthy.
self.health_updater.freeze();
return Ok(());
return Ok(SnapshotApplierTaskStats {
done_work: !matches!(strategy, SnapshotRecoveryStrategy::Completed),
});
}
Err(SnapshotsApplierError::Fatal(err)) => {
tracing::error!("Fatal error occurred during snapshots recovery: {err:?}");
Expand All @@ -305,21 +314,21 @@ impl SnapshotsApplierTask {
}

/// Strategy determining how snapshot recovery should proceed.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
enum SnapshotRecoveryStrategy {
/// Snapshot recovery should proceed from scratch with the specified params.
New(SnapshotRecoveryStatus),
New,
/// Snapshot recovery should continue with the specified params.
Resumed(SnapshotRecoveryStatus),
Resumed,
/// Snapshot recovery has already been completed.
Completed(SnapshotRecoveryStatus),
Completed,
}

impl SnapshotRecoveryStrategy {
async fn new(
storage: &mut Connection<'_, Core>,
main_node_client: &dyn SnapshotsApplierMainNodeClient,
) -> Result<Self, SnapshotsApplierError> {
) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> {
let latency =
METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start();
let applied_snapshot_status = storage
Expand All @@ -328,14 +337,14 @@ impl SnapshotRecoveryStrategy {
.await?;

if let Some(applied_snapshot_status) = applied_snapshot_status {
let sealed_miniblock_number = storage.blocks_dal().get_sealed_l2_block_number().await?;
if sealed_miniblock_number.is_some() {
return Ok(Self::Completed(applied_snapshot_status));
let sealed_l2_block_number = storage.blocks_dal().get_sealed_l2_block_number().await?;
if sealed_l2_block_number.is_some() {
return Ok((Self::Completed, applied_snapshot_status));
}

let latency = latency.observe();
tracing::info!("Re-initialized snapshots applier after reset/failure in {latency:?}");
Ok(Self::Resumed(applied_snapshot_status))
Ok((Self::Resumed, applied_snapshot_status))
} else {
let is_genesis_needed = storage.blocks_dal().is_genesis_needed().await?;
if !is_genesis_needed {
Expand All @@ -362,7 +371,7 @@ impl SnapshotRecoveryStrategy {

let latency = latency.observe();
tracing::info!("Initialized fresh snapshots applier in {latency:?}");
Ok(Self::New(recovery_status))
Ok((Self::New, recovery_status))
}
}

Expand All @@ -376,7 +385,7 @@ impl SnapshotRecoveryStrategy {
let l1_batch_number = snapshot.l1_batch_number;
let l2_block_number = snapshot.l2_block_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, miniblock #{l2_block_number}, \
"Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \
version {version}, storage logs are divided into {chunk_count} chunk(s)",
version = snapshot.version,
chunk_count = snapshot.storage_logs_chunks.len()
Expand Down Expand Up @@ -457,7 +466,7 @@ impl<'a> SnapshotsApplier<'a> {
blob_store: &'a dyn ObjectStore,
health_updater: &'a HealthUpdater,
max_concurrency: usize,
) -> Result<SnapshotRecoveryStatus, SnapshotsApplierError> {
) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> {
// While the recovery is in progress, the node is healthy (no error has occurred),
// but is affected (its usual APIs don't work).
health_updater.update(HealthStatus::Affected.into());
Expand All @@ -467,15 +476,13 @@ impl<'a> SnapshotsApplier<'a> {
.await?;
let mut storage_transaction = storage.start_transaction().await?;

let strategy =
let (strategy, applied_snapshot_status) =
SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?;
tracing::info!("Chosen snapshot recovery strategy: {strategy:?}");
let (applied_snapshot_status, created_from_scratch) = match strategy {
SnapshotRecoveryStrategy::Completed(status) => {
return Ok(status);
}
SnapshotRecoveryStrategy::New(status) => (status, true),
SnapshotRecoveryStrategy::Resumed(status) => (status, false),
tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}");
let created_from_scratch = match strategy {
SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)),
SnapshotRecoveryStrategy::New => true,
SnapshotRecoveryStrategy::Resumed => false,
};

let mut this = Self {
Expand Down Expand Up @@ -537,7 +544,7 @@ impl<'a> SnapshotsApplier<'a> {
this.recover_tokens().await?;
this.tokens_recovered = true;
this.update_health();
Ok(this.applied_snapshot_status)
Ok((strategy, this.applied_snapshot_status))
}

fn update_health(&self) {
Expand Down
58 changes: 34 additions & 24 deletions core/lib/snapshots_applier/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ use zksync_health_check::CheckHealth;
use zksync_object_store::ObjectStoreFactory;
use zksync_types::{
api::{BlockDetails, L1BatchDetails},
block::{L1BatchHeader, L2BlockHeader},
get_code_key, Address, L1BatchNumber, ProtocolVersion, ProtocolVersionId,
block::L1BatchHeader,
get_code_key, L1BatchNumber, ProtocolVersion, ProtocolVersionId,
};

use self::utils::{
mock_recovery_status, prepare_clients, MockMainNodeClient, ObjectStoreWithErrors,
mock_l2_block_header, mock_recovery_status, mock_snapshot_header, mock_tokens, prepare_clients,
random_storage_logs, MockMainNodeClient, ObjectStoreWithErrors,
};
use super::*;
use crate::tests::utils::{mock_snapshot_header, mock_tokens, random_storage_logs};

mod utils;

Expand Down Expand Up @@ -65,7 +65,8 @@ async fn snapshots_creator_can_successfully_recover_db(
object_store.clone(),
);
let task_health = task.health_check();
task.run().await.unwrap();
let stats = task.run().await.unwrap();
assert!(stats.done_work);
assert_matches!(
task_health.check_health().await.status(),
HealthStatus::Ready
Expand Down Expand Up @@ -103,16 +104,39 @@ async fn snapshots_creator_can_successfully_recover_db(
assert_eq!(db_log.value, expected_log.value);
assert_eq!(db_log.l2_block_number, expected_status.l2_block_number);
}
drop(storage);

// Try recovering again.
let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool,
pool.clone(),
Box::new(client.clone()),
object_store.clone(),
);
task.run().await.unwrap();
// Here, stats would unfortunately have `done_work: true` because work detection isn't smart enough.

// Emulate a node processing data after recovery.
storage
.protocol_versions_dal()
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
let l2_block = mock_l2_block_header(expected_status.l2_block_number + 1);
storage
.blocks_dal()
.insert_l2_block(&l2_block)
.await
.unwrap();
drop(storage);

let task = SnapshotsApplierTask::new(
SnapshotsApplierConfig::for_tests(),
pool.clone(),
Box::new(client),
object_store,
);
task.run().await.unwrap();
let stats = task.run().await.unwrap();
assert!(!stats.done_work);
}

#[tokio::test]
Expand Down Expand Up @@ -145,7 +169,7 @@ async fn health_status_immediately_after_task_start() {

async fn fetch_tokens(
&self,
_at_miniblock: L2BlockNumber,
_at_l2_block: L2BlockNumber,
) -> EnrichedClientResult<Vec<TokenInfo>> {
self.0.wait().await;
future::pending().await
Expand Down Expand Up @@ -183,21 +207,7 @@ async fn applier_errors_after_genesis() {
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
let genesis_l2_block = L2BlockHeader {
number: L2BlockNumber(0),
timestamp: 0,
hash: H256::zero(),
l1_tx_count: 0,
l2_tx_count: 0,
fee_account_address: Address::repeat_byte(1),
base_fee_per_gas: 1,
batch_fee_input: Default::default(),
gas_per_pubdata_limit: 2,
base_system_contracts_hashes: Default::default(),
protocol_version: Some(ProtocolVersionId::latest()),
virtual_blocks: 0,
gas_limit: 0,
};
let genesis_l2_block = mock_l2_block_header(L2BlockNumber(0));
storage
.blocks_dal()
.insert_l2_block(&genesis_l2_block)
Expand Down
19 changes: 19 additions & 0 deletions core/lib/snapshots_applier/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_trait::async_trait;
use zksync_object_store::{Bucket, ObjectStore, ObjectStoreError, ObjectStoreFactory};
use zksync_types::{
api,
block::L2BlockHeader,
snapshots::{
SnapshotFactoryDependencies, SnapshotFactoryDependency, SnapshotHeader,
SnapshotRecoveryStatus, SnapshotStorageLog, SnapshotStorageLogsChunk,
Expand Down Expand Up @@ -105,6 +106,24 @@ impl ObjectStore for ObjectStoreWithErrors {
}
}

pub(super) fn mock_l2_block_header(l2_block_number: L2BlockNumber) -> L2BlockHeader {
L2BlockHeader {
number: l2_block_number,
timestamp: 0,
hash: H256::from_low_u64_be(u64::from(l2_block_number.0)),
l1_tx_count: 0,
l2_tx_count: 0,
fee_account_address: Address::repeat_byte(1),
base_fee_per_gas: 0,
gas_per_pubdata_limit: 0,
batch_fee_input: Default::default(),
base_system_contracts_hashes: Default::default(),
protocol_version: Some(Default::default()),
virtual_blocks: 0,
gas_limit: 0,
}
}

fn block_details_base(hash: H256) -> api::BlockDetailsBase {
api::BlockDetailsBase {
timestamp: 0,
Expand Down