Skip to content

Commit

Permalink
feat(en): Make snapshot syncing future-proof (#1441)
Browse files Browse the repository at this point in the history
## What ❔

Adds a snapshot version to snapshot metadata returned by the
corresponding endpoint. If the version of the returned snapshot is not
supported by the EN, it immediately errors with an informative message.

## Why ❔

We plan to change the snapshot format soon-ish, but it could happen
after the initial public EN release.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Mar 26, 2024
1 parent 2e5549f commit 8c26a7a
Show file tree
Hide file tree
Showing 16 changed files with 201 additions and 99 deletions.
3 changes: 2 additions & 1 deletion core/bin/snapshots_creator/src/creator.rs
Expand Up @@ -10,7 +10,7 @@ use zksync_object_store::ObjectStore;
use zksync_types::{
snapshots::{
uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, SnapshotVersion,
},
L1BatchNumber, MiniblockNumber,
};
Expand Down Expand Up @@ -307,6 +307,7 @@ impl SnapshotCreator {
master_conn
.snapshots_dal()
.add_snapshot(
SnapshotVersion::Version0,
progress.l1_batch_number,
progress.chunk_count,
&factory_deps_output_file,
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

@@ -0,0 +1,2 @@
ALTER TABLE snapshots
DROP COLUMN version;
@@ -0,0 +1,2 @@
ALTER TABLE snapshots
ADD COLUMN version INT NOT NULL DEFAULT 0;
59 changes: 44 additions & 15 deletions core/lib/dal/src/snapshots_dal.rs
@@ -1,29 +1,43 @@
use zksync_db_connection::{connection::Connection, instrument::InstrumentExt};
use zksync_types::{
snapshots::{AllSnapshots, SnapshotMetadata},
snapshots::{AllSnapshots, SnapshotMetadata, SnapshotVersion},
L1BatchNumber,
};

use crate::Core;

#[derive(Debug, sqlx::FromRow)]
struct StorageSnapshotMetadata {
version: i32,
l1_batch_number: i64,
storage_logs_filepaths: Vec<String>,
factory_deps_filepath: String,
}

impl From<StorageSnapshotMetadata> for SnapshotMetadata {
fn from(row: StorageSnapshotMetadata) -> Self {
Self {
impl TryFrom<StorageSnapshotMetadata> for SnapshotMetadata {
type Error = sqlx::Error;

fn try_from(row: StorageSnapshotMetadata) -> Result<Self, Self::Error> {
let int_version = u16::try_from(row.version).map_err(|err| sqlx::Error::ColumnDecode {
index: "version".to_owned(),
source: err.into(),
})?;
let version =
SnapshotVersion::try_from(int_version).map_err(|err| sqlx::Error::ColumnDecode {
index: "version".to_owned(),
source: err.into(),
})?;

Ok(Self {
version,
l1_batch_number: L1BatchNumber(row.l1_batch_number as u32),
storage_logs_filepaths: row
.storage_logs_filepaths
.into_iter()
.map(|path| (!path.is_empty()).then_some(path))
.collect(),
factory_deps_filepath: row.factory_deps_filepath,
}
})
}
}

Expand All @@ -35,6 +49,7 @@ pub struct SnapshotsDal<'a, 'c> {
impl SnapshotsDal<'_, '_> {
pub async fn add_snapshot(
&mut self,
version: SnapshotVersion,
l1_batch_number: L1BatchNumber,
storage_logs_chunk_count: u64,
factory_deps_filepaths: &str,
Expand All @@ -43,15 +58,17 @@ impl SnapshotsDal<'_, '_> {
r#"
INSERT INTO
snapshots (
VERSION,
l1_batch_number,
storage_logs_filepaths,
factory_deps_filepath,
created_at,
updated_at
)
VALUES
($1, ARRAY_FILL(''::TEXT, ARRAY[$2::INTEGER]), $3, NOW(), NOW())
($1, $2, ARRAY_FILL(''::TEXT, ARRAY[$3::INTEGER]), $4, NOW(), NOW())
"#,
version as i32,
l1_batch_number.0 as i32,
storage_logs_chunk_count as i32,
factory_deps_filepaths,
Expand Down Expand Up @@ -121,6 +138,7 @@ impl SnapshotsDal<'_, '_> {
StorageSnapshotMetadata,
r#"
SELECT
VERSION,
l1_batch_number,
factory_deps_filepath,
storage_logs_filepaths
Expand All @@ -137,7 +155,7 @@ impl SnapshotsDal<'_, '_> {
.fetch_optional(self.storage)
.await?;

Ok(row.map(Into::into))
row.map(TryFrom::try_from).transpose()
}

pub async fn get_snapshot_metadata(
Expand All @@ -148,6 +166,7 @@ impl SnapshotsDal<'_, '_> {
StorageSnapshotMetadata,
r#"
SELECT
VERSION,
l1_batch_number,
factory_deps_filepath,
storage_logs_filepaths
Expand All @@ -163,13 +182,13 @@ impl SnapshotsDal<'_, '_> {
.fetch_optional(self.storage)
.await?;

Ok(row.map(Into::into))
row.map(TryFrom::try_from).transpose()
}
}

#[cfg(test)]
mod tests {
use zksync_types::L1BatchNumber;
use zksync_types::{snapshots::SnapshotVersion, L1BatchNumber};

use crate::{ConnectionPool, Core, CoreDal};

Expand All @@ -179,9 +198,14 @@ mod tests {
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.snapshots_dal();
let l1_batch_number = L1BatchNumber(100);
dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin")
.await
.expect("Failed to add snapshot");
dal.add_snapshot(
SnapshotVersion::Version0,
l1_batch_number,
2,
"gs:///bucket/factory_deps.bin",
)
.await
.expect("Failed to add snapshot");

let snapshots = dal
.get_all_complete_snapshots()
Expand Down Expand Up @@ -219,9 +243,14 @@ mod tests {
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.snapshots_dal();
let l1_batch_number = L1BatchNumber(100);
dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin")
.await
.expect("Failed to add snapshot");
dal.add_snapshot(
SnapshotVersion::Version0,
l1_batch_number,
2,
"gs:///bucket/factory_deps.bin",
)
.await
.expect("Failed to add snapshot");

let storage_log_filepaths = ["gs:///bucket/test_file1.bin", "gs:///bucket/test_file2.bin"];
dal.add_storage_logs_filepath_for_snapshot(l1_batch_number, 1, storage_log_filepaths[1])
Expand Down
6 changes: 0 additions & 6 deletions core/lib/db_connection/src/connection.rs
Expand Up @@ -254,11 +254,5 @@ mod tests {
let traced = traced.connections.lock().unwrap();
assert!(traced.is_empty());
}

let _connection = pool.connection_tagged("test").await.unwrap();
let err = format!("{:?}", pool.connection().await.unwrap_err());
// Matching strings in error messages is an anti-pattern, but we really want to test DevEx here.
assert!(err.contains("Active connections"), "{err}");
assert!(err.contains("requested by `test`"), "{err}");
}
}
20 changes: 18 additions & 2 deletions core/lib/snapshots_applier/src/lib.rs
Expand Up @@ -13,7 +13,7 @@ use zksync_types::{
api::en::SyncBlock,
snapshots::{
SnapshotFactoryDependencies, SnapshotHeader, SnapshotRecoveryStatus, SnapshotStorageLog,
SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, SnapshotVersion,
},
tokens::TokenInfo,
web3::futures,
Expand Down Expand Up @@ -381,9 +381,11 @@ impl<'a> SnapshotsApplier<'a> {
let l1_batch_number = snapshot.l1_batch_number;
let miniblock_number = snapshot.miniblock_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, storage_logs are divided into {} chunk(s)",
"Found snapshot with data up to L1 batch #{l1_batch_number}, version {}, storage_logs are divided into {} chunk(s)",
snapshot.version,
snapshot.storage_logs_chunks.len()
);
Self::check_snapshot_version(snapshot.version)?;

let miniblock = main_node_client
.fetch_l2_block(miniblock_number)
Expand All @@ -409,6 +411,20 @@ impl<'a> SnapshotsApplier<'a> {
})
}

fn check_snapshot_version(raw_version: u16) -> anyhow::Result<()> {
let version = SnapshotVersion::try_from(raw_version).with_context(|| {
format!(
"Unrecognized snapshot version: {raw_version}; make sure you're running the latest version of the node"
)
})?;
anyhow::ensure!(
matches!(version, SnapshotVersion::Version0),
"Cannot recover from a snapshot with version {version:?}; the only supported version is {:?}",
SnapshotVersion::Version0
);
Ok(())
}

fn update_health(&self) {
let details = SnapshotsApplierHealthDetails {
snapshot_miniblock: self.applied_snapshot_status.miniblock_number,
Expand Down

0 comments on commit 8c26a7a

Please sign in to comment.