Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Restore state keeper storage from snapshot #885

Merged
merged 18 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ async fn init_tasks(
memtable_capacity: config.optional.merkle_tree_memtable_capacity(),
stalled_writes_timeout: config.optional.merkle_tree_stalled_writes_timeout(),
};
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None).await;
let metadata_calculator = MetadataCalculator::new(metadata_calculator_config, None)
.await
.context("failed initializing metadata calculator")?;
healthchecks.push(Box::new(metadata_calculator.tree_health_check()));

let consistency_checker = ConsistencyChecker::new(
Expand Down
2 changes: 1 addition & 1 deletion core/bin/merkle_tree_consistency_checker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Cli {
let db_path = &config.merkle_tree.path;
tracing::info!("Verifying consistency of Merkle tree at {db_path}");
let start = Instant::now();
let db = RocksDB::new(Path::new(db_path));
let db = RocksDB::new(Path::new(db_path)).unwrap();
let tree = ZkSyncTree::new_lightweight(db.into());

let l1_batch_number = if let Some(number) = self.l1_batch {
Expand Down
1 change: 0 additions & 1 deletion core/bin/snapshots_creator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ prometheus_exporter = { path = "../../lib/prometheus_exporter" }
zksync_config = { path = "../../lib/config" }
zksync_dal = { path = "../../lib/dal" }
zksync_env_config = { path = "../../lib/env_config" }
zksync_utils = { path = "../../lib/utils" }
zksync_types = { path = "../../lib/types" }
zksync_object_store = { path = "../../lib/object_store" }
vlog = { path = "../../lib/vlog" }
Expand Down
69 changes: 0 additions & 69 deletions core/bin/snapshots_creator/src/chunking.rs

This file was deleted.

23 changes: 13 additions & 10 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_object_store::ObjectStore;
use zksync_types::{
snapshots::{
SnapshotFactoryDependencies, SnapshotMetadata, SnapshotStorageLogsChunk,
SnapshotStorageLogsStorageKey,
uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
},
L1BatchNumber, MiniblockNumber,
};
use zksync_utils::ceil_div;

use crate::metrics::{FactoryDepsStage, StorageChunkStage, METRICS};
#[cfg(test)]
use crate::tests::HandleEvent;
use crate::{
chunking::get_chunk_hashed_keys_range,
metrics::{FactoryDepsStage, StorageChunkStage, METRICS},
};

/// Encapsulates progress of creating a particular storage snapshot.
#[derive(Debug)]
Expand Down Expand Up @@ -91,7 +87,7 @@ impl SnapshotCreator {
return Ok(());
}

let hashed_keys_range = get_chunk_hashed_keys_range(chunk_id, chunk_count);
let hashed_keys_range = uniform_hashed_keys_chunk(chunk_id, chunk_count);
let mut conn = self.connect_to_replica().await?;

let latency =
Expand Down Expand Up @@ -166,6 +162,12 @@ impl SnapshotCreator {
tracing::info!("Saving factory deps to GCS...");
let latency =
METRICS.factory_deps_processing_duration[&FactoryDepsStage::SaveToGcs].start();
let factory_deps = factory_deps
.into_iter()
.map(|(_, bytecode)| SnapshotFactoryDependency {
bytecode: bytecode.into(),
})
.collect();
let factory_deps = SnapshotFactoryDependencies { factory_deps };
let filename = self
.blob_store
Expand Down Expand Up @@ -216,8 +218,9 @@ impl SnapshotCreator {
.await?;
let chunk_size = config.storage_logs_chunk_size;
// We force the minimum number of chunks to avoid situations where only one chunk is created in tests.
let chunk_count =
ceil_div(distinct_storage_logs_keys_count, chunk_size).max(min_chunk_count);
let chunk_count = distinct_storage_logs_keys_count
.div_ceil(chunk_size)
.max(min_chunk_count);

tracing::info!(
"Selected storage logs chunking for L1 batch {l1_batch_number}: \
Expand Down
1 change: 0 additions & 1 deletion core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use zksync_object_store::ObjectStoreFactory;

use crate::creator::SnapshotCreator;

mod chunking;
mod creator;
mod metrics;
#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ async fn prepare_postgres(
let expected_l1_batches_and_indices = conn
.storage_logs_dal()
.get_l1_batches_and_indices_for_initial_writes(&hashed_keys)
.await;
.await
.unwrap();

let logs = logs.into_iter().map(|log| {
let (l1_batch_number_of_initial_write, enumeration_index) =
Expand Down
3 changes: 2 additions & 1 deletion core/bin/storage_logs_dedup_migration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ async fn main() {
let values_for_missing_keys: HashMap<_, _> = connection
.storage_logs_dal()
.get_storage_values(&missing_keys, miniblock_number - 1)
.await;
.await
.expect("failed getting storage values for missing keys");

in_memory_prev_values_iter
.chain(
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions core/lib/dal/src/models/storage_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ impl From<DBStorageLog> for StorageLog {

// We don't want to rely on the Merkle tree crate to import a single type, so we duplicate `TreeEntry` here.
#[derive(Debug, Clone, Copy)]
pub struct StorageTreeEntry {
pub key: U256,
pub struct StorageRecoveryLogEntry {
pub key: H256,
pub value: H256,
pub leaf_index: u64,
}

impl StorageRecoveryLogEntry {
/// Converts `key` to the format used by the Merkle tree (little-endian [`U256`]).
pub fn tree_key(&self) -> U256 {
U256::from_little_endian(&self.key.0)
}
}
12 changes: 6 additions & 6 deletions core/lib/dal/src/snapshots_creator_dal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use zksync_types::{
snapshots::{SnapshotFactoryDependency, SnapshotStorageLog},
AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, H256,
snapshots::SnapshotStorageLog, AccountTreeId, Address, L1BatchNumber, MiniblockNumber,
StorageKey, H256,
};

use crate::{instrument::InstrumentExt, StorageProcessor};
Expand Down Expand Up @@ -99,13 +99,15 @@ impl SnapshotsCreatorDal<'_, '_> {
Ok(storage_logs)
}

/// Returns all factory dependencies up to and including the specified `miniblock_number`.
pub async fn get_all_factory_deps(
&mut self,
miniblock_number: MiniblockNumber,
) -> sqlx::Result<Vec<SnapshotFactoryDependency>> {
) -> sqlx::Result<Vec<(H256, Vec<u8>)>> {
let rows = sqlx::query!(
r#"
SELECT
bytecode_hash,
bytecode
FROM
factory_deps
Expand All @@ -121,9 +123,7 @@ impl SnapshotsCreatorDal<'_, '_> {

Ok(rows
.into_iter()
.map(|row| SnapshotFactoryDependency {
bytecode: row.bytecode.into(),
})
.map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode))
.collect())
}
}
9 changes: 4 additions & 5 deletions core/lib/dal/src/storage_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl StorageDal<'_, '_> {
pub async fn get_factory_deps_for_revert(
&mut self,
block_number: MiniblockNumber,
) -> Vec<H256> {
sqlx::query!(
) -> sqlx::Result<Vec<H256>> {
Ok(sqlx::query!(
r#"
SELECT
bytecode_hash
Expand All @@ -147,11 +147,10 @@ impl StorageDal<'_, '_> {
block_number.0 as i64
)
.fetch_all(self.storage.conn())
.await
.unwrap()
.await?
.into_iter()
.map(|row| H256::from_slice(&row.bytecode_hash))
.collect()
.collect())
}

/// Applies the specified storage logs for a miniblock. Returns the map of unique storage updates.
Expand Down
Loading