Skip to content

Commit

Permalink
feat(en): consistency checker persistent cursor (#1466)
Browse files Browse the repository at this point in the history
## What ❔

Makes the L1 batch number cursor used by the consistency checker (i.e.,
the last checked L1 batch) persistent.

## Why ❔

Consistency checker seems to be the only component that has a logical L1
batch number cursor influencing pruning without persistence. (E.g.,
metadata calc, commitment generator and batch status updater all have
persistent cursors stored in Postgres.)

---------

Signed-off-by: tomg10 <lemures64@gmail.com>
  • Loading branch information
tomg10 committed Mar 26, 2024
1 parent e7583f4 commit 03496e6
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 1 deletion.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE consistency_checker_info;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE consistency_checker_info
(
last_processed_l1_batch BIGINT NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);

INSERT INTO consistency_checker_info(last_processed_l1_batch, created_at, updated_at)
VALUES (0, NOW(), NOW());
39 changes: 39 additions & 0 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,45 @@ pub struct BlocksDal<'a, 'c> {
}

impl BlocksDal<'_, '_> {
pub async fn get_consistency_checker_last_processed_l1_batch(
&mut self,
) -> sqlx::Result<L1BatchNumber> {
let row = sqlx::query!(
r#"
SELECT
last_processed_l1_batch AS "last_processed_l1_batch!"
FROM
consistency_checker_info
"#
)
.instrument("get_consistency_checker_last_processed_l1_batch")
.report_latency()
.fetch_one(self.storage)
.await?;
Ok(L1BatchNumber(row.last_processed_l1_batch as u32))
}

pub async fn set_consistency_checker_last_processed_l1_batch(
&mut self,
l1_batch_number: L1BatchNumber,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
UPDATE consistency_checker_info
SET
last_processed_l1_batch = $1,
updated_at = NOW()
"#,
l1_batch_number.0 as i32,
)
.instrument("set_consistency_checker_last_processed_l1_batch")
.report_latency()
.with_arg("l1_batch_number", &l1_batch_number)
.execute(self.storage)
.await?;
Ok(())
}

pub async fn is_genesis_needed(&mut self) -> sqlx::Result<bool> {
let count = sqlx::query!(
r#"
Expand Down
16 changes: 15 additions & 1 deletion core/lib/zksync_core/src/consistency_checker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,20 @@ impl ConsistencyChecker {
.0
.saturating_sub(self.max_batches_to_recheck)
.into();

let last_processed_batch = self
.pool
.connection()
.await?
.blocks_dal()
.get_consistency_checker_last_processed_l1_batch()
.await?;

// We shouldn't check batches not present in the storage, and skip the genesis batch since
// it's not committed on L1.
let first_batch_to_check = first_batch_to_check
.max(earliest_l1_batch_number)
.max(L1BatchNumber(1));
.max(L1BatchNumber(last_processed_batch.0 + 1));
tracing::info!(
"Last committed L1 batch is #{last_committed_batch}; starting checks from L1 batch #{first_batch_to_check}"
);
Expand All @@ -534,6 +543,11 @@ impl ConsistencyChecker {

match self.check_commitments(batch_number, &local).await {
Ok(()) => {
let mut storage = self.pool.connection().await?;
storage
.blocks_dal()
.set_consistency_checker_last_processed_l1_batch(batch_number)
.await?;
self.event_handler.update_checked_batch(batch_number);
batch_number += 1;
}
Expand Down
6 changes: 6 additions & 0 deletions core/lib/zksync_core/src/consistency_checker/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ async fn checker_functions_after_snapshot_recovery(delay_batch_insertion: bool)
// Wait until the batch is checked.
let checked_batch = l1_batch_updates_receiver.recv().await.unwrap();
assert_eq!(checked_batch, l1_batch.header.number);
let last_reported_batch = storage
.blocks_dal()
.get_consistency_checker_last_processed_l1_batch()
.await
.unwrap();
assert_eq!(last_reported_batch, l1_batch.header.number);

stop_sender.send_replace(true);
checker_task.await.unwrap().unwrap();
Expand Down

0 comments on commit 03496e6

Please sign in to comment.