Skip to content

Commit

Permalink
fix(en): Fix miscellaneous snapshot recovery nits (#1701)
Browse files Browse the repository at this point in the history
## What ❔

- Allows to configure concurrency during DB recovery and restricts it to
10 by default.
- Sets the "affected" health status while the recovery is in progress.

## Why ❔

- DB recovery is very I/O-heavy, so having concurrency less than the DB
pool size can utilize node resources more efficiently.
- "Affected" health status is more idiomatic; the node with such a
status still passes readiness checks, but it signals that it isn't fully
functional.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Apr 18, 2024
1 parent d28d45d commit 13bfecc
Show file tree
Hide file tree
Showing 33 changed files with 535 additions and 233 deletions.
25 changes: 24 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -87,6 +87,7 @@ clap = "4.2.2"
codegen = "0.2.0"
criterion = "0.4.0"
ctrlc = "3.1"
derive_more = "=1.0.0-beta.6"
envy = "0.4"
ethabi = "18.0.0"
flate2 = "1.0.28"
Expand Down
10 changes: 10 additions & 0 deletions core/bin/external_node/src/config/mod.rs
Expand Up @@ -26,6 +26,7 @@ use zksync_core::{
#[cfg(test)]
use zksync_dal::{ConnectionPool, Core};
use zksync_protobuf_config::proto;
use zksync_snapshots_applier::SnapshotsApplierConfig;
use zksync_types::{api::BridgeAddresses, fee_model::FeeParams};
use zksync_web3_decl::{
client::L2Client,
Expand Down Expand Up @@ -320,6 +321,11 @@ pub(crate) struct OptionalENConfig {
/// This is an experimental and incomplete feature; do not use unless you know what you're doing.
#[serde(default)]
pub snapshots_recovery_enabled: bool,
/// Maximum concurrency factor for the concurrent parts of snapshot recovery for Postgres. It may be useful to
/// reduce this factor to about 5 if snapshot recovery overloads I/O capacity of the node. Conversely,
/// if I/O capacity of your infra is high, you may increase concurrency to speed up Postgres recovery.
#[serde(default = "OptionalENConfig::default_snapshots_recovery_postgres_max_concurrency")]
pub snapshots_recovery_postgres_max_concurrency: NonZeroUsize,

#[serde(default = "OptionalENConfig::default_pruning_chunk_size")]
pub pruning_chunk_size: u32,
Expand Down Expand Up @@ -461,6 +467,10 @@ impl OptionalENConfig {
L1BatchCommitDataGeneratorMode::Rollup
}

fn default_snapshots_recovery_postgres_max_concurrency() -> NonZeroUsize {
SnapshotsApplierConfig::default().max_concurrency
}

const fn default_pruning_chunk_size() -> u32 {
10
}
Expand Down
20 changes: 11 additions & 9 deletions core/bin/external_node/src/init.rs
Expand Up @@ -6,7 +6,7 @@ use zksync_core::sync_layer::genesis::perform_genesis_if_needed;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_health_check::AppHealthCheck;
use zksync_object_store::ObjectStoreFactory;
use zksync_snapshots_applier::SnapshotsApplierConfig;
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_web3_decl::client::BoxedL2Client;

use crate::config::read_snapshots_recovery_config;
Expand All @@ -20,7 +20,7 @@ enum InitDecision {
}

pub(crate) async fn ensure_storage_initialized(
pool: &ConnectionPool<Core>,
pool: ConnectionPool<Core>,
main_node_client: BoxedL2Client,
app_health: &AppHealthCheck,
l2_chain_id: L2ChainId,
Expand Down Expand Up @@ -89,13 +89,15 @@ pub(crate) async fn ensure_storage_initialized(
.await;

let config = SnapshotsApplierConfig::default();
app_health.insert_component(config.health_check());
config
.run(
pool,
&main_node_client.for_component("snapshot_recovery"),
&blob_store,
)
let snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
Box::new(main_node_client.for_component("snapshot_recovery")),
blob_store,
);
app_health.insert_component(snapshots_applier_task.health_check());
snapshots_applier_task
.run()
.await
.context("snapshot recovery failed")?;
tracing::info!("Snapshot recovery is complete");
Expand Down
28 changes: 23 additions & 5 deletions core/bin/external_node/src/main.rs
Expand Up @@ -147,9 +147,27 @@ async fn run_tree(
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
};
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None)
.await
.context("failed initializing metadata calculator")?;

let max_concurrency = config
.optional
.snapshots_recovery_postgres_max_concurrency
.get();
let max_concurrency = u32::try_from(max_concurrency).with_context(|| {
format!("snapshot recovery max concurrency ({max_concurrency}) is too large")
})?;
let recovery_pool = ConnectionPool::builder(
tree_pool.database_url(),
max_concurrency.min(config.postgres.max_connections),
)
.build()
.await
.context("failed creating DB pool for Merkle tree recovery")?;

let metadata_calculator =
MetadataCalculator::new(metadata_calculator_config, None, tree_pool, recovery_pool)
.await
.context("failed initializing metadata calculator")?;

let tree_reader = Arc::new(metadata_calculator.tree_reader());
app_health.insert_component(metadata_calculator.tree_health_check());

Expand All @@ -166,7 +184,7 @@ async fn run_tree(
}));
}

let tree_handle = task::spawn(metadata_calculator.run(tree_pool, stop_receiver));
let tree_handle = task::spawn(metadata_calculator.run(stop_receiver));

task_futures.push(tree_handle);
Ok(tree_reader)
Expand Down Expand Up @@ -909,7 +927,7 @@ async fn run_node(

// Make sure that the node storage is initialized either via genesis or snapshot recovery.
ensure_storage_initialized(
&connection_pool,
connection_pool.clone(),
main_node_client.clone(),
&app_health,
config.remote.l2_chain_id,
Expand Down
2 changes: 1 addition & 1 deletion core/lib/dal/src/lib.rs
Expand Up @@ -7,7 +7,7 @@ pub use sqlx::{types::BigDecimal, Error as SqlxError};
use zksync_db_connection::connection::DbMarker;
pub use zksync_db_connection::{
connection::Connection,
connection_pool::ConnectionPool,
connection_pool::{ConnectionPool, ConnectionPoolBuilder},
error::{DalError, DalResult},
};

Expand Down
1 change: 1 addition & 0 deletions core/lib/snapshots_applier/Cargo.toml
Expand Up @@ -28,4 +28,5 @@ thiserror.workspace = true
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
assert_matches.workspace = true
test-casing.workspace = true

0 comments on commit 13bfecc

Please sign in to comment.