Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Make snapshot syncing future-proof #1441

Merged
merged 8 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_object_store::ObjectStore;
use zksync_types::{
snapshots::{
uniform_hashed_keys_chunk, SnapshotFactoryDependencies, SnapshotFactoryDependency,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
SnapshotMetadata, SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, SnapshotVersion,
},
L1BatchNumber, MiniblockNumber,
};
Expand Down Expand Up @@ -307,6 +307,7 @@ impl SnapshotCreator {
master_conn
.snapshots_dal()
.add_snapshot(
SnapshotVersion::Version0,
progress.l1_batch_number,
progress.chunk_count,
&factory_deps_output_file,
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE snapshots
DROP COLUMN version;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE snapshots
ADD COLUMN version INT NOT NULL DEFAULT 0;
59 changes: 44 additions & 15 deletions core/lib/dal/src/snapshots_dal.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,43 @@
use zksync_db_connection::{connection::Connection, instrument::InstrumentExt};
use zksync_types::{
snapshots::{AllSnapshots, SnapshotMetadata},
snapshots::{AllSnapshots, SnapshotMetadata, SnapshotVersion},
L1BatchNumber,
};

use crate::Core;

#[derive(Debug, sqlx::FromRow)]
struct StorageSnapshotMetadata {
version: i32,
l1_batch_number: i64,
storage_logs_filepaths: Vec<String>,
factory_deps_filepath: String,
}

impl From<StorageSnapshotMetadata> for SnapshotMetadata {
fn from(row: StorageSnapshotMetadata) -> Self {
Self {
impl TryFrom<StorageSnapshotMetadata> for SnapshotMetadata {
type Error = sqlx::Error;

fn try_from(row: StorageSnapshotMetadata) -> Result<Self, Self::Error> {
let int_version = u16::try_from(row.version).map_err(|err| sqlx::Error::ColumnDecode {
index: "version".to_owned(),
source: err.into(),
})?;
let version =
SnapshotVersion::try_from(int_version).map_err(|err| sqlx::Error::ColumnDecode {
index: "version".to_owned(),
source: err.into(),
})?;

Ok(Self {
version,
l1_batch_number: L1BatchNumber(row.l1_batch_number as u32),
storage_logs_filepaths: row
.storage_logs_filepaths
.into_iter()
.map(|path| (!path.is_empty()).then_some(path))
.collect(),
factory_deps_filepath: row.factory_deps_filepath,
}
})
}
}

Expand All @@ -35,6 +49,7 @@ pub struct SnapshotsDal<'a, 'c> {
impl SnapshotsDal<'_, '_> {
pub async fn add_snapshot(
&mut self,
version: SnapshotVersion,
l1_batch_number: L1BatchNumber,
storage_logs_chunk_count: u64,
factory_deps_filepaths: &str,
Expand All @@ -43,15 +58,17 @@ impl SnapshotsDal<'_, '_> {
r#"
INSERT INTO
snapshots (
VERSION,
l1_batch_number,
storage_logs_filepaths,
factory_deps_filepath,
created_at,
updated_at
)
VALUES
($1, ARRAY_FILL(''::TEXT, ARRAY[$2::INTEGER]), $3, NOW(), NOW())
($1, $2, ARRAY_FILL(''::TEXT, ARRAY[$3::INTEGER]), $4, NOW(), NOW())
"#,
version as i32,
l1_batch_number.0 as i32,
storage_logs_chunk_count as i32,
factory_deps_filepaths,
Expand Down Expand Up @@ -121,6 +138,7 @@ impl SnapshotsDal<'_, '_> {
StorageSnapshotMetadata,
r#"
SELECT
VERSION,
l1_batch_number,
factory_deps_filepath,
storage_logs_filepaths
Expand All @@ -137,7 +155,7 @@ impl SnapshotsDal<'_, '_> {
.fetch_optional(self.storage)
.await?;

Ok(row.map(Into::into))
row.map(TryFrom::try_from).transpose()
}

pub async fn get_snapshot_metadata(
Expand All @@ -148,6 +166,7 @@ impl SnapshotsDal<'_, '_> {
StorageSnapshotMetadata,
r#"
SELECT
VERSION,
l1_batch_number,
factory_deps_filepath,
storage_logs_filepaths
Expand All @@ -163,13 +182,13 @@ impl SnapshotsDal<'_, '_> {
.fetch_optional(self.storage)
.await?;

Ok(row.map(Into::into))
row.map(TryFrom::try_from).transpose()
}
}

#[cfg(test)]
mod tests {
use zksync_types::L1BatchNumber;
use zksync_types::{snapshots::SnapshotVersion, L1BatchNumber};

use crate::{ConnectionPool, Core, CoreDal};

Expand All @@ -179,9 +198,14 @@ mod tests {
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.snapshots_dal();
let l1_batch_number = L1BatchNumber(100);
dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin")
.await
.expect("Failed to add snapshot");
dal.add_snapshot(
SnapshotVersion::Version0,
l1_batch_number,
2,
"gs:///bucket/factory_deps.bin",
)
.await
.expect("Failed to add snapshot");

let snapshots = dal
.get_all_complete_snapshots()
Expand Down Expand Up @@ -219,9 +243,14 @@ mod tests {
let mut conn = pool.connection().await.unwrap();
let mut dal = conn.snapshots_dal();
let l1_batch_number = L1BatchNumber(100);
dal.add_snapshot(l1_batch_number, 2, "gs:///bucket/factory_deps.bin")
.await
.expect("Failed to add snapshot");
dal.add_snapshot(
SnapshotVersion::Version0,
l1_batch_number,
2,
"gs:///bucket/factory_deps.bin",
)
.await
.expect("Failed to add snapshot");

let storage_log_filepaths = ["gs:///bucket/test_file1.bin", "gs:///bucket/test_file2.bin"];
dal.add_storage_logs_filepath_for_snapshot(l1_batch_number, 1, storage_log_filepaths[1])
Expand Down
6 changes: 0 additions & 6 deletions core/lib/db_connection/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,5 @@ mod tests {
let traced = traced.connections.lock().unwrap();
assert!(traced.is_empty());
}

let _connection = pool.connection_tagged("test").await.unwrap();
let err = format!("{:?}", pool.connection().await.unwrap_err());
// Matching strings in error messages is an anti-pattern, but we really want to test DevEx here.
assert!(err.contains("Active connections"), "{err}");
assert!(err.contains("requested by `test`"), "{err}");
}
}
20 changes: 18 additions & 2 deletions core/lib/snapshots_applier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_types::{
api::en::SyncBlock,
snapshots::{
SnapshotFactoryDependencies, SnapshotHeader, SnapshotRecoveryStatus, SnapshotStorageLog,
SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey,
SnapshotStorageLogsChunk, SnapshotStorageLogsStorageKey, SnapshotVersion,
},
tokens::TokenInfo,
web3::futures,
Expand Down Expand Up @@ -381,9 +381,11 @@ impl<'a> SnapshotsApplier<'a> {
let l1_batch_number = snapshot.l1_batch_number;
let miniblock_number = snapshot.miniblock_number;
tracing::info!(
"Found snapshot with data up to L1 batch #{l1_batch_number}, storage_logs are divided into {} chunk(s)",
"Found snapshot with data up to L1 batch #{l1_batch_number}, version {}, storage_logs are divided into {} chunk(s)",
snapshot.version,
snapshot.storage_logs_chunks.len()
);
Self::check_snapshot_version(snapshot.version)?;

let miniblock = main_node_client
.fetch_l2_block(miniblock_number)
Expand All @@ -409,6 +411,20 @@ impl<'a> SnapshotsApplier<'a> {
})
}

fn check_snapshot_version(raw_version: u16) -> anyhow::Result<()> {
let version = SnapshotVersion::try_from(raw_version).with_context(|| {
format!(
"Unrecognized snapshot version: {raw_version}; make sure you're running the latest version of the node"
)
})?;
anyhow::ensure!(
matches!(version, SnapshotVersion::Version0),
"Cannot recover from a snapshot with version {version:?}; the only supported version is {:?}",
SnapshotVersion::Version0
);
Ok(())
}

fn update_health(&self) {
let details = SnapshotsApplierHealthDetails {
snapshot_miniblock: self.applied_snapshot_status.miniblock_number,
Expand Down
Loading
Loading