From 555753fa270a0b5d0514e3a03f93079f34d46a79 Mon Sep 17 00:00:00 2001 From: frisitano Date: Mon, 1 Sep 2025 20:45:16 +0800 Subject: [PATCH 1/2] update batch processing logic --- crates/chain-orchestrator/src/event.rs | 2 +- crates/chain-orchestrator/src/lib.rs | 82 ++----------------- crates/database/db/src/db.rs | 2 +- crates/database/db/src/models/batch_commit.rs | 2 + crates/database/db/src/operations.rs | 49 +++++------ crates/database/migration/src/lib.rs | 2 + ...220101_000001_create_batch_commit_table.rs | 1 + ...02341_add_commit_batch_processed_column.rs | 51 ++++++++++++ crates/node/src/args.rs | 3 +- crates/node/tests/e2e.rs | 8 -- 10 files changed, 94 insertions(+), 108 deletions(-) create mode 100644 crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index 562e81c2..edbf0afa 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -45,7 +45,7 @@ pub enum ChainOrchestratorEvent { BatchFinalized(Option>, Option), /// An L1 block has been finalized returning the L1 block number, the list of finalized batches /// and an optional finalized L2 block. - L1BlockFinalized(u64, Vec, Option), + L1BlockFinalized(u64, Vec), /// A `L1Message` event has been committed returning the message queue index. L1MessageCommitted(u64), /// A reorg has occurred on L1, returning the L1 block number of the new L1 head, diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index b94f5163..c611e600 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -11,7 +11,7 @@ use reth_network_p2p::{BlockClient, BodiesClient}; use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{ BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ChainImport, L1MessageEnvelope, - L2BlockInfoWithL1Messages, WithBlockNumber, + L2BlockInfoWithL1Messages, }; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; @@ -21,7 +21,6 @@ use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, Unw use scroll_network::NewBlockWithPeer; use std::{ collections::{HashMap, VecDeque}, - ops::Add, pin::Pin, sync::{ atomic::{AtomicU64, Ordering}, @@ -67,8 +66,6 @@ pub struct ChainOrchestrator { pending_futures: VecDeque, /// The block number of the L1 finalized block. l1_finalized_block_number: Arc, - /// The block number of the L2 finalized block. - l2_finalized_block_number: Arc, /// The chain specification for the chain orchestrator. chain_spec: Arc, /// The metrics for the chain orchestrator. @@ -109,7 +106,6 @@ impl< database, pending_futures: Default::default(), l1_finalized_block_number: Arc::new(AtomicU64::new(0)), - l2_finalized_block_number: Arc::new(AtomicU64::new(0)), chain_spec, metrics: ChainOrchestratorItem::iter() .map(|i| { @@ -529,7 +525,6 @@ impl< self.database.clone(), block_number, self.l1_finalized_block_number.clone(), - self.l2_finalized_block_number.clone(), )), )) } @@ -551,16 +546,13 @@ impl< )), )) } - L1Notification::BatchFinalization { hash, index, block_number } => { + L1Notification::BatchFinalization { hash: _hash, index, block_number } => { ChainOrchestratorFuture::HandleBatchFinalization(self.handle_metered( ChainOrchestratorItem::BatchFinalization, Box::pin(Self::handle_batch_finalization( self.database.clone(), - hash, index, block_number, - self.l1_finalized_block_number.clone(), - self.l2_finalized_block_number.clone(), )), )) } @@ -615,35 +607,18 @@ impl< database: Arc, block_number: u64, l1_block_number: Arc, - l2_block_number: Arc, ) -> Result, ChainOrchestratorError> { // Set the latest finalized L1 block in the database. database.set_latest_finalized_l1_block_number(block_number).await?; - // get the finalized batch infos. - // we add 1 to the low finalized l1 block number to avoid fetching the last finalized batch - // a second time. - let low_finalized_l1_block_number = - l1_block_number.load(Ordering::Relaxed).add(1).max(block_number); - let finalized_batches = database - .get_batches_by_finalized_block_range(low_finalized_l1_block_number, block_number) - .await?; - - // get the finalized block for the batch. - let finalized_block = if let Some(info) = finalized_batches.last() { - Self::fetch_highest_finalized_block(database, info.hash, l2_block_number).await? - } else { - None - }; + // Get all unprocessed batches that have been finalized by this L1 block finalization. + let finalized_batches = + database.fetch_and_update_unprocessed_finalized_batches(block_number).await?; - // update the chain orchestrator l1 block number. + // Update the chain orchestrator L1 block number. l1_block_number.store(block_number, Ordering::Relaxed); - Ok(Some(ChainOrchestratorEvent::L1BlockFinalized( - block_number, - finalized_batches, - finalized_block, - ))) + Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches))) } /// Handles an L1 message by inserting it into the database. @@ -715,54 +690,13 @@ impl< /// Handles a batch finalization event by updating the batch input in the database. async fn handle_batch_finalization( database: Arc, - batch_hash: B256, batch_index: u64, block_number: u64, - l1_block_number: Arc, - l2_block_number: Arc, ) -> Result, ChainOrchestratorError> { // finalize all batches up to `batch_index`. database.finalize_batches_up_to_index(batch_index, block_number).await?; - let mut finalized_block = None; - let mut finalized_batch = None; - - // check if the block where the batch was finalized is finalized on L1. - let l1_block_number_value = l1_block_number.load(Ordering::Relaxed); - if l1_block_number_value >= block_number { - // fetch the finalized block. - finalized_block = - Self::fetch_highest_finalized_block(database, batch_hash, l2_block_number).await?; - - // set the finalized batch info. - finalized_batch = - Some(WithBlockNumber::new(block_number, BatchInfo::new(batch_index, batch_hash))); - } - - let event = ChainOrchestratorEvent::BatchFinalized(finalized_batch, finalized_block); - Ok(Some(event)) - } - - /// Returns the highest finalized block for the provided batch hash. Will return [`None`] if the - /// block number has already been seen by the chain orchestrator. - async fn fetch_highest_finalized_block( - database: Arc, - batch_hash: B256, - l2_block_number: Arc, - ) -> Result, ChainOrchestratorError> { - let finalized_block = database.get_highest_block_for_batch_hash(batch_hash).await?; - - // only return the block if the chain orchestrator hasn't seen it. - // in which case also update the `l2_finalized_block_number` value. - Ok(finalized_block.filter(|info| { - let current_l2_block_number = l2_block_number.load(Ordering::Relaxed); - if info.number > current_l2_block_number { - l2_block_number.store(info.number, Ordering::Relaxed); - true - } else { - false - } - })) + Ok(None) } } diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 3292e646..3216351b 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -311,7 +311,7 @@ mod test { // Fetch the finalized batch for provided height and verify number. let batch_infos = db - .get_batches_by_finalized_block_range(100, 110) + .fetch_and_update_unprocessed_finalized_batches(110) .await .unwrap() .into_iter() diff --git a/crates/database/db/src/models/batch_commit.rs b/crates/database/db/src/models/batch_commit.rs index 47ed8bf1..b7a4b257 100644 --- a/crates/database/db/src/models/batch_commit.rs +++ b/crates/database/db/src/models/batch_commit.rs @@ -15,6 +15,7 @@ pub struct Model { calldata: Vec, blob_hash: Option>, pub(crate) finalized_block_number: Option, + processed: bool, } /// The relation for the batch input model. @@ -50,6 +51,7 @@ impl From for ActiveModel { calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()), blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())), finalized_block_number: ActiveValue::Unchanged(None), + processed: ActiveValue::Unchanged(false), } } } diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 319a052d..6e803b0d 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -107,19 +107,19 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { .map(|x| x.and_then(|x| x.parse::().ok()))?) } - /// Get the finalized batches between the provided range \[low; high\]. - async fn get_batches_by_finalized_block_range( + /// Fetches unprocessed batches up to the provided finalized L1 block number and updates their + /// status. + async fn fetch_and_update_unprocessed_finalized_batches( &self, - low: u64, - high: u64, + finalized_l1_block_number: u64, ) -> Result, DatabaseError> { - Ok(models::batch_commit::Entity::find() - .filter( - Condition::all() - .add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null()) - .add(models::batch_commit::Column::FinalizedBlockNumber.gte(low)) - .add(models::batch_commit::Column::FinalizedBlockNumber.lte(high)), - ) + let conditions = Condition::all() + .add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null()) + .add(models::batch_commit::Column::FinalizedBlockNumber.lte(finalized_l1_block_number)) + .add(models::batch_commit::Column::Processed.eq(false)); + + let batches = models::batch_commit::Entity::find() + .filter(conditions.clone()) .order_by_asc(models::batch_commit::Column::Index) .select_only() .column(models::batch_commit::Column::Index) @@ -131,7 +131,15 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { x.into_iter() .map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash))) .collect() - })?) + })?; + + models::batch_commit::Entity::update_many() + .col_expr(models::batch_commit::Column::Processed, Expr::value(true)) + .filter(conditions) + .exec(self.get_connection()) + .await?; + + Ok(batches) } /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. @@ -358,27 +366,22 @@ pub trait DatabaseOperations: DatabaseConnectionProvider { let finalized_block_number = self.get_finalized_l1_block_number().await?.unwrap_or(0); self.unwind(genesis_hash, finalized_block_number).await?; - // Fetch the latest safe L2 block and the block number where its associated batch was - // finalized. + // Delete all unprocessed batches from the database and return starting l2 safe head and l1 + // head. let safe = if let Some(batch_info) = self .get_latest_safe_l2_info() .await? .map(|(_, batch_info)| batch_info) .filter(|b| b.index > 1) { - let batch = self - .get_batch_by_index(batch_info.index) - .await? - .expect("Batch info must be present due to database query arguments"); + let previous_batch_index = batch_info.index - 1; let previous_batch = self - .get_batch_by_index(batch_info.index - 1) + .get_batch_by_index(previous_batch_index) .await? .expect("Batch info must be present due to database query arguments"); + self.delete_batches_gt_batch_index(previous_batch_index).await?; let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?; - ( - l2_block, - Some(batch.finalized_block_number.expect("All blocks in database are finalized")), - ) + (l2_block, Some(previous_batch.block_number + 1)) } else { (None, None) }; diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index 1f22c2c0..91ecd611 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -8,6 +8,7 @@ mod m20250411_072004_add_l2_block; mod m20250616_223947_add_metadata; mod m20250825_093350_remove_unsafe_l2_blocks; mod m20250829_042803_add_table_indexes; +mod m20250901_102341_add_commit_batch_processed_column; mod migration_info; pub use migration_info::{ MigrationInfo, ScrollDevMigrationInfo, ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo, @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator { Box::new(m20250616_223947_add_metadata::Migration), Box::new(m20250825_093350_remove_unsafe_l2_blocks::Migration), Box::new(m20250829_042803_add_table_indexes::Migration), + Box::new(m20250901_102341_add_commit_batch_processed_column::Migration), ] } } diff --git a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs index e85a9d07..0f531461 100644 --- a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs +++ b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs @@ -64,4 +64,5 @@ pub(crate) enum BatchCommit { Calldata, BlobHash, FinalizedBlockNumber, + Processed, } diff --git a/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs b/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs new file mode 100644 index 00000000..43e72107 --- /dev/null +++ b/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs @@ -0,0 +1,51 @@ +use super::m20220101_000001_create_batch_commit_table::BatchCommit; +use sea_orm::Statement; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Add the processed column to the batch_commit table. + manager + .alter_table( + Table::alter() + .table(BatchCommit::Table) + .add_column( + ColumnDef::new(BatchCommit::Processed).boolean().not_null().default(false), + ) + .to_owned(), + ) + .await?; + + // Backfill the processed column using data sourced from the l2_block table. + manager + .get_connection() + .execute(Statement::from_sql_and_values( + manager.get_database_backend(), + r#" + UPDATE batch_commit + SET processed = 1 + WHERE hash IN (SELECT batch_hash FROM l2_block); + "#, + vec![], + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // drop the processed column on the batch_commit table. + manager + .alter_table( + Table::alter() + .table(BatchCommit::Table) + .drop_column(BatchCommit::Processed) + .to_owned(), + ) + .await + } +} diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 82f3d47d..24bb869f 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -233,6 +233,7 @@ impl ScrollRollupNodeConfig { }); } + tracing::info!(target: "scroll::node::args", fcs = ?fcs, payload_building_duration = ?self.sequencer_args.payload_building_duration, "Starting engine driver"); let engine = EngineDriver::new( Arc::new(engine_api), chain_spec.clone(), @@ -256,7 +257,7 @@ impl ScrollRollupNodeConfig { let (l1_notification_tx, l1_notification_rx): (Option>>, _) = if let Some(provider) = l1_provider.filter(|_| !self.test) { - // Determine the start block number for the L1 watcher + tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher"); (None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await)) } else { // Create a channel for L1 notifications that we can use to inject L1 messages for diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index e398fcec..730b4a2a 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -977,15 +977,7 @@ async fn graceful_shutdown_consolidates_most_recent_batch_on_startup() -> eyre:: let mut rnm_events = handle.get_event_listener().await?; // Send the second batch again to mimic the watcher behaviour. - l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_0_data.clone()))).await?; l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data.clone()))).await?; - l1_notification_tx - .send(Arc::new(L1Notification::BatchFinalization { - hash: batch_0_data.hash, - index: batch_0_data.index, - block_number: batch_0_data.block_number, - })) - .await?; l1_notification_tx .send(Arc::new(L1Notification::BatchFinalization { hash: batch_1_data.hash, From b15026818f45158f787d06661bbc1db1245d14ec Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 2 Sep 2025 12:19:44 +0800 Subject: [PATCH 2/2] update comment --- crates/chain-orchestrator/src/event.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index edbf0afa..caf94579 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -43,8 +43,8 @@ pub enum ChainOrchestratorEvent { /// A batch has been finalized returning an optional finalized L2 block. Also returns a /// [`BatchInfo`] if the finalized event occurred in a finalized L1 block. BatchFinalized(Option>, Option), - /// An L1 block has been finalized returning the L1 block number, the list of finalized batches - /// and an optional finalized L2 block. + /// An L1 block has been finalized returning the L1 block number and the list of finalized + /// batches. L1BlockFinalized(u64, Vec), /// A `L1Message` event has been committed returning the message queue index. L1MessageCommitted(u64),