From 9d656092c3030afecad1b44c26fb73fed9a18407 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 02:55:08 +0800 Subject: [PATCH 1/9] feat: database support retry --- crates/chain-orchestrator/src/lib.rs | 203 +++++++++++++++------------ crates/database/db/src/db.rs | 2 +- crates/database/db/src/lib.rs | 3 + crates/database/db/src/retry.rs | 168 ++++++++++++++++++++++ 4 files changed, 288 insertions(+), 88 deletions(-) create mode 100644 crates/database/db/src/retry.rs diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index c898163a..490c52d8 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -18,8 +18,7 @@ use scroll_alloy_consensus::TxL1Message; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{ - Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, - DatabaseWriteOperations, L1MessageStart, UnwindResult, + retry_with_defaults, Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, DatabaseWriteOperations, L1MessageStart, UnwindResult }; use scroll_network::NewBlockWithPeer; use std::{ @@ -211,9 +210,12 @@ impl< let database = ctx.database.clone(); let block_info: L2BlockInfoWithL1Messages = (&block_with_peer.block).into(); Self::do_handle_block_from_peer(ctx, block_with_peer).await?; - let tx = database.tx_mut().await?; - tx.update_l1_messages_with_l2_block(block_info.clone()).await?; - tx.commit().await?; + retry_with_defaults("update_l1_messages_with_l2_block", || async { + let tx = database.tx_mut().await?; + tx.update_l1_messages_with_l2_block(block_info.clone()).await?; + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }).await?; Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true)) } @@ -292,10 +294,12 @@ impl< let mut received_chain_headers = VecDeque::from(vec![received_block.header.clone()]); // We should never have a re-org that is deeper than the current safe head. - let tx = database.tx().await?; - let (latest_safe_block, _) = - tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); - drop(tx); + let (latest_safe_block, _) = retry_with_defaults("get_latest_safe_l2_info", || async { + let tx = database.tx().await?; + let (latest_safe_block, batch_info) = + tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); + Ok::<_, DatabaseError>((latest_safe_block, batch_info)) + }).await?; // We search for the re-org index in the in-memory chain. const BATCH_FETCH_SIZE: usize = 50; @@ -455,11 +459,14 @@ impl< ChainOrchestratorItem::InsertConsolidatedL2Blocks, Box::pin(async move { let head = block_infos.last().expect("block info must not be empty").clone(); - let tx = database.tx_mut().await?; - for block in block_infos { - tx.insert_block(block, batch_info).await?; - } - tx.commit().await?; + retry_with_defaults("insert_block", || async { + let tx = database.tx_mut().await?; + for block in block_infos.clone() { + tx.insert_block(block, batch_info).await?; + } + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }).await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ConsolidatedBlockCommitted(head), )) @@ -503,9 +510,12 @@ impl< // Insert the blocks into the database. let head = block_info.last().expect("block info must not be empty").clone(); - let tx = database.tx_mut().await?; - tx.update_l1_messages_from_l2_blocks(block_info).await?; - tx.commit().await?; + retry_with_defaults("update_l1_messages_from_l2_blocks", || async { + let tx = database.tx_mut().await?; + tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?; + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }).await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ChainCommitted(head, None, consolidated), @@ -589,10 +599,13 @@ impl< l2_client: Arc

, current_chain: Arc>, ) -> Result, ChainOrchestratorError> { - let txn = database.tx_mut().await?; - let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = - txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; - txn.commit().await?; + let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = retry_with_defaults("unwind", || async { + let txn = database.tx_mut().await?; + let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = + txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; + txn.commit().await?; + Ok::<_, DatabaseError>(UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info }) + }).await?; let l2_head_block_info = if let Some(block_number) = l2_head_block_number { // Fetch the block hash of the new L2 head block. let block_hash = l2_client @@ -623,16 +636,21 @@ impl< block_number: u64, l1_block_number: Arc, ) -> Result, ChainOrchestratorError> { - let tx = database.tx_mut().await?; + let finalized_batches = retry_with_defaults("set_latest_finalized_l1_block_number", || async { + let tx = database.tx_mut().await?; - // Set the latest finalized L1 block in the database. - tx.set_latest_finalized_l1_block_number(block_number).await?; + // Set the latest finalized L1 block in the database. + tx.set_latest_finalized_l1_block_number(block_number).await?; + + // Get all unprocessed batches that have been finalized by this L1 block finalization. + let finalized_batches = + tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; + + tx.commit().await?; - // Get all unprocessed batches that have been finalized by this L1 block finalization. - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; + Ok::<_, DatabaseError>(finalized_batches) + }).await?; - tx.commit().await?; // Update the chain orchestrator L1 block number. l1_block_number.store(block_number, Ordering::Relaxed); @@ -654,17 +672,20 @@ impl< let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); // Perform a consistency check to ensure the previous L1 message exists in the database. - let tx = database.tx_mut().await?; - if l1_message.transaction.queue_index > 0 && - tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1).await?.is_none() - { - return Err(ChainOrchestratorError::L1MessageQueueGap( - l1_message.transaction.queue_index, - )) - } + let _ = retry_with_defaults("handle_l1_message", || async { + let tx = database.tx_mut().await?; + if l1_message.transaction.queue_index > 0 && + tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1).await?.is_none() + { + return Err(ChainOrchestratorError::L1MessageQueueGap( + l1_message.transaction.queue_index, + )) + } - tx.insert_l1_message(l1_message).await?; - tx.commit().await?; + tx.insert_l1_message(l1_message.clone()).await?; + tx.commit().await?; + Ok::<_, ChainOrchestratorError>(()) + }).await; Ok(Some(event)) } @@ -673,36 +694,41 @@ impl< database: Arc, batch: BatchCommitData, ) -> Result, ChainOrchestratorError> { - let tx = database.tx_mut().await?; - let prev_batch_index = batch.index - 1; - - // Perform a consistency check to ensure the previous commit batch exists in the database. - if tx.get_batch_by_index(prev_batch_index).await?.is_none() { - return Err(ChainOrchestratorError::BatchCommitGap(batch.index)) - } - - // remove any batches with an index greater than the previous batch. - let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; - - // handle the case of a batch revert. - let new_safe_head = if affected > 0 { - tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; - tx.get_highest_block_for_batch_index(prev_batch_index).await? - } else { - None - }; - - let event = ChainOrchestratorEvent::BatchCommitIndexed { - batch_info: BatchInfo::new(batch.index, batch.hash), - l1_block_number: batch.block_number, - safe_head: new_safe_head, - }; + let event = retry_with_defaults("handle_batch_commit", || async { + let tx = database.tx_mut().await?; + let batch_clone = batch.clone(); + let prev_batch_index = batch_clone.clone().index - 1; + + // Perform a consistency check to ensure the previous commit batch exists in the database. + if tx.get_batch_by_index(prev_batch_index).await?.is_none() { + return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)) + } + + // remove any batches with an index greater than the previous batch. + let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; + + // handle the case of a batch revert. + let new_safe_head = if affected > 0 { + tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; + tx.get_highest_block_for_batch_index(prev_batch_index).await? + } else { + None + }; + + let event = ChainOrchestratorEvent::BatchCommitIndexed { + batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), + l1_block_number: batch.block_number, + safe_head: new_safe_head, + }; + + // insert the batch and commit the transaction. + tx.insert_batch(batch_clone).await?; + tx.commit().await?; - // insert the batch and commit the transaction. - tx.insert_batch(batch).await?; - tx.commit().await?; + Ok::<_, ChainOrchestratorError>(Some(event)) + }).await?; - Ok(Some(event)) + Ok(event) } /// Handles a batch finalization event by updating the batch input in the database. @@ -712,22 +738,24 @@ impl< block_number: u64, finalized_block_number: Arc, ) -> Result, ChainOrchestratorError> { - let tx = database.tx_mut().await?; + retry_with_defaults("handle_batch_finalization", || async { + let tx = database.tx_mut().await?; - // finalize all batches up to `batch_index`. - tx.finalize_batches_up_to_index(batch_index, block_number).await?; + // finalize all batches up to `batch_index`. + tx.finalize_batches_up_to_index(batch_index, block_number).await?; - // Get all unprocessed batches that have been finalized by this L1 block finalization. - let finalized_block_number = finalized_block_number.load(Ordering::Relaxed); - if finalized_block_number >= block_number { - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number).await?; - tx.commit().await?; - return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches))) - } + // Get all unprocessed batches that have been finalized by this L1 block finalization. + let finalized_block_number = finalized_block_number.load(Ordering::Relaxed); + if finalized_block_number >= block_number { + let finalized_batches = + tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number).await?; + tx.commit().await?; + return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches))) + } - tx.commit().await?; - Ok(None) + tx.commit().await?; + Ok::<_, ChainOrchestratorError>(None) + }).await } } @@ -772,8 +800,8 @@ async fn init_chain_from_db + 'static>( ) -> Result, ChainOrchestratorError> { let blocks = { let mut blocks = Vec::with_capacity(chain_buffer_size); - let tx = database.tx().await?; - let blocks_stream = tx.get_l2_blocks().await?.take(chain_buffer_size); + let tx = retry_with_defaults("get_l2_blocks_new_tx", || database.tx()).await?; + let blocks_stream = retry_with_defaults("get_l2_blocks", || tx.get_l2_blocks()).await?.take(chain_buffer_size); pin_mut!(blocks_stream); while let Some(block_info) = blocks_stream.try_next().await? { let header = l2_client @@ -869,9 +897,11 @@ async fn consolidate_chain + 'static>( // Fetch the safe head from the database. We use this as a trust anchor to reconcile the chain // back to. - let tx = database.tx().await?; - let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; - drop(tx); + let safe_head = retry_with_defaults("get_latest_safe_l2_info", || async { + let tx = database.tx().await?; + let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; + Ok::<_, DatabaseError>(safe_head) + }).await?; // If the in-memory chain contains the safe head, we check if the safe hash from the // database (L1 consolidation) matches the in-memory value. If it does not match, we return an @@ -970,9 +1000,8 @@ async fn validate_l1_messages( // TODO: instead of using `l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))` to // determine the start of the L1 message stream, we should use a more robust method to determine // the start of the L1 message stream. - let tx = database.tx().await?; - let l1_message_stream = - tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))).await?; + let tx = retry_with_defaults("get_l1_messages_new_tx", || database.tx()).await?; + let l1_message_stream = retry_with_defaults("get_l1_messages", || tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx)))).await?; pin_mut!(l1_message_stream); for message_hash in l1_message_hashes { diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 26db30a1..5d13cc9c 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -136,7 +136,7 @@ impl From for Database { } } -#[cfg(test)] +#[cfg(all(test, feature = "test-utils"))] mod test { use super::*; use crate::{ diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index 80325faa..73ca1920 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -19,6 +19,9 @@ pub use operations::{ DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult, }; +mod retry; +pub use retry::{RetryConfig, retry_config, retry_operation_with_name, retry_with_defaults}; + mod transaction; pub use transaction::{DatabaseTransactionProvider, TXMut, TX}; diff --git a/crates/database/db/src/retry.rs b/crates/database/db/src/retry.rs new file mode 100644 index 00000000..5f86e5e8 --- /dev/null +++ b/crates/database/db/src/retry.rs @@ -0,0 +1,168 @@ +//! Retry mechanism for database operations + +use std::time::Duration; + +/// Configuration for retry behavior +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// Maximum number of retry attempts. None means infinite retries + pub max_retries: Option, + /// Initial delay between retries in milliseconds + pub initial_delay_ms: u64, + /// Whether to use exponential backoff + pub exponential_backoff: bool, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: None, + initial_delay_ms: 50, + exponential_backoff: false, + } + } +} + +/// Helper function to create a retry configuration +pub const fn retry_config(max_retries: usize, initial_delay_ms: u64, exponential_backoff: bool) -> RetryConfig { + RetryConfig { + max_retries: Some(max_retries), + initial_delay_ms, + exponential_backoff, + } +} + +/// Retry a database operation with operation name for better logging +pub async fn retry_operation_with_name( + operation_name: &str, + operation: F, + config: RetryConfig, +) -> Result +where + F: Fn() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Debug, +{ + let mut attempt = 0; + + loop { + match operation().await { + Ok(result) => return Ok(result), + Err(error) => { + if let Some(max_retries) = config.max_retries { + if attempt >= max_retries { + return Err(error); + } + } + + attempt += 1; + tracing::debug!( + target: "scroll::db", + operation = operation_name, + error = ?error, + attempt = attempt, + "Retrying database operation" + ); + + // Calculate delay for next retry + let delay_ms = if config.exponential_backoff { + config.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) + } else { + config.initial_delay_ms + }; + + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + } + } +} + +/// Convenience function for retrying with default configuration +pub async fn retry_with_defaults(operation_name: &str, operation: F) -> Result +where + F: Fn() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Debug, +{ + retry_operation_with_name(operation_name, operation, RetryConfig::default()).await +} + +#[cfg(test)] +mod tests { + use crate::{retry_config, retry_operation_with_name, retry_with_defaults}; + use std::cell::RefCell; + + #[tokio::test] + async fn test_retry_success_on_first_attempt() { + let attempt = RefCell::new(0); + let result = retry_operation_with_name( + "test_operation", + || { + *attempt.borrow_mut() += 1; + async move { Ok::(42) } + }, + retry_config(3, 10, false), + ).await; + + assert_eq!(result, Ok(42)); + assert_eq!(*attempt.borrow(), 1); + } + + #[tokio::test] + async fn test_retry_success_after_failures() { + let attempt = RefCell::new(0); + let result = retry_operation_with_name( + "test_operation", + || { + *attempt.borrow_mut() += 1; + let current_attempt = *attempt.borrow(); + async move { + if current_attempt < 3 { + Err::("failed") + } else { + Ok(42) + } + } + }, + retry_config(5, 10, false), + ).await; + + assert_eq!(result, Ok(42)); + assert_eq!(*attempt.borrow(), 3); + } + + #[tokio::test] + async fn test_retry_exhausted() { + let attempt = RefCell::new(0); + let result = retry_operation_with_name( + "test_operation", + || { + *attempt.borrow_mut() += 1; + async move { Err::("always fails") } + }, + retry_config(2, 10, false), + ).await; + + assert_eq!(result, Err("always fails")); + assert_eq!(*attempt.borrow(), 3); // 1 initial + 2 retries + } + + #[tokio::test] + async fn test_retry_with_defaults() { + let attempt = RefCell::new(0); + let result = retry_with_defaults("test_retry_with_defaults", || { + *attempt.borrow_mut() += 1; + let current_attempt = *attempt.borrow(); + async move { + if current_attempt < 2 { + Err::("failed") + } else { + Ok(42) + } + } + }).await; + + assert_eq!(result, Ok(42)); + assert_eq!(*attempt.borrow(), 2); + } +} From bca98031468997146ac77f6978092447724d9ac3 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 03:02:30 +0800 Subject: [PATCH 2/9] fmt --- crates/chain-orchestrator/src/lib.rs | 118 +++++++++++++++++---------- crates/database/db/src/lib.rs | 2 +- crates/database/db/src/retry.rs | 38 ++++----- 3 files changed, 96 insertions(+), 62 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 490c52d8..ba4ea826 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -18,7 +18,8 @@ use scroll_alloy_consensus::TxL1Message; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{ - retry_with_defaults, Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, DatabaseWriteOperations, L1MessageStart, UnwindResult + retry_with_defaults, Database, DatabaseError, DatabaseReadOperations, + DatabaseTransactionProvider, DatabaseWriteOperations, L1MessageStart, UnwindResult, }; use scroll_network::NewBlockWithPeer; use std::{ @@ -215,7 +216,8 @@ impl< tx.update_l1_messages_with_l2_block(block_info.clone()).await?; tx.commit().await?; Ok::<_, DatabaseError>(()) - }).await?; + }) + .await?; Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true)) } @@ -294,12 +296,13 @@ impl< let mut received_chain_headers = VecDeque::from(vec![received_block.header.clone()]); // We should never have a re-org that is deeper than the current safe head. - let (latest_safe_block, _) = retry_with_defaults("get_latest_safe_l2_info", || async { + let (latest_safe_block, _) = retry_with_defaults("get_latest_safe_l2_info", || async { let tx = database.tx().await?; let (latest_safe_block, batch_info) = tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); - Ok::<_, DatabaseError>((latest_safe_block, batch_info)) - }).await?; + Ok::<_, DatabaseError>((latest_safe_block, batch_info)) + }) + .await?; // We search for the re-org index in the in-memory chain. const BATCH_FETCH_SIZE: usize = 50; @@ -466,7 +469,8 @@ impl< } tx.commit().await?; Ok::<_, DatabaseError>(()) - }).await?; + }) + .await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ConsolidatedBlockCommitted(head), )) @@ -515,7 +519,8 @@ impl< tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?; tx.commit().await?; Ok::<_, DatabaseError>(()) - }).await?; + }) + .await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ChainCommitted(head, None, consolidated), @@ -599,13 +604,24 @@ impl< l2_client: Arc

, current_chain: Arc>, ) -> Result, ChainOrchestratorError> { - let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = retry_with_defaults("unwind", || async { - let txn = database.tx_mut().await?; - let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = - txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; - txn.commit().await?; - Ok::<_, DatabaseError>(UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info }) - }).await?; + let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = + retry_with_defaults("unwind", || async { + let txn = database.tx_mut().await?; + let UnwindResult { + l1_block_number, + queue_index, + l2_head_block_number, + l2_safe_block_info, + } = txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; + txn.commit().await?; + Ok::<_, DatabaseError>(UnwindResult { + l1_block_number, + queue_index, + l2_head_block_number, + l2_safe_block_info, + }) + }) + .await?; let l2_head_block_info = if let Some(block_number) = l2_head_block_number { // Fetch the block hash of the new L2 head block. let block_hash = l2_client @@ -636,21 +652,23 @@ impl< block_number: u64, l1_block_number: Arc, ) -> Result, ChainOrchestratorError> { - let finalized_batches = retry_with_defaults("set_latest_finalized_l1_block_number", || async { - let tx = database.tx_mut().await?; + let finalized_batches = + retry_with_defaults("set_latest_finalized_l1_block_number", || async { + let tx = database.tx_mut().await?; - // Set the latest finalized L1 block in the database. - tx.set_latest_finalized_l1_block_number(block_number).await?; - - // Get all unprocessed batches that have been finalized by this L1 block finalization. - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; - - tx.commit().await?; + // Set the latest finalized L1 block in the database. + tx.set_latest_finalized_l1_block_number(block_number).await?; - Ok::<_, DatabaseError>(finalized_batches) - }).await?; + // Get all unprocessed batches that have been finalized by this L1 block + // finalization. + let finalized_batches = + tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; + tx.commit().await?; + + Ok::<_, DatabaseError>(finalized_batches) + }) + .await?; // Update the chain orchestrator L1 block number. l1_block_number.store(block_number, Ordering::Relaxed); @@ -675,7 +693,9 @@ impl< let _ = retry_with_defaults("handle_l1_message", || async { let tx = database.tx_mut().await?; if l1_message.transaction.queue_index > 0 && - tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1).await?.is_none() + tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1) + .await? + .is_none() { return Err(ChainOrchestratorError::L1MessageQueueGap( l1_message.transaction.queue_index, @@ -685,7 +705,8 @@ impl< tx.insert_l1_message(l1_message.clone()).await?; tx.commit().await?; Ok::<_, ChainOrchestratorError>(()) - }).await; + }) + .await; Ok(Some(event)) } @@ -698,15 +719,16 @@ impl< let tx = database.tx_mut().await?; let batch_clone = batch.clone(); let prev_batch_index = batch_clone.clone().index - 1; - - // Perform a consistency check to ensure the previous commit batch exists in the database. + + // Perform a consistency check to ensure the previous commit batch exists in the + // database. if tx.get_batch_by_index(prev_batch_index).await?.is_none() { return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)) } - + // remove any batches with an index greater than the previous batch. let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; - + // handle the case of a batch revert. let new_safe_head = if affected > 0 { tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; @@ -714,19 +736,20 @@ impl< } else { None }; - + let event = ChainOrchestratorEvent::BatchCommitIndexed { batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), l1_block_number: batch.block_number, safe_head: new_safe_head, }; - + // insert the batch and commit the transaction. tx.insert_batch(batch_clone).await?; tx.commit().await?; Ok::<_, ChainOrchestratorError>(Some(event)) - }).await?; + }) + .await?; Ok(event) } @@ -747,15 +770,20 @@ impl< // Get all unprocessed batches that have been finalized by this L1 block finalization. let finalized_block_number = finalized_block_number.load(Ordering::Relaxed); if finalized_block_number >= block_number { - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(finalized_block_number).await?; + let finalized_batches = tx + .fetch_and_update_unprocessed_finalized_batches(finalized_block_number) + .await?; tx.commit().await?; - return Ok(Some(ChainOrchestratorEvent::BatchFinalized(block_number, finalized_batches))) + return Ok(Some(ChainOrchestratorEvent::BatchFinalized( + block_number, + finalized_batches, + ))) } tx.commit().await?; Ok::<_, ChainOrchestratorError>(None) - }).await + }) + .await } } @@ -801,7 +829,9 @@ async fn init_chain_from_db + 'static>( let blocks = { let mut blocks = Vec::with_capacity(chain_buffer_size); let tx = retry_with_defaults("get_l2_blocks_new_tx", || database.tx()).await?; - let blocks_stream = retry_with_defaults("get_l2_blocks", || tx.get_l2_blocks()).await?.take(chain_buffer_size); + let blocks_stream = retry_with_defaults("get_l2_blocks", || tx.get_l2_blocks()) + .await? + .take(chain_buffer_size); pin_mut!(blocks_stream); while let Some(block_info) = blocks_stream.try_next().await? { let header = l2_client @@ -901,7 +931,8 @@ async fn consolidate_chain + 'static>( let tx = database.tx().await?; let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; Ok::<_, DatabaseError>(safe_head) - }).await?; + }) + .await?; // If the in-memory chain contains the safe head, we check if the safe hash from the // database (L1 consolidation) matches the in-memory value. If it does not match, we return an @@ -1001,7 +1032,10 @@ async fn validate_l1_messages( // determine the start of the L1 message stream, we should use a more robust method to determine // the start of the L1 message stream. let tx = retry_with_defaults("get_l1_messages_new_tx", || database.tx()).await?; - let l1_message_stream = retry_with_defaults("get_l1_messages", || tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx)))).await?; + let l1_message_stream = retry_with_defaults("get_l1_messages", || { + tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + }) + .await?; pin_mut!(l1_message_stream); for message_hash in l1_message_hashes { diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index 73ca1920..dcebf39d 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -20,7 +20,7 @@ pub use operations::{ }; mod retry; -pub use retry::{RetryConfig, retry_config, retry_operation_with_name, retry_with_defaults}; +pub use retry::{retry_config, retry_operation_with_name, retry_with_defaults, RetryConfig}; mod transaction; pub use transaction::{DatabaseTransactionProvider, TXMut, TX}; diff --git a/crates/database/db/src/retry.rs b/crates/database/db/src/retry.rs index 5f86e5e8..7c4931d3 100644 --- a/crates/database/db/src/retry.rs +++ b/crates/database/db/src/retry.rs @@ -15,21 +15,17 @@ pub struct RetryConfig { impl Default for RetryConfig { fn default() -> Self { - Self { - max_retries: None, - initial_delay_ms: 50, - exponential_backoff: false, - } + Self { max_retries: None, initial_delay_ms: 50, exponential_backoff: false } } } /// Helper function to create a retry configuration -pub const fn retry_config(max_retries: usize, initial_delay_ms: u64, exponential_backoff: bool) -> RetryConfig { - RetryConfig { - max_retries: Some(max_retries), - initial_delay_ms, - exponential_backoff, - } +pub const fn retry_config( + max_retries: usize, + initial_delay_ms: u64, + exponential_backoff: bool, +) -> RetryConfig { + RetryConfig { max_retries: Some(max_retries), initial_delay_ms, exponential_backoff } } /// Retry a database operation with operation name for better logging @@ -44,7 +40,7 @@ where E: std::fmt::Debug, { let mut attempt = 0; - + loop { match operation().await { Ok(result) => return Ok(result), @@ -54,7 +50,7 @@ where return Err(error); } } - + attempt += 1; tracing::debug!( target: "scroll::db", @@ -63,14 +59,14 @@ where attempt = attempt, "Retrying database operation" ); - + // Calculate delay for next retry let delay_ms = if config.exponential_backoff { config.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) } else { config.initial_delay_ms }; - + tokio::time::sleep(Duration::from_millis(delay_ms)).await; } } @@ -102,7 +98,8 @@ mod tests { async move { Ok::(42) } }, retry_config(3, 10, false), - ).await; + ) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 1); @@ -125,7 +122,8 @@ mod tests { } }, retry_config(5, 10, false), - ).await; + ) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 3); @@ -141,7 +139,8 @@ mod tests { async move { Err::("always fails") } }, retry_config(2, 10, false), - ).await; + ) + .await; assert_eq!(result, Err("always fails")); assert_eq!(*attempt.borrow(), 3); // 1 initial + 2 retries @@ -160,7 +159,8 @@ mod tests { Ok(42) } } - }).await; + }) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 2); From d3c8a8f5d0d6fc3557bfb3b70212a68c51ff566f Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 03:41:40 +0800 Subject: [PATCH 3/9] rename --- crates/chain-orchestrator/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index ba4ea826..7e77a478 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -653,7 +653,7 @@ impl< l1_block_number: Arc, ) -> Result, ChainOrchestratorError> { let finalized_batches = - retry_with_defaults("set_latest_finalized_l1_block_number", || async { + retry_with_defaults("handle_finalized", || async { let tx = database.tx_mut().await?; // Set the latest finalized L1 block in the database. From aabc09aa9ab6f682f653a596314ce7d9898181b2 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 12:49:38 +0800 Subject: [PATCH 4/9] feat: l2 el provider retry --- crates/chain-orchestrator/src/lib.rs | 25 ++++++++++++------------- crates/node/src/args.rs | 28 +++++++++++++++++++--------- crates/node/src/constants.rs | 10 ++++++++-- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 7e77a478..78d3c84d 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -652,23 +652,22 @@ impl< block_number: u64, l1_block_number: Arc, ) -> Result, ChainOrchestratorError> { - let finalized_batches = - retry_with_defaults("handle_finalized", || async { - let tx = database.tx_mut().await?; + let finalized_batches = retry_with_defaults("handle_finalized", || async { + let tx = database.tx_mut().await?; - // Set the latest finalized L1 block in the database. - tx.set_latest_finalized_l1_block_number(block_number).await?; + // Set the latest finalized L1 block in the database. + tx.set_latest_finalized_l1_block_number(block_number).await?; - // Get all unprocessed batches that have been finalized by this L1 block - // finalization. - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; + // Get all unprocessed batches that have been finalized by this L1 block + // finalization. + let finalized_batches = + tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; - tx.commit().await?; + tx.commit().await?; - Ok::<_, DatabaseError>(finalized_batches) - }) - .await?; + Ok::<_, DatabaseError>(finalized_batches) + }) + .await?; // Update the chain orchestrator L1 block number. l1_block_number.store(block_number, Ordering::Relaxed); diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 3b828f58..ab386b92 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -199,11 +199,21 @@ impl ScrollRollupNodeConfig { ProviderBuilder::new().connect_client(client) }); - // Get a provider to the execution layer. - let l2_provider = rpc_server_handles - .rpc - .new_http_provider_for() - .expect("failed to create payload provider"); + // Init a retry provider to the execution layer. + let retry_layer = RetryBackoffLayer::new( + constants::L2_PROVIDER_MAX_RETRIES, + constants::L2_PROVIDER_INITIAL_BACKOFF, + constants::PROVIDER_COMPUTE_UNITS_PER_SECOND, + ); + let client = RpcClient::builder().layer(retry_layer).http( + rpc_server_handles + .rpc + .http_url() + .expect("failed to get l2 rpc url") + .parse() + .expect("invalid l2 rpc url"), + ); + let l2_provider = ProviderBuilder::<_, _, Scroll>::default().connect_client(client); let l2_provider = Arc::new(l2_provider); // Fetch the database from the hydrated config. @@ -578,10 +588,10 @@ pub struct L1ProviderArgs { #[arg(long = "l1.cups", id = "l1_compute_units_per_second", value_name = "L1_COMPUTE_UNITS_PER_SECOND", default_value_t = constants::PROVIDER_COMPUTE_UNITS_PER_SECOND)] pub compute_units_per_second: u64, /// The max amount of retries for the provider. - #[arg(long = "l1.max-retries", id = "l1_max_retries", value_name = "L1_MAX_RETRIES", default_value_t = constants::PROVIDER_MAX_RETRIES)] + #[arg(long = "l1.max-retries", id = "l1_max_retries", value_name = "L1_MAX_RETRIES", default_value_t = constants::L1_PROVIDER_MAX_RETRIES)] pub max_retries: u32, /// The initial backoff for the provider. - #[arg(long = "l1.initial-backoff", id = "l1_initial_backoff", value_name = "L1_INITIAL_BACKOFF", default_value_t = constants::PROVIDER_INITIAL_BACKOFF)] + #[arg(long = "l1.initial-backoff", id = "l1_initial_backoff", value_name = "L1_INITIAL_BACKOFF", default_value_t = constants::L1_PROVIDER_INITIAL_BACKOFF)] pub initial_backoff: u64, } @@ -608,10 +618,10 @@ pub struct BlobProviderArgs { #[arg(long = "blob.cups", id = "blob_compute_units_per_second", value_name = "BLOB_COMPUTE_UNITS_PER_SECOND", default_value_t = constants::PROVIDER_COMPUTE_UNITS_PER_SECOND)] pub compute_units_per_second: u64, /// The max amount of retries for the provider. - #[arg(long = "blob.max-retries", id = "blob_max_retries", value_name = "BLOB_MAX_RETRIES", default_value_t = constants::PROVIDER_MAX_RETRIES)] + #[arg(long = "blob.max-retries", id = "blob_max_retries", value_name = "BLOB_MAX_RETRIES", default_value_t = constants::L1_PROVIDER_MAX_RETRIES)] pub max_retries: u32, /// The initial backoff for the provider. - #[arg(long = "blob.initial-backoff", id = "blob_initial_backoff", value_name = "BLOB_INITIAL_BACKOFF", default_value_t = constants::PROVIDER_INITIAL_BACKOFF)] + #[arg(long = "blob.initial-backoff", id = "blob_initial_backoff", value_name = "BLOB_INITIAL_BACKOFF", default_value_t = constants::L1_PROVIDER_INITIAL_BACKOFF)] pub initial_backoff: u64, } diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 3611aaa3..48425bbf 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -3,10 +3,16 @@ use alloy_primitives::{address, Address, U128}; /// The max retries for the L1 provider. -pub(crate) const PROVIDER_MAX_RETRIES: u32 = 10; +pub(crate) const L1_PROVIDER_MAX_RETRIES: u32 = 10; /// The initial backoff for the L1 provider. -pub(crate) const PROVIDER_INITIAL_BACKOFF: u64 = 100; +pub(crate) const L1_PROVIDER_INITIAL_BACKOFF: u64 = 100; + +/// The max retries for the L2 provider. +pub(crate) const L2_PROVIDER_MAX_RETRIES: u32 = u32::MAX; + +/// The initial backoff for the L2 provider. +pub(crate) const L2_PROVIDER_INITIAL_BACKOFF: u64 = 50; /// The default provider compute units per second. pub(crate) const PROVIDER_COMPUTE_UNITS_PER_SECOND: u64 = 10000; From 6cba505d1652f58ff8bf62b83638ca4932cd9515 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 13:58:23 +0800 Subject: [PATCH 5/9] fix: db operation retry --- crates/chain-orchestrator/src/lib.rs | 18 ++++++++++-------- crates/database/db/src/db.rs | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 78d3c84d..e00fd1b8 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -804,14 +804,16 @@ async fn compute_l1_message_queue_hash( Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else if l1_message.queue_index > l1_v2_message_queue_start_index { let index = l1_message.queue_index - 1; - let tx = database.tx().await?; - let mut input = tx - .get_l1_message_by_index(index) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? - .unwrap_or_default() - .to_vec(); + let mut input = retry_with_defaults("get_l1_message_by_index", || async { + let tx = database.tx().await?; + let input = tx.get_l1_message_by_index(index).await?; + Ok::<_, DatabaseError>(input) + }) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else { diff --git a/crates/database/db/src/db.rs b/crates/database/db/src/db.rs index 5d13cc9c..26db30a1 100644 --- a/crates/database/db/src/db.rs +++ b/crates/database/db/src/db.rs @@ -136,7 +136,7 @@ impl From for Database { } } -#[cfg(all(test, feature = "test-utils"))] +#[cfg(test)] mod test { use super::*; use crate::{ From b2fbf7167fd29a7724f01ccef49b23a71c69c788 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 14:26:20 +0800 Subject: [PATCH 6/9] feat: network_client retry --- crates/chain-orchestrator/src/lib.rs | 33 ++++++++++++------- .../db => chain-orchestrator}/src/retry.rs | 8 ++--- crates/database/db/src/lib.rs | 3 -- 3 files changed, 26 insertions(+), 18 deletions(-) rename crates/{database/db => chain-orchestrator}/src/retry.rs (95%) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index e00fd1b8..f452b819 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -18,8 +18,8 @@ use scroll_alloy_consensus::TxL1Message; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{ - retry_with_defaults, Database, DatabaseError, DatabaseReadOperations, - DatabaseTransactionProvider, DatabaseWriteOperations, L1MessageStart, UnwindResult, + Database, DatabaseError, DatabaseReadOperations, DatabaseTransactionProvider, + DatabaseWriteOperations, L1MessageStart, UnwindResult, }; use scroll_network::NewBlockWithPeer; use std::{ @@ -47,6 +47,9 @@ pub use error::ChainOrchestratorError; mod metrics; pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics}; +mod retry; +pub use retry::{retry_config, retry_operation_with_name, retry_with_defaults, RetryConfig}; + /// The mask used to mask the L1 message queue hash. const L1_MESSAGE_QUEUE_HASH_MASK: B256 = b256!("ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000"); @@ -257,9 +260,13 @@ impl< tracing::trace!(target: "scroll::chain_orchestrator", number = ?(optimistic_headers.first().expect("chain can not be empty").number - 1), "fetching block"); let parent_hash = optimistic_headers.first().expect("chain can not be empty").parent_hash; - let header = network_client - .get_header(BlockHashOrNumber::Hash(parent_hash)) - .await? + let header = retry_with_defaults("network_client_get_header", || async { + let header = + network_client.get_header(BlockHashOrNumber::Hash(parent_hash)).await?; + Ok::<_, ChainOrchestratorError>(header) + }) + .await?; + let header = header .into_data() .ok_or(ChainOrchestratorError::MissingBlockHeader { hash: parent_hash })?; optimistic_headers.push_front(header); @@ -390,12 +397,16 @@ impl< } tracing::trace!(target: "scroll::chain_orchestrator", number = ?(received_chain_headers.front().expect("chain can not be empty").number - 1), "fetching block"); - if let Some(header) = network_client - .get_header(BlockHashOrNumber::Hash( - received_chain_headers.front().expect("chain can not be empty").parent_hash, - )) - .await? - .into_data() + if let Some(header) = retry_with_defaults("network_client_get_header", || async { + let header = network_client + .get_header(BlockHashOrNumber::Hash( + received_chain_headers.front().expect("chain can not be empty").parent_hash, + )) + .await? + .into_data(); + Ok::<_, ChainOrchestratorError>(header) + }) + .await? { received_chain_headers.push_front(header.clone()); } else { diff --git a/crates/database/db/src/retry.rs b/crates/chain-orchestrator/src/retry.rs similarity index 95% rename from crates/database/db/src/retry.rs rename to crates/chain-orchestrator/src/retry.rs index 7c4931d3..93aea627 100644 --- a/crates/database/db/src/retry.rs +++ b/crates/chain-orchestrator/src/retry.rs @@ -1,4 +1,4 @@ -//! Retry mechanism for database operations +//! Configurable retry mechanism for database, network, and other fallible operations. use std::time::Duration; @@ -28,7 +28,7 @@ pub const fn retry_config( RetryConfig { max_retries: Some(max_retries), initial_delay_ms, exponential_backoff } } -/// Retry a database operation with operation name for better logging +/// Retry a operation with operation name for better logging pub async fn retry_operation_with_name( operation_name: &str, operation: F, @@ -53,11 +53,11 @@ where attempt += 1; tracing::debug!( - target: "scroll::db", + target: "scroll::chain_orchestrator", operation = operation_name, error = ?error, attempt = attempt, - "Retrying database operation" + "Retrying operation" ); // Calculate delay for next retry diff --git a/crates/database/db/src/lib.rs b/crates/database/db/src/lib.rs index dcebf39d..80325faa 100644 --- a/crates/database/db/src/lib.rs +++ b/crates/database/db/src/lib.rs @@ -19,9 +19,6 @@ pub use operations::{ DatabaseReadOperations, DatabaseWriteOperations, L1MessageStart, UnwindResult, }; -mod retry; -pub use retry::{retry_config, retry_operation_with_name, retry_with_defaults, RetryConfig}; - mod transaction; pub use transaction::{DatabaseTransactionProvider, TXMut, TX}; From 06ef2538c1d01c94c83be449bc7f4be19a55f657 Mon Sep 17 00:00:00 2001 From: Morty Date: Mon, 22 Sep 2025 23:31:46 +0800 Subject: [PATCH 7/9] fix: refactor --- crates/chain-orchestrator/src/lib.rs | 349 +++++++++++++------------ crates/chain-orchestrator/src/retry.rs | 168 ++++++------ 2 files changed, 260 insertions(+), 257 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index f452b819..86cd4098 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -48,7 +48,7 @@ mod metrics; pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics}; mod retry; -pub use retry::{retry_config, retry_operation_with_name, retry_with_defaults, RetryConfig}; +pub use retry::Retry; /// The mask used to mask the L1 message queue hash. const L1_MESSAGE_QUEUE_HASH_MASK: B256 = @@ -214,13 +214,14 @@ impl< let database = ctx.database.clone(); let block_info: L2BlockInfoWithL1Messages = (&block_with_peer.block).into(); Self::do_handle_block_from_peer(ctx, block_with_peer).await?; - retry_with_defaults("update_l1_messages_with_l2_block", || async { - let tx = database.tx_mut().await?; - tx.update_l1_messages_with_l2_block(block_info.clone()).await?; - tx.commit().await?; - Ok::<_, DatabaseError>(()) - }) - .await?; + Retry::default() + .retry("update_l1_messages_with_l2_block", || async { + let tx = database.tx_mut().await?; + tx.update_l1_messages_with_l2_block(block_info.clone()).await?; + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }) + .await?; Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true)) } @@ -260,12 +261,13 @@ impl< tracing::trace!(target: "scroll::chain_orchestrator", number = ?(optimistic_headers.first().expect("chain can not be empty").number - 1), "fetching block"); let parent_hash = optimistic_headers.first().expect("chain can not be empty").parent_hash; - let header = retry_with_defaults("network_client_get_header", || async { - let header = - network_client.get_header(BlockHashOrNumber::Hash(parent_hash)).await?; - Ok::<_, ChainOrchestratorError>(header) - }) - .await?; + let header = Retry::default() + .retry("network_client_get_header", || async { + let header = + network_client.get_header(BlockHashOrNumber::Hash(parent_hash)).await?; + Ok::<_, ChainOrchestratorError>(header) + }) + .await?; let header = header .into_data() .ok_or(ChainOrchestratorError::MissingBlockHeader { hash: parent_hash })?; @@ -303,13 +305,14 @@ impl< let mut received_chain_headers = VecDeque::from(vec![received_block.header.clone()]); // We should never have a re-org that is deeper than the current safe head. - let (latest_safe_block, _) = retry_with_defaults("get_latest_safe_l2_info", || async { - let tx = database.tx().await?; - let (latest_safe_block, batch_info) = - tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); - Ok::<_, DatabaseError>((latest_safe_block, batch_info)) - }) - .await?; + let (latest_safe_block, _) = Retry::default() + .retry("get_latest_safe_l2_info", || async { + let tx = database.tx().await?; + let (latest_safe_block, batch_info) = + tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); + Ok::<_, DatabaseError>((latest_safe_block, batch_info)) + }) + .await?; // We search for the re-org index in the in-memory chain. const BATCH_FETCH_SIZE: usize = 50; @@ -397,16 +400,20 @@ impl< } tracing::trace!(target: "scroll::chain_orchestrator", number = ?(received_chain_headers.front().expect("chain can not be empty").number - 1), "fetching block"); - if let Some(header) = retry_with_defaults("network_client_get_header", || async { - let header = network_client - .get_header(BlockHashOrNumber::Hash( - received_chain_headers.front().expect("chain can not be empty").parent_hash, - )) - .await? - .into_data(); - Ok::<_, ChainOrchestratorError>(header) - }) - .await? + if let Some(header) = Retry::default() + .retry("network_client_get_header", || async { + let header = network_client + .get_header(BlockHashOrNumber::Hash( + received_chain_headers + .front() + .expect("chain can not be empty") + .parent_hash, + )) + .await? + .into_data(); + Ok::<_, ChainOrchestratorError>(header) + }) + .await? { received_chain_headers.push_front(header.clone()); } else { @@ -473,15 +480,16 @@ impl< ChainOrchestratorItem::InsertConsolidatedL2Blocks, Box::pin(async move { let head = block_infos.last().expect("block info must not be empty").clone(); - retry_with_defaults("insert_block", || async { - let tx = database.tx_mut().await?; - for block in block_infos.clone() { - tx.insert_block(block, batch_info).await?; - } - tx.commit().await?; - Ok::<_, DatabaseError>(()) - }) - .await?; + Retry::default() + .retry("insert_block", || async { + let tx = database.tx_mut().await?; + for block in block_infos.clone() { + tx.insert_block(block, batch_info).await?; + } + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }) + .await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ConsolidatedBlockCommitted(head), )) @@ -525,13 +533,14 @@ impl< // Insert the blocks into the database. let head = block_info.last().expect("block info must not be empty").clone(); - retry_with_defaults("update_l1_messages_from_l2_blocks", || async { - let tx = database.tx_mut().await?; - tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?; - tx.commit().await?; - Ok::<_, DatabaseError>(()) - }) - .await?; + Retry::default() + .retry("update_l1_messages_from_l2_blocks", || async { + let tx = database.tx_mut().await?; + tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?; + tx.commit().await?; + Ok::<_, DatabaseError>(()) + }) + .await?; Result::<_, ChainOrchestratorError>::Ok(Some( ChainOrchestratorEvent::L2ChainCommitted(head, None, consolidated), @@ -616,23 +625,24 @@ impl< current_chain: Arc>, ) -> Result, ChainOrchestratorError> { let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = - retry_with_defaults("unwind", || async { - let txn = database.tx_mut().await?; - let UnwindResult { - l1_block_number, - queue_index, - l2_head_block_number, - l2_safe_block_info, - } = txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; - txn.commit().await?; - Ok::<_, DatabaseError>(UnwindResult { - l1_block_number, - queue_index, - l2_head_block_number, - l2_safe_block_info, + Retry::default() + .retry("unwind", || async { + let txn = database.tx_mut().await?; + let UnwindResult { + l1_block_number, + queue_index, + l2_head_block_number, + l2_safe_block_info, + } = txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; + txn.commit().await?; + Ok::<_, DatabaseError>(UnwindResult { + l1_block_number, + queue_index, + l2_head_block_number, + l2_safe_block_info, + }) }) - }) - .await?; + .await?; let l2_head_block_info = if let Some(block_number) = l2_head_block_number { // Fetch the block hash of the new L2 head block. let block_hash = l2_client @@ -663,22 +673,23 @@ impl< block_number: u64, l1_block_number: Arc, ) -> Result, ChainOrchestratorError> { - let finalized_batches = retry_with_defaults("handle_finalized", || async { - let tx = database.tx_mut().await?; + let finalized_batches = Retry::default() + .retry("handle_finalized", || async { + let tx = database.tx_mut().await?; - // Set the latest finalized L1 block in the database. - tx.set_latest_finalized_l1_block_number(block_number).await?; + // Set the latest finalized L1 block in the database. + tx.set_latest_finalized_l1_block_number(block_number).await?; - // Get all unprocessed batches that have been finalized by this L1 block - // finalization. - let finalized_batches = - tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; + // Get all unprocessed batches that have been finalized by this L1 block + // finalization. + let finalized_batches = + tx.fetch_and_update_unprocessed_finalized_batches(block_number).await?; - tx.commit().await?; + tx.commit().await?; - Ok::<_, DatabaseError>(finalized_batches) - }) - .await?; + Ok::<_, DatabaseError>(finalized_batches) + }) + .await?; // Update the chain orchestrator L1 block number. l1_block_number.store(block_number, Ordering::Relaxed); @@ -700,23 +711,24 @@ impl< let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); // Perform a consistency check to ensure the previous L1 message exists in the database. - let _ = retry_with_defaults("handle_l1_message", || async { - let tx = database.tx_mut().await?; - if l1_message.transaction.queue_index > 0 && - tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1) - .await? - .is_none() - { - return Err(ChainOrchestratorError::L1MessageQueueGap( - l1_message.transaction.queue_index, - )) - } + let _ = Retry::default() + .retry("handle_l1_message", || async { + let tx = database.tx_mut().await?; + if l1_message.transaction.queue_index > 0 && + tx.get_l1_message_by_index(l1_message.transaction.queue_index - 1) + .await? + .is_none() + { + return Err(ChainOrchestratorError::L1MessageQueueGap( + l1_message.transaction.queue_index, + )) + } - tx.insert_l1_message(l1_message.clone()).await?; - tx.commit().await?; - Ok::<_, ChainOrchestratorError>(()) - }) - .await; + tx.insert_l1_message(l1_message.clone()).await?; + tx.commit().await?; + Ok::<_, ChainOrchestratorError>(()) + }) + .await; Ok(Some(event)) } @@ -725,41 +737,42 @@ impl< database: Arc, batch: BatchCommitData, ) -> Result, ChainOrchestratorError> { - let event = retry_with_defaults("handle_batch_commit", || async { - let tx = database.tx_mut().await?; - let batch_clone = batch.clone(); - let prev_batch_index = batch_clone.clone().index - 1; - - // Perform a consistency check to ensure the previous commit batch exists in the - // database. - if tx.get_batch_by_index(prev_batch_index).await?.is_none() { - return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)) - } + let event = Retry::default() + .retry("handle_batch_commit", || async { + let tx = database.tx_mut().await?; + let batch_clone = batch.clone(); + let prev_batch_index = batch_clone.clone().index - 1; + + // Perform a consistency check to ensure the previous commit batch exists in the + // database. + if tx.get_batch_by_index(prev_batch_index).await?.is_none() { + return Err(ChainOrchestratorError::BatchCommitGap(batch_clone.index)) + } - // remove any batches with an index greater than the previous batch. - let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; + // remove any batches with an index greater than the previous batch. + let affected = tx.delete_batches_gt_batch_index(prev_batch_index).await?; - // handle the case of a batch revert. - let new_safe_head = if affected > 0 { - tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; - tx.get_highest_block_for_batch_index(prev_batch_index).await? - } else { - None - }; + // handle the case of a batch revert. + let new_safe_head = if affected > 0 { + tx.delete_l2_blocks_gt_batch_index(prev_batch_index).await?; + tx.get_highest_block_for_batch_index(prev_batch_index).await? + } else { + None + }; - let event = ChainOrchestratorEvent::BatchCommitIndexed { - batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), - l1_block_number: batch.block_number, - safe_head: new_safe_head, - }; + let event = ChainOrchestratorEvent::BatchCommitIndexed { + batch_info: BatchInfo::new(batch_clone.index, batch_clone.hash), + l1_block_number: batch.block_number, + safe_head: new_safe_head, + }; - // insert the batch and commit the transaction. - tx.insert_batch(batch_clone).await?; - tx.commit().await?; + // insert the batch and commit the transaction. + tx.insert_batch(batch_clone).await?; + tx.commit().await?; - Ok::<_, ChainOrchestratorError>(Some(event)) - }) - .await?; + Ok::<_, ChainOrchestratorError>(Some(event)) + }) + .await?; Ok(event) } @@ -771,29 +784,31 @@ impl< block_number: u64, finalized_block_number: Arc, ) -> Result, ChainOrchestratorError> { - retry_with_defaults("handle_batch_finalization", || async { - let tx = database.tx_mut().await?; - - // finalize all batches up to `batch_index`. - tx.finalize_batches_up_to_index(batch_index, block_number).await?; + Retry::default() + .retry("handle_batch_finalization", || async { + let tx = database.tx_mut().await?; + + // finalize all batches up to `batch_index`. + tx.finalize_batches_up_to_index(batch_index, block_number).await?; + + // Get all unprocessed batches that have been finalized by this L1 block + // finalization. + let finalized_block_number = finalized_block_number.load(Ordering::Relaxed); + if finalized_block_number >= block_number { + let finalized_batches = tx + .fetch_and_update_unprocessed_finalized_batches(finalized_block_number) + .await?; + tx.commit().await?; + return Ok(Some(ChainOrchestratorEvent::BatchFinalized( + block_number, + finalized_batches, + ))) + } - // Get all unprocessed batches that have been finalized by this L1 block finalization. - let finalized_block_number = finalized_block_number.load(Ordering::Relaxed); - if finalized_block_number >= block_number { - let finalized_batches = tx - .fetch_and_update_unprocessed_finalized_batches(finalized_block_number) - .await?; tx.commit().await?; - return Ok(Some(ChainOrchestratorEvent::BatchFinalized( - block_number, - finalized_batches, - ))) - } - - tx.commit().await?; - Ok::<_, ChainOrchestratorError>(None) - }) - .await + Ok::<_, ChainOrchestratorError>(None) + }) + .await } } @@ -815,16 +830,17 @@ async fn compute_l1_message_queue_hash( Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else if l1_message.queue_index > l1_v2_message_queue_start_index { let index = l1_message.queue_index - 1; - let mut input = retry_with_defaults("get_l1_message_by_index", || async { - let tx = database.tx().await?; - let input = tx.get_l1_message_by_index(index).await?; - Ok::<_, DatabaseError>(input) - }) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? - .unwrap_or_default() - .to_vec(); + let mut input = Retry::default() + .retry("get_l1_message_by_index", || async { + let tx = database.tx().await?; + let input = tx.get_l1_message_by_index(index).await?; + Ok::<_, DatabaseError>(input) + }) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else { @@ -840,8 +856,9 @@ async fn init_chain_from_db + 'static>( ) -> Result, ChainOrchestratorError> { let blocks = { let mut blocks = Vec::with_capacity(chain_buffer_size); - let tx = retry_with_defaults("get_l2_blocks_new_tx", || database.tx()).await?; - let blocks_stream = retry_with_defaults("get_l2_blocks", || tx.get_l2_blocks()) + let tx = Retry::default().retry("get_l2_blocks_new_tx", || database.tx()).await?; + let blocks_stream = Retry::default() + .retry("get_l2_blocks", || tx.get_l2_blocks()) .await? .take(chain_buffer_size); pin_mut!(blocks_stream); @@ -939,12 +956,13 @@ async fn consolidate_chain + 'static>( // Fetch the safe head from the database. We use this as a trust anchor to reconcile the chain // back to. - let safe_head = retry_with_defaults("get_latest_safe_l2_info", || async { - let tx = database.tx().await?; - let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; - Ok::<_, DatabaseError>(safe_head) - }) - .await?; + let safe_head = Retry::default() + .retry("get_latest_safe_l2_info", || async { + let tx = database.tx().await?; + let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; + Ok::<_, DatabaseError>(safe_head) + }) + .await?; // If the in-memory chain contains the safe head, we check if the safe hash from the // database (L1 consolidation) matches the in-memory value. If it does not match, we return an @@ -1043,11 +1061,12 @@ async fn validate_l1_messages( // TODO: instead of using `l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))` to // determine the start of the L1 message stream, we should use a more robust method to determine // the start of the L1 message stream. - let tx = retry_with_defaults("get_l1_messages_new_tx", || database.tx()).await?; - let l1_message_stream = retry_with_defaults("get_l1_messages", || { - tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) - }) - .await?; + let tx = Retry::default().retry("get_l1_messages_new_tx", || database.tx()).await?; + let l1_message_stream = Retry::default() + .retry("get_l1_messages", || { + tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + }) + .await?; pin_mut!(l1_message_stream); for message_hash in l1_message_hashes { diff --git a/crates/chain-orchestrator/src/retry.rs b/crates/chain-orchestrator/src/retry.rs index 93aea627..581a5c3c 100644 --- a/crates/chain-orchestrator/src/retry.rs +++ b/crates/chain-orchestrator/src/retry.rs @@ -2,9 +2,9 @@ use std::time::Duration; -/// Configuration for retry behavior +/// A type used for retrying transient failures in operations. #[derive(Debug, Clone)] -pub struct RetryConfig { +pub struct Retry { /// Maximum number of retry attempts. None means infinite retries pub max_retries: Option, /// Initial delay between retries in milliseconds @@ -13,93 +13,79 @@ pub struct RetryConfig { pub exponential_backoff: bool, } -impl Default for RetryConfig { +impl Default for Retry { fn default() -> Self { Self { max_retries: None, initial_delay_ms: 50, exponential_backoff: false } } } -/// Helper function to create a retry configuration -pub const fn retry_config( - max_retries: usize, - initial_delay_ms: u64, - exponential_backoff: bool, -) -> RetryConfig { - RetryConfig { max_retries: Some(max_retries), initial_delay_ms, exponential_backoff } -} +impl Retry { + /// Creates a new [`Retry`] with the specified parameters. + pub const fn new( + max_retries: Option, + initial_delay_ms: u64, + exponential_backoff: bool, + ) -> Self { + Self { max_retries, initial_delay_ms, exponential_backoff } + } -/// Retry a operation with operation name for better logging -pub async fn retry_operation_with_name( - operation_name: &str, - operation: F, - config: RetryConfig, -) -> Result -where - F: Fn() -> Fut, - Fut: std::future::Future>, - E: std::fmt::Debug, -{ - let mut attempt = 0; - - loop { - match operation().await { - Ok(result) => return Ok(result), - Err(error) => { - if let Some(max_retries) = config.max_retries { - if attempt >= max_retries { - return Err(error); + /// Retry an asynchronous operation with the configured retry strategy. + pub async fn retry(&self, operation_name: &str, operation: F) -> Result + where + F: Fn() -> Fut, + Fut: std::future::Future>, + E: std::fmt::Debug, + { + let mut attempt: usize = 0; + + loop { + match operation().await { + Ok(result) => return Ok(result), + Err(error) => { + if let Some(max_retries) = self.max_retries { + if attempt >= max_retries { + return Err(error); + } } - } - attempt += 1; - tracing::debug!( - target: "scroll::chain_orchestrator", - operation = operation_name, - error = ?error, - attempt = attempt, - "Retrying operation" - ); - - // Calculate delay for next retry - let delay_ms = if config.exponential_backoff { - config.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) - } else { - config.initial_delay_ms - }; - - tokio::time::sleep(Duration::from_millis(delay_ms)).await; + attempt += 1; + tracing::debug!( + target: "scroll::chain_orchestrator", + operation = operation_name, + error = ?error, + attempt = attempt, + "Retrying operation" + ); + + // Calculate delay for next retry + let delay_ms = if self.exponential_backoff { + self.initial_delay_ms * 2_u64.pow(attempt as u32 - 1) + } else { + self.initial_delay_ms + }; + + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } } } } } -/// Convenience function for retrying with default configuration -pub async fn retry_with_defaults(operation_name: &str, operation: F) -> Result -where - F: Fn() -> Fut, - Fut: std::future::Future>, - E: std::fmt::Debug, -{ - retry_operation_with_name(operation_name, operation, RetryConfig::default()).await -} - #[cfg(test)] mod tests { - use crate::{retry_config, retry_operation_with_name, retry_with_defaults}; + use super::Retry; use std::cell::RefCell; #[tokio::test] async fn test_retry_success_on_first_attempt() { let attempt = RefCell::new(0); - let result = retry_operation_with_name( - "test_operation", - || { + let retry = Retry::new(Some(3), 10, false); + let result = retry + .retry("test_operation", || { *attempt.borrow_mut() += 1; async move { Ok::(42) } - }, - retry_config(3, 10, false), - ) - .await; + }) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 1); @@ -108,9 +94,9 @@ mod tests { #[tokio::test] async fn test_retry_success_after_failures() { let attempt = RefCell::new(0); - let result = retry_operation_with_name( - "test_operation", - || { + let retry = Retry::new(Some(5), 10, false); + let result = retry + .retry("test_operation", || { *attempt.borrow_mut() += 1; let current_attempt = *attempt.borrow(); async move { @@ -120,10 +106,8 @@ mod tests { Ok(42) } } - }, - retry_config(5, 10, false), - ) - .await; + }) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 3); @@ -132,15 +116,13 @@ mod tests { #[tokio::test] async fn test_retry_exhausted() { let attempt = RefCell::new(0); - let result = retry_operation_with_name( - "test_operation", - || { + let retry = Retry::new(Some(2), 10, false); + let result = retry + .retry("test_operation", || { *attempt.borrow_mut() += 1; async move { Err::("always fails") } - }, - retry_config(2, 10, false), - ) - .await; + }) + .await; assert_eq!(result, Err("always fails")); assert_eq!(*attempt.borrow(), 3); // 1 initial + 2 retries @@ -149,18 +131,20 @@ mod tests { #[tokio::test] async fn test_retry_with_defaults() { let attempt = RefCell::new(0); - let result = retry_with_defaults("test_retry_with_defaults", || { - *attempt.borrow_mut() += 1; - let current_attempt = *attempt.borrow(); - async move { - if current_attempt < 2 { - Err::("failed") - } else { - Ok(42) + let retry = Retry::default(); + let result = retry + .retry("test_retry_with_defaults", || { + *attempt.borrow_mut() += 1; + let current_attempt = *attempt.borrow(); + async move { + if current_attempt < 2 { + Err::("failed") + } else { + Ok(42) + } } - } - }) - .await; + }) + .await; assert_eq!(result, Ok(42)); assert_eq!(*attempt.borrow(), 2); From c78d0bb57ee87bbcd0134a90598e36ad814d97d6 Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 23 Sep 2025 15:43:01 +0800 Subject: [PATCH 8/9] address comment --- crates/chain-orchestrator/src/error.rs | 22 +++++++++++ crates/chain-orchestrator/src/lib.rs | 52 ++++++++++++++++++-------- crates/chain-orchestrator/src/retry.rs | 32 ++++++++++++---- crates/database/db/src/error.rs | 14 ++----- 4 files changed, 87 insertions(+), 33 deletions(-) diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index e4338bbc..8c7576c2 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -3,6 +3,8 @@ use alloy_primitives::B256; use alloy_transport::TransportErrorKind; use scroll_db::{DatabaseError, L1MessageStart}; +use crate::retry::CanRetry; + /// A type that represents an error that occurred in the chain orchestrator. #[derive(Debug, thiserror::Error)] pub enum ChainOrchestratorError { @@ -52,3 +54,23 @@ pub enum ChainOrchestratorError { #[error("An error occurred while making a JSON-RPC request to the EN: {0}")] RpcError(#[from] RpcError), } + +// Implement the local CanRetry trait for the external DatabaseError type so we can delegate. +impl CanRetry for DatabaseError { + fn can_retry(&self) -> bool { + matches!(self, Self::DatabaseError(_) | Self::SqlxError(_)) + } +} + +impl CanRetry for ChainOrchestratorError { + fn can_retry(&self) -> bool { + match self { + // Delegate to DatabaseError's classification + Self::DatabaseError(db) => db.can_retry(), + // Network and RPC errors are generally transient + Self::NetworkRequestError(_) | Self::RpcError(_) => true, + // All others are logical/state errors: do not retry + _ => false, + } + } +} diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 86cd4098..f603c05a 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -219,7 +219,7 @@ impl< let tx = database.tx_mut().await?; tx.update_l1_messages_with_l2_block(block_info.clone()).await?; tx.commit().await?; - Ok::<_, DatabaseError>(()) + Ok::<_, ChainOrchestratorError>(()) }) .await?; Ok(ChainOrchestratorEvent::L2ChainCommitted(block_info, None, true)) @@ -310,7 +310,7 @@ impl< let tx = database.tx().await?; let (latest_safe_block, batch_info) = tx.get_latest_safe_l2_info().await?.expect("safe block must exist"); - Ok::<_, DatabaseError>((latest_safe_block, batch_info)) + Ok::<_, ChainOrchestratorError>((latest_safe_block, batch_info)) }) .await?; @@ -487,7 +487,7 @@ impl< tx.insert_block(block, batch_info).await?; } tx.commit().await?; - Ok::<_, DatabaseError>(()) + Ok::<_, ChainOrchestratorError>(()) }) .await?; Result::<_, ChainOrchestratorError>::Ok(Some( @@ -538,7 +538,7 @@ impl< let tx = database.tx_mut().await?; tx.update_l1_messages_from_l2_blocks(block_info.clone()).await?; tx.commit().await?; - Ok::<_, DatabaseError>(()) + Ok::<_, ChainOrchestratorError>(()) }) .await?; @@ -635,7 +635,7 @@ impl< l2_safe_block_info, } = txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?; txn.commit().await?; - Ok::<_, DatabaseError>(UnwindResult { + Ok::<_, ChainOrchestratorError>(UnwindResult { l1_block_number, queue_index, l2_head_block_number, @@ -687,7 +687,7 @@ impl< tx.commit().await?; - Ok::<_, DatabaseError>(finalized_batches) + Ok::<_, ChainOrchestratorError>(finalized_batches) }) .await?; @@ -834,7 +834,7 @@ async fn compute_l1_message_queue_hash( .retry("get_l1_message_by_index", || async { let tx = database.tx().await?; let input = tx.get_l1_message_by_index(index).await?; - Ok::<_, DatabaseError>(input) + Ok::<_, ChainOrchestratorError>(input) }) .await? .map(|m| m.queue_hash) @@ -856,13 +856,21 @@ async fn init_chain_from_db + 'static>( ) -> Result, ChainOrchestratorError> { let blocks = { let mut blocks = Vec::with_capacity(chain_buffer_size); - let tx = Retry::default().retry("get_l2_blocks_new_tx", || database.tx()).await?; + let tx = Retry::default() + .retry("get_l2_blocks_new_tx", || async { + let tx = database.tx().await?; + Ok::<_, ChainOrchestratorError>(tx) + }) + .await?; let blocks_stream = Retry::default() - .retry("get_l2_blocks", || tx.get_l2_blocks()) + .retry("get_l2_blocks", || async { + let stream = tx.get_l2_blocks().await?; + Ok::<_, ChainOrchestratorError>(stream) + }) .await? .take(chain_buffer_size); pin_mut!(blocks_stream); - while let Some(block_info) = blocks_stream.try_next().await? { + while let Some(block_info) = blocks_stream.as_mut().try_next().await? { let header = l2_client .get_block_by_hash(block_info.hash) .await? @@ -960,7 +968,7 @@ async fn consolidate_chain + 'static>( .retry("get_latest_safe_l2_info", || async { let tx = database.tx().await?; let safe_head = tx.get_latest_safe_l2_info().await?.expect("safe head must exist").0; - Ok::<_, DatabaseError>(safe_head) + Ok::<_, ChainOrchestratorError>(safe_head) }) .await?; @@ -1061,17 +1069,31 @@ async fn validate_l1_messages( // TODO: instead of using `l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))` to // determine the start of the L1 message stream, we should use a more robust method to determine // the start of the L1 message stream. - let tx = Retry::default().retry("get_l1_messages_new_tx", || database.tx()).await?; + let tx = Retry::default() + .retry("get_l1_messages_new_tx", || async { + let tx = database.tx().await?; + Ok::<_, ChainOrchestratorError>(tx) + }) + .await?; let l1_message_stream = Retry::default() - .retry("get_l1_messages", || { - tx.get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + .retry("get_l1_messages", || async { + let messages = tx + .get_l1_messages(l1_message_hashes.first().map(|tx| L1MessageStart::Hash(*tx))) + .await?; + Ok::<_, ChainOrchestratorError>(messages) }) .await?; pin_mut!(l1_message_stream); for message_hash in l1_message_hashes { // Get the expected L1 message from the database. - let expected_hash = l1_message_stream.next().await.unwrap().unwrap().transaction.tx_hash(); + let expected_hash = l1_message_stream + .as_mut() + .next() + .await + .map(|m| m.map(|msg| msg.transaction.tx_hash())) + .transpose()? + .ok_or(ChainOrchestratorError::L1MessageNotFound(L1MessageStart::Hash(message_hash)))?; // If the received and expected L1 messages do not match return an error. if message_hash != expected_hash { diff --git a/crates/chain-orchestrator/src/retry.rs b/crates/chain-orchestrator/src/retry.rs index 581a5c3c..0077dddd 100644 --- a/crates/chain-orchestrator/src/retry.rs +++ b/crates/chain-orchestrator/src/retry.rs @@ -19,6 +19,11 @@ impl Default for Retry { } } +/// A trait for errors that can indicate whether an operation can be retried. +pub trait CanRetry { + fn can_retry(&self) -> bool; +} + impl Retry { /// Creates a new [`Retry`] with the specified parameters. pub const fn new( @@ -34,7 +39,7 @@ impl Retry { where F: Fn() -> Fut, Fut: std::future::Future>, - E: std::fmt::Debug, + E: std::fmt::Debug + CanRetry, { let mut attempt: usize = 0; @@ -42,6 +47,11 @@ impl Retry { match operation().await { Ok(result) => return Ok(result), Err(error) => { + // If the error is not retryable, return immediately. + if !error.can_retry() { + return Err(error); + } + if let Some(max_retries) = self.max_retries { if attempt >= max_retries { return Err(error); @@ -73,9 +83,17 @@ impl Retry { #[cfg(test)] mod tests { - use super::Retry; + use super::{CanRetry, Retry}; use std::cell::RefCell; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + struct TestErr; + impl CanRetry for TestErr { + fn can_retry(&self) -> bool { + true + } + } + #[tokio::test] async fn test_retry_success_on_first_attempt() { let attempt = RefCell::new(0); @@ -83,7 +101,7 @@ mod tests { let result = retry .retry("test_operation", || { *attempt.borrow_mut() += 1; - async move { Ok::(42) } + async move { Ok::(42) } }) .await; @@ -101,7 +119,7 @@ mod tests { let current_attempt = *attempt.borrow(); async move { if current_attempt < 3 { - Err::("failed") + Err::(TestErr) } else { Ok(42) } @@ -120,11 +138,11 @@ mod tests { let result = retry .retry("test_operation", || { *attempt.borrow_mut() += 1; - async move { Err::("always fails") } + async move { Err::(TestErr) } }) .await; - assert_eq!(result, Err("always fails")); + assert_eq!(result, Err(TestErr)); assert_eq!(*attempt.borrow(), 3); // 1 initial + 2 retries } @@ -138,7 +156,7 @@ mod tests { let current_attempt = *attempt.borrow(); async move { if current_attempt < 2 { - Err::("failed") + Err::(TestErr) } else { Ok(42) } diff --git a/crates/database/db/src/error.rs b/crates/database/db/src/error.rs index 127ea0bb..0320a03f 100644 --- a/crates/database/db/src/error.rs +++ b/crates/database/db/src/error.rs @@ -1,6 +1,4 @@ use super::L1MessageStart; -use alloy_eips::BlockId; -use alloy_primitives::B256; use sea_orm::sqlx::Error as SqlxError; /// The error type for database operations. @@ -9,19 +7,13 @@ pub enum DatabaseError { /// A database error occurred. #[error("database error: {0}")] DatabaseError(#[from] sea_orm::DbErr), - /// A batch was not found in the database. - #[error("batch with hash [{0}] not found in database")] - BatchNotFound(B256), - /// The block was not found in database. - #[error("no block for id {0}")] - BlockNotFound(BlockId), + /// An error occurred at the sqlx level. + #[error("A sqlx error occurred: {0}")] + SqlxError(#[from] SqlxError), /// A generic error occurred. #[error("parse signature error: {0}")] ParseSignatureError(String), /// The L1 message was not found in database. #[error("L1 message at index [{0}] not found in database")] L1MessageNotFound(L1MessageStart), - /// An error occurred at the sqlx level. - #[error("A sqlx error occurred: {0}")] - SqlxError(#[from] SqlxError), } From df3922b99b2f07406d1247176d5ca309db4d39e5 Mon Sep 17 00:00:00 2001 From: Morty Date: Tue, 23 Sep 2025 17:26:16 +0800 Subject: [PATCH 9/9] address comment --- crates/chain-orchestrator/src/error.rs | 22 --------------------- crates/chain-orchestrator/src/lib.rs | 4 ++-- crates/chain-orchestrator/src/retry.rs | 27 +++++++++++++++++++++----- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/crates/chain-orchestrator/src/error.rs b/crates/chain-orchestrator/src/error.rs index 8c7576c2..e4338bbc 100644 --- a/crates/chain-orchestrator/src/error.rs +++ b/crates/chain-orchestrator/src/error.rs @@ -3,8 +3,6 @@ use alloy_primitives::B256; use alloy_transport::TransportErrorKind; use scroll_db::{DatabaseError, L1MessageStart}; -use crate::retry::CanRetry; - /// A type that represents an error that occurred in the chain orchestrator. #[derive(Debug, thiserror::Error)] pub enum ChainOrchestratorError { @@ -54,23 +52,3 @@ pub enum ChainOrchestratorError { #[error("An error occurred while making a JSON-RPC request to the EN: {0}")] RpcError(#[from] RpcError), } - -// Implement the local CanRetry trait for the external DatabaseError type so we can delegate. -impl CanRetry for DatabaseError { - fn can_retry(&self) -> bool { - matches!(self, Self::DatabaseError(_) | Self::SqlxError(_)) - } -} - -impl CanRetry for ChainOrchestratorError { - fn can_retry(&self) -> bool { - match self { - // Delegate to DatabaseError's classification - Self::DatabaseError(db) => db.can_retry(), - // Network and RPC errors are generally transient - Self::NetworkRequestError(_) | Self::RpcError(_) => true, - // All others are logical/state errors: do not retry - _ => false, - } - } -} diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index f603c05a..99d59d3c 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -711,7 +711,7 @@ impl< let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); // Perform a consistency check to ensure the previous L1 message exists in the database. - let _ = Retry::default() + Retry::default() .retry("handle_l1_message", || async { let tx = database.tx_mut().await?; if l1_message.transaction.queue_index > 0 && @@ -728,7 +728,7 @@ impl< tx.commit().await?; Ok::<_, ChainOrchestratorError>(()) }) - .await; + .await?; Ok(Some(event)) } diff --git a/crates/chain-orchestrator/src/retry.rs b/crates/chain-orchestrator/src/retry.rs index 0077dddd..35a7fa9d 100644 --- a/crates/chain-orchestrator/src/retry.rs +++ b/crates/chain-orchestrator/src/retry.rs @@ -19,11 +19,6 @@ impl Default for Retry { } } -/// A trait for errors that can indicate whether an operation can be retried. -pub trait CanRetry { - fn can_retry(&self) -> bool; -} - impl Retry { /// Creates a new [`Retry`] with the specified parameters. pub const fn new( @@ -81,6 +76,28 @@ impl Retry { } } +/// A trait for errors that can indicate whether an operation can be retried. +pub trait CanRetry { + fn can_retry(&self) -> bool; +} + +// Centralized retry classification impls +impl CanRetry for scroll_db::DatabaseError { + fn can_retry(&self) -> bool { + matches!(self, Self::DatabaseError(_) | Self::SqlxError(_)) + } +} + +impl CanRetry for crate::error::ChainOrchestratorError { + fn can_retry(&self) -> bool { + match self { + Self::DatabaseError(db) => db.can_retry(), + Self::NetworkRequestError(_) | Self::RpcError(_) => true, + _ => false, + } + } +} + #[cfg(test)] mod tests { use super::{CanRetry, Retry};