Skip to content

Commit

Permalink
feat(en): Allow recovery from specific snapshot (#2137)
Browse files Browse the repository at this point in the history
## What ❔

Allows recovering a node from a specific snapshot specified at the start
of recovery.

## Why ❔

Useful at least for testing recovery and pruning end-to-end on the
testnet. There, L1 batches are produced very slowly, so it makes sense
to recover from an earlier snapshot in order to meaningfully test
pruning.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli committed Jun 5, 2024
1 parent 0329ed6 commit ac61fed
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 57 deletions.
27 changes: 10 additions & 17 deletions core/bin/external_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use zksync_node_api_server::{
use zksync_protobuf_config::proto;
use zksync_snapshots_applier::SnapshotsApplierConfig;
use zksync_types::{
api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address, L1ChainId,
L2ChainId, ETHEREUM_ADDRESS,
api::BridgeAddresses, commitment::L1BatchCommitmentMode, url::SensitiveUrl, Address,
L1BatchNumber, L1ChainId, L2ChainId, ETHEREUM_ADDRESS,
};
use zksync_web3_decl::{
client::{DynClient, L2},
Expand Down Expand Up @@ -746,6 +746,8 @@ pub(crate) struct ExperimentalENConfig {
pub state_keeper_db_max_open_files: Option<NonZeroU32>,

// Snapshot recovery
/// L1 batch number of the snapshot to use during recovery. Specifying this parameter is mostly useful for testing.
pub snapshots_recovery_l1_batch: Option<L1BatchNumber>,
/// Approximate chunk size (measured in the number of entries) to recover in a single iteration.
/// Reasonable values are order of 100,000 (meaning an iteration takes several seconds).
///
Expand Down Expand Up @@ -775,6 +777,7 @@ impl ExperimentalENConfig {
state_keeper_db_block_cache_capacity_mb:
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
snapshots_recovery_l1_batch: None,
snapshots_recovery_tree_chunk_size: Self::default_snapshots_recovery_tree_chunk_size(),
commitment_generator_max_parallelism: None,
}
Expand Down Expand Up @@ -807,21 +810,11 @@ pub(crate) fn read_consensus_config() -> anyhow::Result<Option<ConsensusConfig>>
))
}

/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled.
#[derive(Debug)]
pub(crate) struct SnapshotsRecoveryConfig {
pub snapshots_object_store: ObjectStoreConfig,
}

impl SnapshotsRecoveryConfig {
pub fn new() -> anyhow::Result<Self> {
let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")?;
Ok(Self {
snapshots_object_store,
})
}
/// Configuration for snapshot recovery. Should be loaded optionally, only if snapshot recovery is enabled.
pub(crate) fn snapshot_recovery_object_store_config() -> anyhow::Result<ObjectStoreConfig> {
envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")
}

#[derive(Debug, Deserialize)]
Expand Down
30 changes: 21 additions & 9 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_types::{L1BatchNumber, L2ChainId};
use zksync_web3_decl::client::{DynClient, L2};

use crate::config::SnapshotsRecoveryConfig;
use crate::config::snapshot_recovery_object_store_config;

#[derive(Debug)]
pub(crate) struct SnapshotRecoveryConfig {
/// If not specified, the latest snapshot will be used.
pub snapshot_l1_batch_override: Option<L1BatchNumber>,
}

#[derive(Debug)]
enum InitDecision {
Expand All @@ -27,7 +33,7 @@ pub(crate) async fn ensure_storage_initialized(
main_node_client: Box<DynClient<L2>>,
app_health: &AppHealthCheck,
l2_chain_id: L2ChainId,
consider_snapshot_recovery: bool,
recovery_config: Option<SnapshotRecoveryConfig>,
) -> anyhow::Result<()> {
let mut storage = pool.connection_tagged("en").await?;
let genesis_l1_batch = storage
Expand Down Expand Up @@ -57,7 +63,7 @@ pub(crate) async fn ensure_storage_initialized(
}
(None, None) => {
tracing::info!("Node has neither genesis L1 batch, nor snapshot recovery info");
if consider_snapshot_recovery {
if recovery_config.is_some() {
InitDecision::SnapshotRecovery
} else {
InitDecision::Genesis
Expand All @@ -78,25 +84,31 @@ pub(crate) async fn ensure_storage_initialized(
.context("performing genesis failed")?;
}
InitDecision::SnapshotRecovery => {
anyhow::ensure!(
consider_snapshot_recovery,
let recovery_config = recovery_config.context(
"Snapshot recovery is required to proceed, but it is not enabled. Enable by setting \
`EN_SNAPSHOTS_RECOVERY_ENABLED=true` env variable to the node binary, or use a Postgres dump for recovery"
);
)?;

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let recovery_config = SnapshotsRecoveryConfig::new()?;
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
let object_store_config = snapshot_recovery_object_store_config()?;
let blob_store = ObjectStoreFactory::new(object_store_config)
.create_store()
.await;

let config = SnapshotsApplierConfig::default();
let snapshots_applier_task = SnapshotsApplierTask::new(
let mut snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
Box::new(main_node_client.for_component("snapshot_recovery")),
blob_store,
);
if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch_override {
tracing::info!(
"Using a specific snapshot with L1 batch #{snapshot_l1_batch}; this may not work \
if the snapshot is too old (order of several weeks old) or non-existent"
);
snapshots_applier_task.set_snapshot_l1_batch(snapshot_l1_batch);
}
app_health.insert_component(snapshots_applier_task.health_check())?;

let recovery_started_at = Instant::now();
Expand Down
11 changes: 9 additions & 2 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use zksync_web3_decl::{
use crate::{
config::ExternalNodeConfig,
helpers::{MainNodeHealthCheck, ValidateChainIdsTask},
init::ensure_storage_initialized,
init::{ensure_storage_initialized, SnapshotRecoveryConfig},
metrics::RUST_METRICS,
};

Expand Down Expand Up @@ -908,12 +908,19 @@ async fn run_node(
task_handles.extend(prometheus_task);

// Make sure that the node storage is initialized either via genesis or snapshot recovery.
let recovery_config =
config
.optional
.snapshots_recovery_enabled
.then_some(SnapshotRecoveryConfig {
snapshot_l1_batch_override: config.experimental.snapshots_recovery_l1_batch,
});
ensure_storage_initialized(
connection_pool.clone(),
main_node_client.clone(),
&app_health,
config.required.l2_chain_id,
config.optional.snapshots_recovery_enabled,
recovery_config,
)
.await?;
let sigint_receiver = env.setup_sigint_handler();
Expand Down
66 changes: 51 additions & 15 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ pub trait SnapshotsApplierMainNodeClient: fmt::Debug + Send + Sync {
number: L2BlockNumber,
) -> EnrichedClientResult<Option<api::BlockDetails>>;

async fn fetch_newest_snapshot(&self) -> EnrichedClientResult<Option<SnapshotHeader>>;
async fn fetch_newest_snapshot_l1_batch_number(
&self,
) -> EnrichedClientResult<Option<L1BatchNumber>>;

async fn fetch_snapshot(
&self,
l1_batch_number: L1BatchNumber,
) -> EnrichedClientResult<Option<SnapshotHeader>>;

async fn fetch_tokens(
&self,
Expand Down Expand Up @@ -153,17 +160,23 @@ impl SnapshotsApplierMainNodeClient for Box<DynClient<L2>> {
.await
}

async fn fetch_newest_snapshot(&self) -> EnrichedClientResult<Option<SnapshotHeader>> {
async fn fetch_newest_snapshot_l1_batch_number(
&self,
) -> EnrichedClientResult<Option<L1BatchNumber>> {
let snapshots = self
.get_all_snapshots()
.rpc_context("get_all_snapshots")
.await?;
let Some(newest_snapshot) = snapshots.snapshots_l1_batch_numbers.first() else {
return Ok(None);
};
self.get_snapshot_by_l1_batch_number(*newest_snapshot)
Ok(snapshots.snapshots_l1_batch_numbers.first().copied())
}

async fn fetch_snapshot(
&self,
l1_batch_number: L1BatchNumber,
) -> EnrichedClientResult<Option<SnapshotHeader>> {
self.get_snapshot_by_l1_batch_number(l1_batch_number)
.rpc_context("get_snapshot_by_l1_batch_number")
.with_arg("number", newest_snapshot)
.with_arg("number", &l1_batch_number)
.await
}

Expand All @@ -179,7 +192,7 @@ impl SnapshotsApplierMainNodeClient for Box<DynClient<L2>> {
}

/// Snapshot applier configuration options.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SnapshotsApplierConfig {
/// Number of retries for transient errors before giving up on recovery (i.e., returning an error
/// from [`Self::run()`]).
Expand Down Expand Up @@ -223,6 +236,7 @@ pub struct SnapshotApplierTaskStats {

#[derive(Debug)]
pub struct SnapshotsApplierTask {
snapshot_l1_batch: Option<L1BatchNumber>,
config: SnapshotsApplierConfig,
health_updater: HealthUpdater,
connection_pool: ConnectionPool<Core>,
Expand All @@ -238,6 +252,7 @@ impl SnapshotsApplierTask {
blob_store: Arc<dyn ObjectStore>,
) -> Self {
Self {
snapshot_l1_batch: None,
config,
health_updater: ReactiveHealthCheck::new("snapshot_recovery").1,
connection_pool,
Expand All @@ -246,6 +261,11 @@ impl SnapshotsApplierTask {
}
}

/// Specifies the L1 batch to recover from. This setting is ignored if recovery is complete or resumed.
pub fn set_snapshot_l1_batch(&mut self, number: L1BatchNumber) {
self.snapshot_l1_batch = Some(number);
}

/// Returns the health check for snapshot recovery.
pub fn health_check(&self) -> ReactiveHealthCheck {
self.health_updater.subscribe()
Expand All @@ -270,6 +290,7 @@ impl SnapshotsApplierTask {
self.main_node_client.as_ref(),
&self.blob_store,
&self.health_updater,
self.snapshot_l1_batch,
self.config.max_concurrency.get(),
)
.await;
Expand Down Expand Up @@ -324,6 +345,7 @@ impl SnapshotRecoveryStrategy {
async fn new(
storage: &mut Connection<'_, Core>,
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
) -> Result<(Self, SnapshotRecoveryStatus), SnapshotsApplierError> {
let latency =
METRICS.initial_stage_duration[&InitialStage::FetchMetadataFromMainNode].start();
Expand All @@ -350,7 +372,8 @@ impl SnapshotRecoveryStrategy {
return Err(SnapshotsApplierError::Fatal(err));
}

let recovery_status = Self::create_fresh_recovery_status(main_node_client).await?;
let recovery_status =
Self::create_fresh_recovery_status(main_node_client, snapshot_l1_batch).await?;

let storage_logs_count = storage
.storage_logs_dal()
Expand All @@ -373,12 +396,20 @@ impl SnapshotRecoveryStrategy {

async fn create_fresh_recovery_status(
main_node_client: &dyn SnapshotsApplierMainNodeClient,
snapshot_l1_batch: Option<L1BatchNumber>,
) -> Result<SnapshotRecoveryStatus, SnapshotsApplierError> {
let snapshot_response = main_node_client.fetch_newest_snapshot().await?;
let l1_batch_number = match snapshot_l1_batch {
Some(num) => num,
None => main_node_client
.fetch_newest_snapshot_l1_batch_number()
.await?
.context("no snapshots on main node; snapshot recovery is impossible")?,
};
let snapshot_response = main_node_client.fetch_snapshot(l1_batch_number).await?;

let snapshot = snapshot_response
.context("no snapshots on main node; snapshot recovery is impossible")?;
let l1_batch_number = snapshot.l1_batch_number;
let snapshot = snapshot_response.with_context(|| {
format!("snapshot for L1 batch #{l1_batch_number} is not present on main node")
})?;
let l2_block_number = snapshot.l2_block_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, L2 block #{l2_block_number}, \
Expand Down Expand Up @@ -461,6 +492,7 @@ impl<'a> SnapshotsApplier<'a> {
main_node_client: &'a dyn SnapshotsApplierMainNodeClient,
blob_store: &'a dyn ObjectStore,
health_updater: &'a HealthUpdater,
snapshot_l1_batch: Option<L1BatchNumber>,
max_concurrency: usize,
) -> Result<(SnapshotRecoveryStrategy, SnapshotRecoveryStatus), SnapshotsApplierError> {
// While the recovery is in progress, the node is healthy (no error has occurred),
Expand All @@ -472,8 +504,12 @@ impl<'a> SnapshotsApplier<'a> {
.await?;
let mut storage_transaction = storage.start_transaction().await?;

let (strategy, applied_snapshot_status) =
SnapshotRecoveryStrategy::new(&mut storage_transaction, main_node_client).await?;
let (strategy, applied_snapshot_status) = SnapshotRecoveryStrategy::new(
&mut storage_transaction,
main_node_client,
snapshot_l1_batch,
)
.await?;
tracing::info!("Chosen snapshot recovery strategy: {strategy:?} with status: {applied_snapshot_status:?}");
let created_from_scratch = match strategy {
SnapshotRecoveryStrategy::Completed => return Ok((strategy, applied_snapshot_status)),
Expand Down
Loading

0 comments on commit ac61fed

Please sign in to comment.