diff --git a/Cargo.lock b/Cargo.lock index 6f487492..4ba5bc6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11026,6 +11026,7 @@ dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rpc-types-engine", + "alloy-rpc-types-eth", "arbitrary", "derive_more", "eyre", @@ -11670,6 +11671,7 @@ dependencies = [ "metrics", "metrics-derive", "rand 0.9.2", + "reth-tracing", "rollup-node-primitives", "scroll-alloy-consensus", "scroll-alloy-rpc-types-engine", diff --git a/crates/chain-orchestrator/src/consolidation.rs b/crates/chain-orchestrator/src/consolidation.rs index 9e38c82a..c3af0af0 100644 --- a/crates/chain-orchestrator/src/consolidation.rs +++ b/crates/chain-orchestrator/src/consolidation.rs @@ -1,7 +1,9 @@ use super::ChainOrchestratorError; use alloy_provider::Provider; use futures::{stream::FuturesOrdered, TryStreamExt}; -use rollup_node_primitives::{BatchConsolidationOutcome, BatchInfo, L2BlockInfoWithL1Messages}; +use rollup_node_primitives::{ + BatchConsolidationOutcome, BatchInfo, BatchStatus, L2BlockInfoWithL1Messages, +}; use scroll_alloy_network::Scroll; use scroll_derivation_pipeline::{BatchDerivationResult, DerivedAttributes}; use scroll_engine::{block_matches_attributes, ForkchoiceState}; @@ -53,7 +55,11 @@ pub(crate) async fn reconcile_batch>( } let actions: Vec = futures.try_collect().await?; - Ok(BatchReconciliationResult { batch_info: batch.batch_info, actions }) + Ok(BatchReconciliationResult { + batch_info: batch.batch_info, + actions, + target_status: batch.target_status, + }) } /// The result of reconciling a batch with the L2 chain. @@ -63,6 +69,8 @@ pub(crate) struct BatchReconciliationResult { pub batch_info: BatchInfo, /// The actions that must be performed on the L2 chain to consolidate the batch. pub actions: Vec, + /// The target status of the batch after consolidation. + pub target_status: BatchStatus, } impl BatchReconciliationResult { @@ -93,7 +101,8 @@ impl BatchReconciliationResult { self, reorg_results: Vec, ) -> Result { - let mut consolidate_chain = BatchConsolidationOutcome::new(self.batch_info); + let mut consolidate_chain = + BatchConsolidationOutcome::new(self.batch_info, self.target_status); // First append all non-reorg results to the consolidated chain. self.actions.into_iter().filter(|action| !action.is_reorg()).for_each(|action| { diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index 47bf267a..363ba3ce 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -42,11 +42,23 @@ pub enum ChainOrchestratorEvent { batch_info: BatchInfo, /// The L1 block number in which the batch was committed. l1_block_number: u64, - /// The safe L2 block info. - safe_head: Option, }, /// A batch has been finalized returning a list of finalized batches. - BatchFinalized(u64, Vec), + BatchFinalized { + /// The L1 block info at which the batch finalization event was received. + l1_block_info: BlockInfo, + /// The list of batches that have been triggered for the derivation pipeline. + triggered_batches: Vec, + /// The finalized block info after finalizing the consolidated batches. + finalized_block_info: Option, + }, + /// A batch has been reverted returning the batch info and the new safe head. + BatchReverted { + /// The latest batch info after the revert. + batch_info: BatchInfo, + /// The new safe head after the revert. + safe_head: BlockInfo, + }, /// A new L1 block has been received returning the L1 block number. NewL1Block(u64), /// An L1 block has been finalized returning the L1 block number and the list of finalized diff --git a/crates/chain-orchestrator/src/handle/command.rs b/crates/chain-orchestrator/src/handle/command.rs index 184aeff1..03ed97ac 100644 --- a/crates/chain-orchestrator/src/handle/command.rs +++ b/crates/chain-orchestrator/src/handle/command.rs @@ -30,6 +30,9 @@ pub enum ChainOrchestratorCommand)), + /// Returns a database handle for direct database access. + #[cfg(feature = "test-utils")] + DatabaseHandle(oneshot::Sender>), } /// The database queries that can be sent to the rollup manager. diff --git a/crates/chain-orchestrator/src/handle/mod.rs b/crates/chain-orchestrator/src/handle/mod.rs index c6f099fe..b62ee195 100644 --- a/crates/chain-orchestrator/src/handle/mod.rs +++ b/crates/chain-orchestrator/src/handle/mod.rs @@ -110,4 +110,14 @@ impl> ChainOrchestratorHand self.send_command(ChainOrchestratorCommand::SetGossip((enabled, tx))); rx.await } + + /// Sends a command to the rollup manager to get a database handle for direct database access. + #[cfg(feature = "test-utils")] + pub async fn get_database_handle( + &self, + ) -> Result, oneshot::error::RecvError> { + let (tx, rx) = oneshot::channel(); + self.send_command(ChainOrchestratorCommand::DatabaseHandle(tx)); + rx.await + } } diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 5344c399..761d80d6 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -14,7 +14,7 @@ use reth_scroll_primitives::ScrollBlock; use reth_tasks::shutdown::Shutdown; use reth_tokio_util::{EventSender, EventStream}; use rollup_node_primitives::{ - BatchCommitData, BatchInfo, BlockConsolidationOutcome, BlockInfo, ChainImport, + BatchCommitData, BatchInfo, BatchStatus, BlockConsolidationOutcome, BlockInfo, ChainImport, L1MessageEnvelope, L2BlockInfoWithL1Messages, }; use rollup_node_providers::L1MessageProvider; @@ -406,6 +406,10 @@ impl< self.network.handle().set_gossip(enabled).await; let _ = tx.send(()); } + #[cfg(feature = "test-utils")] + ChainOrchestratorCommand::DatabaseHandle(tx) => { + let _ = tx.send(self.database.clone()); + } } Ok(()) @@ -452,8 +456,12 @@ impl< } BlockConsolidationAction::UpdateSafeHead(block_info) => { tracing::info!(target: "scroll::chain_orchestrator", ?block_info, "Updating safe head to consolidated block"); + let finalized_block_info = batch_reconciliation_result + .target_status + .is_finalized() + .then_some(block_info.block_info); self.engine - .update_fcs(None, Some(block_info.block_info), Some(block_info.block_info)) + .update_fcs(None, Some(block_info.block_info), finalized_block_info) .await?; BlockConsolidationOutcome::UpdateFcs(block_info) } @@ -487,11 +495,15 @@ impl< } // Update the forkchoice state to the new head. + let finalized_block_info = batch_reconciliation_result + .target_status + .is_finalized() + .then_some(block_info.block_info); self.engine .update_fcs( Some(block_info.block_info), Some(block_info.block_info), - Some(block_info.block_info), + finalized_block_info, ) .await?; @@ -533,15 +545,29 @@ impl< self.consensus.update_config(update); Ok(None) } - L1Notification::NewBlock(block_number) => self.handle_l1_new_block(*block_number).await, - L1Notification::Finalized(block_number) => { - metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number)) + L1Notification::NewBlock(block_info) => self.handle_l1_new_block(*block_info).await, + L1Notification::Finalized(block_info) => { + metered!(Task::L1Finalization, self, handle_l1_finalized(*block_info)) } - L1Notification::BatchCommit(batch) => { - metered!(Task::BatchCommit, self, handle_batch_commit(batch.clone())) + L1Notification::BatchCommit { block_info, data } => { + metered!(Task::BatchCommit, self, handle_batch_commit(*block_info, data.clone())) } - L1Notification::L1Message { message, block_number, block_timestamp: _ } => { - metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_number)) + L1Notification::BatchRevert { batch_info, block_info } => { + metered!( + Task::BatchRevert, + self, + handle_batch_revert(batch_info.index, batch_info.index, *block_info) + ) + } + L1Notification::BatchRevertRange { start, end, block_info } => { + metered!( + Task::BatchRevertRange, + self, + handle_batch_revert(*start, *end, *block_info) + ) + } + L1Notification::L1Message { message, block_info, block_timestamp: _ } => { + metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_info)) } L1Notification::Synced => { tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced"); @@ -552,11 +578,11 @@ impl< self.notify(ChainOrchestratorEvent::L1Synced); Ok(None) } - L1Notification::BatchFinalization { hash: _hash, index, block_number } => { + L1Notification::BatchFinalization { hash: _hash, index, block_info } => { metered!( Task::BatchFinalization, self, - handle_batch_finalization(*index, *block_number) + handle_batch_finalization(*index, *block_info) ) } } @@ -564,10 +590,10 @@ impl< async fn handle_l1_new_block( &self, - block_number: u64, + block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { - self.database.set_latest_l1_block_number(block_number).await?; - Ok(Some(ChainOrchestratorEvent::NewL1Block(block_number))) + self.database.set_latest_l1_block_number(block_info.number).await?; + Ok(Some(ChainOrchestratorEvent::NewL1Block(block_info.number))) } /// Collects reverted L2 transactions in [from, to], excluding L1 messages. @@ -613,9 +639,8 @@ impl< &mut self, block_number: u64, ) -> Result, ChainOrchestratorError> { - let genesis_hash = self.config.chain_spec().genesis_hash(); let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = - self.database.unwind(genesis_hash, block_number).await?; + self.database.unwind(block_number).await?; let (l2_head_block_info, reverted_transactions) = if let Some(block_number) = l2_head_block_number { @@ -681,69 +706,79 @@ impl< /// the new finalized L2 chain block and the list of finalized batches. async fn handle_l1_finalized( &mut self, - block_number: u64, + block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { - let finalized_batches = self + let (finalized_block_info, triggered_batches) = self .database .tx_mut(move |tx| async move { // Set the latest finalized L1 block in the database. - tx.set_finalized_l1_block_number(block_number).await?; + tx.set_finalized_l1_block_number(block_info.number).await?; + + // Finalize consolidated batches up to the finalized L1 block number. + let finalized_block_info = + tx.finalize_consolidated_batches(block_info.number).await?; // Get all unprocessed batches that have been finalized by this L1 block // finalization. - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await + let triggered_batches = + tx.fetch_and_update_unprocessed_finalized_batches(block_info.number).await?; + + Ok::<_, ChainOrchestratorError>((finalized_block_info, triggered_batches)) }) .await?; - for batch in &finalized_batches { - self.derivation_pipeline.push_batch(Arc::new(*batch)).await; + if finalized_block_info.is_some() { + tracing::info!(target: "scroll::chain_orchestrator", ?finalized_block_info, "Updating FCS with new finalized block info from L1 finalization"); + self.engine.update_fcs(None, None, finalized_block_info).await?; } - Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches))) + for batch in &triggered_batches { + self.derivation_pipeline.push_batch(*batch, BatchStatus::Finalized).await; + } + + Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_info.number, triggered_batches))) } /// Handles a batch input by inserting it into the database. async fn handle_batch_commit( - &self, + &mut self, + block_info: BlockInfo, batch: BatchCommitData, ) -> Result, ChainOrchestratorError> { + let batch_info: BatchInfo = (&batch).into(); let event = self .database .tx_mut(move |tx| { - let batch_clone = batch.clone(); + let batch = batch.clone(); async move { - let prev_batch_index = batch_clone.index - 1; + let prev_batch_index = batch.index - 1; // Perform a consistency check to ensure the previous commit batch exists in the // database. if tx.get_batch_by_index(prev_batch_index).await?.is_none() { - return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)); + return Err(ChainOrchestratorError::BatchCommitGap(batch.index)); } - // remove any batches with an index greater than the previous batch. - let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; - - // handle the case of a batch revert. - let new_safe_head = if affected > 0 { - tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; - tx.get_highest_block_for_batch_index(prev_batch_index).await? - } else { - None - }; - let event = ChainOrchestratorEvent::BatchCommitIndexed { - batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), - l1_block_number: batch_clone.block_number, - safe_head: new_safe_head, + batch_info: BatchInfo::new(batch.index, batch.hash), + l1_block_number: batch.block_number, }; // insert the batch and commit the transaction. - tx.insert_batch(batch_clone).await?; + tx.insert_batch(batch).await?; + + // insert the L1 block info. + tx.insert_l1_block_info(block_info).await?; + Ok::<_, ChainOrchestratorError>(Some(event)) } }) .await?; + if self.sync_state.is_synced() { + self.derivation_pipeline.push_batch(batch_info, BatchStatus::Consolidated).await; + } + Ok(event) } @@ -751,46 +786,83 @@ impl< async fn handle_batch_finalization( &mut self, batch_index: u64, - block_number: u64, + l1_block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { - let event = self + let (triggered_batches, finalized_block_info) = self .database .tx_mut(move |tx| async move { + // Insert the L1 block info. + tx.insert_l1_block_info(l1_block_info).await?; + // finalize all batches up to `batch_index`. - tx.finalize_batches_up_to_index(batch_index, block_number).await?; + tx.finalize_batches_up_to_index(batch_index, l1_block_info.number).await?; + let finalized_block_number = tx.get_finalized_l1_block_number().await?; + let finalized_block_info = + tx.finalize_consolidated_batches(finalized_block_number).await?; // Get all unprocessed batches that have been finalized by this L1 block // finalization. - let finalized_block_number = tx.get_finalized_l1_block_number().await?; - if finalized_block_number >= block_number { - let finalized_batches = tx - .fetch_and_update_unprocessed_finalized_batches(finalized_block_number) - .await?; - - return Ok(Some(ChainOrchestratorEvent::BatchFinalized( - block_number, - finalized_batches, - ))); - } + let triggered_batches = if finalized_block_number >= l1_block_info.number { + tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number) + .await? + } else { + vec![] + }; - Ok::<_, ChainOrchestratorError>(None) + Ok::<_, ChainOrchestratorError>((triggered_batches, finalized_block_info)) }) - .await; + .await?; - if let Ok(Some(ChainOrchestratorEvent::BatchFinalized(_, batches))) = &event { - for batch in batches { - self.derivation_pipeline.push_batch(Arc::new(*batch)).await; - } + if finalized_block_info.is_some() { + tracing::info!(target: "scroll::chain_orchestrator", ?finalized_block_info, "Updating FCS with new finalized block info from batch finalization"); + self.engine.update_fcs(None, None, finalized_block_info).await?; } - event + for batch in &triggered_batches { + self.derivation_pipeline.push_batch(*batch, BatchStatus::Finalized).await; + } + + Ok(Some(ChainOrchestratorEvent::BatchFinalized { + l1_block_info, + triggered_batches, + finalized_block_info, + })) + } + + /// Handles a batch revert event by updating the database. + async fn handle_batch_revert( + &mut self, + start_index: u64, + end_index: u64, + l1_block_info: BlockInfo, + ) -> Result, ChainOrchestratorError> { + let (safe_block_info, batch_info) = self + .database + .tx_mut(move |tx| async move { + tx.insert_l1_block_info(l1_block_info).await?; + tx.set_batch_revert_block_number_for_batch_range( + start_index, + end_index, + l1_block_info, + ) + .await?; + + // handle the case of a batch revert. + Ok::<_, ChainOrchestratorError>(tx.get_latest_safe_l2_info().await?) + }) + .await?; + + // Update the forkchoice state to the new safe block. + self.engine.update_fcs(None, Some(safe_block_info), None).await?; + + Ok(Some(ChainOrchestratorEvent::BatchReverted { batch_info, safe_head: safe_block_info })) } /// Handles an L1 message by inserting it into the database. async fn handle_l1_message( &self, l1_message: TxL1Message, - l1_block_number: u64, + l1_block_info: BlockInfo, ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); let queue_hash = compute_l1_message_queue_hash( @@ -799,7 +871,7 @@ impl< self.config.l1_v2_message_queue_start_index(), ) .await?; - let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); + let l1_message = L1MessageEnvelope::new(l1_message, l1_block_info.number, None, queue_hash); // Perform a consistency check to ensure the previous L1 message exists in the database. self.database @@ -822,6 +894,7 @@ impl< } tx.insert_l1_message(l1_message.clone()).await?; + tx.insert_l1_block_info(l1_block_info).await?; Ok::<_, ChainOrchestratorError>(()) } }) @@ -1138,7 +1211,7 @@ impl< /// /// This involves validating the L1 messages in the blocks against the expected L1 messages /// synced from L1. - async fn consolidate_chain(&self) -> Result<(), ChainOrchestratorError> { + async fn consolidate_chain(&mut self) -> Result<(), ChainOrchestratorError> { tracing::trace!(target: "scroll::chain_orchestrator", fcs = ?self.engine.fcs(), "Consolidating chain from safe to head"); let safe_block_number = self.engine.fcs().safe_block_info().number; @@ -1191,6 +1264,14 @@ impl< // transactions into the transaction pool. self.network.handle().inner().update_sync_state(RethSyncState::Idle); + // Fetch all unprocessed committed batches and push them to the derivation pipeline as + // consolidated. + let committed_batches = + self.database.fetch_and_update_unprocessed_committed_batches().await?; + for batch_commit in committed_batches { + self.derivation_pipeline.push_batch(batch_commit, BatchStatus::Consolidated).await; + } + self.notify(ChainOrchestratorEvent::ChainConsolidated { from: safe_block_number, to: head_block_number, diff --git a/crates/chain-orchestrator/src/metrics.rs b/crates/chain-orchestrator/src/metrics.rs index 1e5da569..8859712f 100644 --- a/crates/chain-orchestrator/src/metrics.rs +++ b/crates/chain-orchestrator/src/metrics.rs @@ -68,6 +68,10 @@ pub(crate) enum Task { BatchCommit, /// Batch finalization event handling. BatchFinalization, + /// Batch revert event handling. + BatchRevert, + /// Batch revert range event handling. + BatchRevertRange, } impl Task { @@ -79,6 +83,8 @@ impl Task { Self::L1Message => "l1_message", Self::BatchCommit => "batch_commit", Self::BatchFinalization => "batch_finalization", + Self::BatchRevert => "batch_revert", + Self::BatchRevertRange => "batch_revert_range", Self::BatchReconciliation => "batch_reconciliation", Self::ChainConsolidation => "chain_consolidation", Self::L2BlockImport => "l2_block_import", diff --git a/crates/database/db/Cargo.toml b/crates/database/db/Cargo.toml index 1f331462..2d7ddcaf 100644 --- a/crates/database/db/Cargo.toml +++ b/crates/database/db/Cargo.toml @@ -41,6 +41,9 @@ tracing.workspace = true scroll-migration.workspace = true rollup-node-primitives = { workspace = true, features = ["arbitrary"] } +# reth +reth-tracing.workspace = true + # misc arbitrary.workspace = true rand.workspace = true diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index d84f45ef..4bcffece 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -8,8 +8,8 @@ use crate::{ }; use alloy_primitives::{Signature, B256}; use rollup_node_primitives::{ - BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1MessageEnvelope, - L2BlockInfoWithL1Messages, + BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1BlockStartupInfo, + L1MessageEnvelope, L2BlockInfoWithL1Messages, }; use scroll_alloy_rpc_types_engine::BlockDataHint; use sea_orm::{ @@ -148,6 +148,111 @@ macro_rules! metered { #[async_trait::async_trait] impl DatabaseWriteOperations for Database { + async fn insert_l1_block_info(&self, block_info: BlockInfo) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::InsertL1BlockInfo, + self, + tx_mut(move |tx| { async move { tx.insert_l1_block_info(block_info).await } }) + ) + } + + async fn remove_l1_block_info_leq(&self, block_info: u64) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::RemoveL1BlockInfoLeq, + self, + tx_mut(move |tx| { async move { tx.remove_l1_block_info_leq(block_info).await } }) + ) + } + + async fn remove_l1_block_info_gt(&self, block_info: u64) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::RemoveL1BlockInfoGt, + self, + tx_mut(move |tx| { async move { tx.remove_l1_block_info_gt(block_info).await } }) + ) + } + + async fn delete_batch_finalization_gt_block_number( + &self, + block_number: u64, + ) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::DeleteBatchFinalizationGtBlockNumber, + self, + tx_mut(move |tx| { + async move { tx.delete_batch_finalization_gt_block_number(block_number).await } + }) + ) + } + + async fn set_batch_revert_block_number_for_batch_range( + &self, + start_index: u64, + end_index: u64, + block_info: BlockInfo, + ) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::SetBatchRevertBlockNumberForBatchRange, + self, + tx_mut(move |tx| { + async move { + tx.set_batch_revert_block_number_for_batch_range( + start_index, + end_index, + block_info, + ) + .await + } + }) + ) + } + + async fn delete_batch_revert_gt_block_number( + &self, + block_number: u64, + ) -> Result { + metered!( + DatabaseOperation::DeleteBatchRevertGtBlockNumber, + self, + tx_mut( + move |tx| async move { tx.delete_batch_revert_gt_block_number(block_number).await } + ) + ) + } + + async fn finalize_consolidated_batches( + &self, + finalized_l1_block_number: u64, + ) -> Result, DatabaseError> { + metered!( + DatabaseOperation::FinalizeConsolidatedBatches, + self, + tx_mut(move |tx| async move { + tx.finalize_consolidated_batches(finalized_l1_block_number).await + }) + ) + } + + async fn change_batch_processing_to_committed_status(&self) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::ChangeBatchProcessingToCommittedStatus, + self, + tx_mut(move |tx| async move { tx.change_batch_processing_to_committed_status().await }) + ) + } + + async fn update_batch_status( + &self, + batch_hash: B256, + status: rollup_node_primitives::BatchStatus, + ) -> Result<(), DatabaseError> { + metered!( + DatabaseOperation::UpdateBatchStatus, + self, + tx_mut(move |tx| async move { tx.update_batch_status(batch_hash, status).await }) + ) + } + async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> { metered!( DatabaseOperation::InsertBatch, @@ -218,6 +323,18 @@ impl DatabaseWriteOperations for Database { ) } + async fn fetch_and_update_unprocessed_committed_batches( + &self, + ) -> Result, DatabaseError> { + metered!( + DatabaseOperation::FetchAndUpdateUnprocessedCommittedBatches, + self, + tx_mut( + move |tx| async move { tx.fetch_and_update_unprocessed_committed_batches().await } + ) + ) + } + async fn delete_batches_gt_block_number( &self, block_number: u64, @@ -270,14 +387,11 @@ impl DatabaseWriteOperations for Database { ) } - async fn prepare_on_startup( - &self, - genesis_hash: B256, - ) -> Result<(Option, Option), DatabaseError> { + async fn prepare_l1_watcher_start_info(&self) -> Result { metered!( DatabaseOperation::PrepareOnStartup, self, - tx_mut(move |tx| async move { tx.prepare_on_startup(genesis_hash).await }) + tx_mut(move |tx| async move { tx.prepare_l1_watcher_start_info().await }) ) } @@ -383,15 +497,11 @@ impl DatabaseWriteOperations for Database { ) } - async fn unwind( - &self, - genesis_hash: B256, - l1_block_number: u64, - ) -> Result { + async fn unwind(&self, l1_block_number: u64) -> Result { metered!( DatabaseOperation::Unwind, self, - tx_mut(move |tx| async move { tx.unwind(genesis_hash, l1_block_number).await }) + tx_mut(move |tx| async move { tx.unwind(l1_block_number).await }) ) } @@ -421,6 +531,45 @@ impl DatabaseReadOperations for Database { ) } + async fn get_batch_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetBatchByHash, + self, + tx(move |tx| async move { tx.get_batch_by_hash(batch_hash).await }) + ) + } + + #[cfg(test)] + async fn get_batch_status_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetBatchStatusByHash, + self, + tx(move |tx| async move { tx.get_batch_status_by_hash(batch_hash).await }) + ) + } + + async fn get_latest_indexed_event_l1_block_number(&self) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetLatestIndexedEventL1BlockNumber, + self, + tx(|tx| async move { tx.get_latest_indexed_event_l1_block_number().await }) + ) + } + + async fn get_l1_block_info(&self) -> Result, DatabaseError> { + metered!( + DatabaseOperation::GetL1BlockInfo, + self, + tx(|tx| async move { tx.get_l1_block_info().await }) + ) + } + async fn get_latest_l1_block_number(&self) -> Result { metered!( DatabaseOperation::GetLatestL1BlockNumber, @@ -502,9 +651,7 @@ impl DatabaseReadOperations for Database { ) } - async fn get_latest_safe_l2_info( - &self, - ) -> Result, DatabaseError> { + async fn get_latest_safe_l2_info(&self) -> Result<(BlockInfo, BatchInfo), DatabaseError> { metered!( DatabaseOperation::GetLatestSafeL2Info, self, @@ -735,7 +882,7 @@ mod test { let mut u = Unstructured::new(&bytes); // Generate 10 finalized batches at L1 block 100. - for i in 0..10 { + for i in 1..10 { let batch_commit = BatchCommitData { index: i, calldata: Arc::new(vec![].into()), @@ -770,7 +917,9 @@ mod test { .await; for batch in batches { let batch = batch.unwrap(); - if batch.index < 10 { + if batch.index == 0 { + assert_eq!(batch.finalized_block_number, Some(0)); + } else if batch.index < 10 { assert_eq!(batch.finalized_block_number, Some(100)); } else if batch.index <= 15 { assert_eq!(batch.finalized_block_number, Some(200)); @@ -828,6 +977,8 @@ mod test { #[tokio::test] async fn test_derived_block_exists() { + reth_tracing::init_test_tracing(); + // Set up the test database. let db = setup_test_db().await; @@ -1155,7 +1306,7 @@ mod test { let mut u = Unstructured::new(&bytes); // Initially should return the genesis block and hash. - let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap().unwrap(); + let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap(); assert_eq!(latest_safe_block.number, 0); assert_eq!(batch.index, 0); @@ -1172,11 +1323,13 @@ mod test { // Should return the highest safe block (block 201) let latest_safe = db.get_latest_safe_l2_info().await.unwrap(); - assert_eq!(latest_safe, Some((safe_block_2, batch_info))); + assert_eq!(latest_safe, (safe_block_2, batch_info)); } #[tokio::test] async fn test_delete_l2_blocks_gt_block_number() { + reth_tracing::init_test_tracing(); + // Set up the test database. let db = setup_test_db().await; @@ -1217,7 +1370,7 @@ mod test { let db = setup_test_db().await; // Generate unstructured bytes. - let mut bytes = [0u8; 1024]; + let mut bytes = [0u8; 4096]; rand::rng().fill(bytes.as_mut_slice()); let mut u = Unstructured::new(&bytes); @@ -1310,6 +1463,8 @@ mod test { #[tokio::test] async fn test_insert_block_upsert_behavior() { + reth_tracing::init_test_tracing(); + // Set up the test database. let db = setup_test_db().await; @@ -1377,25 +1532,31 @@ mod test { let mut u = Unstructured::new(&bytes); // Insert batch 1 and associate it with two blocks in the database + let l1_block_info_1 = BlockInfo { number: 10, hash: B256::arbitrary(&mut u).unwrap() }; let batch_data_1 = BatchCommitData { index: 1, block_number: 10, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_1 = BlockInfo { number: 1, hash: B256::arbitrary(&mut u).unwrap() }; let block_2 = BlockInfo { number: 2, hash: B256::arbitrary(&mut u).unwrap() }; + db.insert_l1_block_info(l1_block_info_1).await.unwrap(); db.insert_batch(batch_data_1.clone()).await.unwrap(); db.insert_blocks(vec![block_1, block_2], batch_data_1.clone().into()).await.unwrap(); // Insert batch 2 and associate it with one block in the database + let l1_block_info_2 = BlockInfo { number: 20, hash: B256::arbitrary(&mut u).unwrap() }; let batch_data_2 = BatchCommitData { index: 2, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_3 = BlockInfo { number: 3, hash: B256::arbitrary(&mut u).unwrap() }; + db.insert_l1_block_info(l1_block_info_2).await.unwrap(); db.insert_batch(batch_data_2.clone()).await.unwrap(); db.insert_blocks(vec![block_3], batch_data_2.clone().into()).await.unwrap(); // Insert batch 3 produced at the same block number as batch 2 and associate it with one // block + let l1_block_info_3 = BlockInfo { number: 30, hash: B256::arbitrary(&mut u).unwrap() }; let batch_data_3 = - BatchCommitData { index: 3, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; + BatchCommitData { index: 3, block_number: 30, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_4 = BlockInfo { number: 4, hash: B256::arbitrary(&mut u).unwrap() }; + db.insert_l1_block_info(l1_block_info_3).await.unwrap(); db.insert_batch(batch_data_3.clone()).await.unwrap(); db.insert_blocks(vec![block_4], batch_data_3.clone().into()).await.unwrap(); @@ -1419,24 +1580,10 @@ mod test { assert_eq!(retried_block_4, block_4); // Call prepare_on_startup which should not error - let result = db.prepare_on_startup(Default::default()).await.unwrap(); + let result = db.prepare_l1_watcher_start_info().await.unwrap(); // verify the result - assert_eq!(result, (Some(block_2), Some(11))); - - // Verify that batches 2 and 3 are deleted - let batch_1 = db.get_batch_by_index(1).await.unwrap(); - let batch_2 = db.get_batch_by_index(2).await.unwrap(); - let batch_3 = db.get_batch_by_index(3).await.unwrap(); - assert!(batch_1.is_some()); - assert!(batch_2.is_none()); - assert!(batch_3.is_none()); - - // Verify that blocks 3 and 4 are deleted - let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap(); - let retried_block_4 = db.get_l2_block_info_by_number(4).await.unwrap(); - assert!(retried_block_3.is_none()); - assert!(retried_block_4.is_none()); + assert_eq!(result, L1BlockStartupInfo::UnsafeBlocks(vec![l1_block_info_3])); } #[tokio::test] diff --git a/crates/database/db/src/metrics.rs b/crates/database/db/src/metrics.rs index a352ea61..fda77bf3 100644 --- a/crates/database/db/src/metrics.rs +++ b/crates/database/db/src/metrics.rs @@ -17,6 +17,15 @@ pub(crate) struct DatabaseMetrics { #[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)] pub(crate) enum DatabaseOperation { // Write operations + InsertL1BlockInfo, + RemoveL1BlockInfoLeq, + DeleteBatchFinalizationGtBlockNumber, + SetBatchRevertBlockNumberForBatchRange, + DeleteBatchRevertGtBlockNumber, + FinalizeConsolidatedBatches, + ChangeBatchProcessingToCommittedStatus, + RemoveL1BlockInfoGt, + UpdateBatchStatus, InsertBatch, FinalizeBatchesUpToIndex, SetLatestL1BlockNumber, @@ -24,6 +33,7 @@ pub(crate) enum DatabaseOperation { SetProcessedL1BlockNumber, SetL2HeadBlockNumber, FetchAndUpdateUnprocessedFinalizedBatches, + FetchAndUpdateUnprocessedCommittedBatches, DeleteBatchesGtBlockNumber, DeleteBatchesGtBatchIndex, InsertL1Message, @@ -43,6 +53,10 @@ pub(crate) enum DatabaseOperation { InsertSignature, // Read operations GetBatchByIndex, + GetBatchByHash, + GetBatchStatusByHash, + GetLatestIndexedEventL1BlockNumber, + GetL1BlockInfo, GetLatestL1BlockNumber, GetFinalizedL1BlockNumber, GetProcessedL1BlockNumber, @@ -61,6 +75,21 @@ impl DatabaseOperation { /// Returns the str representation of the [`DatabaseOperation`]. pub(crate) const fn as_str(&self) -> &'static str { match self { + Self::InsertL1BlockInfo => "insert_l1_block_info", + Self::RemoveL1BlockInfoLeq => "remove_l1_block_info_leq", + Self::DeleteBatchFinalizationGtBlockNumber => { + "delete_batch_finalization_gt_block_number" + } + Self::SetBatchRevertBlockNumberForBatchRange => { + "set_batch_revert_block_number_for_batch_range" + } + Self::DeleteBatchRevertGtBlockNumber => "delete_batch_revert_gt_block_number", + Self::FinalizeConsolidatedBatches => "finalize_consolidated_batches", + Self::ChangeBatchProcessingToCommittedStatus => { + "change_batch_processing_to_committed_status" + } + Self::UpdateBatchStatus => "update_batch_status", + Self::RemoveL1BlockInfoGt => "remove_l1_block_info_gt", Self::InsertBatch => "insert_batch", Self::FinalizeBatchesUpToIndex => "finalize_batches_up_to_index", Self::SetLatestL1BlockNumber => "set_latest_l1_block_number", @@ -70,6 +99,9 @@ impl DatabaseOperation { Self::FetchAndUpdateUnprocessedFinalizedBatches => { "fetch_and_update_unprocessed_finalized_batches" } + Self::FetchAndUpdateUnprocessedCommittedBatches => { + "fetch_and_update_unprocessed_committed_batches" + } Self::DeleteBatchesGtBlockNumber => "delete_batches_gt_block_number", Self::DeleteBatchesGtBatchIndex => "delete_batches_gt_batch_index", Self::InsertL1Message => "insert_l1_message", @@ -88,6 +120,10 @@ impl DatabaseOperation { Self::Unwind => "unwind", Self::InsertSignature => "insert_signature", Self::GetBatchByIndex => "get_batch_by_index", + Self::GetBatchByHash => "get_batch_by_hash", + Self::GetBatchStatusByHash => "get_batch_status_by_hash", + Self::GetLatestIndexedEventL1BlockNumber => "get_latest_indexed_event_l1_block_number", + Self::GetL1BlockInfo => "get_l1_block_info", Self::GetLatestL1BlockNumber => "get_latest_l1_block_number", Self::GetFinalizedL1BlockNumber => "get_finalized_l1_block_number", Self::GetProcessedL1BlockNumber => "get_processed_l1_block_number", diff --git a/crates/database/db/src/models/batch_commit.rs b/crates/database/db/src/models/batch_commit.rs index b7a4b257..aa9aa4dd 100644 --- a/crates/database/db/src/models/batch_commit.rs +++ b/crates/database/db/src/models/batch_commit.rs @@ -7,15 +7,16 @@ use sea_orm::{entity::prelude::*, ActiveValue}; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "batch_commit")] pub struct Model { - #[sea_orm(primary_key)] pub(crate) index: i64, + #[sea_orm(primary_key)] pub(crate) hash: Vec, block_number: i64, block_timestamp: i64, calldata: Vec, blob_hash: Option>, pub(crate) finalized_block_number: Option, - processed: bool, + reverted_block_number: Option, + status: String, } /// The relation for the batch input model. @@ -51,7 +52,8 @@ impl From for ActiveModel { calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()), blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())), finalized_block_number: ActiveValue::Unchanged(None), - processed: ActiveValue::Unchanged(false), + reverted_block_number: ActiveValue::Unchanged(None), + status: ActiveValue::Set("committed".into()), } } } @@ -68,6 +70,7 @@ impl From for BatchCommitData { .blob_hash .map(|b| b.as_slice().try_into().expect("data persisted in database is valid")), finalized_block_number: value.finalized_block_number.map(|b| b as u64), + reverted_block_number: value.reverted_block_number.map(|b| b as u64), } } } diff --git a/crates/database/db/src/models/l1_block.rs b/crates/database/db/src/models/l1_block.rs new file mode 100644 index 00000000..313892d9 --- /dev/null +++ b/crates/database/db/src/models/l1_block.rs @@ -0,0 +1,36 @@ +use alloy_primitives::B256; +use rollup_node_primitives::BlockInfo; +use sea_orm::{entity::prelude::*, ActiveValue}; + +/// A database model that represents an L1 block. +#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] +#[sea_orm(table_name = "l1_block")] +pub struct Model { + #[sea_orm(primary_key)] + block_number: i64, + block_hash: Vec, +} + +/// The relation for the batch input model. +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +/// The active model behavior for the batch input model. +impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(block_info: BlockInfo) -> Self { + Self { + block_number: ActiveValue::Set( + block_info.number.try_into().expect("block number should fit in i64"), + ), + block_hash: ActiveValue::Set(block_info.hash.to_vec()), + } + } +} + +impl From for BlockInfo { + fn from(value: Model) -> Self { + Self { number: value.block_number as u64, hash: B256::from_slice(&value.block_hash) } + } +} diff --git a/crates/database/db/src/models/l2_block.rs b/crates/database/db/src/models/l2_block.rs index 21ac8ead..9e328e58 100644 --- a/crates/database/db/src/models/l2_block.rs +++ b/crates/database/db/src/models/l2_block.rs @@ -6,11 +6,12 @@ use sea_orm::{entity::prelude::*, ActiveValue}; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] #[sea_orm(table_name = "l2_block")] pub struct Model { - #[sea_orm(primary_key)] block_number: i64, + #[sea_orm(primary_key)] block_hash: Vec, batch_index: i64, batch_hash: Vec, + reverted: bool, } impl Model { @@ -57,6 +58,7 @@ impl From<(BlockInfo, BatchInfo)> for ActiveModel { batch_info.index.try_into().expect("index should fit in i64"), ), batch_hash: ActiveValue::Set(batch_info.hash.to_vec()), + reverted: ActiveValue::Set(false), } } } diff --git a/crates/database/db/src/models/mod.rs b/crates/database/db/src/models/mod.rs index f6dd71e9..1bb037ea 100644 --- a/crates/database/db/src/models/mod.rs +++ b/crates/database/db/src/models/mod.rs @@ -1,6 +1,9 @@ /// This module contains the batch commit database model. pub mod batch_commit; +/// This module contains the L1 block database model. +pub mod l1_block; + /// This module contains the derived block model. pub mod l2_block; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 86d7a10e..d505d35f 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -3,8 +3,8 @@ use crate::{ReadConnectionProvider, WriteConnectionProvider}; use alloy_primitives::{Signature, B256}; use rollup_node_primitives::{ - BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1MessageEnvelope, - L2BlockInfoWithL1Messages, Metadata, + BatchCommitData, BatchConsolidationOutcome, BatchInfo, BatchStatus, BlockInfo, + L1BlockStartupInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata, }; use scroll_alloy_rpc_types_engine::BlockDataHint; use sea_orm::{ @@ -18,6 +18,17 @@ use std::fmt; #[async_trait::async_trait] #[auto_impl::auto_impl(Arc)] pub trait DatabaseWriteOperations { + /// Insert a [`BlockInfo`] representing an L1 block into the database. + async fn insert_l1_block_info(&self, block_info: BlockInfo) -> Result<(), DatabaseError>; + + /// Remove all [`BlockInfo`]s representing L1 blocks with block numbers less than or equal to + /// the provided block number. + async fn remove_l1_block_info_leq(&self, block_info: u64) -> Result<(), DatabaseError>; + + /// Remove all [`BlockInfo`]s representing L1 blocks with block numbers greater than the + /// provided block number. + async fn remove_l1_block_info_gt(&self, block_number: u64) -> Result<(), DatabaseError>; + /// Insert a [`BatchCommitData`] into the database. async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError>; @@ -42,16 +53,60 @@ pub trait DatabaseWriteOperations { async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError>; /// Fetches unprocessed batches up to the provided finalized L1 block number and updates their - /// status. + /// status to processing. async fn fetch_and_update_unprocessed_finalized_batches( &self, finalized_l1_block_number: u64, ) -> Result, DatabaseError>; + /// Fetches unprocessed committed batches and updates their status to processing. + async fn fetch_and_update_unprocessed_committed_batches( + &self, + ) -> Result, DatabaseError>; + /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. async fn delete_batches_gt_block_number(&self, block_number: u64) -> Result; + /// Delete all effects of `BatchFinalization` events with a block number greater than the + /// provided block number. + async fn delete_batch_finalization_gt_block_number( + &self, + block_number: u64, + ) -> Result<(), DatabaseError>; + + /// Sets the L1 block number of the batch revert associated with the provided batch index range. + async fn set_batch_revert_block_number_for_batch_range( + &self, + start_index: u64, + end_index: u64, + block_info: BlockInfo, + ) -> Result<(), DatabaseError>; + + /// Delete all batch reverts with a block number greater than the provided block number and + /// returns the number of deleted reverts. + async fn delete_batch_revert_gt_block_number( + &self, + block_number: u64, + ) -> Result; + + /// Finalize consolidated batches by updating their status in the database and returning the new + /// finalized head. + async fn finalize_consolidated_batches( + &self, + finalized_l1_block_number: u64, + ) -> Result, DatabaseError>; + + /// Set batches with processing status to committed status. + async fn change_batch_processing_to_committed_status(&self) -> Result<(), DatabaseError>; + + /// Update the status of a batch identified by its hash. + async fn update_batch_status( + &self, + batch_hash: B256, + status: BatchStatus, + ) -> Result<(), DatabaseError>; + /// Delete all [`BatchCommitData`]s with a batch index greater than the provided index. async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result; @@ -68,19 +123,8 @@ pub trait DatabaseWriteOperations { l1_block_number: u64, ) -> Result, DatabaseError>; - /// Prepare the database on startup and return metadata used for other components in the - /// rollup-node. - /// - /// This method first unwinds the database to the finalized L1 block. It then fetches the batch - /// info for the latest safe L2 block. It takes note of the L1 block number at which - /// this batch was produced (currently the finalized block for the batch until we implement - /// issue #273). It then retrieves the latest block for the previous batch (i.e., the batch - /// before the latest safe block). It returns a tuple of this latest fetched block and the - /// L1 block number of the batch. - async fn prepare_on_startup( - &self, - genesis_hash: B256, - ) -> Result<(Option, Option), DatabaseError>; + /// Returns the L1 block info required to start the L1 watcher on startup. + async fn prepare_l1_watcher_start_info(&self) -> Result; /// Delete all L2 blocks with a block number greater than the provided block number. async fn delete_l2_blocks_gt_block_number( @@ -130,11 +174,7 @@ pub trait DatabaseWriteOperations { /// Unwinds the chain orchestrator by deleting all indexed data greater than the provided L1 /// block number. - async fn unwind( - &self, - genesis_hash: B256, - l1_block_number: u64, - ) -> Result; + async fn unwind(&self, l1_block_number: u64) -> Result; /// Store a block signature in the database. /// TODO: remove this once we deprecated l2geth. @@ -147,14 +187,52 @@ pub trait DatabaseWriteOperations { #[async_trait::async_trait] impl DatabaseWriteOperations for T { + async fn insert_l1_block_info(&self, block_info: BlockInfo) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", %block_info, "Inserting L1 block info into database."); + let finalized_l1_block_number = self.get_finalized_l1_block_number().await?; + if block_info.number <= finalized_l1_block_number { + tracing::trace!(target: "scroll::db", %block_info, %finalized_l1_block_number, "L1 block info is less than or equal to finalized L1 block number, skipping insertion."); + return Ok(()); + } + + let l1_block: models::l1_block::ActiveModel = block_info.into(); + Ok(models::l1_block::Entity::insert(l1_block) + .on_conflict( + OnConflict::column(models::l1_block::Column::BlockNumber) + .update_column(models::l1_block::Column::BlockHash) + .to_owned(), + ) + .exec(self.get_connection()) + .await + .map(|_| ())?) + } + + async fn remove_l1_block_info_leq(&self, block_number: u64) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", %block_number, "Removing L1 block info less than or equal to provided block number from database."); + Ok(models::l1_block::Entity::delete_many() + .filter(models::l1_block::Column::BlockNumber.lte(block_number as i64)) + .exec(self.get_connection()) + .await + .map(|_| ())?) + } + + async fn remove_l1_block_info_gt(&self, block_number: u64) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", block_number = block_number, "Removing L1 block info greater than provided block number from database."); + Ok(models::l1_block::Entity::delete_many() + .filter(models::l1_block::Column::BlockNumber.gt(block_number as i64)) + .exec(self.get_connection()) + .await + .map(|_| ())?) + } + async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database."); let batch_commit: models::batch_commit::ActiveModel = batch_commit.into(); Ok(models::batch_commit::Entity::insert(batch_commit) .on_conflict( - OnConflict::column(models::batch_commit::Column::Index) + OnConflict::column(models::batch_commit::Column::Hash) .update_columns(vec![ - models::batch_commit::Column::Hash, + models::batch_commit::Column::Index, models::batch_commit::Column::BlockNumber, models::batch_commit::Column::BlockTimestamp, models::batch_commit::Column::Calldata, @@ -163,11 +241,131 @@ impl DatabaseWriteOperations for T { ]) .to_owned(), ) - .exec(self.get_connection()) + .exec_without_returning(self.get_connection()) .await .map(|_| ())?) } + async fn delete_batch_finalization_gt_block_number( + &self, + block_number: u64, + ) -> Result<(), DatabaseError> { + tracing::trace!( + target: "scroll::db", + block_number, + "Deleting batch finalization effects greater than block number." + ); + + models::batch_commit::Entity::update_many() + .filter(models::batch_commit::Column::FinalizedBlockNumber.gt(block_number as i64)) + .col_expr(models::batch_commit::Column::FinalizedBlockNumber, Expr::value(None::)) + .exec(self.get_connection()) + .await + .map(|_| ()) + .map_err(Into::into) + } + + async fn set_batch_revert_block_number_for_batch_range( + &self, + start_index: u64, + end_index: u64, + block_info: BlockInfo, + ) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", ?start_index, ?end_index, %block_info, "Setting batch revert block number for batch index range in database."); + + // Define the filter to select the appropriate batches. + let filter = Condition::all() + .add(models::batch_commit::Column::Index.gte(start_index as i64)) + .add(models::batch_commit::Column::Index.lte(end_index as i64)) + .add(models::batch_commit::Column::RevertedBlockNumber.is_null()); + + // Fetch the batch hashes to update the + let batch_hashes = models::batch_commit::Entity::find() + .select_only() + .column(models::batch_commit::Column::Hash) + .filter(filter.clone()) + .into_tuple::>() + .all(self.get_connection()) + .await?; + + models::batch_commit::Entity::update_many() + .filter(models::batch_commit::Column::Hash.is_in(batch_hashes.iter().cloned())) + .col_expr( + models::batch_commit::Column::RevertedBlockNumber, + Expr::value(Some(block_info.number as i64)), + ) + .exec(self.get_connection()) + .await?; + + models::l2_block::Entity::update_many() + .filter(models::l2_block::Column::BatchHash.is_in(batch_hashes.iter().cloned())) + .col_expr(models::l2_block::Column::Reverted, Expr::value(true)) + .exec(self.get_connection()) + .await?; + + Ok(()) + } + + async fn delete_batch_revert_gt_block_number( + &self, + block_number: u64, + ) -> Result { + tracing::trace!( + target: "scroll::db", block_number, "Deleting batch reverts greater than block number."); + + let batch_hashes = models::batch_commit::Entity::find() + .select_only() + .column(models::batch_commit::Column::Hash) + .filter(models::batch_commit::Column::RevertedBlockNumber.gt(block_number as i64)) + .into_tuple::>() + .all(self.get_connection()) + .await?; + let num_batches = batch_hashes.len() as u64; + + models::batch_commit::Entity::update_many() + .filter(models::batch_commit::Column::Hash.is_in(batch_hashes.iter().cloned())) + .col_expr(models::batch_commit::Column::RevertedBlockNumber, Expr::value(None::)) + .col_expr(models::batch_commit::Column::Status, Expr::value("consolidated")) + .exec(self.get_connection()) + .await?; + + models::l2_block::Entity::update_many() + .filter(models::l2_block::Column::BatchHash.is_in(batch_hashes.iter().cloned())) + .col_expr(models::l2_block::Column::Reverted, Expr::value(false)) + .exec(self.get_connection()) + .await?; + + Ok(num_batches) + } + + async fn change_batch_processing_to_committed_status(&self) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", "Changing batch status from processing to committed in database."); + + models::batch_commit::Entity::update_many() + .filter(models::batch_commit::Column::Status.eq("processing")) + .col_expr(models::batch_commit::Column::Status, Expr::value("committed")) + .exec(self.get_connection()) + .await?; + + Ok(()) + } + + async fn update_batch_status( + &self, + batch_hash: B256, + status: BatchStatus, + ) -> Result<(), DatabaseError> { + tracing::trace!(target: "scroll::db", ?batch_hash, ?status, "Updating batch status in database."); + + models::batch_commit::Entity::update_many() + .filter(models::batch_commit::Column::Hash.eq(batch_hash.to_vec())) + .col_expr(models::batch_commit::Column::Status, Expr::value(status.to_string())) + .exec(self.get_connection()) + .await?; + + Ok(()) + } + async fn finalize_batches_up_to_index( &self, batch_index: u64, @@ -179,7 +377,8 @@ impl DatabaseWriteOperations for T { .filter( models::batch_commit::Column::Index .lte(batch_index) - .and(models::batch_commit::Column::FinalizedBlockNumber.is_null()), + .and(models::batch_commit::Column::FinalizedBlockNumber.is_null()) + .and(models::batch_commit::Column::RevertedBlockNumber.is_null()), ) .col_expr( models::batch_commit::Column::FinalizedBlockNumber, @@ -208,6 +407,11 @@ impl DatabaseWriteOperations for T { async fn set_finalized_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", block_number, "Updating the finalized L1 block number in the database."); + + // Remove all finalized L1 block infos less than or equal to the provided block number. + self.remove_l1_block_info_leq(block_number).await?; + + // Insert or update the finalized L1 block number in metadata. let metadata: models::metadata::ActiveModel = Metadata { key: "l1_finalized_block".to_string(), value: block_number.to_string() } .into(); @@ -253,6 +457,41 @@ impl DatabaseWriteOperations for T { .map(|_| ())?) } + async fn finalize_consolidated_batches( + &self, + finalized_l1_block_number: u64, + ) -> Result, DatabaseError> { + tracing::trace!(target: "scroll::db", finalized_l1_block_number, "Finalizing consolidated batches in the database."); + let filter = Condition::all() + .add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null()) + .add(models::batch_commit::Column::FinalizedBlockNumber.lte(finalized_l1_block_number)) + .add(models::batch_commit::Column::Status.eq("consolidated")); + let batch = models::batch_commit::Entity::find() + .filter(filter.clone()) + .order_by_desc(models::batch_commit::Column::Index) + .one(self.get_connection()) + .await?; + + if let Some(batch) = batch { + let finalized_block_info = models::l2_block::Entity::find() + .filter(models::l2_block::Column::BatchHash.eq(batch.hash.clone())) + .order_by_desc(models::l2_block::Column::BlockNumber) + .one(self.get_connection()) + .await? + .map(|block| block.block_info()) + .expect("Finalized batch must have at least one L2 block."); + models::batch_commit::Entity::update_many() + .filter(filter) + .col_expr(models::batch_commit::Column::Status, Expr::value("finalized")) + .exec(self.get_connection()) + .await?; + + Ok(Some(finalized_block_info)) + } else { + Ok(None) + } + } + async fn fetch_and_update_unprocessed_finalized_batches( &self, finalized_l1_block_number: u64, @@ -260,7 +499,7 @@ impl DatabaseWriteOperations for T { let conditions = Condition::all() .add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null()) .add(models::batch_commit::Column::FinalizedBlockNumber.lte(finalized_l1_block_number)) - .add(models::batch_commit::Column::Processed.eq(false)); + .add(models::batch_commit::Column::Status.eq("committed")); let batches = models::batch_commit::Entity::find() .filter(conditions.clone()) @@ -278,7 +517,36 @@ impl DatabaseWriteOperations for T { })?; models::batch_commit::Entity::update_many() - .col_expr(models::batch_commit::Column::Processed, Expr::value(true)) + .col_expr(models::batch_commit::Column::Status, Expr::value("processing")) + .filter(conditions) + .exec(self.get_connection()) + .await?; + + Ok(batches) + } + + async fn fetch_and_update_unprocessed_committed_batches( + &self, + ) -> Result, DatabaseError> { + let conditions = Condition::all().add(models::batch_commit::Column::Status.eq("committed")); + + let batches = models::batch_commit::Entity::find() + .filter(conditions.clone()) + .order_by_asc(models::batch_commit::Column::Index) + .select_only() + .column(models::batch_commit::Column::Index) + .column(models::batch_commit::Column::Hash) + .into_tuple::<(i64, Vec)>() + .all(self.get_connection()) + .await + .map(|x| { + x.into_iter() + .map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash))) + .collect() + })?; + + models::batch_commit::Entity::update_many() + .col_expr(models::batch_commit::Column::Status, Expr::value("processing")) .filter(conditions) .exec(self.get_connection()) .await?; @@ -356,35 +624,17 @@ impl DatabaseWriteOperations for T { Ok(removed_messages.into_iter().map(Into::into).collect()) } - async fn prepare_on_startup( - &self, - genesis_hash: B256, - ) -> Result<(Option, Option), DatabaseError> { + async fn prepare_l1_watcher_start_info(&self) -> Result { tracing::trace!(target: "scroll::db", "Fetching startup safe block from database."); - // Unwind the database to the last finalized L1 block saved in database. - let finalized_block_number = self.get_finalized_l1_block_number().await?; - self.unwind(genesis_hash, finalized_block_number).await?; + // set all batches with processing status back to committed + self.change_batch_processing_to_committed_status().await?; - // Delete all unprocessed batches from the database and return starting l2 safe head and l1 - // head. - if let Some(batch_info) = self - .get_latest_safe_l2_info() - .await? - .map(|(_, batch_info)| batch_info) - .filter(|b| b.index > 1) - { - let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist"); - self.delete_batches_gt_block_number(batch.block_number.saturating_sub(1)).await?; - }; + // Get all L1 block infos from the database. + let l1_block_infos = self.get_l1_block_info().await?; + let latest_l1_block_info = self.get_latest_indexed_event_l1_block_number().await?; - let Some((block_info, batch_info)) = - self.get_latest_safe_l2_info().await?.filter(|(block_info, _)| block_info.number > 0) - else { - return Ok((None, None)); - }; - let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist"); - Ok((Some(block_info), Some(batch.block_number.saturating_add(1)))) + Ok(L1BlockStartupInfo::new(l1_block_infos, latest_l1_block_info)) } async fn delete_l2_blocks_gt_block_number( @@ -432,16 +682,16 @@ impl DatabaseWriteOperations for T { blocks.into_iter().map(|b| (b, batch_info).into()).collect(); models::l2_block::Entity::insert_many(l2_blocks) .on_conflict( - OnConflict::column(models::l2_block::Column::BlockNumber) + OnConflict::column(models::l2_block::Column::BlockHash) .update_columns([ - models::l2_block::Column::BlockHash, + models::l2_block::Column::BlockNumber, models::l2_block::Column::BatchHash, models::l2_block::Column::BatchIndex, ]) .to_owned(), ) .on_empty_do_nothing() - .exec(self.get_connection()) + .exec_without_returning(self.get_connection()) .await?; Ok(()) @@ -536,9 +786,8 @@ impl DatabaseWriteOperations for T { let filter = if let Some(block_number) = block_number { models::l1_message::Column::L2BlockNumber.gte(block_number as i64) } else { - let safe_block_number = self.get_latest_safe_l2_info().await?; - models::l1_message::Column::L2BlockNumber - .gt(safe_block_number.map(|(block_info, _)| block_info.number as i64).unwrap_or(0)) + let (safe_block_info, _batch_info) = self.get_latest_safe_l2_info().await?; + models::l1_message::Column::L2BlockNumber.gt(safe_block_info.number as i64) }; models::l1_message::Entity::update_many() @@ -561,20 +810,24 @@ impl DatabaseWriteOperations for T { .await?; self.update_l1_messages_with_l2_blocks(outcome.blocks).await?; self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?; + self.update_batch_status(outcome.batch_info.hash, outcome.target_status).await?; Ok(()) } - async fn unwind( - &self, - genesis_hash: B256, - l1_block_number: u64, - ) -> Result { + async fn unwind(&self, l1_block_number: u64) -> Result { // Set the latest L1 block number self.set_latest_l1_block_number(l1_block_number).await?; - // delete batch inputs and l1 messages + // remove the L1 block infos greater than the provided l1 block number + self.remove_l1_block_info_gt(l1_block_number).await?; + + // delete batch commits, l1 messages and batch finalization effects greater than the + // provided l1 block number let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?; let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?; + self.delete_batch_finalization_gt_block_number(l1_block_number).await?; + let batch_reverts_removed: u64 = + self.delete_batch_revert_gt_block_number(l1_block_number).await?; // filter and sort the executed L1 messages let mut removed_executed_l1_messages: Vec<_> = @@ -596,12 +849,9 @@ impl DatabaseWriteOperations for T { }; // check if we need to reorg the L2 safe block - let l2_safe_block_info = if batches_removed > 0 { - if let Some(x) = self.get_latest_safe_l2_info().await? { - Some(x.0) - } else { - Some(BlockInfo::new(0, genesis_hash)) - } + let l2_safe_block_info = if batches_removed > 0 || batch_reverts_removed > 0 { + let (safe_block_info, _batch_info) = self.get_latest_safe_l2_info().await?; + Some(safe_block_info) } else { None }; @@ -651,6 +901,25 @@ pub trait DatabaseReadOperations { batch_index: u64, ) -> Result, DatabaseError>; + /// Get a [`BatchCommitData`] from the database by its batch hash. + async fn get_batch_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError>; + + /// Get the status of a batch by its hash. + #[cfg(test)] + async fn get_batch_status_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError>; + + /// Get all L1 block infos from the database ordered by block number ascending. + async fn get_l1_block_info(&self) -> Result, DatabaseError>; + + /// Get the latest indexed event L1 block number from the database. + async fn get_latest_indexed_event_l1_block_number(&self) -> Result, DatabaseError>; + /// Get the latest L1 block number from the database. async fn get_latest_l1_block_number(&self) -> Result; @@ -693,9 +962,7 @@ pub trait DatabaseReadOperations { /// Get the latest safe/finalized L2 ([`BlockInfo`], [`BatchInfo`]) from the database. Until we /// update the batch handling logic with issue #273, we don't differentiate between safe and /// finalized l2 blocks. - async fn get_latest_safe_l2_info( - &self, - ) -> Result, DatabaseError>; + async fn get_latest_safe_l2_info(&self) -> Result<(BlockInfo, BatchInfo), DatabaseError>; /// Returns the highest L2 block originating from the provided `batch_hash` or the highest block /// for the batch's index. @@ -722,12 +989,90 @@ impl DatabaseReadOperations for T { &self, batch_index: u64, ) -> Result, DatabaseError> { - Ok(models::batch_commit::Entity::find_by_id( - TryInto::::try_into(batch_index).expect("index should fit in i64"), - ) - .one(self.get_connection()) - .await - .map(|x| x.map(Into::into))?) + Ok(models::batch_commit::Entity::find() + .filter( + models::batch_commit::Column::Index + .eq(TryInto::::try_into(batch_index).expect("index should fit in i64")) + .and(models::batch_commit::Column::RevertedBlockNumber.is_null()), + ) + .one(self.get_connection()) + .await + .map(|x| x.map(Into::into))?) + } + + async fn get_batch_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError> { + Ok(models::batch_commit::Entity::find() + .filter(models::batch_commit::Column::Hash.eq(batch_hash.to_vec())) + .one(self.get_connection()) + .await + .map(|x| x.map(Into::into))?) + } + + #[cfg(test)] + async fn get_batch_status_by_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError> { + use std::str::FromStr; + + Ok(models::batch_commit::Entity::find() + .filter(models::batch_commit::Column::Hash.eq(batch_hash.to_vec())) + .select_only() + .column(models::batch_commit::Column::Status) + .into_tuple::() + .one(self.get_connection()) + .await? + .map(|status_str| { + rollup_node_primitives::BatchStatus::from_str(&status_str) + .expect("Invalid batch status in database") + })) + } + + async fn get_l1_block_info(&self) -> Result, DatabaseError> { + let l1_blocks = models::l1_block::Entity::find() + .order_by_asc(models::l1_block::Column::BlockNumber) + .all(self.get_connection()) + .await?; + + Ok(l1_blocks.into_iter().map(Into::into).collect()) + } + + async fn get_latest_indexed_event_l1_block_number(&self) -> Result, DatabaseError> { + let latest_l1_message = models::l1_message::Entity::find() + .select_only() + .column_as(models::l1_message::Column::L1BlockNumber.max(), "max_l1_block_number") + .into_tuple::>() + .one(self.get_connection()) + .await? + .flatten(); + + let latest_batch_event = models::batch_commit::Entity::find() + .select_only() + .filter(models::batch_commit::Column::Index.gt(0)) + .column_as( + Expr::col(models::batch_commit::Column::BlockNumber).max(), + "max_block_number", + ) + .column_as( + Expr::col(models::batch_commit::Column::FinalizedBlockNumber).max(), + "max_finalized_block_number", + ) + .column_as( + Expr::col(models::batch_commit::Column::RevertedBlockNumber).max(), + "max_reverted_block_number", + ) + .into_tuple::<(Option, Option, Option)>() + .one(self.get_connection()) + .await? + .and_then(|tuple| <[Option; 3]>::from(tuple).into_iter().flatten().max()); + + let latest_l1_block_number = + [latest_l1_message, latest_batch_event].into_iter().flatten().max(); + + Ok(latest_l1_block_number.map(|n| n as u64)) } async fn get_latest_l1_block_number(&self) -> Result { @@ -1006,16 +1351,20 @@ impl DatabaseReadOperations for T { })?) } - async fn get_latest_safe_l2_info( - &self, - ) -> Result, DatabaseError> { + async fn get_latest_safe_l2_info(&self) -> Result<(BlockInfo, BatchInfo), DatabaseError> { tracing::trace!(target: "scroll::db", "Fetching latest safe L2 block from database."); - Ok(models::l2_block::Entity::find() - .filter(models::l2_block::Column::BatchIndex.is_not_null()) + let filter = Condition::all() + .add(models::l2_block::Column::BatchIndex.is_not_null()) + .add(models::l2_block::Column::Reverted.eq(false)); + + let safe_block = models::l2_block::Entity::find() + .filter(filter) .order_by_desc(models::l2_block::Column::BlockNumber) .one(self.get_connection()) - .await - .map(|x| x.map(|x| (x.block_info(), x.batch_info())))?) + .await? + .expect("there should always be at least the genesis block in the database"); + + Ok((safe_block.block_info(), safe_block.batch_info())) } async fn get_highest_block_for_batch_hash( diff --git a/crates/database/migration/src/lib.rs b/crates/database/migration/src/lib.rs index c1e1ad4a..dedbeda4 100644 --- a/crates/database/migration/src/lib.rs +++ b/crates/database/migration/src/lib.rs @@ -8,16 +8,16 @@ mod m20250411_072004_add_l2_block; mod m20250616_223947_add_metadata; mod m20250825_093350_remove_unsafe_l2_blocks; mod m20250829_042803_add_table_indexes; -mod m20250901_102341_add_commit_batch_processed_column; mod m20250904_175949_block_signature; mod m20250923_135359_add_index_block_hash; mod m20250929_161536_add_additional_indexes; -mod m20251001_125444_add_index_processed; +mod m20251001_125444_add_index_status; mod m20251005_160938_add_initial_l1_block_numbers; mod m20251013_140946_add_initial_l1_processed_block_number; mod m20251021_070729_add_skipped_column; mod m20251021_144852_add_queue_index_index; mod m20251027_090416_add_table_statistics; +mod m20251028_110719_add_l1_block_table; mod migration_info; pub use migration_info::{ @@ -38,16 +38,16 @@ impl MigratorTrait for Migrator { Box::new(m20250616_223947_add_metadata::Migration), Box::new(m20250825_093350_remove_unsafe_l2_blocks::Migration), Box::new(m20250829_042803_add_table_indexes::Migration), - Box::new(m20250901_102341_add_commit_batch_processed_column::Migration), Box::new(m20250904_175949_block_signature::Migration), Box::new(m20250923_135359_add_index_block_hash::Migration), Box::new(m20250929_161536_add_additional_indexes::Migration), - Box::new(m20251001_125444_add_index_processed::Migration), + Box::new(m20251001_125444_add_index_status::Migration), Box::new(m20251005_160938_add_initial_l1_block_numbers::Migration), Box::new(m20251013_140946_add_initial_l1_processed_block_number::Migration), Box::new(m20251021_070729_add_skipped_column::Migration), Box::new(m20251021_144852_add_queue_index_index::Migration), Box::new(m20251027_090416_add_table_statistics::Migration), + Box::new(m20251028_110719_add_l1_block_table::Migration), ] } } diff --git a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs index 0f531461..5f34ee59 100644 --- a/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs +++ b/crates/database/migration/src/m20220101_000001_create_batch_commit_table.rs @@ -15,13 +15,15 @@ impl MigrationTrait for Migration { Table::create() .table(BatchCommit::Table) .if_not_exists() - .col(pk_auto(BatchCommit::Index)) - .col(binary_len(BatchCommit::Hash, HASH_LENGTH).unique_key()) + .col(big_unsigned(BatchCommit::Index)) + .col(binary_len(BatchCommit::Hash, HASH_LENGTH).primary_key()) .col(big_unsigned(BatchCommit::BlockNumber)) .col(big_unsigned(BatchCommit::BlockTimestamp)) .col(binary(BatchCommit::Calldata)) .col(binary_len_null(BatchCommit::BlobHash, HASH_LENGTH)) .col(big_unsigned_null(BatchCommit::FinalizedBlockNumber)) + .col(big_unsigned_null(BatchCommit::RevertedBlockNumber)) + .col(string(BatchCommit::Status).not_null()) .to_owned(), ) .await?; @@ -31,8 +33,8 @@ impl MigrationTrait for Migration { .execute(Statement::from_sql_and_values( manager.get_database_backend(), r#" - INSERT INTO batch_commit ("index", hash, block_number, block_timestamp, calldata, blob_hash, finalized_block_number) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO batch_commit ("index", hash, block_number, block_timestamp, calldata, blob_hash, finalized_block_number, reverted_block_number, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) "#, vec![ 0u64.into(), @@ -42,6 +44,8 @@ impl MigrationTrait for Migration { vec![].into(), None::>.into(), 0u64.into(), + None::.into(), + "finalized".into() ], )) .await?; @@ -64,5 +68,6 @@ pub(crate) enum BatchCommit { Calldata, BlobHash, FinalizedBlockNumber, - Processed, + RevertedBlockNumber, + Status, } diff --git a/crates/database/migration/src/m20250411_072004_add_l2_block.rs b/crates/database/migration/src/m20250411_072004_add_l2_block.rs index 1950399d..01d51d08 100644 --- a/crates/database/migration/src/m20250411_072004_add_l2_block.rs +++ b/crates/database/migration/src/m20250411_072004_add_l2_block.rs @@ -19,18 +19,11 @@ impl MigrationTrait for Migration { Table::create() .table(L2Block::Table) .if_not_exists() - .col(pk_auto(L2Block::BlockNumber)) - .col(binary_len(L2Block::BlockHash, 32)) - .col(big_unsigned(L2Block::BatchIndex)) - .col(binary_len(L2Block::BatchHash, 32)) - .foreign_key( - ForeignKey::create() - .name("fk_batch_index") - .from(L2Block::Table, L2Block::BatchIndex) - .to(BatchCommit::Table, BatchCommit::Index) - .on_delete(ForeignKeyAction::Cascade) - .on_update(ForeignKeyAction::Cascade), - ) + .col(big_unsigned(L2Block::BlockNumber).not_null()) + .col(binary_len(L2Block::BlockHash, 32).not_null().primary_key()) + .col(big_unsigned(L2Block::BatchIndex).not_null()) + .col(binary_len(L2Block::BatchHash, 32).not_null()) + .col(boolean(L2Block::Reverted).not_null().default(false)) .foreign_key( ForeignKey::create() .name("fk_batch_hash") @@ -73,4 +66,5 @@ pub(crate) enum L2Block { BatchHash, BlockNumber, BlockHash, + Reverted, } diff --git a/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs b/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs deleted file mode 100644 index 43e72107..00000000 --- a/crates/database/migration/src/m20250901_102341_add_commit_batch_processed_column.rs +++ /dev/null @@ -1,51 +0,0 @@ -use super::m20220101_000001_create_batch_commit_table::BatchCommit; -use sea_orm::Statement; -use sea_orm_migration::prelude::*; - -#[derive(DeriveMigrationName)] -pub struct Migration; - -#[async_trait::async_trait] -impl MigrationTrait for Migration { - async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { - // Add the processed column to the batch_commit table. - manager - .alter_table( - Table::alter() - .table(BatchCommit::Table) - .add_column( - ColumnDef::new(BatchCommit::Processed).boolean().not_null().default(false), - ) - .to_owned(), - ) - .await?; - - // Backfill the processed column using data sourced from the l2_block table. - manager - .get_connection() - .execute(Statement::from_sql_and_values( - manager.get_database_backend(), - r#" - UPDATE batch_commit - SET processed = 1 - WHERE hash IN (SELECT batch_hash FROM l2_block); - "#, - vec![], - )) - .await?; - - Ok(()) - } - - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { - // drop the processed column on the batch_commit table. - manager - .alter_table( - Table::alter() - .table(BatchCommit::Table) - .drop_column(BatchCommit::Processed) - .to_owned(), - ) - .await - } -} diff --git a/crates/database/migration/src/m20251001_125444_add_index_processed.rs b/crates/database/migration/src/m20251001_125444_add_index_status.rs similarity index 74% rename from crates/database/migration/src/m20251001_125444_add_index_processed.rs rename to crates/database/migration/src/m20251001_125444_add_index_status.rs index 5a3132ae..2f40e997 100644 --- a/crates/database/migration/src/m20251001_125444_add_index_processed.rs +++ b/crates/database/migration/src/m20251001_125444_add_index_status.rs @@ -12,8 +12,8 @@ impl MigrationTrait for Migration { manager .create_index( Index::create() - .name("idx_batch_commit_processed") - .col(BatchCommit::Processed) + .name("idx_batch_commit_status") + .col(BatchCommit::Status) .table(BatchCommit::Table) .to_owned(), ) @@ -26,10 +26,7 @@ impl MigrationTrait for Migration { // Drop index `processed` for the `batch_commit` table. manager .drop_index( - Index::drop() - .name("idx_batch_commit_processed") - .table(BatchCommit::Table) - .to_owned(), + Index::drop().name("idx_batch_commit_status").table(BatchCommit::Table).to_owned(), ) .await?; diff --git a/crates/database/migration/src/m20251028_110719_add_l1_block_table.rs b/crates/database/migration/src/m20251028_110719_add_l1_block_table.rs new file mode 100644 index 00000000..8649cfa3 --- /dev/null +++ b/crates/database/migration/src/m20251028_110719_add_l1_block_table.rs @@ -0,0 +1,53 @@ +use sea_orm_migration::{prelude::*, schema::*}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Create the l1_block table + manager + .create_table( + Table::create() + .table(L1Block::Table) + .if_not_exists() + .col(big_unsigned(L1Block::BlockNumber).not_null().primary_key()) + .col(binary_len(L1Block::BlockHash, 32).not_null().unique_key()) + .to_owned(), + ) + .await?; + + // Add explicit indexes for fast lookups + manager + .create_index( + Index::create() + .name("idx-l1_block-number") + .table(L1Block::Table) + .col(L1Block::BlockNumber) + .to_owned(), + ) + .await?; + + manager + .create_index( + Index::create() + .name("idx-l1_block-hash") + .table(L1Block::Table) + .col(L1Block::BlockHash) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(L1Block::Table).to_owned()).await + } +} + +#[derive(DeriveIden)] +enum L1Block { + Table, + BlockNumber, + BlockHash, +} diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs index 4b510f25..048af0d5 100644 --- a/crates/derivation-pipeline/benches/pipeline.rs +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -5,7 +5,7 @@ use alloy_primitives::{Bytes, B256}; use criterion::{criterion_group, criterion_main, Criterion}; use futures::StreamExt; -use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope}; +use rollup_node_primitives::{BatchCommitData, BatchInfo, BatchStatus, L1MessageEnvelope}; use rollup_node_providers::{ test_utils::MockL1Provider, FullL1Provider, L1Provider, S3BlobProvider, }; @@ -87,6 +87,7 @@ async fn setup_pipeline( calldata: Arc::new(raw_calldata.into()), blob_versioned_hash: Some(hash), finalized_block_number: None, + reverted_block_number: None, }; db.insert_batch(batch_data).await.unwrap(); } @@ -126,7 +127,7 @@ fn benchmark_pipeline_derivation_in_file_blobs(c: &mut Criterion) { // commit 253 batches. for index in BATCHES_START_INDEX..=BATCHES_STOP_INDEX { let batch_info = BatchInfo { index, hash: Default::default() }; - pipeline.push_batch(batch_info.into()).await; + pipeline.push_batch(batch_info, BatchStatus::Committed).await; } tx.send(pipeline).unwrap(); @@ -162,7 +163,7 @@ fn benchmark_pipeline_derivation_s3_blobs(c: &mut Criterion) { // commit 15 batches. for index in BATCHES_START_INDEX..=BATCHES_START_INDEX + 15 { let batch_info = BatchInfo { index, hash: Default::default() }; - pipeline.push_batch(batch_info.into()).await; + pipeline.push_batch(batch_info, BatchStatus::Committed).await; } tx.send(pipeline).unwrap(); diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index e34f00a2..2c89a85b 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -7,7 +7,7 @@ use alloy_primitives::{Address, B256}; use alloy_rpc_types_engine::PayloadAttributes; use core::{fmt::Debug, future::Future, pin::Pin, task::Poll}; use futures::{stream::FuturesOrdered, Stream, StreamExt}; -use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope}; +use rollup_node_primitives::{BatchCommitData, BatchInfo, BatchStatus, L1MessageEnvelope}; use rollup_node_providers::L1Provider; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; use scroll_codec::{decoding::payload::PayloadData, Codec}; @@ -34,7 +34,7 @@ use std::{boxed::Box, sync::Arc, time::Instant, vec::Vec}; #[derive(Debug)] pub struct DerivationPipeline { /// The sender for the pipeline used to push new batches to be processed. - batch_sender: UnboundedSender>, + batch_sender: UnboundedSender>, /// The receiver for the pipeline used to receive the results of the batch processing. result_receiver: UnboundedReceiver, /// The number of active batches being processed. @@ -58,9 +58,9 @@ impl DerivationPipeline { } /// Pushes a new batch info to the derivation pipeline. - pub async fn push_batch(&mut self, batch_info: Arc) { + pub async fn push_batch(&mut self, batch_info: BatchInfo, target_status: BatchStatus) { self.batch_sender - .send(batch_info) + .send(Arc::new(BatchDerivationRequest { batch_info, target_status })) .expect("Failed to send batch info to derivation pipeline"); self.len += 1; } @@ -101,7 +101,7 @@ const DERIVATION_PIPELINE_WORKER_CONCURRENCY: usize = 3; #[derive(Debug)] pub struct DerivationPipelineWorker

{ /// The receiver for the pipeline used to receive new batches to be derived. - batch_receiver: UnboundedReceiver>, + batch_receiver: UnboundedReceiver>, /// The sender for the pipeline used to send the results of the batch derivation. result_sender: UnboundedSender, /// The active batch derivation futures. @@ -122,7 +122,7 @@ impl

DerivationPipelineWorker

{ l1_provider: P, database: Arc, l1_v2_message_queue_start_index: u64, - batch_receiver: UnboundedReceiver>, + batch_receiver: UnboundedReceiver>, result_sender: UnboundedSender, ) -> Self { Self { @@ -146,7 +146,8 @@ where l1_provider: P, database: Arc, l1_v2_message_queue_start_index: u64, - ) -> (UnboundedSender>, UnboundedReceiver) { + ) -> (UnboundedSender>, UnboundedReceiver) + { let (batch_sender, batch_receiver) = tokio::sync::mpsc::unbounded_channel(); let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -175,8 +176,8 @@ where maybe_batch = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => { match maybe_batch { - Some(batch_info) => { - let fut = self.derivation_future(batch_info); + Some(request) => { + let fut = self.derivation_future(request); self.futures.push_back(fut); } None => { @@ -204,7 +205,7 @@ where } } - fn derivation_future(&self, batch_info: Arc) -> DerivationPipelineFuture { + fn derivation_future(&self, request: Arc) -> DerivationPipelineFuture { let db = self.database.clone(); let metrics = self.metrics.clone(); let provider = self.l1_provider.clone(); @@ -212,21 +213,24 @@ where Box::pin(async move { let derive_start = Instant::now(); + let batch_info = request.batch_info; + let target_status = request.target_status; // get the batch commit data. let batch = db .get_batch_by_index(batch_info.index) .await - .map_err(|err| (batch_info.clone(), err.into()))? + .map_err(|err| (request.clone(), err.into()))? .ok_or(( - batch_info.clone(), + request.clone(), DerivationPipelineError::UnknownBatch(batch_info.index), ))?; // derive the attributes and attach the corresponding batch info. - let result = derive(batch, provider, db, l1_v2_message_queue_start_index) - .await - .map_err(|err| (batch_info.clone(), err))?; + let result = + derive(batch, target_status, provider, db, l1_v2_message_queue_start_index) + .await + .map_err(|err| (request.clone(), err))?; // update metrics. metrics.derived_blocks.increment(result.attributes.len() as u64); @@ -237,6 +241,15 @@ where } } +/// The request to derive a batch. +#[derive(Debug)] +pub struct BatchDerivationRequest { + /// The batch info to derive. + pub batch_info: BatchInfo, + /// The target status of the batch after derivation. + pub target_status: BatchStatus, +} + /// The result of deriving a batch. #[derive(Debug)] pub struct BatchDerivationResult { @@ -246,6 +259,8 @@ pub struct BatchDerivationResult { pub batch_info: BatchInfo, /// The list of skipped L1 messages indexes. pub skipped_l1_messages: Vec, + /// The target status of the batch after derivation. + pub target_status: BatchStatus, } /// The derived attributes along with the block number they correspond to. @@ -261,7 +276,10 @@ pub struct DerivedAttributes { type DerivationPipelineFuture = Pin< Box< dyn Future< - Output = Result, DerivationPipelineError)>, + Output = Result< + BatchDerivationResult, + (Arc, DerivationPipelineError), + >, > + Send, >, >; @@ -270,6 +288,7 @@ type DerivationPipelineFuture = Pin< /// attributes for each L2 block in the batch. pub async fn derive( batch: BatchCommitData, + target_status: BatchStatus, l1_provider: L1P, db: DB, l1_v2_message_queue_start_index: u64, @@ -354,6 +373,7 @@ pub async fn derive( attributes, batch_info: BatchInfo { index: batch.index, hash: batch.hash }, skipped_l1_messages, + target_status, }) } @@ -481,6 +501,7 @@ mod tests { calldata: Arc::new(raw_calldata), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; db.insert_batch(batch_data).await?; @@ -492,7 +513,12 @@ mod tests { let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX).await; // as long as we don't call `push_batch`, pipeline should not return attributes. - pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }.into()).await; + pipeline + .push_batch( + BatchInfo { index: 12, hash: Default::default() }, + BatchStatus::Consolidated, + ) + .await; // wait for 5 seconds to ensure the pipeline is in a retry loop. tokio::select! { @@ -547,6 +573,7 @@ mod tests { calldata: Arc::new(raw_calldata), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; db.insert_batch(batch_data).await?; // load messages in db. @@ -560,7 +587,9 @@ mod tests { let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX).await; // as long as we don't call `push_batch`, pipeline should not return attributes. - pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }.into()).await; + pipeline + .push_batch(BatchInfo { index: 12, hash: Default::default() }, BatchStatus::Committed) + .await; // check the correctness of the last attribute. let mut attribute = ScrollPayloadAttributes::default(); @@ -601,6 +630,7 @@ mod tests { calldata: Arc::new(raw_calldata), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; let l1_messages = vec![L1_MESSAGE_INDEX_33, L1_MESSAGE_INDEX_34]; for message in l1_messages { @@ -609,7 +639,7 @@ mod tests { let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; - let result = derive(batch_data, l1_provider, db, u64::MAX).await?; + let result = derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?; let attribute = result .attributes .iter() @@ -658,6 +688,7 @@ mod tests { calldata: Arc::new(raw_calldata), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; // prepare the l1 messages. @@ -709,7 +740,8 @@ mod tests { let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; // derive attributes and extract l1 messages. - let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; + let attributes = + derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .attributes .into_iter() @@ -743,6 +775,7 @@ mod tests { calldata: Arc::new(raw_calldata), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; // prepare the l1 messages. @@ -762,7 +795,8 @@ mod tests { let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; // derive attributes and extract l1 messages. - let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; + let attributes = + derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .attributes .into_iter() @@ -808,6 +842,7 @@ mod tests { "013b3960a40175bd6436e8dfe07e6d80c125e12997fa1de004b1990e20dba1ee" )), finalized_block_number: None, + reverted_block_number: None, }; let l1_messages = vec![ L1MessageEnvelope { @@ -875,7 +910,7 @@ mod tests { )]), }; - let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; + let attributes = derive(batch_data, BatchStatus::Committed, l1_provider, db, u64::MAX).await?; let attribute = attributes.attributes.last().unwrap(); let expected = ScrollPayloadAttributes { diff --git a/crates/l1/src/abi/logs.rs b/crates/l1/src/abi/logs.rs index c6a85b05..429442c5 100644 --- a/crates/l1/src/abi/logs.rs +++ b/crates/l1/src/abi/logs.rs @@ -20,6 +20,14 @@ sol! { #[cfg_attr(feature = "test-utils", derive(arbitrary::Arbitrary))] #[derive(Debug)] event FinalizeBatch(uint256 indexed batch_index, bytes32 indexed batch_hash, bytes32 state_root, bytes32 withdraw_root); + + #[cfg_attr(feature = "test-utils", derive(arbitrary::Arbitrary))] + #[derive(Debug)] + event RevertBatch(uint256 indexed batchIndex, bytes32 indexed batchHash); + + #[cfg_attr(feature = "test-utils", derive(arbitrary::Arbitrary))] + #[derive(Debug)] + event RevertBatch(uint256 indexed startBatchIndex, uint256 indexed finishBatchIndex); } /// Tries to decode the provided log into the type T. diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index ea81ac2a..7fca258b 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -258,18 +258,16 @@ impl ScrollRollupNodeConfig { let mut fcs = ForkchoiceState::from_provider(&l2_provider).await.unwrap_or_else(chain_spec_fcs); - let genesis_hash = chain_spec.genesis_hash(); - let (l1_start_block_number, mut l2_head_block_number) = db + let (l1_block_startup_info, mut l2_head_block_number) = db .tx_mut(move |tx| async move { // On startup we replay the latest batch of blocks from the database as such we set // the safe block hash to the latest block hash associated with the // previous consolidated batch in the database. - let (_startup_safe_block, l1_start_block_number) = - tx.prepare_on_startup(genesis_hash).await?; + let l1_block_startup_info = tx.prepare_l1_watcher_start_info().await?; let l2_head_block_number = tx.get_l2_head_block_number().await?; - Ok::<_, DatabaseError>((l1_start_block_number, l2_head_block_number)) + Ok::<_, DatabaseError>((l1_block_startup_info, l2_head_block_number)) }) .await?; @@ -344,13 +342,13 @@ impl ScrollRollupNodeConfig { let (l1_notification_tx, l1_notification_rx): (Option>>, _) = if let Some(provider) = l1_provider.filter(|_| !self.test) { - tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher"); + tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); ( None, Some( L1Watcher::spawn( provider, - l1_start_block_number, + l1_block_startup_info, node_config, self.l1_provider_args.logs_query_block_range, ) diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 6f6e7f1b..251cf813 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1,7 +1,7 @@ //! End-to-end tests for the rollup node. use alloy_eips::{eip2718::Encodable2718, BlockNumberOrTag}; -use alloy_primitives::{address, b256, Address, Bytes, Signature, B256, U256}; +use alloy_primitives::{address, b256, hex::FromHex, Address, Bytes, Signature, B256, U256}; use alloy_rpc_types_eth::Block; use alloy_signer::Signer; use alloy_signer_local::PrivateKeySigner; @@ -32,7 +32,9 @@ use rollup_node::{ ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, }; use rollup_node_chain_orchestrator::ChainOrchestratorEvent; -use rollup_node_primitives::{sig_encode_hash, BatchCommitData, BlockInfo, ConsensusUpdate}; +use rollup_node_primitives::{ + sig_encode_hash, BatchCommitData, BatchInfo, BlockInfo, ConsensusUpdate, +}; use rollup_node_sequencer::L1MessageInclusionMode; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; @@ -91,6 +93,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> { // Send a notification to set the L1 to synced l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?; + let block_info = BlockInfo { number: 0, hash: B256::random() }; let l1_message = TxL1Message { queue_index: 0, gas_limit: 21000, @@ -102,7 +105,7 @@ async fn can_bridge_l1_messages() -> eyre::Result<()> { l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: l1_message.clone(), - block_number: 0, + block_info, block_timestamp: 1000, })) .await?; @@ -871,6 +874,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() l1_notification_tx.unwrap(); // Load test batches + let block_0_info = BlockInfo { number: 18318207, hash: B256::random() }; let raw_calldata_0 = read_to_bytes("./tests/testdata/batch_0_calldata.bin")?; let batch_0_data = BatchCommitData { hash: b256!("5AAEB6101A47FC16866E80D77FFE090B6A7B3CF7D988BE981646AB6AEDFA2C42"), @@ -880,7 +884,9 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() calldata: Arc::new(raw_calldata_0), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; + let block_1_info = BlockInfo { number: 18318215, hash: B256::random() }; let raw_calldata_1 = read_to_bytes("./tests/testdata/batch_1_calldata.bin")?; let batch_1_data = BatchCommitData { hash: b256!("AA8181F04F8E305328A6117FA6BC13FA2093A3C4C990C5281DF95A1CB85CA18F"), @@ -890,24 +896,26 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() calldata: Arc::new(raw_calldata_1), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; - println!("Sending first batch commit and finalization"); - // Send the first batch commit to the rollup node manager and finalize it. - l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_0_data.clone()))).await?; + l1_notification_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: block_0_info, + data: batch_0_data.clone(), + })) + .await?; l1_notification_tx .send(Arc::new(L1Notification::BatchFinalization { hash: batch_0_data.hash, index: batch_0_data.index, - block_number: batch_0_data.block_number, + block_info: block_0_info, })) .await?; // Lets finalize the first batch - l1_notification_tx.send(Arc::new(L1Notification::Finalized(batch_0_data.block_number))).await?; - - println!("First batch finalized, iterating until first batch is consolidated"); + l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_0_info))).await?; // Lets iterate over all blocks expected to be derived from the first batch commit. let consolidation_outcome = loop { @@ -919,22 +927,23 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() }; assert_eq!(consolidation_outcome.blocks.len(), 4, "Expected 4 blocks to be consolidated"); - println!("First batch consolidated, sending second batch commit and finalization"); - // Now we send the second batch commit and finalize it. - l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data.clone()))).await?; + l1_notification_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: block_1_info, + data: batch_1_data.clone(), + })) + .await?; l1_notification_tx .send(Arc::new(L1Notification::BatchFinalization { hash: batch_1_data.hash, index: batch_1_data.index, - block_number: batch_1_data.block_number, + block_info: block_1_info, })) .await?; // Lets finalize the second batch. - l1_notification_tx.send(Arc::new(L1Notification::Finalized(batch_1_data.block_number))).await?; - - println!("Second batch finalized, iterating until block 40 is consolidated"); + l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_1_info))).await?; // The second batch commit contains 42 blocks (5-57), lets iterate until the rnm has // consolidated up to block 40. @@ -954,8 +963,6 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() i += 1; }; - println!("Block 40 consolidated, checking safe and head block hashes"); - // Fetch the safe and head block hashes from the EN. let rpc = node.rpc.inner.eth_api(); let safe_block_hash = @@ -1013,17 +1020,11 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Request an event stream from the rollup node manager. let mut rnm_events = handle.get_event_listener().await?; + println!("im here"); + // Send the second batch again to mimic the watcher behaviour. - l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data.clone()))).await?; - l1_notification_tx - .send(Arc::new(L1Notification::BatchFinalization { - hash: batch_1_data.hash, - index: batch_1_data.index, - block_number: batch_1_data.block_number, - })) - .await?; - // Lets finalize the second batch. - l1_notification_tx.send(Arc::new(L1Notification::Finalized(batch_1_data.block_number))).await?; + let block_1_info = BlockInfo { number: 18318215, hash: B256::random() }; + l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_1_info))).await?; // Lets fetch the first consolidated block event - this should be the first block of the batch. let l2_block = loop { @@ -1207,8 +1208,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - } #[tokio::test] -#[ignore = "Enable once we implement issue #273"] -async fn can_handle_batch_revert() -> eyre::Result<()> { +async fn consolidates_committed_batches_after_chain_consolidation() -> eyre::Result<()> { reth_tracing::init_test_tracing(); let chain_spec = (*SCROLL_MAINNET).clone(); @@ -1225,6 +1225,7 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { let mut rnm_events = handle.get_event_listener().await?; // Load test batches + let batch_0_block_info = BlockInfo { number: 18318207, hash: B256::random() }; let raw_calldata_0 = read_to_bytes("./tests/testdata/batch_0_calldata.bin")?; let batch_0_data = BatchCommitData { hash: b256!("5AAEB6101A47FC16866E80D77FFE090B6A7B3CF7D988BE981646AB6AEDFA2C42"), @@ -1234,7 +1235,11 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { calldata: Arc::new(raw_calldata_0), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, }; + let batch_0_info = BatchInfo { index: batch_0_data.index, hash: batch_0_data.hash }; + let batch_0_finalization_block_info = BlockInfo { number: 18318210, hash: B256::random() }; + let batch_1_block_info = BlockInfo { number: 18318215, hash: B256::random() }; let raw_calldata_1 = read_to_bytes("./tests/testdata/batch_1_calldata.bin")?; let batch_1_data = BatchCommitData { hash: b256!("AA8181F04F8E305328A6117FA6BC13FA2093A3C4C990C5281DF95A1CB85CA18F"), @@ -1244,19 +1249,142 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { calldata: Arc::new(raw_calldata_1), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, + }; + let batch_1_info = BatchInfo { index: batch_1_data.index, hash: batch_1_data.hash }; + let batch_1_finalization_block_info = BlockInfo { number: 18318220, hash: B256::random() }; + + // Send the first batch. + l1_watcher_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: batch_0_block_info, + data: batch_0_data, + })) + .await?; + + // Send a batch finalization for the first batch. + l1_watcher_tx + .send(Arc::new(L1Notification::BatchFinalization { + hash: batch_0_info.hash, + index: batch_0_info.index, + block_info: batch_0_finalization_block_info, + })) + .await?; + // Send the L1 block finalized notification. + l1_watcher_tx + .send(Arc::new(L1Notification::Finalized(batch_0_finalization_block_info))) + .await?; + + wait_for_event_predicate_5s(&mut rnm_events, |event| { + matches!(event, ChainOrchestratorEvent::BatchConsolidated(_)) + }) + .await?; + + // Send the second batch. + l1_watcher_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: batch_1_block_info, + data: batch_1_data, + })) + .await?; + + // send the Synced notification to the chain orchestrator + l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?; + + wait_for_event_predicate_5s(&mut rnm_events, |event| { + matches!(event, ChainOrchestratorEvent::BatchConsolidated(_)) + }) + .await?; + + let status = handle.status().await?; + + assert_eq!(status.l2.fcs.safe_block_info().number, 57); + assert_eq!(status.l2.fcs.finalized_block_info().number, 4); + + // Now send the batch finalization event for the second batch and finalize the L1 block. + l1_watcher_tx + .send(Arc::new(L1Notification::BatchFinalization { + hash: batch_1_info.hash, + index: batch_1_info.index, + block_info: batch_1_finalization_block_info, + })) + .await?; + l1_watcher_tx + .send(Arc::new(L1Notification::Finalized(batch_1_finalization_block_info))) + .await?; + + wait_for_event_predicate_5s(&mut rnm_events, |event| { + matches!(event, ChainOrchestratorEvent::L1BlockFinalized(_, _)) + }) + .await?; + + let status = handle.status().await?; + + assert_eq!(status.l2.fcs.safe_block_info().number, 57); + assert_eq!(status.l2.fcs.finalized_block_info().number, 57); + + Ok(()) +} + +#[tokio::test] +async fn can_handle_batch_revert_with_reorg() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let chain_spec = (*SCROLL_MAINNET).clone(); + + // Launch a node + let (mut nodes, _tasks, _) = + setup_engine(default_test_scroll_rollup_node_config(), 1, chain_spec.clone(), false, false) + .await?; + let node = nodes.pop().unwrap(); + let handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); + let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + + // Request an event stream from the rollup node manager and manually poll rnm to process the + // event stream request from the handle. + let mut rnm_events = handle.get_event_listener().await?; + + // send a Synced notification to the chain orchestrator + l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + + // Load test batches + let batch_0_block_info = BlockInfo { number: 18318207, hash: B256::random() }; + let raw_calldata_0 = read_to_bytes("./tests/testdata/batch_0_calldata.bin")?; + let batch_0_data = BatchCommitData { + hash: b256!("5AAEB6101A47FC16866E80D77FFE090B6A7B3CF7D988BE981646AB6AEDFA2C42"), + index: 1, + block_number: 18318207, + block_timestamp: 1696935971, + calldata: Arc::new(raw_calldata_0), + blob_versioned_hash: None, + finalized_block_number: None, + reverted_block_number: None, }; - let revert_batch_data = BatchCommitData { - hash: B256::random(), + let batch_0_info = BatchInfo { index: batch_0_data.index, hash: batch_0_data.hash }; + let batch_1_block_info = BlockInfo { number: 18318215, hash: B256::random() }; + let raw_calldata_1 = read_to_bytes("./tests/testdata/batch_1_calldata.bin")?; + let batch_1_data = BatchCommitData { + hash: b256!("AA8181F04F8E305328A6117FA6BC13FA2093A3C4C990C5281DF95A1CB85CA18F"), index: 2, - block_number: 18318220, - block_timestamp: 1696936500, - calldata: Arc::new(Default::default()), + block_number: 18318215, + block_timestamp: 1696936000, + calldata: Arc::new(raw_calldata_1), blob_versioned_hash: None, finalized_block_number: None, + reverted_block_number: None, + }; + let batch_1_revert_block_info = BlockInfo { number: 18318216, hash: B256::random() }; + let batch_1_revert = L1Notification::BatchRevert { + batch_info: BatchInfo { index: batch_1_data.index, hash: batch_1_data.hash }, + block_info: batch_1_revert_block_info, }; // Send the first batch. - l1_watcher_tx.send(Arc::new(L1Notification::BatchCommit(batch_0_data))).await?; + l1_watcher_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: batch_0_block_info, + data: batch_0_data, + })) + .await?; // Read the first 4 blocks. loop { @@ -1270,7 +1398,12 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { } // Send the second batch. - l1_watcher_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data))).await?; + l1_watcher_tx + .send(Arc::new(L1Notification::BatchCommit { + block_info: batch_1_block_info, + data: batch_1_data, + })) + .await?; // Read the next 42 blocks. loop { @@ -1289,18 +1422,55 @@ async fn can_handle_batch_revert() -> eyre::Result<()> { assert!(status.l2.fcs.head_block_info().number > 4); assert!(status.l2.fcs.safe_block_info().number > 4); - // Send the third batch which should trigger the revert. - l1_watcher_tx.send(Arc::new(L1Notification::BatchCommit(revert_batch_data))).await?; - - // Wait for the third batch to be proceeded. - tokio::time::sleep(Duration::from_millis(300)).await; + // Send the revert for the second batch. + l1_watcher_tx.send(Arc::new(batch_1_revert)).await?; + wait_for_event( + &mut rnm_events, + ChainOrchestratorEvent::BatchReverted { + batch_info: batch_0_info, + safe_head: BlockInfo { + number: 4, + hash: B256::from_hex( + "30af93536b9f2899c2f5e77be24a4447a8e49c5683c74c4aab8c880c1508fdc5", + ) + .unwrap(), + }, + }, + Duration::from_secs(5), + ) + .await?; let status = handle.status().await?; // Assert the forkchoice state was reset to 4. - assert_eq!(status.l2.fcs.head_block_info().number, 4); + assert_eq!(status.l2.fcs.head_block_info().number, 57); assert_eq!(status.l2.fcs.safe_block_info().number, 4); + // Now lets reorg the L1 such that the batch revert should be reorged out. + l1_watcher_tx.send(Arc::new(L1Notification::Reorg(18318215))).await?; + wait_for_event( + &mut rnm_events, + ChainOrchestratorEvent::L1Reorg { + l1_block_number: 18318215, + queue_index: None, + l2_head_block_info: None, + l2_safe_block_info: Some(BlockInfo { + number: 57, + hash: B256::from_hex( + "88ab32bd52bdbab5dd148bad0de208c634d357570055a62bacc46e7a78b371dd", + ) + .unwrap(), + }), + }, + Duration::from_secs(5), + ) + .await?; + + let status = handle.status().await?; + + // Assert the forkchoice state safe block was reset to 57. + assert_eq!(status.l2.fcs.safe_block_info().number, 57); + Ok(()) } @@ -1343,6 +1513,7 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; // Send a L1 message and wait for it to be indexed. + let block_10_block_info = BlockInfo { number: 10, hash: B256::random() }; let l1_message_notification = L1Notification::L1Message { message: TxL1Message { queue_index: 0, @@ -1352,19 +1523,19 @@ async fn can_handle_l1_message_reorg() -> eyre::Result<()> { sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), input: Default::default(), }, - block_number: 10, + block_info: block_10_block_info, block_timestamp: 0, }; // Send the L1 message to the sequencer node. node0_l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; - node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_10_block_info))).await?; wait_for_event_5s(&mut node0_rnm_events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?; wait_for_event_5s(&mut node0_rnm_events, ChainOrchestratorEvent::NewL1Block(10)).await?; // Send L1 the L1 message to follower node. node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; - node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_10_block_info))).await?; wait_for_event_5s(&mut node1_rnm_events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?; wait_for_event_5s(&mut node1_rnm_events, ChainOrchestratorEvent::NewL1Block(10)).await?; @@ -1482,6 +1653,7 @@ async fn requeues_transactions_after_l1_reorg() -> eyre::Result<()> { } // Send a L1 message and wait for it to be indexed. + let block_2_info = BlockInfo { number: 2, hash: B256::random() }; let l1_message_notification = L1Notification::L1Message { message: TxL1Message { queue_index: 0, @@ -1491,13 +1663,13 @@ async fn requeues_transactions_after_l1_reorg() -> eyre::Result<()> { sender: Default::default(), input: Default::default(), }, - block_number: 2, + block_info: block_2_info, block_timestamp: 0, }; // Build a L2 block with L1 message, so we can revert it later. l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; - l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(2))).await?; + l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_2_info))).await?; wait_for_event_5s(&mut events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?; wait_for_event_5s(&mut events, ChainOrchestratorEvent::NewL1Block(2)).await?; rnm_handle.build_block(); @@ -1864,6 +2036,7 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> { wait_for_block_imported_5s(&mut node1_rnm_events, 10).await?; // Send a L1 message and wait for it to be indexed. + let block_10_block_info = BlockInfo { number: 10, hash: B256::random() }; let l1_message_notification = L1Notification::L1Message { message: TxL1Message { queue_index: 0, @@ -1873,13 +2046,13 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> { sender: address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), input: Default::default(), }, - block_number: 10, + block_info: block_10_block_info, block_timestamp: 0, }; // Send the L1 message to the sequencer node but not to follower node. node0_l1_watcher_tx.send(Arc::new(l1_message_notification.clone())).await?; - node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + node0_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_10_block_info))).await?; wait_for_event_5s(&mut node0_rnm_events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?; wait_for_event_5s(&mut node0_rnm_events, ChainOrchestratorEvent::NewL1Block(10)).await?; @@ -1918,7 +2091,7 @@ async fn can_reject_l2_block_with_unknown_l1_message() -> eyre::Result<()> { // Finally send L1 the L1 message to follower node. node1_l1_watcher_tx.send(Arc::new(l1_message_notification)).await?; - node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(10))).await?; + node1_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_10_block_info))).await?; wait_for_event_5s(&mut node1_rnm_events, ChainOrchestratorEvent::L1MessageCommitted(0)).await?; wait_for_event_5s(&mut node1_rnm_events, ChainOrchestratorEvent::NewL1Block(10)).await?; diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 2d2686a2..9e5bf813 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -1,6 +1,6 @@ //! Contains tests related to RN and EN sync. -use alloy_primitives::{b256, Address, U256}; +use alloy_primitives::{b256, Address, B256, U256}; use alloy_provider::{Provider, ProviderBuilder}; use futures::StreamExt; use reqwest::Url; @@ -292,6 +292,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { // Create a sequence of L1 messages to be added to the sequencer node. const L1_MESSAGES_COUNT: usize = 200; let mut l1_messages = Vec::with_capacity(L1_MESSAGES_COUNT); + let mut l1_block_info = Vec::with_capacity(L1_MESSAGES_COUNT); for i in 0..L1_MESSAGES_COUNT as u64 { let l1_message = TxL1Message { queue_index: i, @@ -302,14 +303,16 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { input: Default::default(), }; l1_messages.push(l1_message); + let block_info = BlockInfo { number: i, hash: B256::random() }; + l1_block_info.push(block_info) } // Add the L1 messages to the sequencer node. - for (i, l1_message) in l1_messages.iter().enumerate() { + for (i, (l1_message, block_info)) in l1_messages.iter().zip(l1_block_info.iter()).enumerate() { sequencer_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: l1_message.clone(), - block_number: i as u64, + block_info: *block_info, block_timestamp: i as u64 * 10, })) .await @@ -325,7 +328,10 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { 1, ) .await; - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(i as u64))).await.unwrap(); + sequencer_l1_watcher_tx + .send(Arc::new(L1Notification::NewBlock(*block_info))) + .await + .unwrap(); wait_n_events( &mut sequencer_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), @@ -361,11 +367,11 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; // Send all L1 messages to the unsynced node. - for (i, l1_message) in l1_messages.iter().enumerate() { + for (i, (l1_message, block_info)) in l1_messages.iter().zip(l1_block_info).enumerate() { follower_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: l1_message.clone(), - block_number: i as u64, + block_info, block_timestamp: i as u64 * 10, })) .await @@ -404,6 +410,8 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { .await; // Now push a L1 message to the sequencer node and build a new block. + let block_info_200 = BlockInfo { number: 200, hash: B256::random() }; + let block_info_201 = BlockInfo { number: 201, hash: B256::random() }; sequencer_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: TxL1Message { @@ -414,7 +422,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { value: U256::from(1), input: Default::default(), }, - block_number: 200, + block_info: block_info_200, block_timestamp: 2010, })) .await @@ -425,7 +433,7 @@ async fn test_should_consolidate_after_optimistic_sync() -> eyre::Result<()> { 1, ) .await; - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(201))).await.unwrap(); + sequencer_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(block_info_201))).await.unwrap(); wait_n_events(&mut sequencer_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), 1) .await; sequencer_handle.build_block(); @@ -507,6 +515,7 @@ async fn test_consolidation() -> eyre::Result<()> { sequencer.network.next_session_established().await; // Create a L1 message and send it to both nodes. + let block_info_0 = BlockInfo { number: 0, hash: B256::random() }; let l1_message = TxL1Message { queue_index: 0, gas_limit: 21000, @@ -518,7 +527,7 @@ async fn test_consolidation() -> eyre::Result<()> { sequencer_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: l1_message.clone(), - block_number: 0, + block_info: block_info_0, block_timestamp: 0, })) .await @@ -529,12 +538,15 @@ async fn test_consolidation() -> eyre::Result<()> { 1, ) .await; - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(2))).await.unwrap(); + sequencer_l1_watcher_tx + .send(Arc::new(L1Notification::NewBlock(BlockInfo { number: 2, hash: B256::random() }))) + .await + .unwrap(); follower_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: l1_message, - block_number: 0, + block_info: block_info_0, block_timestamp: 0, })) .await @@ -562,6 +574,7 @@ async fn test_consolidation() -> eyre::Result<()> { sequencer_handle.build_block(); // Now push a L1 message to the sequencer node and build a new block. + let block_info_1 = BlockInfo { number: 1, hash: B256::random() }; sequencer_l1_watcher_tx .send(Arc::new(L1Notification::L1Message { message: TxL1Message { @@ -572,7 +585,7 @@ async fn test_consolidation() -> eyre::Result<()> { value: U256::from(1), input: Default::default(), }, - block_number: 1, + block_info: block_info_1, block_timestamp: 10, })) .await @@ -584,7 +597,10 @@ async fn test_consolidation() -> eyre::Result<()> { ) .await; - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::NewBlock(5))).await.unwrap(); + sequencer_l1_watcher_tx + .send(Arc::new(L1Notification::NewBlock(BlockInfo { number: 5, hash: B256::random() }))) + .await + .unwrap(); wait_n_events(&mut sequencer_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), 1) .await; sequencer_handle.build_block(); @@ -845,6 +861,7 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { // Initially the sequencer should build 100 blocks with 1 message in each and the follower // should follow them for i in 0..100 { + let block_info = BlockInfo { number: i, hash: B256::random() }; let l1_message = Arc::new(L1Notification::L1Message { message: TxL1Message { queue_index: i, @@ -854,10 +871,10 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { value: U256::from(1), input: Default::default(), }, - block_number: i, + block_info, block_timestamp: i * 10, }); - let new_block = Arc::new(L1Notification::NewBlock(i)); + let new_block = Arc::new(L1Notification::NewBlock(block_info)); sequencer_l1_watcher_tx.send(l1_message.clone()).await.unwrap(); sequencer_l1_watcher_tx.send(new_block.clone()).await.unwrap(); wait_n_events( @@ -914,6 +931,7 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { // Have the sequencer build 20 new blocks, containing new L1 messages. let mut l1_notifications = vec![]; for i in 0..20 { + let block_info = BlockInfo { number: (51 + i), hash: B256::random() }; let l1_message = Arc::new(L1Notification::L1Message { message: TxL1Message { queue_index: 51 + i, @@ -923,10 +941,10 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { value: U256::from(1), input: Default::default(), }, - block_number: 51 + i, + block_info, block_timestamp: (51 + i) * 10, }); - let new_block = Arc::new(L1Notification::NewBlock(51 + i)); + let new_block = Arc::new(L1Notification::NewBlock(block_info)); l1_notifications.extend([l1_message.clone(), new_block.clone()]); sequencer_l1_watcher_tx.send(l1_message.clone()).await.unwrap(); sequencer_l1_watcher_tx.send(new_block.clone()).await.unwrap(); diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 30f3de58..15b81984 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -16,6 +16,7 @@ alloy-consensus.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rpc-types-engine.workspace = true +alloy-rpc-types-eth.workspace = true # scroll scroll-alloy-consensus.workspace = true @@ -51,6 +52,7 @@ std = [ "reth-chainspec/std", "reth-scroll-chainspec/std", "reth-network-peers/std", + "alloy-rpc-types-eth/std", ] arbitrary = [ "std", @@ -66,6 +68,7 @@ arbitrary = [ "alloy-rpc-types-engine/arbitrary", "alloy-chains/arbitrary", "reth-chainspec/arbitrary", + "alloy-rpc-types-eth/arbitrary", ] serde = [ "alloy-chains/serde", @@ -77,4 +80,5 @@ serde = [ "reth-scroll-primitives/serde", "scroll-alloy-consensus/serde", "scroll-alloy-rpc-types-engine/serde", + "alloy-rpc-types-eth/serde", ] diff --git a/crates/primitives/src/batch.rs b/crates/primitives/src/batch.rs index 54aa981e..ef635e5a 100644 --- a/crates/primitives/src/batch.rs +++ b/crates/primitives/src/batch.rs @@ -45,6 +45,8 @@ pub struct BatchCommitData { pub blob_versioned_hash: Option, /// The block number at which the batch finalized event was emitted. pub finalized_block_number: Option, + /// The block number at which the batch was reverted, if any. + pub reverted_block_number: Option, } impl From for BatchInfo { @@ -53,6 +55,61 @@ impl From for BatchInfo { } } +impl From<&BatchCommitData> for BatchInfo { + fn from(value: &BatchCommitData) -> Self { + Self { index: value.index, hash: value.hash } + } +} + +/// The status of a batch. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum BatchStatus { + /// The batch has been committed but not yet processed. + Committed, + /// The batch is currently being processed. + Processing, + /// The batch has been successfully consolidated with the L2 chain. + Consolidated, + /// The batch has been reverted. + Reverted, + /// The batch has been finalized. + Finalized, +} + +impl BatchStatus { + /// Returns true if the batch status is finalized. + pub const fn is_finalized(&self) -> bool { + matches!(self, Self::Finalized) + } +} + +impl core::fmt::Display for BatchStatus { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::Committed => write!(f, "committed"), + Self::Processing => write!(f, "processing"), + Self::Consolidated => write!(f, "consolidated"), + Self::Reverted => write!(f, "reverted"), + Self::Finalized => write!(f, "finalized"), + } + } +} + +impl core::str::FromStr for BatchStatus { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "committed" => Ok(Self::Committed), + "processing" => Ok(Self::Processing), + "consolidated" => Ok(Self::Consolidated), + "reverted" => Ok(Self::Reverted), + "finalized" => Ok(Self::Finalized), + _ => Err(()), + } + } +} + /// The outcome of consolidating a batch with the L2 chain. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BatchConsolidationOutcome { @@ -62,12 +119,14 @@ pub struct BatchConsolidationOutcome { pub blocks: Vec, /// The list of skipped L1 messages index. pub skipped_l1_messages: Vec, + /// The target status of the batch after consolidation. + pub target_status: BatchStatus, } impl BatchConsolidationOutcome { /// Creates a new empty batch consolidation outcome for the given batch info. - pub const fn new(batch_info: BatchInfo) -> Self { - Self { batch_info, blocks: Vec::new(), skipped_l1_messages: Vec::new() } + pub const fn new(batch_info: BatchInfo, target_status: BatchStatus) -> Self { + Self { batch_info, blocks: Vec::new(), skipped_l1_messages: Vec::new(), target_status } } /// Pushes a block consolidation outcome to the batch. @@ -143,6 +202,7 @@ mod arbitrary_impl { calldata: Arc::new(bytes), blob_versioned_hash: blob_hash, finalized_block_number: None, + reverted_block_number: None, }) } } diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index ea0849ad..de5fe7b0 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -25,6 +25,31 @@ pub struct BlockInfo { pub hash: B256, } +/// The startup configuration for the L1 watcher. +#[derive(Debug, PartialEq, Eq)] +pub enum L1BlockStartupInfo { + /// The L1 block infos of the unsafe blocks stored in the database. + UnsafeBlocks(Vec), + /// The finalized block number to start from. + FinalizedBlockNumber(u64), + /// No startup information available. + None, +} + +impl L1BlockStartupInfo { + /// Creates a new [`L1BlockStartupInfo`] from the given unsafe blocks and finalized block + /// number. + pub fn new(unsafe_blocks: Vec, finalized_block_number: Option) -> Self { + if !unsafe_blocks.is_empty() { + Self::UnsafeBlocks(unsafe_blocks) + } else if let Some(number) = finalized_block_number { + Self::FinalizedBlockNumber(number) + } else { + Self::None + } + } +} + impl PartialOrd for BlockInfo { fn partial_cmp(&self, other: &Self) -> Option { self.number.partial_cmp(&other.number) @@ -74,6 +99,12 @@ impl From

for BlockInfo { } } +impl From<&alloy_rpc_types_eth::Header> for BlockInfo { + fn from(value: &alloy_rpc_types_eth::Header) -> Self { + Self { number: value.number, hash: value.hash } + } +} + impl std::fmt::Display for BlockInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockInfo {{ number: {}, hash: 0x{} }}", self.number, self.hash) diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index a095ea20..cf582101 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -9,12 +9,15 @@ pub use attributes::ScrollPayloadAttributesWithBatchInfo; mod block; pub use block::{ - BlockInfo, L2BlockInfoWithL1Messages, WithBatchInfo, WithBlockNumber, WithCommittedBatchInfo, - WithFinalizedBatchInfo, WithFinalizedBlockNumber, DEFAULT_BLOCK_DIFFICULTY, + BlockInfo, L1BlockStartupInfo, L2BlockInfoWithL1Messages, WithBatchInfo, WithBlockNumber, + WithCommittedBatchInfo, WithFinalizedBatchInfo, WithFinalizedBlockNumber, + DEFAULT_BLOCK_DIFFICULTY, }; mod batch; -pub use batch::{BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockConsolidationOutcome}; +pub use batch::{ + BatchCommitData, BatchConsolidationOutcome, BatchInfo, BatchStatus, BlockConsolidationOutcome, +}; mod bounded_vec; pub use bounded_vec::BoundedVec; diff --git a/crates/watcher/src/error.rs b/crates/watcher/src/error.rs index 75e8ca50..44d80811 100644 --- a/crates/watcher/src/error.rs +++ b/crates/watcher/src/error.rs @@ -46,6 +46,9 @@ pub enum FilterLogError { /// The log is missing a block number. #[error("missing block number for log")] MissingBlockNumber, + /// The log is missing a block hash. + #[error("missing block hash for log")] + MissingBlockHash, /// The log is missing a block timestamp. #[error("missing block timestamp for log")] MissingBlockTimestamp, diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 85b38e0b..16837152 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -17,7 +17,10 @@ use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, Log, TransactionTrait}; use alloy_sol_types::SolEvent; use error::L1WatcherResult; use itertools::Itertools; -use rollup_node_primitives::{BatchCommitData, BoundedVec, ConsensusUpdate, NodeConfig}; +use rollup_node_primitives::{ + BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ConsensusUpdate, L1BlockStartupInfo, + NodeConfig, +}; use rollup_node_providers::SystemContractProvider; use scroll_alloy_consensus::TxL1Message; use scroll_l1::abi::logs::{try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction}; @@ -55,8 +58,8 @@ pub type Header = ::HeaderResponse; /// The state of the L1. #[derive(Debug, Default, Clone)] pub struct L1State { - head: u64, - finalized: u64, + head: BlockInfo, + finalized: BlockInfo, } /// The L1 watcher indexes L1 blocks, applying a first level of filtering via log filters. @@ -89,36 +92,57 @@ pub struct L1Watcher { /// The L1 notification type yielded by the [`L1Watcher`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum L1Notification { - /// A notification that the L1 watcher has processed up to a given block number. + /// A notification that the L1 watcher has processed up to a given block info. Processed(u64), /// A notification for a reorg of the L1 up to a given block number. Reorg(u64), /// A new batch has been committed on the L1 rollup contract. - BatchCommit(BatchCommitData), + BatchCommit { + /// The block info the batch was committed at. + block_info: BlockInfo, + /// The data of the committed batch. + data: BatchCommitData, + }, /// A new batch has been finalized on the L1 rollup contract. BatchFinalization { /// The hash of the finalized batch. hash: B256, /// The index of the finalized batch. index: u64, - /// The block number the batch was finalized at. - block_number: BlockNumber, + /// The block info the batch was finalized at. + block_info: BlockInfo, + }, + /// A batch has been reverted. + BatchRevert { + /// The batch info of the reverted batch. + batch_info: BatchInfo, + /// The L1 block info at which the Batch Revert occurred. + block_info: BlockInfo, + }, + /// A range of batches have been reverted. + BatchRevertRange { + /// The start index of the reverted batches. + start: u64, + /// The end index of the reverted batches. + end: u64, + /// The L1 block info at which the Batch Revert Range occurred. + block_info: BlockInfo, }, /// A new `L1Message` has been added to the L1 message queue. L1Message { /// The L1 message. message: TxL1Message, - /// The block number at which the L1 message was emitted. - block_number: u64, + /// The block info at which the L1 message was emitted. + block_info: BlockInfo, /// The timestamp at which the L1 message was emitted. block_timestamp: u64, }, /// The consensus config has been updated. Consensus(ConsensusUpdate), /// A new block has been added to the L1. - NewBlock(u64), + NewBlock(BlockInfo), /// A block has been finalized on the L1. - Finalized(u64), + Finalized(BlockInfo), /// A notification that the L1 watcher is synced to the L1 head. Synced, } @@ -128,17 +152,30 @@ impl Display for L1Notification { match self { Self::Processed(n) => write!(f, "Processed({n})"), Self::Reorg(n) => write!(f, "Reorg({n:?})"), - Self::BatchCommit(b) => { - write!(f, "BatchCommit {{ hash: {}, index: {} }}", b.hash, b.index) + Self::BatchCommit { block_info, data } => { + write!( + f, + "BatchCommit {{ block_info: {}, batch_index: {}, batch_hash: {} }}", + block_info, data.index, data.hash + ) + } + Self::BatchRevert { batch_info, block_info } => { + write!(f, "BatchRevert{{ batch_info: {batch_info}, block_info: {block_info} }}",) } - Self::BatchFinalization { hash, index, block_number } => write!( + Self::BatchRevertRange { start, end, block_info } => { + write!( + f, + "BatchRevertRange{{ start: {start}, end: {end}, block_info: {block_info} }}", + ) + } + Self::BatchFinalization { hash, index, block_info } => write!( f, - "BatchFinalization{{ hash: {hash}, index: {index}, block_number: {block_number} }}", + "BatchFinalization{{ hash: {hash}, index: {index}, block_info: {block_info} }}", ), - Self::L1Message { message, block_number, .. } => write!( + Self::L1Message { message, block_info, .. } => write!( f, - "L1Message{{ index: {}, block_number: {} }}", - message.queue_index, block_number + "L1Message{{ index: {}, block_info: {} }}", + message.queue_index, block_info ), Self::Consensus(u) => write!(f, "{u:?}"), Self::NewBlock(n) => write!(f, "NewBlock({n})"), @@ -156,15 +193,15 @@ where /// returning [`L1Notification`] in the returned channel. pub async fn spawn( execution_provider: EP, - start_block: Option, + l1_block_startup_info: L1BlockStartupInfo, config: Arc, log_query_block_range: u64, ) -> mpsc::Receiver> { - tracing::trace!(target: "scroll::watcher", ?start_block, ?config, "spawning L1 watcher"); + tracing::trace!(target: "scroll::watcher", ?l1_block_startup_info, ?config, "spawning L1 watcher"); let (tx, rx) = mpsc::channel(log_query_block_range as usize); - let fetch_block_number = async |tag: BlockNumberOrTag| { + let fetch_block_info = async |tag: BlockNumberOrTag| { let block = loop { match execution_provider.get_block(tag.into()).await { Err(err) => { @@ -174,20 +211,48 @@ where _ => unreachable!("should always be a {tag} block"), } }; - block.header.number + BlockInfo { number: block.header.number, hash: block.header.hash } }; // fetch l1 state. let l1_state = L1State { - head: fetch_block_number(BlockNumberOrTag::Latest).await, - finalized: fetch_block_number(BlockNumberOrTag::Finalized).await, + head: fetch_block_info(BlockNumberOrTag::Latest).await, + finalized: fetch_block_info(BlockNumberOrTag::Finalized).await, + }; + + let (reorg, start_block) = match l1_block_startup_info { + L1BlockStartupInfo::UnsafeBlocks(blocks) => { + let mut reorg = true; + let mut start_block = blocks.first().expect("at least one unsafe block").number; + for (i, block) in blocks.into_iter().rev().enumerate() { + let current_block = + fetch_block_info(BlockNumberOrTag::Number(block.number)).await; + if current_block.hash == block.hash { + tracing::info!(target: "scroll::watcher", ?block, "found reorg block from unsafe blocks"); + reorg = i != 0; + start_block = current_block.number; + break; + } + } + + (reorg, start_block) + } + L1BlockStartupInfo::FinalizedBlockNumber(number) => { + tracing::info!(target: "scroll::watcher", ?number, "starting from finalized block number"); + + (false, number) + } + L1BlockStartupInfo::None => { + tracing::info!(target: "scroll::watcher", "no L1 startup info, starting from config start block"); + (false, config.start_l1_block) + } }; // init the watcher. let watcher = Self { execution_provider, unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY), - current_block_number: start_block.unwrap_or(config.start_l1_block).saturating_sub(1), + current_block_number: start_block.saturating_sub(1), l1_state, sender: tx, config, @@ -197,6 +262,12 @@ where }; // notify at spawn. + if reorg { + watcher + .notify(L1Notification::Reorg(start_block)) + .await + .expect("channel is open in this context"); + } watcher .notify(L1Notification::Finalized(watcher.l1_state.finalized)) .await @@ -227,7 +298,7 @@ where // sleep if we are synced. if self.is_synced { tokio::time::sleep(SLOW_SYNC_INTERVAL).await; - } else if self.current_block_number == self.l1_state.head { + } else if self.current_block_number == self.l1_state.head.number { // if we have synced to the head of the L1, notify the channel and set the // `is_synced`` flag. if let Err(L1WatcherError::SendError(_)) = self.notify(L1Notification::Synced).await @@ -287,11 +358,11 @@ where )] async fn handle_finalized_block(&mut self, finalized: &Header) -> L1WatcherResult<()> { // update the state and notify on channel. - if self.l1_state.finalized < finalized.number { + if self.l1_state.finalized.number < finalized.number { tracing::trace!(target: "scroll::watcher", number = finalized.number, hash = ?finalized.hash, "new finalized block"); - self.l1_state.finalized = finalized.number; - self.notify(L1Notification::Finalized(finalized.number)).await?; + self.l1_state.finalized.number = finalized.number; + self.notify(L1Notification::Finalized(finalized.into())).await?; } // shortcircuit. @@ -359,7 +430,9 @@ where // update metrics. self.metrics.reorgs.increment(1); - self.metrics.reorg_depths.record(self.l1_state.head.saturating_sub(number) as f64); + self.metrics + .reorg_depths + .record(self.l1_state.head.number.saturating_sub(number) as f64); // reset the current block number to the reorged block number if // we have indexed passed the reorg. @@ -374,8 +447,8 @@ where // Update the state and notify on the channel. tracing::trace!(target: "scroll::watcher", number = ?latest.number, hash = ?latest.hash, "new block"); - self.l1_state.head = latest.number; - self.notify(L1Notification::NewBlock(latest.number)).await?; + self.l1_state.head = latest.into(); + self.notify(L1Notification::NewBlock(latest.into())).await?; Ok(()) } @@ -385,10 +458,10 @@ where async fn handle_l1_messages(&self, logs: &[Log]) -> L1WatcherResult> { let mut l1_messages = logs .iter() - .map(|l| (&l.inner, l.block_number, l.block_timestamp)) - .filter_map(|(log, bn, ts)| { + .map(|l| (&l.inner, l.block_number, l.block_hash, l.block_timestamp)) + .filter_map(|(log, bn, bh, ts)| { try_decode_log::(log) - .map(|log| (Into::::into(log.data), bn, ts)) + .map(|log| (Into::::into(log.data), bn, bh, ts)) }) .collect::>(); @@ -396,15 +469,16 @@ where let mut notifications = Vec::with_capacity(l1_messages.len()); // sort the message by index and group by block number. - l1_messages.sort_by(|(m1, _, _), (m2, _, _)| m1.queue_index.cmp(&m2.queue_index)); - let groups = l1_messages.into_iter().chunk_by(|(_, bn, _)| *bn); + l1_messages.sort_by(|(m1, _, _, _), (m2, _, _, _)| m1.queue_index.cmp(&m2.queue_index)); + let groups = l1_messages.into_iter().chunk_by(|(_, bn, bh, _)| (*bn, *bh)); let groups: Vec<_> = groups.into_iter().map(|(bn, group)| (bn, group.collect::>())).collect(); - for (bn, group) in groups { + for ((bn, bh), group) in groups { let block_number = bn.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = bh.ok_or(FilterLogError::MissingBlockHash)?; // fetch the timestamp if missing from the log. - let block_timestamp = if let Some(ts) = group.first().and_then(|(_, _, ts)| *ts) { + let block_timestamp = if let Some(ts) = group.first().and_then(|(_, _, _, ts)| *ts) { ts } else { self.execution_provider @@ -415,10 +489,10 @@ where }; // push notifications in vector. - for (msg, _, _) in group { + for (msg, _, _, _) in group { notifications.push(L1Notification::L1Message { message: msg, - block_number, + block_info: BlockInfo { number: block_number, hash: block_hash }, block_timestamp, }); } @@ -474,6 +548,7 @@ where for (raw_log, decoded_log, _) in group { let block_number = raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = raw_log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; // if the log is missing the block timestamp, we need to fetch it. // the block timestamp is necessary in order to derive the beacon // slot and query the blobs. @@ -490,15 +565,19 @@ where decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); // push in vector. - notifications.push(L1Notification::BatchCommit(BatchCommitData { - hash: decoded_log.batch_hash, - index: batch_index, - block_number, - block_timestamp, - calldata: input.clone(), - blob_versioned_hash: blob_versioned_hashes.next(), - finalized_block_number: None, - })); + notifications.push(L1Notification::BatchCommit { + block_info: BlockInfo { number: block_number, hash: block_hash }, + data: BatchCommitData { + hash: decoded_log.batch_hash, + index: batch_index, + block_number, + block_timestamp, + calldata: input.clone(), + blob_versioned_hash: blob_versioned_hashes.next(), + finalized_block_number: None, + reverted_block_number: None, + }, + }); } } Ok(notifications) @@ -512,21 +591,22 @@ where ) -> L1WatcherResult> { // filter finalize logs and skip genesis batch (batch_index == 0). logs.iter() - .map(|l| (l, l.block_number)) - .filter_map(|(log, bn)| { + .map(|l| (l, l.block_number, l.block_hash)) + .filter_map(|(log, bn, bh)| { try_decode_log::(&log.inner) .filter(|decoded| !decoded.data.batch_index.is_zero()) - .map(|decoded| (decoded.data, bn)) + .map(|decoded| (decoded.data, bn, bh)) }) - .map(|(decoded_log, maybe_block_number)| { + .map(|(decoded_log, maybe_block_number, maybe_block_hash)| { // fetch the finalize transaction. let block_number = maybe_block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = maybe_block_hash.ok_or(FilterLogError::MissingBlockHash)?; let index = decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); Ok(L1Notification::BatchFinalization { hash: decoded_log.batch_hash, index, - block_number, + block_info: BlockInfo { number: block_number, hash: block_hash }, }) }) .collect() @@ -539,7 +619,7 @@ where latest_block: &Block, ) -> L1WatcherResult> { // refresh the signer every new block. - if latest_block.header.number != self.l1_state.head { + if latest_block.header.number != self.l1_state.head.number { let signer = self .execution_provider .authorized_signer(self.config.address_book.system_contract_address) @@ -712,7 +792,7 @@ mod tests { L1Watcher { execution_provider: provider, unfinalized_blocks: unfinalized_blocks.into(), - l1_state: L1State { head: 0, finalized: 0 }, + l1_state: L1State { head: Default::default(), finalized: Default::default() }, current_block_number: 0, sender: tx, config: Arc::new(NodeConfig::mainnet()), @@ -916,6 +996,7 @@ mod tests { queue_transaction.inner = inner_log; queue_transaction.block_number = Some(random!(u64)); queue_transaction.block_timestamp = Some(random!(u64)); + queue_transaction.block_hash = Some(random!(B256)); logs.push(queue_transaction); // When @@ -957,6 +1038,7 @@ mod tests { batch_commit.inner = inner_log; batch_commit.transaction_hash = Some(*tx.inner.tx_hash()); batch_commit.block_number = Some(random!(u64)); + batch_commit.block_hash = Some(random!(B256)); batch_commit.block_timestamp = Some(random!(u64)); logs.push(batch_commit); @@ -984,6 +1066,7 @@ mod tests { inner_log.data = batch.encode_log_data(); finalize_commit.inner = inner_log; finalize_commit.block_number = Some(random!(u64)); + finalize_commit.block_hash = Some(random!(B256)); logs.push(finalize_commit); // When diff --git a/crates/watcher/src/metrics.rs b/crates/watcher/src/metrics.rs index 20093598..4c0447f9 100644 --- a/crates/watcher/src/metrics.rs +++ b/crates/watcher/src/metrics.rs @@ -24,7 +24,7 @@ impl WatcherMetrics { pub fn process_l1_notification(&self, notification: &L1Notification) { match notification { L1Notification::L1Message { .. } => self.l1_messages.increment(1), - L1Notification::BatchCommit(_) => self.batch_commits.increment(1), + L1Notification::BatchCommit { .. } => self.batch_commits.increment(1), L1Notification::BatchFinalization { .. } => self.batch_finalizations.increment(1), _ => {} } diff --git a/crates/watcher/tests/indexing.rs b/crates/watcher/tests/indexing.rs index dc224a83..c1fdc040 100644 --- a/crates/watcher/tests/indexing.rs +++ b/crates/watcher/tests/indexing.rs @@ -4,7 +4,7 @@ use alloy_rpc_types_eth::Log; use alloy_sol_types::SolEvent; use arbitrary::Arbitrary; -use rollup_node_primitives::NodeConfig; +use rollup_node_primitives::{L1BlockStartupInfo, NodeConfig}; use rollup_node_watcher::{ random, test_utils::{chain, provider::MockProvider}, @@ -59,9 +59,14 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = - L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; - let mut prev_block_number = 0; + let mut l1_watcher = L1Watcher::spawn( + mock_provider, + L1BlockStartupInfo::None, + Arc::new(config), + LOGS_QUERY_BLOCK_RANGE, + ) + .await; + let mut prev_block_info = Default::default(); let mut ticker = tokio::time::interval(tokio::time::Duration::from_secs(2)); let _ = ticker.tick().await; @@ -69,9 +74,9 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> select! { notification = l1_watcher.recv() => { let notification = notification.map(|notif| (*notif).clone()); - if let Some(L1Notification::L1Message { block_number, .. }) = notification { - assert_ne!(prev_block_number, block_number, "indexed same block twice {block_number}"); - prev_block_number = block_number + if let Some(L1Notification::L1Message { block_info, .. }) = notification { + assert_ne!(prev_block_info, block_info, "indexed same block twice {block_info}"); + prev_block_info = block_info } } _ = ticker.tick() => break diff --git a/crates/watcher/tests/logs.rs b/crates/watcher/tests/logs.rs index 3a41ca05..0e689e4d 100644 --- a/crates/watcher/tests/logs.rs +++ b/crates/watcher/tests/logs.rs @@ -4,7 +4,7 @@ use alloy_rpc_types_eth::Log; use alloy_sol_types::SolEvent; use arbitrary::Arbitrary; -use rollup_node_primitives::NodeConfig; +use rollup_node_primitives::{L1BlockStartupInfo, NodeConfig}; use rollup_node_watcher::{ random, test_utils::{chain, chain_from, provider::MockProvider}, @@ -45,6 +45,7 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { queue_transaction.inner = inner_log; queue_transaction.block_number = Some(b.header.number); queue_transaction.block_timestamp = Some(b.header.timestamp); + queue_transaction.block_hash = Some(b.header.hash); queue_transaction }) .collect(); @@ -63,8 +64,13 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = - L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; + let mut l1_watcher = L1Watcher::spawn( + mock_provider, + L1BlockStartupInfo::None, + Arc::new(config), + LOGS_QUERY_BLOCK_RANGE, + ) + .await; let mut received_logs = Vec::new(); loop { let notification = l1_watcher.recv().await.map(|notif| (*notif).clone()); diff --git a/crates/watcher/tests/reorg.rs b/crates/watcher/tests/reorg.rs index fdb32c2f..af0d727d 100644 --- a/crates/watcher/tests/reorg.rs +++ b/crates/watcher/tests/reorg.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use alloy_rpc_types_eth::Header; use arbitrary::Arbitrary; use rand::Rng; -use rollup_node_primitives::NodeConfig; +use rollup_node_primitives::{L1BlockStartupInfo, NodeConfig}; use rollup_node_watcher::{ random, test_utils::provider::MockProvider, Block, L1Notification, L1Watcher, }; @@ -72,8 +72,13 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = - L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; + let mut l1_watcher = L1Watcher::spawn( + mock_provider, + L1BlockStartupInfo::None, + Arc::new(config), + LOGS_QUERY_BLOCK_RANGE, + ) + .await; // skip the first two events l1_watcher.recv().await.unwrap(); @@ -90,7 +95,10 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { if matches!(notification.as_ref(), L1Notification::Processed(_)) { notification = l1_watcher.recv().await.unwrap(); } - assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); + assert_eq!( + notification.as_ref(), + &L1Notification::Finalized((&finalized.header).into()) + ); } if latest_number == latest.header.number { @@ -114,9 +122,9 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { // reorg assert!(matches!(notification.as_ref(), L1Notification::Reorg(_))); let notification = l1_watcher.recv().await.unwrap(); - assert_eq!(notification.as_ref(), &L1Notification::NewBlock(latest.header.number)); + assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); } else { - assert_eq!(notification.as_ref(), &L1Notification::NewBlock(latest.header.number)); + assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); } // update finalized and latest. @@ -174,8 +182,13 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { ); // spawn the watcher and verify received notifications are consistent. - let mut l1_watcher = - L1Watcher::spawn(mock_provider, None, Arc::new(config), LOGS_QUERY_BLOCK_RANGE).await; + let mut l1_watcher = L1Watcher::spawn( + mock_provider, + L1BlockStartupInfo::None, + Arc::new(config), + LOGS_QUERY_BLOCK_RANGE, + ) + .await; // skip the first two events l1_watcher.recv().await.unwrap(); @@ -192,7 +205,10 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { if matches!(notification.as_ref(), L1Notification::Processed(_)) { notification = l1_watcher.recv().await.unwrap(); } - assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); + assert_eq!( + notification.as_ref(), + &L1Notification::Finalized((&finalized.header).into()) + ); } if latest_number == latest.header.number { @@ -211,7 +227,7 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { notification = l1_watcher.recv().await.unwrap(); } - assert_eq!(notification.as_ref(), &L1Notification::NewBlock(latest.header.number)); + assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); // update finalized and latest. finalized_number = finalized.header.number;