diff --git a/core/lib/dal/.sqlx/query-ea682d41219feda3514336f4c1ae6ecbe96475e83dfed603bd305e72460f52c2.json b/core/lib/dal/.sqlx/query-ea682d41219feda3514336f4c1ae6ecbe96475e83dfed603bd305e72460f52c2.json new file mode 100644 index 00000000000..96a71d8f234 --- /dev/null +++ b/core/lib/dal/.sqlx/query-ea682d41219feda3514336f4c1ae6ecbe96475e83dfed603bd305e72460f52c2.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE consistency_checker_info\n SET\n last_processed_l1_batch = $1,\n updated_at = NOW()\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "ea682d41219feda3514336f4c1ae6ecbe96475e83dfed603bd305e72460f52c2" +} diff --git a/core/lib/dal/.sqlx/query-f5854ce2c37bc66d38b05c9fb985618fd95722a77cc44a2e0519f3740191dc75.json b/core/lib/dal/.sqlx/query-f5854ce2c37bc66d38b05c9fb985618fd95722a77cc44a2e0519f3740191dc75.json new file mode 100644 index 00000000000..b6d90da9f58 --- /dev/null +++ b/core/lib/dal/.sqlx/query-f5854ce2c37bc66d38b05c9fb985618fd95722a77cc44a2e0519f3740191dc75.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n last_processed_l1_batch AS \"last_processed_l1_batch!\"\n FROM\n consistency_checker_info\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "last_processed_l1_batch!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + }, + "hash": "f5854ce2c37bc66d38b05c9fb985618fd95722a77cc44a2e0519f3740191dc75" +} diff --git a/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.down.sql b/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.down.sql new file mode 100644 index 00000000000..9d0b59df98c --- /dev/null +++ b/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.down.sql @@ -0,0 +1 @@ +DROP TABLE consistency_checker_info; diff --git a/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.up.sql b/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.up.sql new file mode 100644 index 00000000000..f19cd95fe8e --- /dev/null +++ b/core/lib/dal/migrations/20240320173835_consistency-checker-cursor.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE consistency_checker_info +( + last_processed_l1_batch BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL +); + +INSERT INTO consistency_checker_info(last_processed_l1_batch, created_at, updated_at) +VALUES (0, NOW(), NOW()); diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index fbe6a259222..7be67ab88d8 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -30,6 +30,45 @@ pub struct BlocksDal<'a, 'c> { } impl BlocksDal<'_, '_> { + pub async fn get_consistency_checker_last_processed_l1_batch( + &mut self, + ) -> sqlx::Result { + let row = sqlx::query!( + r#" + SELECT + last_processed_l1_batch AS "last_processed_l1_batch!" + FROM + consistency_checker_info + "# + ) + .instrument("get_consistency_checker_last_processed_l1_batch") + .report_latency() + .fetch_one(self.storage) + .await?; + Ok(L1BatchNumber(row.last_processed_l1_batch as u32)) + } + + pub async fn set_consistency_checker_last_processed_l1_batch( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> sqlx::Result<()> { + sqlx::query!( + r#" + UPDATE consistency_checker_info + SET + last_processed_l1_batch = $1, + updated_at = NOW() + "#, + l1_batch_number.0 as i32, + ) + .instrument("set_consistency_checker_last_processed_l1_batch") + .report_latency() + .with_arg("l1_batch_number", &l1_batch_number) + .execute(self.storage) + .await?; + Ok(()) + } + pub async fn is_genesis_needed(&mut self) -> sqlx::Result { let count = sqlx::query!( r#" diff --git a/core/lib/zksync_core/src/consistency_checker/mod.rs b/core/lib/zksync_core/src/consistency_checker/mod.rs index 5a2ac3d14d9..717a5e1737f 100644 --- a/core/lib/zksync_core/src/consistency_checker/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/mod.rs @@ -504,11 +504,20 @@ impl ConsistencyChecker { .0 .saturating_sub(self.max_batches_to_recheck) .into(); + + let last_processed_batch = self + .pool + .connection() + .await? + .blocks_dal() + .get_consistency_checker_last_processed_l1_batch() + .await?; + // We shouldn't check batches not present in the storage, and skip the genesis batch since // it's not committed on L1. let first_batch_to_check = first_batch_to_check .max(earliest_l1_batch_number) - .max(L1BatchNumber(1)); + .max(L1BatchNumber(last_processed_batch.0 + 1)); tracing::info!( "Last committed L1 batch is #{last_committed_batch}; starting checks from L1 batch #{first_batch_to_check}" ); @@ -534,6 +543,11 @@ impl ConsistencyChecker { match self.check_commitments(batch_number, &local).await { Ok(()) => { + let mut storage = self.pool.connection().await?; + storage + .blocks_dal() + .set_consistency_checker_last_processed_l1_batch(batch_number) + .await?; self.event_handler.update_checked_batch(batch_number); batch_number += 1; } diff --git a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs index 1e01a498a50..a5cace1334b 100644 --- a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs @@ -567,6 +567,12 @@ async fn checker_functions_after_snapshot_recovery(delay_batch_insertion: bool) // Wait until the batch is checked. let checked_batch = l1_batch_updates_receiver.recv().await.unwrap(); assert_eq!(checked_batch, l1_batch.header.number); + let last_reported_batch = storage + .blocks_dal() + .get_consistency_checker_last_processed_l1_batch() + .await + .unwrap(); + assert_eq!(last_reported_batch, l1_batch.header.number); stop_sender.send_replace(true); checker_task.await.unwrap().unwrap();