Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): consistency checker persistent cursor #1466

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE consistency_checker_info;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE consistency_checker_info
tomg10 marked this conversation as resolved.
Show resolved Hide resolved
(
last_processed_l1_batch BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);

INSERT INTO consistency_checker_info(last_processed_l1_batch, created_at, updated_at)
VALUES (0, NOW(), NOW());
39 changes: 39 additions & 0 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,45 @@ pub struct BlocksDal<'a, 'c> {
}

impl BlocksDal<'_, '_> {
pub async fn get_consistency_checker_last_processed_l1_batch(
&mut self,
) -> sqlx::Result<L1BatchNumber> {
let row = sqlx::query!(
r#"
SELECT
last_processed_l1_batch AS "last_processed_l1_batch!"
FROM
consistency_checker_info
"#
)
.instrument("get_consistency_checker_last_processed_l1_batch")
.report_latency()
.fetch_one(self.storage)
.await?;
Ok(L1BatchNumber(row.last_processed_l1_batch as u32))
}

pub async fn set_consistency_checker_last_processed_l1_batch(
&mut self,
l1_batch_number: L1BatchNumber,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
UPDATE consistency_checker_info
SET
last_processed_l1_batch = $1,
updated_at = NOW()
"#,
l1_batch_number.0 as i32,
)
.instrument("set_consistency_checker_last_processed_l1_batch")
.report_latency()
.with_arg("l1_batch_number", &l1_batch_number)
.execute(self.storage)
.await?;
Ok(())
}

pub async fn is_genesis_needed(&mut self) -> sqlx::Result<bool> {
let count = sqlx::query!(
r#"
Expand Down
16 changes: 15 additions & 1 deletion core/lib/zksync_core/src/consistency_checker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,20 @@ impl ConsistencyChecker {
.0
.saturating_sub(self.max_batches_to_recheck)
.into();

let last_processed_batch = self
.pool
.connection()
.await?
.blocks_dal()
.get_consistency_checker_last_processed_l1_batch()
.await?;

// We shouldn't check batches not present in the storage, and skip the genesis batch since
// it's not committed on L1.
let first_batch_to_check = first_batch_to_check
.max(earliest_l1_batch_number)
.max(L1BatchNumber(1));
.max(L1BatchNumber(last_processed_batch.0 + 1));
tomg10 marked this conversation as resolved.
Show resolved Hide resolved
tracing::info!(
"Last committed L1 batch is #{last_committed_batch}; starting checks from L1 batch #{first_batch_to_check}"
);
Expand All @@ -534,6 +543,11 @@ impl ConsistencyChecker {

match self.check_commitments(batch_number, &local).await {
Ok(()) => {
let mut storage = self.pool.connection().await?;
storage
.blocks_dal()
.set_consistency_checker_last_processed_l1_batch(batch_number)
.await?;
self.event_handler.update_checked_batch(batch_number);
batch_number += 1;
}
Expand Down
6 changes: 6 additions & 0 deletions core/lib/zksync_core/src/consistency_checker/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ async fn checker_functions_after_snapshot_recovery(delay_batch_insertion: bool)
// Wait until the batch is checked.
let checked_batch = l1_batch_updates_receiver.recv().await.unwrap();
assert_eq!(checked_batch, l1_batch.header.number);
let last_reported_batch = storage
.blocks_dal()
.get_consistency_checker_last_processed_l1_batch()
.await
.unwrap();
assert_eq!(last_reported_batch, l1_batch.header.number);

stop_sender.send_replace(true);
checker_task.await.unwrap().unwrap();
Expand Down