diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index b6a22556..15300bb1 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -26,8 +26,9 @@ jobs: with: tool: cargo-codspeed - name: Build the benchmark target(s) - run: cargo codspeed build -p scroll-derivation-pipeline + run: cargo codspeed build -p scroll-derivation-pipeline --profile profiling - name: Run the benchmarks - uses: CodSpeedHQ/action@v3 + uses: CodSpeedHQ/action@v4 with: + mode: instrumentation run: cargo codspeed run --workspace diff --git a/Cargo.lock b/Cargo.lock index 3dee6eb3..220d9f33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11612,7 +11612,6 @@ dependencies = [ name = "scroll-db" version = "0.0.1" dependencies = [ - "alloy-eips 1.0.30", "alloy-primitives", "arbitrary", "async-trait", @@ -11640,7 +11639,6 @@ dependencies = [ "alloy-eips 1.0.30", "alloy-primitives", "alloy-rpc-types-engine 1.0.30", - "async-trait", "codspeed-criterion-compat", "eyre", "futures", diff --git a/Cargo.toml b/Cargo.toml index 754d2d3e..afec22e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,6 +115,13 @@ significant_drop_tightening = "allow" too_long_first_doc_paragraph = "allow" large_enum_variant = "allow" +# Use the `--profile profiling` flag to show symbols in release mode. +# e.g. `cargo build --profile profiling` +[profile.profiling] +inherits = "release" +debug = "full" +strip = "none" + [workspace.dependencies] # alloy alloy-chains = { version = "0.2.5", default-features = false } diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index 4b47dd32..504daaba 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -4,7 +4,7 @@ use alloy_transport::TransportErrorKind; use rollup_node_primitives::{BatchInfo, BlockInfo}; use rollup_node_sequencer::SequencerError; use rollup_node_signer::SignerError; -use scroll_db::{DatabaseError, L1MessageKey}; +use scroll_db::{CanRetry, DatabaseError, L1MessageKey}; use scroll_engine::EngineError; /// A type that represents an error that occurred in the chain orchestrator. @@ -93,3 +93,12 @@ pub enum ChainOrchestratorError { #[error("An error occurred while handling rollup node primitives: {0}")] RollupNodePrimitiveError(rollup_node_primitives::RollupNodePrimitiveError), } + +impl CanRetry for ChainOrchestratorError { + fn can_retry(&self) -> bool { + match &self { + Self::DatabaseError(err) => err.can_retry(), + _ => false, + } + } +} diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 38a3a5a4..62e3ecbb 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -26,8 +26,8 @@ use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::ScrollEngineApi; use scroll_db::{ - Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, - DatabaseWriteOperations, L1MessageKey, UnwindResult, + Database, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, + UnwindResult, }; use scroll_derivation_pipeline::{BatchDerivationResult, DerivationPipeline}; use scroll_engine::Engine; @@ -64,9 +64,6 @@ pub use handle::{ChainOrchestratorCommand, ChainOrchestratorHandle}; mod metrics; pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics}; -mod retry; -pub use retry::Retry; - mod sync; pub use sync::{SyncMode, SyncState}; @@ -103,7 +100,7 @@ pub struct ChainOrchestrator< block_client: Arc::Client>>, /// The L2 client that is used to interact with the L2 chain. l2_client: Arc, - /// A reference to the database used to persist the indexed data. + /// The reference to database. database: Arc, /// The metrics for the chain orchestrator. metrics: HashMap, @@ -248,15 +245,18 @@ impl< /// Handles an event from the signer. async fn handle_signer_event( &self, - event: rollup_node_signer::SignerEvent, + event: SignerEvent, ) -> Result, ChainOrchestratorError> { tracing::info!(target: "scroll::chain_orchestrator", ?event, "Handling signer event"); match event { SignerEvent::SignedBlock { block, signature } => { - let tx = self.database.tx_mut().await?; - tx.set_l2_head_block_number(block.header.number).await?; - tx.insert_signature(block.hash_slow(), signature).await?; - tx.commit().await?; + let hash = block.hash_slow(); + self.database + .tx_mut(move |tx| async move { + tx.set_l2_head_block_number(block.header.number).await?; + tx.insert_signature(hash, signature).await + }) + .await?; self.network.handle().announce_block(block.clone(), signature); Ok(Some(ChainOrchestratorEvent::SignedBlock { block, signature })) } @@ -292,15 +292,15 @@ impl< .finalize_payload_building(payload_id, &mut self.engine) .await? { - let tx = self.database.tx_mut().await?; let block_info: L2BlockInfoWithL1Messages = (&block).into(); - tx.update_l1_messages_from_l2_blocks(vec![block_info.clone()]).await?; - tx.commit().await?; + self.database + .update_l1_messages_from_l2_blocks(vec![block_info.clone()]) + .await?; self.signer .as_mut() .expect("signer must be present") .sign_block(block.clone())?; - return Ok(Some(ChainOrchestratorEvent::BlockSequenced(block))) + return Ok(Some(ChainOrchestratorEvent::BlockSequenced(block))); } } } @@ -326,10 +326,15 @@ impl< let _ = tx.send(self.event_listener()); } ChainOrchestratorCommand::Status(tx) => { - let db_tx = self.database.tx().await?; - let l1_latest = db_tx.get_latest_l1_block_number().await?; - let l1_finalized = db_tx.get_finalized_l1_block_number().await?; - let l1_processed = db_tx.get_processed_l1_block_number().await?; + let (l1_latest, l1_finalized, l1_processed) = self + .database + .tx(|tx| async move { + let l1_latest = tx.get_latest_l1_block_number().await?; + let l1_finalized = tx.get_finalized_l1_block_number().await?; + let l1_processed = tx.get_processed_l1_block_number().await?; + Ok::<_, ChainOrchestratorError>((l1_latest, l1_finalized, l1_processed)) + }) + .await?; let status = ChainOrchestratorStatus::new( &self.sync_state, l1_latest, @@ -344,10 +349,12 @@ impl< } ChainOrchestratorCommand::UpdateFcsHead((head, sender)) => { self.engine.update_fcs(Some(head), None, None).await?; - let tx = self.database.tx_mut().await?; - tx.purge_l1_message_to_l2_block_mappings(Some(head.number + 1)).await?; - tx.set_l2_head_block_number(head.number).await?; - tx.commit().await?; + self.database + .tx_mut(move |tx| async move { + tx.purge_l1_message_to_l2_block_mappings(Some(head.number + 1)).await?; + tx.set_l2_head_block_number(head.number).await + }) + .await?; self.notify(ChainOrchestratorEvent::FcsHeadUpdated(head)); let _ = sender.send(()); } @@ -474,9 +481,8 @@ impl< batch_reconciliation_result.into_batch_consolidation_outcome(reorg_results).await?; // Insert the batch consolidation outcome into the database. - let tx = self.database.tx_mut().await?; - tx.insert_batch_consolidation_outcome(batch_consolidation_outcome.clone()).await?; - tx.commit().await?; + let consolidation_outcome = batch_consolidation_outcome.clone(); + self.database.insert_batch_consolidation_outcome(consolidation_outcome).await?; Ok(Some(ChainOrchestratorEvent::BatchConsolidated(batch_consolidation_outcome))) } @@ -488,9 +494,8 @@ impl< ) -> Result, ChainOrchestratorError> { match &*notification { L1Notification::Processed(block_number) => { - let tx = self.database.tx_mut().await?; - tx.set_processed_l1_block_number(*block_number).await?; - tx.commit().await?; + let block_number = *block_number; + self.database.set_processed_l1_block_number(block_number).await?; Ok(None) } L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await, @@ -525,15 +530,7 @@ impl< &self, block_number: u64, ) -> Result, ChainOrchestratorError> { - Retry::default() - .retry("handle_new_block", || async { - let tx = self.database.tx_mut().await?; - tx.set_latest_l1_block_number(block_number).await?; - tx.commit().await?; - Ok::<_, ChainOrchestratorError>(()) - }) - .await?; - + self.database.set_latest_l1_block_number(block_number).await?; Ok(Some(ChainOrchestratorEvent::NewL1Block(block_number))) } @@ -545,16 +542,9 @@ impl< ) -> Result, ChainOrchestratorError> { let metric = self.metrics.get(&ChainOrchestratorItem::L1Reorg).expect("metric exists"); let now = Instant::now(); + let genesis_hash = self.config.chain_spec().genesis_hash(); let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = - Retry::default() - .retry("unwind", || async { - let txn = self.database.tx_mut().await?; - let unwind_result = - txn.unwind(self.config.chain_spec().genesis_hash(), block_number).await?; - txn.commit().await?; - Ok::<_, ChainOrchestratorError>(unwind_result) - }) - .await?; + self.database.unwind(genesis_hash, block_number).await?; let l2_head_block_info = if let Some(block_number) = l2_head_block_number { // Fetch the block hash of the new L2 head block. @@ -616,21 +606,15 @@ impl< self.metrics.get(&ChainOrchestratorItem::L1Finalization).expect("metric exists"); let now = Instant::now(); - let finalized_batches = Retry::default() - .retry("handle_finalized", || async { - let tx = self.database.tx_mut().await?; - + let finalized_batches = self + .database + .tx_mut(move |tx| async move { // Set the latest finalized L1 block in the database. tx.set_finalized_l1_block_number(block_number).await?; // Get all unprocessed batches that have been finalized by this L1 block // finalization. - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; - - tx.commit().await?; - - Ok::<_, ChainOrchestratorError>(finalized_batches) + tx.fetch_and_update_unprocessed_finalized_batches(block_number).await }) .await?; @@ -651,40 +635,40 @@ impl< let metric = self.metrics.get(&ChainOrchestratorItem::BatchCommit).expect("metric exists"); let now = Instant::now(); - let event = Retry::default() - .retry("handle_batch_commit", || async { - let tx = self.database.tx_mut().await?; + let event = self + .database + .tx_mut(move |tx| { let batch_clone = batch.clone(); - let prev_batch_index = batch_clone.clone().index - 1; - - // Perform a consistency check to ensure the previous commit batch exists in the - // database. - if tx.get_batch_by_index(prev_batch_index).await?.is_none() { - return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)) - } - - // remove any batches with an index greater than the previous batch. - let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; + async move { + let prev_batch_index = batch_clone.index - 1; - // handle the case of a batch revert. - let new_safe_head = if affected > 0 { - tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; - tx.get_highest_block_for_batch_index(prev_batch_index).await? - } else { - None - }; - - let event = ChainOrchestratorEvent::BatchCommitIndexed { - batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), - l1_block_number: batch.block_number, - safe_head: new_safe_head, - }; + // Perform a consistency check to ensure the previous commit batch exists in the + // database. + if tx.get_batch_by_index(prev_batch_index).await?.is_none() { + return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)); + } - // insert the batch and commit the transaction. - tx.insert_batch(batch_clone).await?; - tx.commit().await?; + // remove any batches with an index greater than the previous batch. + let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; - Ok::<_, ChainOrchestratorError>(Some(event)) + // handle the case of a batch revert. + let new_safe_head = if affected > 0 { + tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; + tx.get_highest_block_for_batch_index(prev_batch_index).await? + } else { + None + }; + + let event = ChainOrchestratorEvent::BatchCommitIndexed { + batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), + l1_block_number: batch_clone.block_number, + safe_head: new_safe_head, + }; + + // insert the batch and commit the transaction. + tx.insert_batch(batch_clone).await?; + Ok::<_, ChainOrchestratorError>(Some(event)) + } }) .await?; @@ -699,10 +683,9 @@ impl< batch_index: u64, block_number: u64, ) -> Result, ChainOrchestratorError> { - let event = Retry::default() - .retry("handle_batch_finalization", || async { - let tx = self.database.tx_mut().await?; - + let event = self + .database + .tx_mut(move |tx| async move { // finalize all batches up to `batch_index`. tx.finalize_batches_up_to_index(batch_index, block_number).await?; @@ -713,15 +696,13 @@ impl< let finalized_batches = tx .fetch_and_update_unprocessed_finalized_batches(finalized_block_number) .await?; - tx.commit().await?; return Ok(Some(ChainOrchestratorEvent::BatchFinalized( block_number, finalized_batches, - ))) + ))); } - tx.commit().await?; Ok::<_, ChainOrchestratorError>(None) }) .await; @@ -754,22 +735,28 @@ impl< let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); // Perform a consistency check to ensure the previous L1 message exists in the database. - Retry::default() - .retry("handle_l1_message", || async { - let tx = self.database.tx_mut().await?; - if l1_message.transaction.queue_index > 0 && - tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1) + self.database + .tx_mut(move |tx| { + let l1_message = l1_message.clone(); + async move { + if l1_message.transaction.queue_index > 0 && + tx.get_n_l1_messages( + Some(L1MessageKey::from_queue_index( + l1_message.transaction.queue_index - 1, + )), + 1, + ) .await? - .is_none() - { - return Err(ChainOrchestratorError::L1MessageQueueGap( - l1_message.transaction.queue_index, - )) - } + .is_empty() + { + return Err(ChainOrchestratorError::L1MessageQueueGap( + l1_message.transaction.queue_index, + )); + } - tx.insert_l1_message(l1_message.clone()).await?; - tx.commit().await?; - Ok::<_, ChainOrchestratorError>(()) + tx.insert_l1_message(l1_message.clone()).await?; + Ok::<_, ChainOrchestratorError>(()) + } }) .await?; @@ -829,10 +816,8 @@ impl< } // We optimistically persist the signature upon passing consensus checks. - let tx = self.database.tx_mut().await?; - tx.insert_signature(block_with_peer.block.header.hash_slow(), block_with_peer.signature) - .await?; - tx.commit().await?; + let block_hash = block_with_peer.block.header.hash_slow(); + self.database.insert_signature(block_hash, block_with_peer.signature).await?; let received_block_number = block_with_peer.block.number; let received_block_hash = block_with_peer.block.header.hash_slow(); @@ -855,11 +840,9 @@ impl< // Purge all L1 message to L2 block mappings as they may be invalid after an // optimistic sync. - let tx = self.database.tx_mut().await?; - tx.purge_l1_message_to_l2_block_mappings(None).await?; - tx.commit().await?; + self.database.purge_l1_message_to_l2_block_mappings(None).await?; - return Ok(Some(ChainOrchestratorEvent::OptimisticSync(block_info))) + return Ok(Some(ChainOrchestratorEvent::OptimisticSync(block_info))); } // If the block number is greater than the current head we attempt to extend the chain. @@ -932,7 +915,7 @@ impl< headers: vec![block_with_peer.block.header], peer_id: block_with_peer.peer_id, signature: block_with_peer.signature, - })) + })); } // Check if the parent hash of the received block is in the chain. @@ -1056,7 +1039,7 @@ impl< // If the FCS update resulted in an invalid state, we return an error. if result.is_invalid() { tracing::warn!(target: "scroll::chain_orchestrator", ?chain_head_hash, ?chain_head_number, ?result, "Failed to update FCS after importing new chain from peer"); - return Err(ChainOrchestratorError::InvalidBlock) + return Err(ChainOrchestratorError::InvalidBlock); } // If we were previously in L2 syncing mode and the FCS update resulted in a valid state, we @@ -1077,10 +1060,15 @@ impl< // result is valid. if self.sync_state.is_synced() && result.is_valid() { let blocks = chain.iter().map(|block| block.into()).collect::>(); - let tx = self.database.tx_mut().await?; - tx.update_l1_messages_from_l2_blocks(blocks).await?; - tx.set_l2_head_block_number(block_with_peer.block.header.number).await?; - tx.commit().await?; + self.database + .tx_mut(move |tx| { + let blocks = blocks.clone(); + async move { + tx.update_l1_messages_from_l2_blocks(blocks).await?; + tx.set_l2_head_block_number(block_with_peer.block.header.number).await + } + }) + .await?; self.network.handle().block_import_outcome(BlockImportOutcome::valid_block( block_with_peer.peer_id, @@ -1114,7 +1102,7 @@ impl< from: safe_block_number, to: head_block_number, }); - return Ok(()) + return Ok(()); } let start_block_number = safe_block_number + 1; @@ -1134,12 +1122,11 @@ impl< self.validate_l1_messages(&blocks_to_validate).await?; - let tx = self.database.tx_mut().await?; - tx.update_l1_messages_from_l2_blocks( - blocks_to_validate.into_iter().map(|b| (&b).into()).collect(), - ) - .await?; - tx.commit().await?; + self.database + .update_l1_messages_from_l2_blocks( + blocks_to_validate.into_iter().map(|b| (&b).into()).collect(), + ) + .await?; self.notify(ChainOrchestratorEvent::ChainConsolidated { from: safe_block_number, @@ -1176,24 +1163,18 @@ impl< let first_block_number = blocks.first().expect("at least one block exists because we have l1 messages").number; - let tx = self.database.tx().await?; - let mut database_messages = if let Some(database_messages) = - tx.get_l1_messages(Some(L1MessageKey::block_number(first_block_number))).await? - { - database_messages - } else { - return Err(ChainOrchestratorError::L1MessageNotFound(L1MessageKey::TransactionHash( - *l1_message_hashes.first().expect("at least one message exists"), - ))) - }; + let count = l1_message_hashes.len(); + let mut database_messages = self + .database + .get_n_l1_messages(Some(L1MessageKey::block_number(first_block_number)), count) + .await? + .into_iter(); for message_hash in l1_message_hashes { // Get the expected L1 message from the database. let expected_hash = database_messages .next() - .await - .map(|m| m.map(|msg| msg.transaction.tx_hash())) - .transpose()? + .map(|m| m.transaction.tx_hash()) .ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageKey::TransactionHash( message_hash, ))) @@ -1238,17 +1219,15 @@ async fn compute_l1_message_queue_hash( Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else if l1_message.queue_index > l1_v2_message_queue_start_index { let index = l1_message.queue_index - 1; - let mut input = Retry::default() - .retry("get_l1_message_by_index", || async { - let tx = database.tx().await?; - let input = tx.get_l1_message_by_index(index).await?; - Ok::<_, ChainOrchestratorError>(input) - }) + let mut input = database + .get_n_l1_messages(Some(L1MessageKey::from_queue_index(index)), 1) .await? + .first() .map(|m| m.queue_hash) .ok_or(DatabaseError::L1MessageNotFound(L1MessageKey::QueueIndex(index)))? .unwrap_or_default() .to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else { diff --git a/crates/chain-orchestrator/src/retry.rs b/crates/chain-orchestrator/src/retry.rs deleted file mode 100644 index b48b9518..00000000 --- a/crates/chain-orchestrator/src/retry.rs +++ /dev/null @@ -1,193 +0,0 @@ -//! Configurable retry mechanism for database, network, and other fallible operations. - -use std::time::Duration; - -/// A type used for retrying transient failures in operations. -#[derive(Debug, Clone)] -pub struct Retry { - /// Maximum number of retry attempts. None means infinite retries - pub max_retries: Option, - /// Initial delay between retries in milliseconds - pub initial_delay_ms: u64, - /// Whether to use exponential backoff - pub exponential_backoff: bool, -} - -impl Default for Retry { - fn default() -> Self { - Self { max_retries: None, initial_delay_ms: 50, exponential_backoff: false } - } -} - -impl Retry { - /// Creates a new [`Retry`] with the specified parameters. - pub const fn new( - max_retries: Option, - initial_delay_ms: u64, - exponential_backoff: bool, - ) -> Self { - Self { max_retries, initial_delay_ms, exponential_backoff } - } - - /// Retry an asynchronous operation with the configured retry strategy. - pub async fn retry(&self, operation_name: &str, operation: F) -> Result - where - F: Fn() -> Fut, - Fut: std::future::Future>, - E: std::fmt::Debug + CanRetry, - { - let mut attempt: usize = 0; - - loop { - match operation().await { - Ok(result) => return Ok(result), - Err(error) => { - // If the error is not retryable, return immediately. - if !error.can_retry() { - return Err(error); - } - - if let Some(max_retries) = self.max_retries { - if attempt >= max_retries { - return Err(error); - } - } - - attempt += 1; - tracing::debug!( - target: "scroll::chain_orchestrator", - operation = operation_name, - error = ?error, - attempt = attempt, - "Retrying operation" - ); - - // Calculate delay for next retry - let delay_ms = if self.exponential_backoff { - self.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) - } else { - self.initial_delay_ms - }; - - tokio::time::sleep(Duration::from_millis(delay_ms)).await; - } - } - } - } -} - -/// A trait for errors that can indicate whether an operation can be retried. -pub trait CanRetry { - fn can_retry(&self) -> bool; -} - -// Centralized retry classification impls -impl CanRetry for scroll_db::DatabaseError { - fn can_retry(&self) -> bool { - matches!(self, Self::DatabaseError(_) | Self::SqlxError(_)) - } -} - -impl CanRetry for crate::error::ChainOrchestratorError { - fn can_retry(&self) -> bool { - match self { - Self::DatabaseError(db) => db.can_retry(), - Self::NetworkRequestError(_) | Self::RpcError(_) => true, - _ => false, - } - } -} - -impl CanRetry for scroll_engine::EngineError { - fn can_retry(&self) -> bool { - matches!(self, Self::TransportError(_)) - } -} - -#[cfg(test)] -mod tests { - use super::{CanRetry, Retry}; - use std::cell::RefCell; - - #[derive(Debug, Clone, Copy, PartialEq, Eq)] - struct TestErr; - impl CanRetry for TestErr { - fn can_retry(&self) -> bool { - true - } - } - - #[tokio::test] - async fn test_retry_success_on_first_attempt() { - let attempt = RefCell::new(0); - let retry = Retry::new(Some(3), 10, false); - let result = retry - .retry("test_operation", || { - *attempt.borrow_mut() += 1; - async move { Ok::(42) } - }) - .await; - - assert_eq!(result, Ok(42)); - assert_eq!(*attempt.borrow(), 1); - } - - #[tokio::test] - async fn test_retry_success_after_failures() { - let attempt = RefCell::new(0); - let retry = Retry::new(Some(5), 10, false); - let result = retry - .retry("test_operation", || { - *attempt.borrow_mut() += 1; - let current_attempt = *attempt.borrow(); - async move { - if current_attempt < 3 { - Err::(TestErr) - } else { - Ok(42) - } - } - }) - .await; - - assert_eq!(result, Ok(42)); - assert_eq!(*attempt.borrow(), 3); - } - - #[tokio::test] - async fn test_retry_exhausted() { - let attempt = RefCell::new(0); - let retry = Retry::new(Some(2), 10, false); - let result = retry - .retry("test_operation", || { - *attempt.borrow_mut() += 1; - async move { Err::(TestErr) } - }) - .await; - - assert_eq!(result, Err(TestErr)); - assert_eq!(*attempt.borrow(), 3); // 1 initial + 2 retries - } - - #[tokio::test] - async fn test_retry_with_defaults() { - let attempt = RefCell::new(0); - let retry = Retry::default(); - let result = retry - .retry("test_retry_with_defaults", || { - *attempt.borrow_mut() += 1; - let current_attempt = *attempt.borrow(); - async move { - if current_attempt < 2 { - Err::(TestErr) - } else { - Ok(42) - } - } - }) - .await; - - assert_eq!(result, Ok(42)); - assert_eq!(*attempt.borrow(), 2); - } -} diff --git a/crates/database/db/Cargo.toml b/crates/database/db/Cargo.toml index c75f0458..31717829 100644 --- a/crates/database/db/Cargo.toml +++ b/crates/database/db/Cargo.toml @@ -11,7 +11,6 @@ workspace = true [dependencies] # alloy -alloy-eips.workspace = true alloy-primitives.workspace = true # scroll-alloy diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index f7413a7c..1ac5f401 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -1,17 +1,25 @@ -use std::{str::FromStr, time::Duration}; - use super::transaction::{DatabaseTransactionProvider, TXMut, TX}; -use crate::{error::DatabaseError, metrics::DatabaseMetrics, DatabaseConnectionProvider}; - +use crate::{ + error::DatabaseError, + metrics::DatabaseMetrics, + service::{query::DatabaseQuery, retry::Retry, DatabaseService, DatabaseServiceError}, + DatabaseConnectionProvider, DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, + UnwindResult, +}; +use alloy_primitives::{Signature, B256}; +use rollup_node_primitives::{ + BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1MessageEnvelope, + L2BlockInfoWithL1Messages, +}; +use scroll_alloy_rpc_types_engine::BlockDataHint; use sea_orm::{ sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}, DatabaseConnection, SqlxSqliteConnector, TransactionTrait, }; -use std::sync::Arc; +use std::{fmt::Debug, future::Future, str::FromStr, sync::Arc, time::Duration}; use tokio::sync::{Mutex, Semaphore}; // TODO: make these configurable via CLI. - /// The timeout duration for database busy errors. const BUSY_TIMEOUT_SECS: u64 = 5; @@ -24,14 +32,371 @@ const MIN_CONNECTIONS: u32 = 5; /// The timeout for acquiring a connection from the pool. const ACQUIRE_TIMEOUT_SECS: u64 = 5; -/// The [`Database`] struct is responsible for interacting with the database. +/// A wrapper around `DatabaseInner` which provides retry features. +#[derive(Debug)] +pub struct Database { + database: Retry>, +} + +impl Database { + /// Creates a new [`Database`] instance associated with the provided database URL. + pub async fn new(database_url: &str) -> Result { + let db = Arc::new(DatabaseInner::new(database_url).await?); + Ok(Self { database: Retry::new_with_default_config(db) }) + } + + /// Creates a new [`Database`] instance with SQLite-specific optimizations and custom pool + /// settings. + pub async fn new_sqlite_with_pool_options( + database_url: &str, + max_connections: u32, + min_connections: u32, + acquire_timeout_secs: u64, + busy_timeout_secs: u64, + ) -> Result { + let db = Arc::new( + DatabaseInner::new_sqlite_with_pool_options( + database_url, + max_connections, + min_connections, + acquire_timeout_secs, + busy_timeout_secs, + ) + .await?, + ); + Ok(Self { database: Retry::new_with_default_config(db) }) + } + + /// Creates a new [`Database`] instance for testing purposes, using the provided temporary + /// directory to store the database files. + #[cfg(feature = "test-utils")] + pub async fn test(dir: tempfile::TempDir) -> Result { + let db = Arc::new(DatabaseInner::test(dir).await?); + Ok(Self { database: Retry::new_with_default_config(db) }) + } + + /// Returns a reference to the database tmp dir. + #[cfg(feature = "test-utils")] + pub fn tmp_dir(&self) -> Option<&tempfile::TempDir> { + self.database.inner.tmp_dir() + } + + /// Returns a reference to the inner database structure. + pub fn inner(&self) -> Arc { + self.database.inner.clone() + } + + /// Initiates a read operation to the underlying database layer. + pub async fn tx( + &self, + call: F, + ) -> Result + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let request = DatabaseQuery::read(call); + self.database.call(request).await + } + + /// Initiates a write operation to the underlying database layer. + pub async fn tx_mut( + &self, + call: F, + ) -> Result + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let request = DatabaseQuery::write(call); + self.database.call(request).await + } +} + +#[async_trait::async_trait] +impl DatabaseWriteOperations for Database { + async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let batch_commit = batch_commit.clone(); + async move { tx.insert_batch(batch_commit).await } + }) + .await + } + + async fn finalize_batches_up_to_index( + &self, + batch_index: u64, + block_number: u64, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { + tx.finalize_batches_up_to_index(batch_index, block_number).await + }) + .await + } + + async fn set_latest_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.set_latest_l1_block_number(block_number).await }) + .await + } + + async fn set_finalized_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.set_finalized_l1_block_number(block_number).await }) + .await + } + + async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.set_processed_l1_block_number(block_number).await }) + .await + } + + async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.set_l2_head_block_number(number).await }).await + } + + async fn fetch_and_update_unprocessed_finalized_batches( + &self, + finalized_l1_block_number: u64, + ) -> Result, DatabaseError> { + self.tx_mut(move |tx| async move { + tx.fetch_and_update_unprocessed_finalized_batches(finalized_l1_block_number).await + }) + .await + } + + async fn delete_batches_gt_block_number( + &self, + block_number: u64, + ) -> Result { + self.tx_mut( + move |tx| async move { tx.delete_l2_blocks_gt_block_number(block_number).await }, + ) + .await + } + + async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result { + self.tx_mut(move |tx| async move { tx.delete_batches_gt_batch_index(batch_index).await }) + .await + } + + async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let l1_message = l1_message.clone(); + async move { tx.insert_l1_message(l1_message).await } + }) + .await + } + + async fn delete_l1_messages_gt( + &self, + l1_block_number: u64, + ) -> Result, DatabaseError> { + self.tx_mut(move |tx| async move { tx.delete_l1_messages_gt(l1_block_number).await }).await + } + + async fn prepare_on_startup( + &self, + genesis_hash: B256, + ) -> Result<(Option, Option), DatabaseError> { + self.tx_mut(move |tx| async move { tx.prepare_on_startup(genesis_hash).await }).await + } + + async fn delete_l2_blocks_gt_block_number( + &self, + block_number: u64, + ) -> Result { + self.tx_mut( + move |tx| async move { tx.delete_l2_blocks_gt_block_number(block_number).await }, + ) + .await + } + + async fn delete_l2_blocks_gt_batch_index( + &self, + batch_index: u64, + ) -> Result { + self.tx_mut(move |tx| async move { tx.delete_l2_blocks_gt_batch_index(batch_index).await }) + .await + } + + async fn insert_blocks( + &self, + blocks: Vec, + batch_info: BatchInfo, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let blocks = blocks.clone(); + async move { tx.insert_blocks(blocks, batch_info).await } + }) + .await + } + + async fn insert_block( + &self, + block_info: BlockInfo, + batch_info: BatchInfo, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.insert_block(block_info, batch_info).await }).await + } + + async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.insert_genesis_block(genesis_hash).await }).await + } + + async fn update_l1_messages_from_l2_blocks( + &self, + blocks: Vec, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let blocks = blocks.clone(); + async move { tx.update_l1_messages_from_l2_blocks(blocks).await } + }) + .await + } + + async fn update_l1_messages_with_l2_block( + &self, + block_info: L2BlockInfoWithL1Messages, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let block_info = block_info.clone(); + async move { tx.update_l1_messages_with_l2_block(block_info).await } + }) + .await + } + + async fn purge_l1_message_to_l2_block_mappings( + &self, + block_number: Option, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { + tx.purge_l1_message_to_l2_block_mappings(block_number).await + }) + .await + } + + async fn insert_batch_consolidation_outcome( + &self, + outcome: BatchConsolidationOutcome, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| { + let outcome = outcome.clone(); + async move { tx.insert_batch_consolidation_outcome(outcome).await } + }) + .await + } + + async fn unwind( + &self, + genesis_hash: B256, + l1_block_number: u64, + ) -> Result { + self.tx_mut(move |tx| async move { tx.unwind(genesis_hash, l1_block_number).await }).await + } + + async fn insert_signature( + &self, + block_hash: B256, + signature: Signature, + ) -> Result<(), DatabaseError> { + self.tx_mut(move |tx| async move { tx.insert_signature(block_hash, signature).await }).await + } +} + +#[async_trait::async_trait] +impl DatabaseReadOperations for Database { + async fn get_batch_by_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_batch_by_index(batch_index).await }).await + } + + async fn get_latest_l1_block_number(&self) -> Result { + self.tx(|tx| async move { tx.get_latest_l1_block_number().await }).await + } + + async fn get_finalized_l1_block_number(&self) -> Result { + self.tx(|tx| async move { tx.get_finalized_l1_block_number().await }).await + } + + async fn get_processed_l1_block_number(&self) -> Result { + self.tx(|tx| async move { tx.get_processed_l1_block_number().await }).await + } + + async fn get_l2_head_block_number(&self) -> Result { + self.tx(|tx| async move { tx.get_l2_head_block_number().await }).await + } + + async fn get_n_l1_messages( + &self, + start: Option, + n: usize, + ) -> Result, DatabaseError> { + self.tx(move |tx| { + let start = start.clone(); + async move { tx.get_n_l1_messages(start, n).await } + }) + .await + } + + async fn get_n_l2_block_data_hint( + &self, + block_number: u64, + n: usize, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_n_l2_block_data_hint(block_number, n).await }).await + } + + async fn get_l2_block_and_batch_info_by_hash( + &self, + block_hash: B256, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_l2_block_and_batch_info_by_hash(block_hash).await }) + .await + } + + async fn get_l2_block_info_by_number( + &self, + block_number: u64, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_l2_block_info_by_number(block_number).await }).await + } + + async fn get_latest_safe_l2_info( + &self, + ) -> Result, DatabaseError> { + self.tx(|tx| async move { tx.get_latest_safe_l2_info().await }).await + } + + async fn get_highest_block_for_batch_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_highest_block_for_batch_hash(batch_hash).await }) + .await + } + + async fn get_highest_block_for_batch_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_highest_block_for_batch_index(batch_index).await }) + .await + } + + async fn get_signature(&self, block_hash: B256) -> Result, DatabaseError> { + self.tx(move |tx| async move { tx.get_signature(block_hash).await }).await + } +} + +/// The [`DatabaseInner`] struct is responsible for interacting with the database. /// -/// The [`Database`] type hold a connection pool and a write lock. It implements the +/// The [`DatabaseInner`] type hold a connection pool and a write lock. It implements the /// [`DatabaseTransactionProvider`] trait to provide methods for creating read-only and read-write /// transactions. It allows multiple concurrent read-only transactions, but ensures that only one /// read-write transaction is active at any given time using a mutex. #[derive(Debug)] -pub struct Database { +pub struct DatabaseInner { /// The underlying database connection. connection: DatabaseConnection, /// A mutex to ensure that only one mutable transaction is active at a time. @@ -46,9 +411,9 @@ pub struct Database { tmp_dir: Option, } -impl Database { +impl DatabaseInner { /// Creates a new [`Database`] instance associated with the provided database URL. - pub async fn new(database_url: &str) -> Result { + async fn new(database_url: &str) -> Result { Self::new_sqlite_with_pool_options( database_url, MAX_CONNECTIONS, @@ -61,7 +426,7 @@ impl Database { /// Creates a new [`Database`] instance with SQLite-specific optimizations and custom pool /// settings. - pub async fn new_sqlite_with_pool_options( + async fn new_sqlite_with_pool_options( database_url: &str, max_connections: u32, min_connections: u32, @@ -98,7 +463,7 @@ impl Database { /// Creates a new [`Database`] instance for testing purposes, using the provided temporary /// directory to store the database files. #[cfg(feature = "test-utils")] - pub async fn test(dir: tempfile::TempDir) -> Result { + async fn test(dir: tempfile::TempDir) -> Result { let path = dir.path().join("test.db"); let mut db = Self::new(path.to_str().unwrap()).await?; db.tmp_dir = Some(dir); @@ -107,13 +472,13 @@ impl Database { /// Returns a reference to the database tmp dir. #[cfg(feature = "test-utils")] - pub const fn tmp_dir(&self) -> Option<&tempfile::TempDir> { + const fn tmp_dir(&self) -> Option<&tempfile::TempDir> { self.tmp_dir.as_ref() } } #[async_trait::async_trait] -impl DatabaseTransactionProvider for Database { +impl DatabaseTransactionProvider for DatabaseInner { /// Creates a new [`TX`] which can be used for read-only operations. async fn tx(&self) -> Result { tracing::trace!(target: "scroll::db", "Creating new read-only transaction"); @@ -135,7 +500,7 @@ impl DatabaseTransactionProvider for Database { } #[async_trait::async_trait] -impl DatabaseTransactionProvider for Arc { +impl DatabaseTransactionProvider for Arc { /// Creates a new [`TX`] which can be used for read-only operations. async fn tx(&self) -> Result { self.as_ref().tx().await @@ -147,7 +512,7 @@ impl DatabaseTransactionProvider for Arc { } } -impl DatabaseConnectionProvider for Database { +impl DatabaseConnectionProvider for DatabaseInner { type Connection = DatabaseConnection; fn get_connection(&self) -> &Self::Connection { @@ -173,13 +538,13 @@ mod test { use rollup_node_primitives::{ BatchCommitData, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, }; + use scroll_alloy_consensus::TxL1Message; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; #[tokio::test] async fn test_database_round_trip_batch_commit() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -190,11 +555,10 @@ mod test { let batch_commit = BatchCommitData::arbitrary(&mut u).unwrap(); // Round trip the BatchCommitData through the database. - tx.insert_batch(batch_commit.clone()).await.unwrap(); + db.insert_batch(batch_commit.clone()).await.unwrap(); let batch_commit_from_db = - tx.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); + db.get_batch_by_index(batch_commit.index).await.unwrap().unwrap(); - tx.commit().await.unwrap(); assert_eq!(batch_commit, batch_commit_from_db); } @@ -202,7 +566,6 @@ mod test { async fn test_database_finalize_batch_commits() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 2048]; @@ -217,10 +580,10 @@ mod test { finalized_block_number: Some(100), ..Arbitrary::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_commit.clone()).await.unwrap(); + db.insert_batch(batch_commit.clone()).await.unwrap(); } // Finalize all batches below batch index 10. - tx.finalize_batches_up_to_index(10, 100).await.unwrap(); + db.finalize_batches_up_to_index(10, 100).await.unwrap(); // Generate 10 commit batches not finalized. for i in 10..20 { @@ -230,14 +593,19 @@ mod test { finalized_block_number: None, ..Arbitrary::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_commit.clone()).await.unwrap(); + db.insert_batch(batch_commit.clone()).await.unwrap(); } // Finalize all batches below batch index 15. - tx.finalize_batches_up_to_index(15, 200).await.unwrap(); + db.finalize_batches_up_to_index(15, 200).await.unwrap(); // Verify the finalized_block_number is correctly updated. - let batches = tx.get_batches().await.unwrap().collect::>>().await; + let batches = models::batch_commit::Entity::find() + .stream(db.inner().get_connection()) + .await + .unwrap() + .collect::>>() + .await; for batch in batches { let batch = batch.unwrap(); if batch.index < 10 { @@ -248,15 +616,12 @@ mod test { assert_eq!(batch.finalized_block_number, None); } } - - tx.commit().await.unwrap(); } #[tokio::test] async fn test_database_round_trip_l1_message() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -267,15 +632,24 @@ mod test { let l1_message = L1MessageEnvelope::arbitrary(&mut u).unwrap(); // Round trip the L1Message through the database. - tx.insert_l1_message(l1_message.clone()).await.unwrap(); - let l1_message_from_db_index = - tx.get_l1_message_by_index(l1_message.transaction.queue_index).await.unwrap().unwrap(); - let l1_message_from_db_hash = - tx.get_l1_message_by_hash(l1_message.queue_hash.unwrap()).await.unwrap().unwrap(); + db.insert_l1_message(l1_message.clone()).await.unwrap(); + let l1_message_from_db_index = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index(l1_message.transaction.queue_index)), + 1, + ) + .await + .unwrap(); + let l1_message_from_db_hash = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_hash(l1_message.queue_hash.unwrap())), + 1, + ) + .await + .unwrap(); - tx.commit().await.unwrap(); - assert_eq!(l1_message, l1_message_from_db_index); - assert_eq!(l1_message, l1_message_from_db_hash); + assert_eq!(l1_message, l1_message_from_db_index[0]); + assert_eq!(l1_message, l1_message_from_db_hash[0]); } #[tokio::test] @@ -283,19 +657,17 @@ mod test { async fn test_database_block_data_seed() { // Setup the test database. let db = setup_test_db().await; - let tx = db.tx().await.unwrap(); // db should contain the seeded data after migration. - let data = tx.get_l2_block_data_hint(0).await.unwrap(); + let data = db.get_n_l2_block_data_hint(0, 1).await.unwrap(); - assert!(data.is_some()); + assert_eq!(data.len(), 1); } #[tokio::test] async fn test_derived_block_exists() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -306,23 +678,23 @@ mod test { let mut block_number = 100; let data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() }; let batch_info: BatchInfo = data.clone().into(); - tx.insert_batch(data).await.unwrap(); + db.insert_batch(data).await.unwrap(); for _ in 0..10 { let block_info = BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(block_info, batch_info).await.unwrap(); + db.insert_block(block_info, batch_info).await.unwrap(); block_number += 1; } // Fetch the highest block for the batch hash and verify number. let highest_block_info = - tx.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_hash(batch_info.hash).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); // Fetch the highest block for the batch and verify number. let highest_block_info = - tx.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap(); + db.get_highest_block_for_batch_index(batch_info.index).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -330,7 +702,6 @@ mod test { async fn test_derived_block_missing() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -345,26 +716,25 @@ mod test { let second_batch = BatchCommitData { index: 250, ..Arbitrary::arbitrary(&mut u).unwrap() }; let second_batch_info: BatchInfo = second_batch.clone().into(); - tx.insert_batch(first_batch).await.unwrap(); - tx.insert_batch(second_batch).await.unwrap(); + db.insert_batch(first_batch).await.unwrap(); + db.insert_batch(second_batch).await.unwrap(); for _ in 0..10 { let block_info = BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(block_info, first_batch_info).await.unwrap(); + db.insert_block(block_info, first_batch_info).await.unwrap(); block_number += 1; } // Fetch the highest block for the batch hash and verify number. let highest_block_info = - tx.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap(); + db.get_highest_block_for_batch_hash(second_batch_info.hash).await.unwrap().unwrap(); assert_eq!(highest_block_info.number, block_number - 1); // Fetch the highest block for the batch index and verify number. let highest_block_info = - tx.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap(); + db.get_highest_block_for_batch_index(second_batch_info.index).await.unwrap().unwrap(); - tx.commit().await.unwrap(); assert_eq!(highest_block_info.number, block_number - 1); } @@ -372,7 +742,6 @@ mod test { async fn test_database_batches_by_finalized_block_range() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 2048]; @@ -391,12 +760,12 @@ mod test { ..Arbitrary::arbitrary(&mut u).unwrap() }; let hash = data.hash; - tx.insert_batch(data).await.unwrap(); + db.insert_batch(data).await.unwrap(); // Finalize batch up to block number 110. if block_number <= 110 { finalized_batches_hashes.push(hash); - tx.finalize_batches_up_to_index(batch_index, block_number).await.unwrap(); + db.finalize_batches_up_to_index(batch_index, block_number).await.unwrap(); } block_number += 1; @@ -404,7 +773,7 @@ mod test { } // Fetch the finalized batch for provided height and verify number. - let batch_infos = tx + let batch_infos = db .fetch_and_update_unprocessed_finalized_batches(110) .await .unwrap() @@ -429,26 +798,27 @@ mod test { let l1_message_2 = L1MessageEnvelope::arbitrary(&mut u).unwrap(); // Insert the L1Messages into the database in a transaction. - let tx = db.tx_mut().await.unwrap(); - tx.insert_l1_message(l1_message_1.clone()).await.unwrap(); - tx.insert_l1_message(l1_message_2.clone()).await.unwrap(); - tx.commit().await.unwrap(); + db.insert_l1_message(l1_message_1.clone()).await.unwrap(); + db.insert_l1_message(l1_message_2.clone()).await.unwrap(); // Check that the L1Messages are in the database. - let tx = db.tx().await.unwrap(); - let l1_message_1_from_db = tx - .get_l1_message_by_index(l1_message_1.transaction.queue_index) + let l1_message_1_from_db = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index(l1_message_1.transaction.queue_index)), + 1, + ) .await - .unwrap() .unwrap(); - assert_eq!(l1_message_1, l1_message_1_from_db); - let l1_message_2_from_db = tx - .get_l1_message_by_index(l1_message_2.transaction.queue_index) + assert_eq!(l1_message_1, l1_message_1_from_db[0]); + let l1_message_2_from_db = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index(l1_message_2.transaction.queue_index)), + 1, + ) .await - .unwrap() .unwrap(); - assert_eq!(l1_message_2, l1_message_2_from_db); + assert_eq!(l1_message_2, l1_message_2_from_db[0]); } #[tokio::test] @@ -467,71 +837,67 @@ mod test { let l1_message_2 = L1MessageEnvelope::arbitrary(&mut u).unwrap(); // Insert the L1Messages into the database in a transaction. - let tx = db.tx_mut().await.unwrap(); - tx.insert_l1_message(l1_message_1.clone()).await.unwrap(); - tx.insert_l1_message(l1_message_2.clone()).await.unwrap(); + db.insert_l1_message(l1_message_1.clone()).await.unwrap(); + db.insert_l1_message(l1_message_2.clone()).await.unwrap(); // Modify l1_block_number of l1_message_1 and attempt to insert again l1_message_1.l1_block_number = 1000; - tx.insert_l1_message(l1_message_1.clone()).await.unwrap(); - tx.commit().await.unwrap(); + db.insert_l1_message(l1_message_1.clone()).await.unwrap(); // Check that the L1Messages are in the database. - let tx = db.tx().await.unwrap(); - let l1_message_1_from_db = tx - .get_l1_message_by_index(l1_message_1.transaction.queue_index) + let l1_message_1_from_db = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index(l1_message_1.transaction.queue_index)), + 1, + ) .await - .unwrap() .unwrap(); - assert_eq!(original_l1_message_1, l1_message_1_from_db); - let l1_message_2_from_db = tx - .get_l1_message_by_index(l1_message_2.transaction.queue_index) + assert_eq!(original_l1_message_1, l1_message_1_from_db[0]); + let l1_message_2_from_db = db + .get_n_l1_messages( + Some(L1MessageKey::from_queue_index(l1_message_2.transaction.queue_index)), + 1, + ) .await - .unwrap() .unwrap(); - assert_eq!(l1_message_2, l1_message_2_from_db); + + assert_eq!(l1_message_2, l1_message_2_from_db[0]); } #[tokio::test] - async fn test_database_iterator() { + async fn test_database_get_l1_messages() { // Setup the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 2048]; rand::rng().fill(bytes.as_mut_slice()); let mut u = Unstructured::new(&bytes); - // Generate 2 random L1Messages. - let l1_message_1 = L1MessageEnvelope::arbitrary(&mut u).unwrap(); - let l1_message_2 = L1MessageEnvelope::arbitrary(&mut u).unwrap(); - - // Insert the L1Messages into the database. - tx.insert_l1_message(l1_message_1.clone()).await.unwrap(); - tx.insert_l1_message(l1_message_2.clone()).await.unwrap(); - tx.commit().await.unwrap(); + // Generate 10 random L1Messages. + let mut l1_messages = Vec::new(); + for i in 0..10 { + let l1_message = L1MessageEnvelope { + transaction: TxL1Message { + queue_index: i, + ..TxL1Message::arbitrary(&mut u).unwrap() + }, + ..L1MessageEnvelope::arbitrary(&mut u).unwrap() + }; + db.insert_l1_message(l1_message.clone()).await.unwrap(); + l1_messages.push(l1_message); + } // collect the L1Messages - let tx = db.tx().await.unwrap(); - let l1_messages = tx - .get_l1_messages(None) - .await - .unwrap() - .unwrap() - .map(|res| res.unwrap()) - .collect::>() - .await; + let db_l1_messages = db.get_n_l1_messages(None, 5).await.unwrap(); // Apply the assertions. - assert!(l1_messages.contains(&l1_message_1)); - assert!(l1_messages.contains(&l1_message_2)); + assert_eq!(db_l1_messages, l1_messages[..5]); } #[tokio::test] async fn test_delete_l1_messages_gt() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -543,11 +909,11 @@ mod test { let mut l1_message = L1MessageEnvelope::arbitrary(&mut u).unwrap(); l1_message.l1_block_number = 100 + i; // block number: 100-109 l1_message.transaction.queue_index = i; // queue index: 0-9 - tx.insert_l1_message(l1_message).await.unwrap(); + db.insert_l1_message(l1_message).await.unwrap(); } // Delete messages with L1 block number > 105 - let deleted_messages = tx.delete_l1_messages_gt(105).await.unwrap(); + let deleted_messages = db.delete_l1_messages_gt(105).await.unwrap(); // Verify that 4 messages were deleted (block numbers 106, 107, 108, 109) assert_eq!(deleted_messages.len(), 4); @@ -559,15 +925,21 @@ mod test { // Verify remaining messages are still in database (queue indices 0-5) for queue_idx in 0..=5 { - let msg = tx.get_l1_message_by_index(queue_idx).await.unwrap(); - assert!(msg.is_some()); - assert!(msg.unwrap().l1_block_number <= 105); + let msgs = db + .get_n_l1_messages(Some(L1MessageKey::from_queue_index(queue_idx)), 1) + .await + .unwrap(); + assert!(!msgs.is_empty()); + assert!(msgs.first().unwrap().l1_block_number <= 105); } // Verify deleted messages are no longer in database (queue indices 6-9) for queue_idx in 6..10 { - let msg = tx.get_l1_message_by_index(queue_idx).await.unwrap(); - assert!(msg.is_none()); + let msgs = db + .get_n_l1_messages(Some(L1MessageKey::from_queue_index(queue_idx)), 1) + .await + .unwrap(); + assert!(msgs.is_empty()); } } @@ -575,7 +947,6 @@ mod test { async fn test_get_l2_block_info_by_number() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -585,7 +956,7 @@ mod test { // Generate and insert a batch let batch_data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() }; let batch_info: BatchInfo = batch_data.clone().into(); - tx.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data).await.unwrap(); // Generate and insert multiple L2 blocks let mut block_infos = Vec::new(); @@ -593,18 +964,18 @@ mod test { let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() }; let l2_block = block_info; block_infos.push(block_info); - tx.insert_block(l2_block, batch_info).await.unwrap(); + db.insert_block(l2_block, batch_info).await.unwrap(); } // Test getting existing blocks for expected_block in block_infos { let retrieved_block = - tx.get_l2_block_info_by_number(expected_block.number).await.unwrap(); + db.get_l2_block_info_by_number(expected_block.number).await.unwrap(); assert_eq!(retrieved_block, Some(expected_block)) } // Test getting non-existent block - let non_existent_block = tx.get_l2_block_info_by_number(999).await.unwrap(); + let non_existent_block = db.get_l2_block_info_by_number(999).await.unwrap(); assert!(non_existent_block.is_none()); } @@ -612,7 +983,6 @@ mod test { async fn test_get_latest_safe_l2_block() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -620,25 +990,25 @@ mod test { let mut u = Unstructured::new(&bytes); // Initially should return the genesis block and hash. - let (latest_safe_block, batch) = tx.get_latest_safe_l2_info().await.unwrap().unwrap(); + let (latest_safe_block, batch) = db.get_latest_safe_l2_info().await.unwrap().unwrap(); assert_eq!(latest_safe_block.number, 0); assert_eq!(batch.index, 0); // Generate and insert a batch let batch_data = BatchCommitData { index: 100, ..Arbitrary::arbitrary(&mut u).unwrap() }; let batch_info: BatchInfo = batch_data.clone().into(); - tx.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data).await.unwrap(); // Insert blocks with batch info (safe blocks) let safe_block_1 = BlockInfo { number: 200, hash: B256::arbitrary(&mut u).unwrap() }; let safe_block_2 = BlockInfo { number: 201, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(safe_block_1, batch_info).await.unwrap(); + db.insert_block(safe_block_1, batch_info).await.unwrap(); - tx.insert_block(safe_block_2, batch_info).await.unwrap(); + db.insert_block(safe_block_2, batch_info).await.unwrap(); // Should return the highest safe block (block 201) - let latest_safe = tx.get_latest_safe_l2_info().await.unwrap(); + let latest_safe = db.get_latest_safe_l2_info().await.unwrap(); assert_eq!(latest_safe, Some((safe_block_2, batch_info))); } @@ -646,7 +1016,6 @@ mod test { async fn test_delete_l2_blocks_gt_block_number() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -658,22 +1027,22 @@ mod test { for i in 400..410 { let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(block_info, batch_info).await.unwrap(); + db.insert_block(block_info, batch_info).await.unwrap(); } // Delete blocks with number > 405 - let deleted_count = tx.delete_l2_blocks_gt_block_number(405).await.unwrap(); + let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap(); assert_eq!(deleted_count, 4); // Blocks 406, 407, 408, 409 // Verify remaining blocks still exist for i in 400..=405 { - let block = tx.get_l2_block_info_by_number(i).await.unwrap(); + let block = db.get_l2_block_info_by_number(i).await.unwrap(); assert!(block.is_some()); } // Verify deleted blocks no longer exist for i in 406..410 { - let block = tx.get_l2_block_info_by_number(i).await.unwrap(); + let block = db.get_l2_block_info_by_number(i).await.unwrap(); assert!(block.is_none()); } } @@ -682,7 +1051,6 @@ mod test { async fn test_delete_l2_blocks_gt_batch_index() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -696,38 +1064,38 @@ mod test { calldata: Arc::new(vec![].into()), ..Arbitrary::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data).await.unwrap(); } // Insert L2 blocks with different batch indices for i in 100..110 { - let batch_data = tx.get_batch_by_index(i).await.unwrap().unwrap(); + let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap(); let batch_info: BatchInfo = batch_data.into(); let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(block_info, batch_info).await.unwrap(); + db.insert_block(block_info, batch_info).await.unwrap(); } // Delete L2 blocks with batch index > 105 - let deleted_count = tx.delete_l2_blocks_gt_batch_index(105).await.unwrap(); + let deleted_count = db.delete_l2_blocks_gt_batch_index(105).await.unwrap(); assert_eq!(deleted_count, 4); // Blocks with batch indices 106, 107, 108, 109 // Verify remaining blocks with batch index <= 105 still exist for i in 100..=105 { - let block = tx.get_l2_block_info_by_number(500 + i).await.unwrap(); + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); assert!(block.is_some()); } // Verify deleted blocks with batch index > 105 no longer exist for i in 106..110 { - let block = tx.get_l2_block_info_by_number(500 + i).await.unwrap(); + let block = db.get_l2_block_info_by_number(500 + i).await.unwrap(); assert!(block.is_none()); } // Verify blocks without batch index are still there (not affected by batch index filter) for i in 0..3 { - let block = tx.get_l2_block_info_by_number(600 + i).await.unwrap(); + let block = db.get_l2_block_info_by_number(600 + i).await.unwrap(); assert!(block.is_some()); } } @@ -736,7 +1104,6 @@ mod test { async fn test_insert_block_with_l1_messages() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -746,7 +1113,7 @@ mod test { // Generate and insert batch let batch_data = BatchCommitData { index: 10, ..Arbitrary::arbitrary(&mut u).unwrap() }; let batch_info: BatchInfo = batch_data.clone().into(); - tx.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data).await.unwrap(); // Generate and insert L1 messages let mut l1_message_hashes = Vec::new(); @@ -754,7 +1121,7 @@ mod test { let mut l1_message = L1MessageEnvelope::arbitrary(&mut u).unwrap(); l1_message.transaction.queue_index = i; l1_message_hashes.push(l1_message.transaction.tx_hash()); - tx.insert_l1_message(l1_message).await.unwrap(); + db.insert_l1_message(l1_message).await.unwrap(); } // Create block with L1 messages @@ -763,17 +1130,18 @@ mod test { L2BlockInfoWithL1Messages { block_info, l1_messages: l1_message_hashes.clone() }; // Insert block - tx.insert_block(l2_block.block_info, batch_info).await.unwrap(); - tx.update_l1_messages_with_l2_block(l2_block).await.unwrap(); + db.insert_block(l2_block.block_info, batch_info).await.unwrap(); + db.update_l1_messages_with_l2_block(l2_block).await.unwrap(); // Verify block was inserted - let retrieved_block = tx.get_l2_block_info_by_number(500).await.unwrap(); + let retrieved_block = db.get_l2_block_info_by_number(500).await.unwrap(); assert_eq!(retrieved_block, Some(block_info)); // Verify L1 messages were updated with L2 block number for i in 100..103 { - let l1_message = tx.get_l1_message_by_index(i).await.unwrap().unwrap(); - assert_eq!(l1_message.l2_block_number, Some(500)); + let l1_message = + db.get_n_l1_messages(Some(L1MessageKey::from_queue_index(i)), 1).await.unwrap(); + assert_eq!(l1_message.first().unwrap().l2_block_number, Some(500)); } } @@ -781,7 +1149,6 @@ mod test { async fn test_insert_block_upsert_behavior() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -794,22 +1161,21 @@ mod test { let batch_data_2 = BatchCommitData { index: 200, ..Arbitrary::arbitrary(&mut u).unwrap() }; let batch_info_2: BatchInfo = batch_data_2.clone().into(); - tx.insert_batch(batch_data_1).await.unwrap(); - tx.insert_batch(batch_data_2).await.unwrap(); + db.insert_batch(batch_data_1).await.unwrap(); + db.insert_batch(batch_data_2).await.unwrap(); // Insert initial block let block_info = BlockInfo { number: 600, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_block(block_info, batch_info_1).await.unwrap(); + db.insert_block(block_info, batch_info_1).await.unwrap(); // Verify initial insertion - let retrieved_block = tx.get_l2_block_info_by_number(600).await.unwrap(); - tx.commit().await.unwrap(); + let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap(); assert_eq!(retrieved_block, Some(block_info)); // Verify initial batch association using model conversion let initial_l2_block_model = models::l2_block::Entity::find() .filter(models::l2_block::Column::BlockNumber.eq(600)) - .one(db.get_connection()) + .one(db.inner().get_connection()) .await .unwrap() .unwrap(); @@ -819,20 +1185,16 @@ mod test { assert_eq!(initial_batch_info, batch_info_1); // Update the same block with different batch info (upsert) - let tx = db.tx_mut().await.unwrap(); - tx.insert_block(block_info, batch_info_2).await.unwrap(); - tx.commit().await.unwrap(); + db.insert_block(block_info, batch_info_2).await.unwrap(); // Verify the block still exists and was updated - let tx = db.tx().await.unwrap(); - let retrieved_block = tx.get_l2_block_info_by_number(600).await.unwrap().unwrap(); - drop(tx); + let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap().unwrap(); assert_eq!(retrieved_block, block_info); // Verify batch association was updated using model conversion let updated_l2_block_model = models::l2_block::Entity::find() .filter(models::l2_block::Column::BlockNumber.eq(600)) - .one(db.get_connection()) + .one(db.inner().get_connection()) .await .unwrap() .unwrap(); @@ -845,7 +1207,6 @@ mod test { #[tokio::test] async fn test_prepare_on_startup() { let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 1024]; @@ -857,38 +1218,35 @@ mod test { BatchCommitData { index: 1, block_number: 10, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_1 = BlockInfo { number: 1, hash: B256::arbitrary(&mut u).unwrap() }; let block_2 = BlockInfo { number: 2, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_data_1.clone()).await.unwrap(); - tx.insert_block(block_1, batch_data_1.clone().into()).await.unwrap(); - tx.insert_block(block_2, batch_data_1.clone().into()).await.unwrap(); + db.insert_batch(batch_data_1.clone()).await.unwrap(); + db.insert_block(block_1, batch_data_1.clone().into()).await.unwrap(); + db.insert_block(block_2, batch_data_1.clone().into()).await.unwrap(); // Insert batch 2 and associate it with one block in the database let batch_data_2 = BatchCommitData { index: 2, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_3 = BlockInfo { number: 3, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_data_2.clone()).await.unwrap(); - tx.insert_block(block_3, batch_data_2.clone().into()).await.unwrap(); + db.insert_batch(batch_data_2.clone()).await.unwrap(); + db.insert_block(block_3, batch_data_2.clone().into()).await.unwrap(); // Insert batch 3 produced at the same block number as batch 2 and associate it with one // block let batch_data_3 = BatchCommitData { index: 3, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_4 = BlockInfo { number: 4, hash: B256::arbitrary(&mut u).unwrap() }; - tx.insert_batch(batch_data_3.clone()).await.unwrap(); - tx.insert_block(block_4, batch_data_3.clone().into()).await.unwrap(); + db.insert_batch(batch_data_3.clone()).await.unwrap(); + db.insert_block(block_4, batch_data_3.clone().into()).await.unwrap(); - tx.set_finalized_l1_block_number(21).await.unwrap(); - tx.commit().await.unwrap(); + db.set_finalized_l1_block_number(21).await.unwrap(); // Verify the batches and blocks were inserted correctly - let tx = db.tx().await.unwrap(); - let retrieved_batch_1 = tx.get_batch_by_index(1).await.unwrap().unwrap(); - let retrieved_batch_2 = tx.get_batch_by_index(2).await.unwrap().unwrap(); - let retrieved_batch_3 = tx.get_batch_by_index(3).await.unwrap().unwrap(); - let retried_block_1 = tx.get_l2_block_info_by_number(1).await.unwrap().unwrap(); - let retried_block_2 = tx.get_l2_block_info_by_number(2).await.unwrap().unwrap(); - let retried_block_3 = tx.get_l2_block_info_by_number(3).await.unwrap().unwrap(); - let retried_block_4 = tx.get_l2_block_info_by_number(4).await.unwrap().unwrap(); - drop(tx); + let retrieved_batch_1 = db.get_batch_by_index(1).await.unwrap().unwrap(); + let retrieved_batch_2 = db.get_batch_by_index(2).await.unwrap().unwrap(); + let retrieved_batch_3 = db.get_batch_by_index(3).await.unwrap().unwrap(); + let retried_block_1 = db.get_l2_block_info_by_number(1).await.unwrap().unwrap(); + let retried_block_2 = db.get_l2_block_info_by_number(2).await.unwrap().unwrap(); + let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap().unwrap(); + let retried_block_4 = db.get_l2_block_info_by_number(4).await.unwrap().unwrap(); assert_eq!(retrieved_batch_1, batch_data_1); assert_eq!(retrieved_batch_2, batch_data_2); @@ -899,25 +1257,22 @@ mod test { assert_eq!(retried_block_4, block_4); // Call prepare_on_startup which should not error - let tx = db.tx_mut().await.unwrap(); - let result = tx.prepare_on_startup(Default::default()).await.unwrap(); - tx.commit().await.unwrap(); + let result = db.prepare_on_startup(Default::default()).await.unwrap(); // verify the result assert_eq!(result, (Some(block_2), Some(11))); // Verify that batches 2 and 3 are deleted - let tx = db.tx().await.unwrap(); - let batch_1 = tx.get_batch_by_index(1).await.unwrap(); - let batch_2 = tx.get_batch_by_index(2).await.unwrap(); - let batch_3 = tx.get_batch_by_index(3).await.unwrap(); + let batch_1 = db.get_batch_by_index(1).await.unwrap(); + let batch_2 = db.get_batch_by_index(2).await.unwrap(); + let batch_3 = db.get_batch_by_index(3).await.unwrap(); assert!(batch_1.is_some()); assert!(batch_2.is_none()); assert!(batch_3.is_none()); // Verify that blocks 3 and 4 are deleted - let retried_block_3 = tx.get_l2_block_info_by_number(3).await.unwrap(); - let retried_block_4 = tx.get_l2_block_info_by_number(4).await.unwrap(); + let retried_block_3 = db.get_l2_block_info_by_number(3).await.unwrap(); + let retried_block_4 = db.get_l2_block_info_by_number(4).await.unwrap(); assert!(retried_block_3.is_none()); assert!(retried_block_4.is_none()); } @@ -926,7 +1281,6 @@ mod test { async fn test_l2_block_head_roundtrip() { // Set up the test database. let db = setup_test_db().await; - let tx = db.tx_mut().await.unwrap(); // Generate unstructured bytes. let mut bytes = [0u8; 40]; @@ -935,12 +1289,10 @@ mod test { // Generate and insert a block info as the head. let block_info = BlockInfo::arbitrary(&mut u).unwrap(); - tx.set_l2_head_block_number(block_info.number).await.unwrap(); - tx.commit().await.unwrap(); + db.set_l2_head_block_number(block_info.number).await.unwrap(); // Retrieve and verify the head block info. - let tx = db.tx().await.unwrap(); - let head_block_info = tx.get_l2_head_block_number().await.unwrap(); + let head_block_info = db.get_l2_head_block_number().await.unwrap(); assert_eq!(head_block_info, block_info.number); } diff --git a/crates/database/db/src/error.rs b/crates/database/db/src/error.rs index f7bf3f32..d77a1031 100644 --- a/crates/database/db/src/error.rs +++ b/crates/database/db/src/error.rs @@ -19,4 +19,10 @@ pub enum DatabaseError { /// The L1 message was not found in database. #[error("L1 message at key [{0}] not found in database")] L1MessageNotFound(L1MessageKey), + /// Failed to commit the transaction to database. + #[error("TXMut commit failed")] + CommitFailed, + /// Failed to rollback the transaction. + #[error("TXMut rollback failed")] + RollbackFailed, } diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index 6470f5f4..1da3ff4b 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -19,6 +19,11 @@ pub use operations::{ DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, NotIncludedStart, UnwindResult, }; +pub use sea_orm::EntityTrait; + +mod service; +pub use service::{CanRetry, DatabaseServiceError}; + mod transaction; pub use transaction::{DatabaseTransactionProvider, TXMut, TX}; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index a40178f1..09ec3ed0 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -2,7 +2,6 @@ use super::{models, DatabaseError}; use crate::{ReadConnectionProvider, WriteConnectionProvider}; use alloy_primitives::{Signature, B256}; -use futures::{Stream, StreamExt}; use rollup_node_primitives::{ BatchCommitData, BatchConsolidationOutcome, BatchInfo, BlockInfo, L1MessageEnvelope, L2BlockInfoWithL1Messages, Metadata, @@ -17,8 +16,141 @@ use std::fmt; /// The [`DatabaseWriteOperations`] trait provides write methods for interacting with the /// database. #[async_trait::async_trait] -pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperations { +#[auto_impl::auto_impl(Arc)] +pub trait DatabaseWriteOperations { /// Insert a [`BatchCommitData`] into the database. + async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError>; + + /// Finalizes all [`BatchCommitData`] up to the provided `batch_index` by setting their + /// finalized block number to the provided block number. + async fn finalize_batches_up_to_index( + &self, + batch_index: u64, + block_number: u64, + ) -> Result<(), DatabaseError>; + + /// Set the latest L1 block number. + async fn set_latest_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError>; + + /// Set the finalized L1 block number. + async fn set_finalized_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError>; + + /// Set the processed L1 block number. + async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError>; + + /// Set the L2 head block number. + async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError>; + + /// Fetches unprocessed batches up to the provided finalized L1 block number and updates their + /// status. + async fn fetch_and_update_unprocessed_finalized_batches( + &self, + finalized_l1_block_number: u64, + ) -> Result, DatabaseError>; + + /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. + async fn delete_batches_gt_block_number(&self, block_number: u64) + -> Result; + + /// Delete all [`BatchCommitData`]s with a batch index greater than the provided index. + async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result; + + /// Insert an [`L1MessageEnvelope`] into the database. + async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError>; + + /// Delete all [`L1MessageEnvelope`]s with a block number greater than the provided block + /// number and return them. + async fn delete_l1_messages_gt( + &self, + l1_block_number: u64, + ) -> Result, DatabaseError>; + + /// Prepare the database on startup and return metadata used for other components in the + /// rollup-node. + /// + /// This method first unwinds the database to the finalized L1 block. It then fetches the batch + /// info for the latest safe L2 block. It takes note of the L1 block number at which + /// this batch was produced (currently the finalized block for the batch until we implement + /// issue #273). It then retrieves the latest block for the previous batch (i.e., the batch + /// before the latest safe block). It returns a tuple of this latest fetched block and the + /// L1 block number of the batch. + async fn prepare_on_startup( + &self, + genesis_hash: B256, + ) -> Result<(Option, Option), DatabaseError>; + + /// Delete all L2 blocks with a block number greater than the provided block number. + async fn delete_l2_blocks_gt_block_number( + &self, + block_number: u64, + ) -> Result; + + /// Delete all L2 blocks with a batch index greater than the batch index. + async fn delete_l2_blocks_gt_batch_index(&self, batch_index: u64) + -> Result; + + /// Insert multiple blocks into the database. + async fn insert_blocks( + &self, + blocks: Vec, + batch_info: BatchInfo, + ) -> Result<(), DatabaseError>; + + /// Insert a new block in the database. + async fn insert_block( + &self, + block_info: BlockInfo, + batch_info: BatchInfo, + ) -> Result<(), DatabaseError>; + + /// Insert the genesis block into the database. + async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError>; + + /// Update the executed L1 messages from the provided L2 blocks in the database. + async fn update_l1_messages_from_l2_blocks( + &self, + blocks: Vec, + ) -> Result<(), DatabaseError>; + + /// Update the executed L1 messages with the provided L2 block number in the database. + async fn update_l1_messages_with_l2_block( + &self, + block_info: L2BlockInfoWithL1Messages, + ) -> Result<(), DatabaseError>; + + /// Purge all L1 message to L2 block mappings from the database for blocks greater or equal to + /// the provided block number. If the no block number is provided, purge mappings for all + /// unsafe blocks. + async fn purge_l1_message_to_l2_block_mappings( + &self, + block_number: Option, + ) -> Result<(), DatabaseError>; + + /// Insert the outcome of a batch consolidation into the database. + async fn insert_batch_consolidation_outcome( + &self, + outcome: BatchConsolidationOutcome, + ) -> Result<(), DatabaseError>; + + /// Unwinds the chain orchestrator by deleting all indexed data greater than the provided L1 + /// block number. + async fn unwind( + &self, + genesis_hash: B256, + l1_block_number: u64, + ) -> Result; + + /// Store a block signature in the database. + /// TODO: remove this once we deprecated l2geth. + async fn insert_signature( + &self, + block_hash: B256, + signature: Signature, + ) -> Result<(), DatabaseError>; +} + +#[async_trait::async_trait] +impl DatabaseWriteOperations for T { async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database."); let batch_commit: models::batch_commit::ActiveModel = batch_commit.into(); @@ -40,8 +172,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } - /// Finalizes all [`BatchCommitData`] up to the provided `batch_index` by setting their - /// finalized block number to the provided block number. async fn finalize_batches_up_to_index( &self, batch_index: u64, @@ -65,7 +195,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Set the latest L1 block number. async fn set_latest_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", block_number, "Updating the latest L1 block number in the database."); let metadata: models::metadata::ActiveModel = @@ -81,7 +210,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } - /// Set the finalized L1 block number. async fn set_finalized_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", block_number, "Updating the finalized L1 block number in the database."); let metadata: models::metadata::ActiveModel = @@ -98,7 +226,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } - /// Set the processed L1 block number. async fn set_processed_l1_block_number(&self, block_number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", block_number, "Updating the processed L1 block number in the database."); let metadata: models::metadata::ActiveModel = @@ -115,7 +242,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } - /// Set the L2 head block number. async fn set_l2_head_block_number(&self, number: u64) -> Result<(), DatabaseError> { tracing::trace!(target: "scroll::db", ?number, "Updating the L2 head block number in the database."); let metadata: models::metadata::ActiveModel = @@ -131,8 +257,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|_| ())?) } - /// Fetches unprocessed batches up to the provided finalized L1 block number and updates their - /// status. async fn fetch_and_update_unprocessed_finalized_batches( &self, finalized_l1_block_number: u64, @@ -166,7 +290,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(batches) } - /// Delete all [`BatchCommitData`]s with a block number greater than the provided block number. async fn delete_batches_gt_block_number( &self, block_number: u64, @@ -179,7 +302,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|x| x.rows_affected)?) } - /// Delete all [`BatchCommitData`]s with a batch index greater than the provided index. async fn delete_batches_gt_batch_index(&self, batch_index: u64) -> Result { tracing::trace!(target: "scroll::db", batch_index, "Deleting batch inputs greater than batch index."); Ok(models::batch_commit::Entity::delete_many() @@ -189,7 +311,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|x| x.rows_affected)?) } - /// Insert an [`L1MessageEnvelope`] into the database. async fn insert_l1_message(&self, l1_message: L1MessageEnvelope) -> Result<(), DatabaseError> { let l1_index = l1_message.transaction.queue_index; tracing::trace!(target: "scroll::db", queue_index = l1_index, "Inserting L1 message into database."); @@ -208,8 +329,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati } } - /// Delete all [`L1MessageEnvelope`]s with a block number greater than the provided block - /// number and return them. async fn delete_l1_messages_gt( &self, l1_block_number: u64, @@ -232,15 +351,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(removed_messages.into_iter().map(Into::into).collect()) } - /// Prepare the database on startup and return metadata used for other components in the - /// rollup-node. - /// - /// This method first unwinds the database to the finalized L1 block. It then fetches the batch - /// info for the latest safe L2 block. It takes note of the L1 block number at which - /// this batch was produced (currently the finalized block for the batch until we implement - /// issue #273). It then retrieves the latest block for the previous batch (i.e., the batch - /// before the latest safe block). It returns a tuple of this latest fetched block and the - /// L1 block number of the batch. async fn prepare_on_startup( &self, genesis_hash: B256, @@ -272,7 +382,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok((Some(block_info), Some(batch.block_number.saturating_add(1)))) } - /// Delete all L2 blocks with a block number greater than the provided block number. async fn delete_l2_blocks_gt_block_number( &self, block_number: u64, @@ -285,7 +394,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|x| x.rows_affected)?) } - /// Delete all L2 blocks with a batch index greater than the batch index. async fn delete_l2_blocks_gt_batch_index( &self, batch_index: u64, @@ -302,7 +410,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati .map(|x| x.rows_affected)?) } - /// Insert multiple blocks into the database. async fn insert_blocks( &self, blocks: Vec, @@ -314,7 +421,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Insert a new block in the database. async fn insert_block( &self, block_info: BlockInfo, @@ -346,14 +452,12 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Insert the genesis block into the database. async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> { let genesis_block = BlockInfo::new(0, genesis_hash); let genesis_batch = BatchInfo::new(0, B256::ZERO); self.insert_block(genesis_block, genesis_batch).await } - /// Update the executed L1 messages from the provided L2 blocks in the database. async fn update_l1_messages_from_l2_blocks( &self, blocks: Vec, @@ -371,7 +475,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Update the executed L1 messages with the provided L2 block number in the database. async fn update_l1_messages_with_l2_block( &self, block_info: L2BlockInfoWithL1Messages, @@ -397,9 +500,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Purge all L1 message to L2 block mappings from the database for blocks greater or equal to - /// the provided block number. If the no block number is provided, purge mappings for all - /// unsafe blocks. async fn purge_l1_message_to_l2_block_mappings( &self, block_number: Option, @@ -423,7 +523,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Insert the outcome of a batch consolidation into the database. async fn insert_batch_consolidation_outcome( &self, outcome: BatchConsolidationOutcome, @@ -435,8 +534,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(()) } - /// Unwinds the chain orchestrator by deleting all indexed data greater than the provided L1 - /// block number. async fn unwind( &self, genesis_hash: B256, @@ -490,8 +587,6 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati Ok(UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info }) } - /// Store a block signature in the database. - /// TODO: remove this once we deprecated l2geth. async fn insert_signature( &self, block_hash: B256, @@ -518,8 +613,81 @@ pub trait DatabaseWriteOperations: WriteConnectionProvider + DatabaseReadOperati /// The [`DatabaseReadOperations`] trait provides read-only methods for interacting with the /// database. #[async_trait::async_trait] -pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { +#[auto_impl::auto_impl(Arc)] +pub trait DatabaseReadOperations { /// Get a [`BatchCommitData`] from the database by its batch index. + async fn get_batch_by_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError>; + + /// Get the latest L1 block number from the database. + async fn get_latest_l1_block_number(&self) -> Result; + + /// Get the finalized L1 block number from the database. + async fn get_finalized_l1_block_number(&self) -> Result; + + /// Get the processed L1 block number from the database. + async fn get_processed_l1_block_number(&self) -> Result; + + /// Get the latest L2 head block info. + async fn get_l2_head_block_number(&self) -> Result; + + /// Get a vector of n [`L1MessageEnvelope`]s in the database starting from the provided `start` + /// point. + async fn get_n_l1_messages( + &self, + start: Option, + n: usize, + ) -> Result, DatabaseError>; + + /// Get the extra data for n block, starting at the provided block number. + async fn get_n_l2_block_data_hint( + &self, + block_number: u64, + n: usize, + ) -> Result, DatabaseError>; + + /// Get the [`BlockInfo`] and optional [`BatchInfo`] for the provided block hash. + async fn get_l2_block_and_batch_info_by_hash( + &self, + block_hash: B256, + ) -> Result, DatabaseError>; + + /// Get a [`BlockInfo`] from the database by its block number. + async fn get_l2_block_info_by_number( + &self, + block_number: u64, + ) -> Result, DatabaseError>; + + /// Get the latest safe/finalized L2 ([`BlockInfo`], [`BatchInfo`]) from the database. Until we + /// update the batch handling logic with issue #273, we don't differentiate between safe and + /// finalized l2 blocks. + async fn get_latest_safe_l2_info( + &self, + ) -> Result, DatabaseError>; + + /// Returns the highest L2 block originating from the provided `batch_hash` or the highest block + /// for the batch's index. + async fn get_highest_block_for_batch_hash( + &self, + batch_hash: B256, + ) -> Result, DatabaseError>; + + /// Returns the highest L2 block originating from the provided `batch_index` or the highest + /// block for the batch's index. + async fn get_highest_block_for_batch_index( + &self, + batch_index: u64, + ) -> Result, DatabaseError>; + + /// Get a block signature from the database by block hash. + /// TODO: remove this once we deprecated l2geth. + async fn get_signature(&self, block_hash: B256) -> Result, DatabaseError>; +} + +#[async_trait::async_trait] +impl DatabaseReadOperations for T { async fn get_batch_by_index( &self, batch_index: u64, @@ -532,7 +700,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .map(|x| x.map(Into::into))?) } - /// Get the latest L1 block number from the database. async fn get_latest_l1_block_number(&self) -> Result { Ok(models::metadata::Entity::find() .filter(models::metadata::Column::Key.eq("l1_latest_block")) @@ -546,7 +713,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .expect("l1_latest_block should always be a valid u64")) } - /// Get the finalized L1 block number from the database. async fn get_finalized_l1_block_number(&self) -> Result { Ok(models::metadata::Entity::find() .filter(models::metadata::Column::Key.eq("l1_finalized_block")) @@ -560,7 +726,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .expect("l1_finalized_block should always be a valid u64")) } - /// Get the processed L1 block number from the database. async fn get_processed_l1_block_number(&self) -> Result { Ok(models::metadata::Entity::find() .filter(models::metadata::Column::Key.eq("l1_processed_block")) @@ -574,7 +739,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .expect("l1_processed_block should always be a valid u64")) } - /// Get the latest L2 head block info. async fn get_l2_head_block_number(&self) -> Result { Ok(models::metadata::Entity::find() .filter(models::metadata::Column::Key.eq("l2_head_block")) @@ -588,79 +752,23 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .expect("l2_head_block should always be a valid u64")) } - /// Get an iterator over all [`BatchCommitData`]s in the database. - async fn get_batches<'a>( - &'a self, - ) -> Result> + 'a, DatabaseError> - { - Ok(models::batch_commit::Entity::find() - .stream(self.get_connection()) - .await? - .map(|res| Ok(res.map(Into::into)?))) - } - - /// Get a [`L1MessageEnvelope`] from the database by its message queue index. - async fn get_l1_message_by_index( - &self, - queue_index: u64, - ) -> Result, DatabaseError> { - Ok(models::l1_message::Entity::find_by_id(queue_index as i64) - .one(self.get_connection()) - .await - .map(|x| x.map(Into::into))?) - } - - /// Get a [`L1MessageEnvelope`] from the database by its message queue hash. - async fn get_l1_message_by_hash( - &self, - queue_hash: B256, - ) -> Result, DatabaseError> { - Ok(models::l1_message::Entity::find() - .filter( - Condition::all() - .add(models::l1_message::Column::QueueHash.is_not_null()) - .add(models::l1_message::Column::QueueHash.eq(queue_hash.to_vec())), - ) - .one(self.get_connection()) - .await - .map(|x| x.map(Into::into))?) - } - - /// Gets the latest L1 messages which has an associated L2 block number if any. - async fn get_latest_executed_l1_message( + async fn get_n_l1_messages( &self, - ) -> Result, DatabaseError> { - Ok(models::l1_message::Entity::find() - .filter(models::l1_message::Column::L2BlockNumber.is_not_null()) - .order_by_desc(models::l1_message::Column::L2BlockNumber) - .order_by_desc(models::l1_message::Column::QueueIndex) - .one(self.get_connection()) - .await? - .map(Into::into)) - } - - /// Get an iterator over all [`L1MessageEnvelope`]s in the database starting from the provided - /// `start` point. - async fn get_l1_messages<'a>( - &'a self, start: Option, - ) -> Result< - Option> + 'a>, - DatabaseError, - > { + n: usize, + ) -> Result, DatabaseError> { match start { - // Provides an stream over all L1 messages with increasing queue index starting from the - // provided queue index. - Some(L1MessageKey::QueueIndex(queue_index)) => Ok(Some( - models::l1_message::Entity::find() - .filter(models::l1_message::Column::QueueIndex.gte(queue_index)) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )), - // Provides a stream over all L1 messages with increasing queue index starting from the - // message with the provided transaction hash. + // Provides n L1 messages with increasing queue index starting from the provided queue + // index. + Some(L1MessageKey::QueueIndex(queue_index)) => Ok(models::l1_message::Entity::find() + .filter(models::l1_message::Column::QueueIndex.gte(queue_index)) + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?), + // Provides n L1 messages with increasing queue index starting from the message with the + // provided transaction hash. Some(L1MessageKey::TransactionHash(ref h)) => { // Lookup message by hash to get its queue index. let record = models::l1_message::Entity::find() @@ -670,18 +778,17 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .ok_or_else(|| { DatabaseError::L1MessageNotFound(L1MessageKey::TransactionHash(*h)) })?; - // Yield a stream of messages starting from the found queue index. - Ok(Some( - models::l1_message::Entity::find() - .filter(models::l1_message::Column::QueueIndex.gte(record.queue_index)) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) + // Yield n messages starting from the found queue index. + Ok(models::l1_message::Entity::find() + .filter(models::l1_message::Column::QueueIndex.gte(record.queue_index)) + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) } - // Provides a stream over all L1 messages with increasing queue index starting from the - // message with the provided queue hash. + // Provides n L1 messages with increasing queue index starting from the message with + // the provided queue hash. Some(L1MessageKey::QueueHash(ref h)) => { // Lookup message by queue hash. let record = models::l1_message::Entity::find() @@ -693,20 +800,19 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .one(self.get_connection()) .await? .ok_or_else(|| DatabaseError::L1MessageNotFound(L1MessageKey::QueueHash(*h)))?; - // Yield a stream of messages starting from the found queue index. - Ok(Some( - models::l1_message::Entity::find() - .filter(models::l1_message::Column::QueueIndex.gte(record.queue_index)) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) + // Yield n messages starting from the found queue index. + Ok(models::l1_message::Entity::find() + .filter(models::l1_message::Column::QueueIndex.gte(record.queue_index)) + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) } - // Provides a stream over all L1 messages with increasing queue index starting from the - // message included in the provided L2 block number. + // Provides n L1 messages with increasing queue index starting from the message included + // in the provided L2 block number. Some(L1MessageKey::BlockNumber(block_number)) => { - // Lookup the the latest message included in a block with a block number less than + // Lookup the latest message included in a block with a block number less than // the provided block number. This is achieved by filtering for messages with a // block number less than the provided block number and ordering by block number and // queue index in descending order. This ensures that we get the latest message @@ -718,30 +824,28 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .one(self.get_connection()) .await? { - // Yield a stream of messages starting from the found queue index + 1. - Ok(Some( - models::l1_message::Entity::find() - .filter( - // We add 1 to the queue index to constrain across block boundaries - models::l1_message::Column::QueueIndex.gte(record.queue_index + 1), - ) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) + // Yield n messages starting from the found queue index + 1. + Ok(models::l1_message::Entity::find() + .filter( + // We add 1 to the queue index to constrain across block boundaries + models::l1_message::Column::QueueIndex.gte(record.queue_index + 1), + ) + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) } // If no messages have been found then it suggests that no messages have been - // included in blocks yet and as such we should return a stream of all messages with - // increasing queue index starting from the beginning. + // included in blocks yet and as such we should n messages with increasing queue + // index starting from the beginning. else { - Ok(Some( - models::l1_message::Entity::find() - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) + Ok(models::l1_message::Entity::find() + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) } } // Provides a stream over all L1 messages with increasing queue index starting that have @@ -753,12 +857,12 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { // Calculate the target block number by subtracting the depth from the finalized // block number. If the depth is greater than the finalized block number, we return - // None as there are no messages that satisfy the condition. + // an empty vector as there are no messages that satisfy the condition. let target_block_number = if let Some(target_block_number) = finalized_block_number.checked_sub(depth) { target_block_number } else { - return Ok(None); + return Ok(vec![]); }; // Create a filter condition for messages that have an L1 block number less than or @@ -767,19 +871,17 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { let condition = Condition::all() .add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64)) .add(models::l1_message::Column::L2BlockNumber.is_null()); - // Yield a stream of messages matching the condition ordered by increasing queue - // index. - Ok(Some( - models::l1_message::Entity::find() - .filter(condition) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) + // Yield n messages matching the condition ordered by increasing queue index. + Ok(models::l1_message::Entity::find() + .filter(condition) + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) } - // Provides a stream over all L1 messages with increasing queue index starting that have - // not been included in an L2 block and have a block number less than or equal to the + // Provides N L1 messages with increasing queue index starting that have not been + // included in an L2 block and have a block number less than or equal to the // latest L1 block number minus the provided depth (they have been sufficiently deep // on L1 to be considered safe to include - reorg risk is low). Some(L1MessageKey::NotIncluded(NotIncludedStart::BlockDepth(depth))) => { @@ -787,13 +889,13 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { let latest_block_number = self.get_latest_l1_block_number().await?; // Calculate the target block number by subtracting the depth from the latest block - // number. If the depth is greater than the latest block number, we return None as - // there are no messages that satisfy the condition. + // number. If the depth is greater than the latest block number, we return an empty + // vector as there are no messages that satisfy the condition. let target_block_number = if let Some(target_block_number) = latest_block_number.checked_sub(depth) { target_block_number } else { - return Ok(None); + return Ok(vec![]); }; // Create a filter condition for messages that have an L1 block number less than // or equal to the target block number and have not been included in an L2 block @@ -801,42 +903,40 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { let condition = Condition::all() .add(models::l1_message::Column::L1BlockNumber.lte(target_block_number as i64)) .add(models::l1_message::Column::L2BlockNumber.is_null()); - // Yield a stream of messages matching the condition ordered by increasing - // queue index. - Ok(Some( - models::l1_message::Entity::find() - .filter(condition) - .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )) - } - // Provides a stream over all L1 messages with increasing queue index starting from the - // beginning. - None => Ok(Some( - models::l1_message::Entity::find() + // Yield n messages matching the condition ordered by increasing queue index. + Ok(models::l1_message::Entity::find() + .filter(condition) .order_by_asc(models::l1_message::Column::QueueIndex) - .stream(self.get_connection()) - .await? - .map(map_l1_message_result), - )), + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?) + } + // Provides n L1 messages with increasing queue index starting from the beginning. + None => Ok(models::l1_message::Entity::find() + .order_by_asc(models::l1_message::Column::QueueIndex) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await + .map(map_l1_message_result)?), } } - /// Get the extra data for the provided block number. - async fn get_l2_block_data_hint( + async fn get_n_l2_block_data_hint( &self, block_number: u64, - ) -> Result, DatabaseError> { + n: usize, + ) -> Result, DatabaseError> { Ok(models::block_data::Entity::find() - .filter(models::block_data::Column::Number.eq(block_number as i64)) - .one(self.get_connection()) - .await - .map(|x| x.map(Into::into))?) + .filter(models::block_data::Column::Number.gte(block_number as i64)) + .limit(Some(n as u64)) + .all(self.get_connection()) + .await? + .into_iter() + .map(Into::into) + .collect()) } - /// Get the [`BlockInfo`] and optional [`BatchInfo`] for the provided block hash. async fn get_l2_block_and_batch_info_by_hash( &self, block_hash: B256, @@ -854,7 +954,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { })?) } - /// Get a [`BlockInfo`] from the database by its block number. async fn get_l2_block_info_by_number( &self, block_number: u64, @@ -871,9 +970,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { })?) } - /// Get the latest safe/finalized L2 ([`BlockInfo`], [`BatchInfo`]) from the database. Until we - /// update the batch handling logic with issue #273, we don't differentiate between safe and - /// finalized l2 blocks. async fn get_latest_safe_l2_info( &self, ) -> Result, DatabaseError> { @@ -886,20 +982,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .map(|x| x.map(|x| (x.block_info(), x.batch_info())))?) } - /// Get an iterator over all L2 blocks in the database starting from the most recent one. - async fn get_l2_blocks<'a>( - &'a self, - ) -> Result> + 'a, DatabaseError> { - tracing::trace!(target: "scroll::db", "Fetching L2 blocks from database."); - Ok(models::l2_block::Entity::find() - .order_by_desc(models::l2_block::Column::BlockNumber) - .stream(self.get_connection()) - .await? - .map(|res| Ok(res.map(|res| res.block_info())?))) - } - - /// Returns the highest L2 block originating from the provided `batch_hash` or the highest block - /// for the batch's index. async fn get_highest_block_for_batch_hash( &self, batch_hash: B256, @@ -924,8 +1006,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { } } - /// Returns the highest L2 block originating from the provided `batch_index` or the highest - /// block for the batch's index. async fn get_highest_block_for_batch_index( &self, batch_index: u64, @@ -938,8 +1018,6 @@ pub trait DatabaseReadOperations: ReadConnectionProvider + Sync { .map(|model| model.block_info())) } - /// Get a block signature from the database by block hash. - /// TODO: remove this once we deprecated l2geth. async fn get_signature(&self, block_hash: B256) -> Result, DatabaseError> { tracing::trace!(target: "scroll::db", block_hash = ?block_hash, "Retrieving block signature from database."); @@ -1010,10 +1088,8 @@ pub enum NotIncludedStart { BlockDepth(u64), } -fn map_l1_message_result( - res: Result, -) -> Result { - Ok(res.map(Into::into)?) +fn map_l1_message_result(res: Vec) -> Vec { + res.into_iter().map(Into::into).collect() } impl fmt::Display for L1MessageKey { @@ -1048,7 +1124,3 @@ pub struct UnwindResult { /// The L2 safe block info after the unwind. This is only populated if the L2 safe has reorged. pub l2_safe_block_info: Option, } - -impl DatabaseReadOperations for T where T: ReadConnectionProvider + ?Sized + Sync {} - -impl DatabaseWriteOperations for T where T: WriteConnectionProvider + ?Sized + Sync {} diff --git a/crates/database/db/src/service/mod.rs b/crates/database/db/src/service/mod.rs new file mode 100644 index 00000000..30267710 --- /dev/null +++ b/crates/database/db/src/service/mod.rs @@ -0,0 +1,57 @@ +use crate::{ + db::DatabaseInner, service::query::DatabaseQuery, DatabaseError, DatabaseTransactionProvider, +}; +use std::{fmt::Debug, sync::Arc}; + +pub(crate) mod query; + +pub(crate) mod retry; +pub use retry::CanRetry; + +/// Error type for database service operations that can be converted from [`DatabaseError`], +/// supports retry logic, and is thread-safe for async contexts. +pub trait DatabaseServiceError: From + CanRetry + Debug + Send + 'static {} +impl DatabaseServiceError for T where T: From + CanRetry + Debug + Send + 'static {} + +/// An implementer of the trait can make queries to the database. This trait is preferred over the +/// `tower::Service` because it doesn't require a mutable reference. +#[async_trait::async_trait] +#[auto_impl::auto_impl(&, Arc)] +pub(crate) trait DatabaseService: Clone + Send + Sync + 'static { + /// Call the database. + async fn call( + &self, + req: DatabaseQuery, + ) -> Result; +} + +#[async_trait::async_trait] +impl DatabaseService for Arc { + async fn call( + &self, + req: DatabaseQuery, + ) -> Result { + let db = self.clone(); + match req { + DatabaseQuery::Read(f) => { + let tx = Arc::new(db.tx().await?); + f(tx).await + } + DatabaseQuery::Write(f) => { + let tx = Arc::new(db.tx_mut().await?); + let res = f(tx.clone()).await; + + // The `WriteQuery` cannot clone the atomic reference to the transaction, or the + // below will fail, and we won't be able to commit/rollback the transaction. + let tx = Arc::try_unwrap(tx); + + if res.is_ok() { + tx.map(|tx| tx.commit()).map_err(|_| DatabaseError::CommitFailed)?.await?; + } else { + tx.map(|tx| tx.rollback()).map_err(|_| DatabaseError::RollbackFailed)?.await?; + } + res + } + } + } +} diff --git a/crates/database/db/src/service/query.rs b/crates/database/db/src/service/query.rs new file mode 100644 index 00000000..1b303e25 --- /dev/null +++ b/crates/database/db/src/service/query.rs @@ -0,0 +1,67 @@ +use crate::{TXMut, TX}; + +use std::{ + fmt::{Debug, Formatter}, + future::Future, + pin::Pin, + sync::Arc, +}; + +/// A boxed future which returns a database query result. +pub(crate) type BoxedFuture = Pin> + Send>>; + +/// A read query that uses a [`TX`] for the call. +pub(crate) type ReadQuery = Arc) -> BoxedFuture + Send + Sync>; + +/// A write query that uses a [`TXMut`] for the call. +pub(crate) type WriteQuery = Arc) -> BoxedFuture + Send + Sync>; + +/// A query to the database. +pub(crate) enum DatabaseQuery { + /// A read query to the database. + Read(ReadQuery), + /// A write query to the database. + Write(WriteQuery), +} + +impl Clone for DatabaseQuery { + fn clone(&self) -> Self { + match self { + Self::Read(f) => Self::Read(f.clone()), + Self::Write(f) => Self::Write(f.clone()), + } + } +} + +impl Debug for DatabaseQuery { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Read(_) => write!(f, "DatabaseQuery::Read"), + Self::Write(_) => write!(f, "DatabaseQuery::Write"), + } + } +} + +impl DatabaseQuery +where + T: Send + 'static, + Err: Send + 'static, +{ + /// Create a new read database query. + pub(crate) fn read(f: F) -> Self + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + Self::Read(Arc::new(move |tx| Box::pin(f(tx)))) + } + + /// Create a new write database query. + pub(crate) fn write(f: F) -> Self + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + Self::Write(Arc::new(move |tx| Box::pin(f(tx)))) + } +} diff --git a/crates/database/db/src/service/retry.rs b/crates/database/db/src/service/retry.rs new file mode 100644 index 00000000..9ea94f14 --- /dev/null +++ b/crates/database/db/src/service/retry.rs @@ -0,0 +1,113 @@ +use crate::{ + service::{query::DatabaseQuery, DatabaseService, DatabaseServiceError}, + DatabaseError, +}; +use metrics::Histogram; +use metrics_derive::Metrics; +use std::{fmt::Debug, time::Duration}; + +/// A type used for retrying transient failures in operations. +#[derive(Debug, Clone)] +pub(crate) struct Retry { + /// The inner service. + pub(crate) inner: S, + /// Maximum number of retry attempts. None means infinite retries + pub max_retries: Option, + /// Initial delay between retries in milliseconds + pub initial_delay_ms: u64, + /// Whether to use exponential backoff + pub exponential_backoff: bool, + /// Retry metrics. + metrics: RetryMetrics, +} + +/// Metrics for the retry service. +#[derive(Metrics, Clone)] +#[metrics(scope = "database_retry")] +struct RetryMetrics { + /// Number of database query attempts before a successful result. + #[metrics(describe = "Number of attempts before successful database query result")] + pub attempts_before_query_success: Histogram, +} + +impl Retry { + /// Creates a new [`Retry`] with the specified parameters. + pub(crate) fn new( + inner: S, + max_retries: Option, + initial_delay_ms: u64, + exponential_backoff: bool, + ) -> Self { + Self { + inner, + max_retries, + initial_delay_ms, + exponential_backoff, + metrics: RetryMetrics::default(), + } + } + + /// Creates a new [`Retry`] with default retry parameters. + pub(crate) fn new_with_default_config(inner: S) -> Self { + Self::new(inner, None, 50, false) + } +} + +#[async_trait::async_trait] +impl DatabaseService for Retry { + async fn call( + &self, + req: DatabaseQuery, + ) -> Result { + let inner = self.inner.clone(); + let this = self.clone(); + + let mut attempt: usize = 0; + + loop { + match inner.call(req.clone()).await { + Ok(result) => { + this.metrics.attempts_before_query_success.record(attempt as f64); + return Ok(result) + } + Err(error) => { + // If the error is not retryable, return immediately. + if !error.can_retry() { + return Err(error); + } + + if let Some(max_retries) = this.max_retries { + if attempt >= max_retries { + return Err(error); + } + } + + // Calculate delay for next retry + let delay_ms = if this.exponential_backoff { + this.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) + } else { + this.initial_delay_ms + }; + + attempt += 1; + tracing::debug!(target: "scroll::chain_orchestrator", ?error, attempt, delay_ms, "Retrying database query"); + + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + } + } + } +} + +/// A trait for errors that can indicate whether an operation can be retried. +pub trait CanRetry { + /// Returns true if the implementer can be retried. + fn can_retry(&self) -> bool; +} + +// Centralized retry classification impls +impl CanRetry for DatabaseError { + fn can_retry(&self) -> bool { + matches!(self, Self::DatabaseError(_) | Self::SqlxError(_)) + } +} diff --git a/crates/database/db/src/test_utils.rs b/crates/database/db/src/test_utils.rs index 488a604e..5b6250ba 100644 --- a/crates/database/db/src/test_utils.rs +++ b/crates/database/db/src/test_utils.rs @@ -14,6 +14,6 @@ pub async fn setup_test_db() -> Database { .tempdir() .expect("failed to create temp dir"); let db = Database::test(dir).await.unwrap(); - Migrator::::up(db.get_connection(), None).await.unwrap(); + Migrator::::up(db.inner().get_connection(), None).await.unwrap(); db } diff --git a/crates/derivation-pipeline/Cargo.toml b/crates/derivation-pipeline/Cargo.toml index ad20ce2a..0d28f167 100644 --- a/crates/derivation-pipeline/Cargo.toml +++ b/crates/derivation-pipeline/Cargo.toml @@ -26,7 +26,6 @@ scroll-codec.workspace = true scroll-db.workspace = true # misc -async-trait = { workspace = true, optional = true } futures.workspace = true metrics.workspace = true metrics-derive.workspace = true @@ -35,7 +34,6 @@ thiserror.workspace = true tracing.workspace = true [dev-dependencies] -async-trait.workspace = true alloy-primitives = { workspace = true, features = ["getrandom"] } criterion = { package = "codspeed-criterion-compat", version = "4.0.2", features = ["async", "async_tokio"] } eyre.workspace = true diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs index 92d06df4..4b510f25 100644 --- a/crates/derivation-pipeline/benches/pipeline.rs +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -11,8 +11,8 @@ use rollup_node_providers::{ }; use scroll_alloy_consensus::TxL1Message; use scroll_db::{ - test_utils::setup_test_db, Database, DatabaseReadOperations, DatabaseTransactionProvider, - DatabaseWriteOperations, + test_utils::setup_test_db, Database, DatabaseConnectionProvider, DatabaseWriteOperations, + EntityTrait, }; use scroll_derivation_pipeline::DerivationPipeline; use std::{collections::HashMap, future::Future, path::PathBuf, pin::Pin, sync::Arc}; @@ -26,11 +26,15 @@ fn setup_mock_provider( db: Arc, ) -> Pin>> + Send>> { Box::pin(async { - let tx = db.tx().await.expect("failed to get tx"); let mut blobs = HashMap::new(); - let mut batches = tx.get_batches().await.expect("failed to get batches stream"); + let db_inner = db.inner(); + let mut batches = scroll_db::batch_commit::Entity::find() + .stream(db_inner.get_connection()) + .await + .expect("failed to get batches stream"); while let Some(Ok(batch)) = batches.next().await { + let batch = Into::::into(batch); if let Some(blob_hash) = batch.blob_versioned_hash { blobs.insert( blob_hash, @@ -39,7 +43,7 @@ fn setup_mock_provider( } } - MockL1Provider { l1_messages_provider: db, blobs } + MockL1Provider { db, blobs } }) } @@ -71,7 +75,6 @@ async fn setup_pipeline( ) .unwrap(); - let tx = db.tx_mut().await.unwrap(); for (index, hash) in (BATCHES_START_INDEX..=BATCHES_STOP_INDEX).zip(blob_hashes.into_iter()) { let raw_calldata = std::fs::read(format!("./benches/testdata/calldata/calldata_batch_{index}.bin")) @@ -85,7 +88,7 @@ async fn setup_pipeline( blob_versioned_hash: Some(hash), finalized_block_number: None, }; - tx.insert_batch(batch_data).await.unwrap(); + db.insert_batch(batch_data).await.unwrap(); } // load messages in db. @@ -99,9 +102,8 @@ async fn setup_pipeline( .collect(); for message in l1_messages { - tx.insert_l1_message(message).await.unwrap(); + db.insert_l1_message(message).await.unwrap(); } - tx.commit().await.unwrap(); // construct the pipeline. let l1_provider = factory(db.clone()).await; diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 06d950f4..2bd4c8d8 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -8,10 +8,10 @@ use alloy_rpc_types_engine::PayloadAttributes; use core::{fmt::Debug, future::Future, pin::Pin, task::Poll}; use futures::{stream::FuturesOrdered, Stream, StreamExt}; use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope}; -use rollup_node_providers::{BlockDataProvider, L1Provider}; +use rollup_node_providers::L1Provider; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; use scroll_codec::{decoding::payload::PayloadData, Codec}; -use scroll_db::{Database, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey}; +use scroll_db::{Database, DatabaseReadOperations, L1MessageKey}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; mod data_source; @@ -205,7 +205,7 @@ where } fn derivation_future(&self, batch_info: Arc) -> DerivationPipelineFuture { - let database = self.database.clone(); + let db = self.database.clone(); let metrics = self.metrics.clone(); let provider = self.l1_provider.clone(); let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index; @@ -214,8 +214,7 @@ where let derive_start = Instant::now(); // get the batch commit data. - let tx = database.tx().await.map_err(|e| (batch_info.clone(), e.into()))?; - let batch = tx + let batch = db .get_batch_by_index(batch_info.index) .await .map_err(|err| (batch_info.clone(), err.into()))? @@ -225,7 +224,7 @@ where ))?; // derive the attributes and attach the corresponding batch info. - let result = derive(batch, provider, tx, l1_v2_message_queue_start_index) + let result = derive(batch, provider, db, l1_v2_message_queue_start_index) .await .map_err(|err| (batch_info.clone(), err))?; @@ -267,10 +266,10 @@ type DerivationPipelineFuture = Pin< /// Returns a [`BatchDerivationResult`] from the [`BatchCommitData`] by deriving the payload /// attributes for each L2 block in the batch. -pub async fn derive( +pub async fn derive( batch: BatchCommitData, l1_provider: L1P, - l2_provider: L2P, + db: DB, l1_v2_message_queue_start_index: u64, ) -> Result { // fetch the blob then decode the input batch. @@ -293,7 +292,14 @@ pub async fn derive Result, Self::Error> { - Ok(None) - } - } - const L1_MESSAGE_INDEX_33: L1MessageEnvelope = L1MessageEnvelope { l1_block_number: 717, l2_block_number: None, @@ -486,15 +474,13 @@ mod tests { blob_versioned_hash: None, finalized_block_number: None, }; - let tx = db.tx_mut().await?; - tx.insert_batch(batch_data).await?; + db.insert_batch(batch_data).await?; + // load message in db, leaving a l1 message missing. - tx.insert_l1_message(L1_MESSAGE_INDEX_33).await?; - tx.commit().await?; + db.insert_l1_message(L1_MESSAGE_INDEX_33).await?; // construct the pipeline. - let l1_messages_provider = db.clone(); - let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; + let mock_l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX).await; // as long as we don't call `push_batch`, pipeline should not return attributes. @@ -509,9 +495,7 @@ mod tests { // in a separate task, add the second l1 message. tokio::task::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - let tx = db.tx_mut().await.unwrap(); - tx.insert_l1_message(L1_MESSAGE_INDEX_34).await.unwrap(); - tx.commit().await.unwrap(); + db.insert_l1_message(L1_MESSAGE_INDEX_34).await.unwrap(); }); // check the correctness of the last attribute. @@ -556,18 +540,15 @@ mod tests { blob_versioned_hash: None, finalized_block_number: None, }; - let tx = db.tx_mut().await?; - tx.insert_batch(batch_data).await?; + db.insert_batch(batch_data).await?; // load messages in db. let l1_messages = vec![L1_MESSAGE_INDEX_33, L1_MESSAGE_INDEX_34]; for message in l1_messages { - tx.insert_l1_message(message).await?; + db.insert_l1_message(message).await?; } - tx.commit().await?; // construct the pipeline. - let l1_messages_provider = db.clone(); - let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; + let mock_l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX).await; // as long as we don't call `push_batch`, pipeline should not return attributes. @@ -614,17 +595,13 @@ mod tests { finalized_block_number: None, }; let l1_messages = vec![L1_MESSAGE_INDEX_33, L1_MESSAGE_INDEX_34]; - let tx = db.tx_mut().await?; for message in l1_messages { - tx.insert_l1_message(message).await?; + db.insert_l1_message(message).await?; } - tx.commit().await?; - let l1_messages_provider = db.clone(); - let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - let l2_provider = MockL2Provider; + let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; - let result = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; + let result = derive(batch_data, l1_provider, db, u64::MAX).await?; let attribute = result .attributes .iter() @@ -717,18 +694,14 @@ mod tests { }, }, ]; - let tx = db.tx_mut().await?; for message in l1_messages.clone() { - tx.insert_l1_message(message).await?; + db.insert_l1_message(message).await?; } - tx.commit().await?; - let l1_messages_provider = db.clone(); - let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - let l2_provider = MockL2Provider; + let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; // derive attributes and extract l1 messages. - let attributes = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; + let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .attributes .into_iter() @@ -774,18 +747,14 @@ mod tests { }) .collect::>(); - let tx = db.tx_mut().await?; for message in l1_messages.clone() { - tx.insert_l1_message(message).await?; + db.insert_l1_message(message).await?; } - tx.commit().await?; - let l1_messages_provider = db.clone(); - let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - let l2_provider = MockL2Provider; + let l1_provider = MockL1Provider { db: db.clone(), blobs: HashMap::new() }; // derive attributes and extract l1 messages. - let attributes = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; + let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .attributes .into_iter() @@ -886,23 +855,19 @@ mod tests { }, }, ]; - let tx = db.tx_mut().await?; for message in l1_messages { - tx.insert_l1_message(message).await?; + db.insert_l1_message(message).await?; } - tx.commit().await?; - let l1_messages_provider = db.clone(); let l1_provider = MockL1Provider { - l1_messages_provider, + db: db.clone(), blobs: HashMap::from([( batch_data.blob_versioned_hash.unwrap(), blob_path )]), }; - let l2_provider = MockL2Provider; - let attributes = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; + let attributes = derive(batch_data, l1_provider, db, u64::MAX).await?; let attribute = attributes.attributes.last().unwrap(); let expected = ScrollPayloadAttributes { diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 98339fa8..be3d48ac 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -43,7 +43,7 @@ use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; use scroll_db::{ - Database, DatabaseConnectionProvider, DatabaseReadOperations, DatabaseTransactionProvider, + Database, DatabaseConnectionProvider, DatabaseError, DatabaseReadOperations, DatabaseWriteOperations, }; use scroll_derivation_pipeline::DerivationPipeline; @@ -226,7 +226,7 @@ impl ScrollRollupNodeConfig { // Run the database migrations if let Some(named) = chain_spec.chain().named() { named - .migrate(db.get_connection(), self.test) + .migrate(db.inner().get_connection(), self.test) .await .expect("failed to perform migration"); } else { @@ -236,7 +236,7 @@ impl ScrollRollupNodeConfig { // This is a workaround due to the fact that sea orm migrations are static. // See https://github.com/scroll-tech/rollup-node/issues/297 for more details. scroll_migration::Migrator::::up( - db.get_connection(), + db.inner().get_connection(), None, ) .await @@ -244,11 +244,9 @@ impl ScrollRollupNodeConfig { // insert the custom chain genesis hash into the database let genesis_hash = chain_spec.genesis_hash(); - let tx = db.tx_mut().await?; - tx.insert_genesis_block(genesis_hash) + db.insert_genesis_block(genesis_hash) .await .expect("failed to insert genesis block (custom chain)"); - tx.commit().await?; tracing::info!(target: "scroll::node::args", ?genesis_hash, "Overwriting genesis hash for custom chain"); } @@ -260,18 +258,23 @@ impl ScrollRollupNodeConfig { let mut fcs = ForkchoiceState::from_provider(&l2_provider).await.unwrap_or_else(chain_spec_fcs); - // On startup we replay the latest batch of blocks from the database as such we set the safe - // block hash to the latest block hash associated with the previous consolidated - // batch in the database. - let tx = db.tx_mut().await?; - let (_startup_safe_block, l1_start_block_number) = - tx.prepare_on_startup(chain_spec.genesis_hash()).await?; - let l2_head_block_number = tx.get_l2_head_block_number().await?; - tx.purge_l1_message_to_l2_block_mappings(Some(l2_head_block_number + 1)).await?; - tx.commit().await?; + let genesis_hash = chain_spec.genesis_hash(); + let (l1_start_block_number, l2_head_block_number) = db + .tx_mut(move |tx| async move { + // On startup we replay the latest batch of blocks from the database as such we set + // the safe block hash to the latest block hash associated with the + // previous consolidated batch in the database. + let (_startup_safe_block, l1_start_block_number) = + tx.prepare_on_startup(genesis_hash).await?; + + let l2_head_block_number = tx.get_l2_head_block_number().await?; + tx.purge_l1_message_to_l2_block_mappings(Some(l2_head_block_number + 1)).await?; + + Ok::<_, DatabaseError>((l1_start_block_number, l2_head_block_number)) + }) + .await?; // Update the head block info if available and ahead of finalized. - let l2_head_block_number = db.tx().await?.get_l2_head_block_number().await?; if l2_head_block_number > fcs.finalized_block_info().number { let block = l2_provider .get_block(l2_head_block_number.into()) @@ -413,7 +416,7 @@ impl ScrollRollupNodeConfig { .await; let (chain_orchestrator, handle) = ChainOrchestrator::new( - db.clone(), + db, config, Arc::new(block_client), l2_provider, diff --git a/crates/node/src/builder/network.rs b/crates/node/src/builder/network.rs index 02b56dd0..23fa983c 100644 --- a/crates/node/src/builder/network.rs +++ b/crates/node/src/builder/network.rs @@ -17,9 +17,7 @@ use reth_transaction_pool::{PoolTransaction, TransactionPool}; use rollup_node_primitives::sig_encode_hash; use rollup_node_signer::SignatureAsBytes; use scroll_alloy_hardforks::ScrollHardforks; -use scroll_db::{ - Database, DatabaseReadOperations, DatabaseTransactionProvider, DatabaseWriteOperations, -}; +use scroll_db::{Database, DatabaseReadOperations, DatabaseWriteOperations}; use std::{fmt, fmt::Debug, sync::Arc}; use tracing::{debug, info, trace, warn}; @@ -237,29 +235,16 @@ impl tx - .get_signature(hash) - .await - .inspect_err(|e| { - warn!(target: "scroll::network::request_header_transform", + let signature = self.db.get_signature(hash).await.inspect_err(|e| { + warn!(target: "scroll::network::request_header_transform", "Failed to get block signature from database, block number: {:?}, header hash: {:?}, error: {}", header.number(), hash, HeaderTransformError::DatabaseError(e.to_string()) ) - }) - .ok() - .flatten(), - Err(e) => { - warn!(target: "scroll::network::request_header_transform", - "Failed to create database transaction to get block signature, header hash: {:?}, error: {}", - header.hash_slow(), - HeaderTransformError::DatabaseError(e.to_string()) - ); - return header; - } - }; + }) + .ok() + .flatten(); // If we have a signature in the database and it matches configured signer then add it // to the extra data field @@ -330,18 +315,8 @@ fn persist_signature(db: Arc, hash: B256, signature: Signature) { hash, signature.to_string() ); - match db.tx_mut().await { - Ok(tx) => { - if let Err(e) = tx.insert_signature(hash, signature).await { - warn!(target: "scroll::network::header_transform", "Failed to store signature in database: {:?}", e); - } - let _ = tx.commit().await.inspect_err(|e| { - warn!(target: "scroll::network::header_transform", "Failed to commit signature to database: {:?}", e); - }); - } - Err(e) => { - warn!(target: "scroll::network::header_transform", "Failed to create database transaction to store signature: {:?}", e); - } + if let Err(e) = db.insert_signature(hash, signature).await { + warn!(target: "scroll::network::header_transform", "Failed to store signature in database: {:?}", e); } }); } diff --git a/crates/providers/src/block.rs b/crates/providers/src/block.rs deleted file mode 100644 index 6ff514c6..00000000 --- a/crates/providers/src/block.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::L1ProviderError; - -use scroll_alloy_rpc_types_engine::BlockDataHint; -use scroll_db::{DatabaseError, DatabaseReadOperations, ReadConnectionProvider}; - -/// Trait implementers can return block data. -#[async_trait::async_trait] -pub trait BlockDataProvider { - /// The error type for the provider. - type Error: Into; - - /// Returns the block data for the provided block number. - async fn block_data(&self, block_number: u64) -> Result, Self::Error>; -} - -#[async_trait::async_trait] -impl BlockDataProvider for T -where - T: ReadConnectionProvider + Sync, -{ - type Error = DatabaseError; - - async fn block_data(&self, block_number: u64) -> Result, Self::Error> { - self.get_l2_block_data_hint(block_number).await - } -} diff --git a/crates/providers/src/l1/message.rs b/crates/providers/src/l1/message.rs index c594fc0f..19acef16 100644 --- a/crates/providers/src/l1/message.rs +++ b/crates/providers/src/l1/message.rs @@ -1,8 +1,7 @@ use crate::L1ProviderError; -use futures::{StreamExt, TryStreamExt}; use rollup_node_primitives::L1MessageEnvelope; -use scroll_db::{DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, L1MessageKey}; +use scroll_db::{DatabaseError, DatabaseReadOperations, L1MessageKey}; /// An instance of the trait can provide L1 messages iterators. #[async_trait::async_trait] @@ -30,7 +29,7 @@ pub trait L1MessageProvider: Send + Sync { #[async_trait::async_trait] impl L1MessageProvider for T where - T: DatabaseTransactionProvider + Send + Sync, + T: DatabaseReadOperations + Send + Sync, { type Error = DatabaseError; @@ -39,12 +38,6 @@ where start: L1MessageKey, n: u64, ) -> Result, Self::Error> { - let tx = self.tx().await?; - let messages = if let Some(stream) = tx.get_l1_messages(Some(start)).await? { - stream.take(n as usize).try_collect().await? - } else { - vec![] - }; - Ok(messages) + self.get_n_l1_messages(Some(start), n as usize).await } } diff --git a/crates/providers/src/lib.rs b/crates/providers/src/lib.rs index 19b85eab..644f93fc 100644 --- a/crates/providers/src/lib.rs +++ b/crates/providers/src/lib.rs @@ -4,9 +4,6 @@ use alloy_provider::RootProvider; use scroll_alloy_network::Scroll; -mod block; -pub use block::BlockDataProvider; - mod l1; pub use l1::{ blob::{ diff --git a/crates/providers/src/test_utils.rs b/crates/providers/src/test_utils.rs index 6f6dde42..4a61fa61 100644 --- a/crates/providers/src/test_utils.rs +++ b/crates/providers/src/test_utils.rs @@ -4,20 +4,20 @@ use crate::{BlobProvider, L1MessageProvider, L1ProviderError}; use alloy_eips::eip4844::Blob; use alloy_primitives::B256; use rollup_node_primitives::L1MessageEnvelope; -use scroll_db::L1MessageKey; +use scroll_db::{DatabaseError, DatabaseReadOperations, L1MessageKey}; use std::{collections::HashMap, path::PathBuf, sync::Arc}; /// Implementation of the [`crate::L1Provider`] that returns blobs from a file. #[derive(Clone, Default, Debug)] -pub struct MockL1Provider { - /// L1 message provider. - pub l1_messages_provider: P, +pub struct MockL1Provider { + /// Database. + pub db: DB, /// File blobs. pub blobs: HashMap, } #[async_trait::async_trait] -impl BlobProvider for MockL1Provider

{ +impl BlobProvider for MockL1Provider { async fn blob( &self, _block_timestamp: u64, @@ -36,14 +36,14 @@ impl BlobProvider for MockL1Provider

{ } #[async_trait::async_trait] -impl L1MessageProvider for MockL1Provider

{ - type Error = P::Error; +impl L1MessageProvider for MockL1Provider { + type Error = DatabaseError; async fn get_n_messages( &self, start: L1MessageKey, n: u64, ) -> Result, Self::Error> { - self.l1_messages_provider.get_n_messages(start, n).await + self.db.get_n_messages(start, n).await } } diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index 345ac0f7..0b8db65f 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -22,7 +22,7 @@ use rollup_node_sequencer::{ use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; use scroll_alloy_provider::ScrollAuthApiEngineClient; -use scroll_db::{test_utils::setup_test_db, DatabaseTransactionProvider, DatabaseWriteOperations}; +use scroll_db::{test_utils::setup_test_db, DatabaseWriteOperations}; use scroll_engine::{Engine, ForkchoiceState}; use std::{io::Write, path::PathBuf, sync::Arc}; use tempfile::NamedTempFile; @@ -57,9 +57,7 @@ async fn skip_block_with_no_transactions() { let provider = database.clone(); // Set the latest block number - let tx = provider.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(0).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(0).await.unwrap(); // create a sequencer let config = SequencerConfig { @@ -114,9 +112,7 @@ async fn can_build_blocks() { let provider = database.clone(); // Set the latest block number - let tx = provider.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(5).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(5).await.unwrap(); // create a sequencer let config = SequencerConfig { @@ -185,9 +181,7 @@ async fn can_build_blocks() { }; drop(wallet_lock); let l1_message_hash = l1_message.transaction.tx_hash(); - let tx = database.tx_mut().await.unwrap(); - tx.insert_l1_message(l1_message).await.unwrap(); - tx.commit().await.unwrap(); + database.insert_l1_message(l1_message).await.unwrap(); // sleep 2 seconds (ethereum header timestamp has granularity of seconds and proceeding header // must have a greater timestamp than the last) @@ -244,9 +238,7 @@ async fn can_build_blocks_with_delayed_l1_messages() { let provider = database.clone(); // Set the latest block number - let tx = provider.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(1).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(1).await.unwrap(); // create a sequencer let config = SequencerConfig { @@ -282,9 +274,7 @@ async fn can_build_blocks_with_delayed_l1_messages() { }; drop(wallet_lock); let l1_message_hash = l1_message.transaction.tx_hash(); - let tx = database.tx_mut().await.unwrap(); - tx.insert_l1_message(l1_message).await.unwrap(); - tx.commit().await.unwrap(); + database.insert_l1_message(l1_message).await.unwrap(); // add a transaction to the pool let mut wallet_lock = wallet.lock().await; @@ -322,9 +312,7 @@ async fn can_build_blocks_with_delayed_l1_messages() { tokio::time::sleep(std::time::Duration::from_secs(2)).await; // set the l1 block number to 3 - let tx = database.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(3).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(3).await.unwrap(); // send a new block request this block should include the L1 message sequencer.start_payload_building(&mut engine).await.unwrap(); @@ -373,9 +361,7 @@ async fn can_build_blocks_with_finalized_l1_messages() { let database = Arc::new(setup_test_db().await); let provider = database.clone(); - let tx = provider.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(5).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(5).await.unwrap(); // create a sequencer let config = SequencerConfig { @@ -394,9 +380,7 @@ async fn can_build_blocks_with_finalized_l1_messages() { let mut sequencer = Sequencer::new(provider, config); // set L1 finalized block number to 2 - let tx = database.tx_mut().await.unwrap(); - tx.set_finalized_l1_block_number(2).await.unwrap(); - tx.commit().await.unwrap(); + database.set_finalized_l1_block_number(2).await.unwrap(); // add L1 messages to database let wallet_lock = wallet.lock().await; @@ -435,10 +419,8 @@ async fn can_build_blocks_with_finalized_l1_messages() { let finalized_message_hash = finalized_l1_message.transaction.tx_hash(); let unfinalized_message_hash = unfinalized_l1_message.transaction.tx_hash(); - let tx = database.tx_mut().await.unwrap(); - tx.insert_l1_message(finalized_l1_message).await.unwrap(); - tx.insert_l1_message(unfinalized_l1_message).await.unwrap(); - tx.commit().await.unwrap(); + database.insert_l1_message(finalized_l1_message).await.unwrap(); + database.insert_l1_message(unfinalized_l1_message).await.unwrap(); // build payload, should only include finalized message sequencer.start_payload_building(&mut engine).await.unwrap(); @@ -458,14 +440,10 @@ async fn can_build_blocks_with_finalized_l1_messages() { assert!(!block.body.transactions.iter().any(|tx| tx.tx_hash() == &unfinalized_message_hash)); // Handle the build block with the sequencer in order to update L1 message queue index. - let tx = database.tx_mut().await.unwrap(); - tx.update_l1_messages_with_l2_block((&block).into()).await.unwrap(); - tx.commit().await.unwrap(); + database.update_l1_messages_with_l2_block((&block).into()).await.unwrap(); // update finalized block number to 3, now both messages should be available - let tx = database.tx_mut().await.unwrap(); - tx.set_finalized_l1_block_number(3).await.unwrap(); - tx.commit().await.unwrap(); + database.set_finalized_l1_block_number(3).await.unwrap(); // sleep 2 seconds (ethereum header timestamp has granularity of seconds and proceeding header // must have a greater timestamp than the last) @@ -893,10 +871,8 @@ async fn should_limit_l1_message_cumulative_gas() { let provider = database.clone(); // Set the latest and finalized block number - let tx = provider.tx_mut().await.unwrap(); - tx.set_latest_l1_block_number(5).await.unwrap(); - tx.set_finalized_l1_block_number(1).await.unwrap(); - tx.commit().await.unwrap(); + database.set_latest_l1_block_number(5).await.unwrap(); + database.set_finalized_l1_block_number(1).await.unwrap(); // create a sequencer let config = SequencerConfig { @@ -944,11 +920,9 @@ async fn should_limit_l1_message_cumulative_gas() { }, }, ]; - let tx = database.tx_mut().await.unwrap(); for l1_message in l1_messages { - tx.insert_l1_message(l1_message).await.unwrap(); + database.insert_l1_message(l1_message).await.unwrap(); } - tx.commit().await.unwrap(); // build payload, should only include first l1 message sequencer.start_payload_building(&mut engine).await.unwrap();