From bf8f7a19d994535230feedb4974989c2743fe4b6 Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 2 Sep 2025 19:54:45 +0800 Subject: [PATCH 1/7] fix l1 message indexing --- Cargo.lock | 1 + crates/chain-orchestrator/Cargo.toml | 1 + crates/chain-orchestrator/src/lib.rs | 42 ++++++++++++++++------------ crates/node/src/args.rs | 12 ++++++++ crates/node/src/constants.rs | 6 ++++ 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a924fae7..ae69a71f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10466,6 +10466,7 @@ dependencies = [ "reth-scroll-chainspec", "reth-scroll-forks", "reth-scroll-primitives", + "reth-tracing", "rollup-node-primitives", "rollup-node-watcher", "scroll-alloy-consensus", diff --git a/crates/chain-orchestrator/Cargo.toml b/crates/chain-orchestrator/Cargo.toml index 55fc9522..16fa7a2a 100644 --- a/crates/chain-orchestrator/Cargo.toml +++ b/crates/chain-orchestrator/Cargo.toml @@ -63,6 +63,7 @@ reth-scroll-forks.workspace = true # reth reth-eth-wire-types.workspace = true reth-network-peers.workspace = true +reth-tracing.workspace = true # misc arbitrary.workspace = true diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index c611e600..193d3f07 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -15,7 +15,7 @@ use rollup_node_primitives::{ }; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; -use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks}; +use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, UnwindResult}; use scroll_network::NewBlockWithPeer; @@ -79,6 +79,8 @@ pub struct ChainOrchestrator { chain_buffer_size: usize, /// A boolean to represent if the L1 has been synced. l1_synced: bool, + /// The L1 message index at which queue hashes should be computed. + l1_message_queue_index_boundary: u64, /// The waker to notify when the engine driver should be polled. waker: AtomicWaker, } @@ -97,6 +99,7 @@ impl< l2_client: P, optimistic_sync_threshold: u64, chain_buffer_size: usize, + l1_message_queue_index_boundary: u64, ) -> Result { let chain = init_chain_from_db(&database, &l2_client, chain_buffer_size).await?; Ok(Self { @@ -117,6 +120,7 @@ impl< optimistic_sync_threshold, chain_buffer_size, l1_synced: false, + l1_message_queue_index_boundary, waker: AtomicWaker::new(), }) } @@ -534,15 +538,14 @@ impl< Box::pin(Self::handle_batch_commit(self.database.clone(), batch)), )) } - L1Notification::L1Message { message, block_number, block_timestamp } => { + L1Notification::L1Message { message, block_number, block_timestamp: _ } => { ChainOrchestratorFuture::HandleL1Message(self.handle_metered( ChainOrchestratorItem::L1Message, Box::pin(Self::handle_l1_message( self.database.clone(), - self.chain_spec.clone(), message, block_number, - block_timestamp, + self.l1_message_queue_index_boundary, )), )) } @@ -624,18 +627,13 @@ impl< /// Handles an L1 message by inserting it into the database. async fn handle_l1_message( database: Arc, - chain_spec: Arc, l1_message: TxL1Message, l1_block_number: u64, - block_timestamp: u64, + l1_message_queue_index_boundary: u64, ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - let queue_hash = if chain_spec - .scroll_fork_activation(ScrollHardfork::EuclidV2) - .active_at_timestamp_or_number(block_timestamp, l1_block_number) && - l1_message.queue_index > 0 - { + let queue_hash = if l1_message.queue_index > l1_message_queue_index_boundary { let index = l1_message.queue_index - 1; let prev_queue_hash = database .get_l1_message_by_index(index) @@ -954,6 +952,7 @@ mod test { const TEST_OPTIMISTIC_SYNC_THRESHOLD: u64 = 100; const TEST_CHAIN_BUFFER_SIZE: usize = 2000; + const TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885; /// A headers+bodies client that stores the headers and bodies in memory, with an artificial /// soft bodies response limit that is set to 20 by default. @@ -1105,6 +1104,7 @@ mod test { .expect("Failed to parse mainnet genesis block"); assertor.push_success(&mainnet_genesis); let provider = ProviderBuilder::<_, _, Scroll>::default().connect_mocked_client(assertor); + let db = Arc::new(setup_test_db().await); ( ChainOrchestrator::new( @@ -1114,6 +1114,7 @@ mod test { provider, TEST_OPTIMISTIC_SYNC_THRESHOLD, TEST_CHAIN_BUFFER_SIZE, + TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, ) .await .unwrap(), @@ -1274,6 +1275,8 @@ mod test { #[tokio::test] async fn test_handle_l1_message() { + reth_tracing::init_test_tracing(); + // Instantiate chain orchestrator and db let (mut chain_orchestrator, db) = setup_test_chain_orchestrator().await; @@ -1283,7 +1286,7 @@ mod test { let mut u = Unstructured::new(&bytes); let message = TxL1Message { - queue_index: i64::arbitrary(&mut u).unwrap().unsigned_abs(), + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_number = u64::arbitrary(&mut u).unwrap(); @@ -1309,7 +1312,10 @@ mod test { // insert the previous L1 message in database. chain_orchestrator.handle_l1_notification(L1Notification::L1Message { - message: TxL1Message { queue_index: 1062109, ..Default::default() }, + message: TxL1Message { + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, + ..Default::default() + }, block_number: 1475588, block_timestamp: 1745305199, }); @@ -1317,7 +1323,7 @@ mod test { // let message = TxL1Message { - queue_index: 1062110, + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1, gas_limit: 168000, to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"), value: U256::ZERO, @@ -1336,7 +1342,7 @@ mod test { db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap(); assert_eq!( - b256!("5e48ae1092c7f912849b9935f4e66870d2034b24fb2016f506e6754900000000"), + b256!("322881db10fa96b7bfed5a51a24d5a1ab86ab8fc7e0dab1b4ee4146f00000000"), l1_message_result.queue_hash.unwrap() ); } @@ -1380,19 +1386,19 @@ mod test { queue_hash: None, l1_block_number: 1, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 1, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; let l1_message_block_20 = L1MessageEnvelope { queue_hash: None, l1_block_number: 20, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 2, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; let l1_message_block_30 = L1MessageEnvelope { queue_hash: None, l1_block_number: 30, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 3, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; // Index L1 messages diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 24bb869f..6fcaf08c 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -327,6 +327,7 @@ impl ScrollRollupNodeConfig { l2_provider, self.chain_orchestrator_args.optimistic_sync_trigger, self.chain_orchestrator_args.chain_buffer_size, + l1_message_queue_index_boundary(chain_spec.chain().named()), ) .await?; @@ -656,6 +657,17 @@ const fn td_constant(chain: NamedChain) -> U128 { } } +/// The L1 message queue index at which queue hashes should be computed . +const fn l1_message_queue_index_boundary(chain: Option) -> u64 { + match chain { + Some(NamedChain::Scroll) => constants::SCROLL_MAINNET_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, + Some(NamedChain::ScrollSepolia) => { + constants::SCROLL_SEPOLIA_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + } + _ => 0, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 5dca72ca..0919aae2 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -40,3 +40,9 @@ pub(crate) const SCROLL_MAINNET_TD_CONSTANT: U128 = U128::from_limbs([14906960, /// The constant value that must be added to the block number to get the total difficulty for Scroll /// Sepolia. pub(crate) const SCROLL_SEPOLIA_TD_CONSTANT: U128 = U128::from_limbs([8484488, 0]); + +/// The L1 message queue index at which queue hashes should be computed for mainnet. +pub(crate) const SCROLL_MAINNET_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885; + +/// The L1 message queue index at which queue hashes should be computed for sepolia. +pub(crate) const SCROLL_SEPOLIA_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 1062110; From 8dc4899a04a67347efab8b09eba9a2b2fbf615dc Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 2 Sep 2025 20:40:31 +0800 Subject: [PATCH 2/7] update queue hash boundary logic --- crates/chain-orchestrator/src/lib.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 193d3f07..e36b5d10 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -633,15 +633,20 @@ impl< ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - let queue_hash = if l1_message.queue_index > l1_message_queue_index_boundary { - let index = l1_message.queue_index - 1; - let prev_queue_hash = database - .get_l1_message_by_index(index) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?; + let queue_hash = if l1_message.queue_index >= l1_message_queue_index_boundary { + let mut input = if l1_message.queue_index == 0 { + B256::default().to_vec() + } else { + let index = l1_message.queue_index - 1; + database + .get_l1_message_by_index(index) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec() + }; - let mut input = prev_queue_hash.unwrap_or_default().to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else { @@ -1313,7 +1318,7 @@ mod test { // insert the previous L1 message in database. chain_orchestrator.handle_l1_notification(L1Notification::L1Message { message: TxL1Message { - queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1, ..Default::default() }, block_number: 1475588, @@ -1323,7 +1328,7 @@ mod test { // let message = TxL1Message { - queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1, + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY , gas_limit: 168000, to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"), value: U256::ZERO, @@ -1342,7 +1347,7 @@ mod test { db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap(); assert_eq!( - b256!("322881db10fa96b7bfed5a51a24d5a1ab86ab8fc7e0dab1b4ee4146f00000000"), + b256!("390cc9241304858dc5b0cf49049630ef65ac8473d3238faca853a5d700000000"), l1_message_result.queue_hash.unwrap() ); } From c6463091f757a396779d8077e2f292d8a3367613 Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 2 Sep 2025 22:57:40 +0800 Subject: [PATCH 3/7] update queue hash boundary logic --- crates/chain-orchestrator/src/lib.rs | 31 +++++++++++++--------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index e36b5d10..99bc28fb 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -633,20 +633,17 @@ impl< ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - let queue_hash = if l1_message.queue_index >= l1_message_queue_index_boundary { - let mut input = if l1_message.queue_index == 0 { - B256::default().to_vec() - } else { - let index = l1_message.queue_index - 1; - database - .get_l1_message_by_index(index) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? - .unwrap_or_default() - .to_vec() - }; - + let queue_hash = if l1_message.queue_index == l1_message_queue_index_boundary { + Some(B256::default()) + } else if l1_message.queue_index > l1_message_queue_index_boundary { + let index = l1_message.queue_index - 1; + let mut input = database + .get_l1_message_by_index(index) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else { @@ -1318,7 +1315,7 @@ mod test { // insert the previous L1 message in database. chain_orchestrator.handle_l1_notification(L1Notification::L1Message { message: TxL1Message { - queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1, + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, ..Default::default() }, block_number: 1475588, @@ -1328,7 +1325,7 @@ mod test { // let message = TxL1Message { - queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY , + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1, gas_limit: 168000, to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"), value: U256::ZERO, @@ -1347,7 +1344,7 @@ mod test { db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap(); assert_eq!( - b256!("390cc9241304858dc5b0cf49049630ef65ac8473d3238faca853a5d700000000"), + b256!("322881db10fa96b7bfed5a51a24d5a1ab86ab8fc7e0dab1b4ee4146f00000000"), l1_message_result.queue_hash.unwrap() ); } From 8ba0f44782522d29f06e2c1805e252700bedf6fe Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 3 Sep 2025 00:11:10 +0800 Subject: [PATCH 4/7] update queue hash boundary logic --- crates/chain-orchestrator/src/lib.rs | 4 +++- crates/derivation-pipeline/src/lib.rs | 15 +++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 99bc28fb..822d5ec9 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -634,7 +634,9 @@ impl< let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); let queue_hash = if l1_message.queue_index == l1_message_queue_index_boundary { - Some(B256::default()) + let mut input = B256::default().to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); + Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) } else if l1_message.queue_index > l1_message_queue_index_boundary { let index = l1_message.queue_index - 1; let mut input = database diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 93d3a27e..fcd68590 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -260,10 +260,17 @@ pub async fn derive V2 + // message queue migration. + if hash != &B256::default() { + l1_provider.set_hash_cursor(*hash).await; + // we skip the first l1 message, as we are interested in the one starting after + // prev_l1_message_queue_hash. + let _ = l1_provider.next_l1_message().await.map_err(Into::into)?; + } } else { return Err(DerivationPipelineError::MissingL1MessageQueueCursor) } From 98f338bf9a5f04881f944a49185a44e51e27b61c Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 3 Sep 2025 00:41:42 +0800 Subject: [PATCH 5/7] update queue hash boundary logic --- crates/chain-orchestrator/src/lib.rs | 18 ++++----- .../derivation-pipeline/benches/pipeline.rs | 2 +- crates/derivation-pipeline/src/lib.rs | 40 ++++++++++++------- crates/manager/src/manager/mod.rs | 4 +- crates/node/src/args.rs | 13 +++--- crates/node/src/constants.rs | 8 ++-- 6 files changed, 49 insertions(+), 36 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 822d5ec9..fa9a70b0 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -79,8 +79,8 @@ pub struct ChainOrchestrator { chain_buffer_size: usize, /// A boolean to represent if the L1 has been synced. l1_synced: bool, - /// The L1 message index at which queue hashes should be computed. - l1_message_queue_index_boundary: u64, + /// The L1 message queue index at which the V2 L1 message queue was enabled. + l1_v2_message_queue_start_index: u64, /// The waker to notify when the engine driver should be polled. waker: AtomicWaker, } @@ -99,7 +99,7 @@ impl< l2_client: P, optimistic_sync_threshold: u64, chain_buffer_size: usize, - l1_message_queue_index_boundary: u64, + l1_v2_message_queue_start_index: u64, ) -> Result { let chain = init_chain_from_db(&database, &l2_client, chain_buffer_size).await?; Ok(Self { @@ -120,7 +120,7 @@ impl< optimistic_sync_threshold, chain_buffer_size, l1_synced: false, - l1_message_queue_index_boundary, + l1_v2_message_queue_start_index, waker: AtomicWaker::new(), }) } @@ -545,7 +545,7 @@ impl< self.database.clone(), message, block_number, - self.l1_message_queue_index_boundary, + self.l1_v2_message_queue_start_index, )), )) } @@ -629,15 +629,15 @@ impl< database: Arc, l1_message: TxL1Message, l1_block_number: u64, - l1_message_queue_index_boundary: u64, + l1_v2_message_queue_start_index: u64, ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - let queue_hash = if l1_message.queue_index == l1_message_queue_index_boundary { + let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index { let mut input = B256::default().to_vec(); input.append(&mut l1_message.tx_hash().to_vec()); Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) - } else if l1_message.queue_index > l1_message_queue_index_boundary { + } else if l1_message.queue_index > l1_v2_message_queue_start_index { let index = l1_message.queue_index - 1; let mut input = database .get_l1_message_by_index(index) @@ -1346,7 +1346,7 @@ mod test { db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap(); assert_eq!( - b256!("322881db10fa96b7bfed5a51a24d5a1ab86ab8fc7e0dab1b4ee4146f00000000"), + b256!("b2331b9010aac89f012d648fccc1f0a9aa5ef7b7b2afe21be297dd1a00000000"), l1_message_result.queue_hash.unwrap() ); } diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs index 127c650f..242ac273 100644 --- a/crates/derivation-pipeline/benches/pipeline.rs +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -67,7 +67,7 @@ async fn setup_pipeline( // construct the pipeline. let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - DerivationPipeline::new(mock_l1_provider, db) + DerivationPipeline::new(mock_l1_provider, db, u64::MAX) } fn benchmark_pipeline_derivation(c: &mut Criterion) { diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index fcd68590..95791dc7 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -56,6 +56,8 @@ pub struct DerivationPipeline

{ database: Arc, /// A L1 provider. l1_provider: P, + /// The L1 message queue index at which the V2 L1 message queue was enabled. + l1_v2_message_queue_start_index: u64, /// The queue of batches to handle. batch_queue: VecDeque>>, /// The queue of polled attributes. @@ -90,10 +92,15 @@ where P: L1Provider + Clone + Send + Sync + 'static, { /// Returns a new instance of the [`DerivationPipeline`]. - pub fn new(l1_provider: P, database: Arc) -> Self { + pub fn new( + l1_provider: P, + database: Arc, + l1_v2_message_queue_start_index: u64, + ) -> Self { Self { database, l1_provider, + l1_v2_message_queue_start_index, batch_queue: Default::default(), pipeline_future: None, attributes_queue: Default::default(), @@ -119,6 +126,7 @@ where let database = self.database.clone(); let metrics = self.metrics.clone(); let provider = self.l1_provider.clone(); + let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index; if let Some(info) = self.batch_queue.pop_front() { let block_number = info.number; @@ -136,8 +144,9 @@ where .ok_or((info.clone(), DerivationPipelineError::UnknownBatch(index)))?; // derive the attributes and attach the corresponding batch info. - let attrs = - derive(batch, provider, database).await.map_err(|err| (info.clone(), err))?; + let attrs = derive(batch, provider, database, l1_v2_message_queue_start_index) + .await + .map_err(|err| (info.clone(), err))?; // update metrics. metrics.derived_blocks.increment(attrs.len() as u64); @@ -245,6 +254,7 @@ pub async fn derive Result, DerivationPipelineError> { // fetch the blob then decode the input batch. let blob = if let Some(hash) = batch.blob_versioned_hash { @@ -260,12 +270,10 @@ pub async fn derive V2 - // message queue migration. - if hash != &B256::default() { + // If the message queue hash is zero then we should use the V2 L1 message queue start index. + if hash == &B256::ZERO { + l1_provider.set_queue_index_cursor(l1_v2_message_queue_start_index); + } else { l1_provider.set_hash_cursor(*hash).await; // we skip the first l1 message, as we are interested in the one starting after // prev_l1_message_queue_hash. @@ -418,6 +426,7 @@ mod tests { )), database: db, l1_provider: mock_l1_provider, + l1_v2_message_queue_start_index: u64::MAX, batch_queue: [ WithFinalizedBlockNumber::new( 0, @@ -477,7 +486,7 @@ mod tests { // construct the pipeline. let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone()); + let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX); // as long as we don't call `push_batch`, pipeline should not return attributes. pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0); @@ -544,7 +553,7 @@ mod tests { // construct the pipeline. let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - let mut pipeline = DerivationPipeline::new(mock_l1_provider, db); + let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX); // as long as we don't call `push_batch`, pipeline should not return attributes. pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0); @@ -603,7 +612,7 @@ mod tests { let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; let l2_provider = MockL2Provider; - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let attribute = attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap(); @@ -702,7 +711,7 @@ mod tests { let l2_provider = MockL2Provider; // derive attributes and extract l1 messages. - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .into_iter() .filter_map(|a| a.transactions) @@ -756,7 +765,7 @@ mod tests { let l2_provider = MockL2Provider; // derive attributes and extract l1 messages. - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .into_iter() .filter_map(|a| a.transactions) @@ -870,7 +879,7 @@ mod tests { }; let l2_provider = MockL2Provider; - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let attribute = attributes.last().unwrap(); let expected = ScrollPayloadAttributes { @@ -925,6 +934,7 @@ mod tests { )), database: db, l1_provider: mock_l1_provider, + l1_v2_message_queue_start_index: u64::MAX, batch_queue: batches.collect(), attributes_queue: attributes, waker: None, diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index eabe7a4f..8441f5b7 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -173,9 +173,11 @@ where signer: Option, block_time: Option, chain_orchestrator: ChainOrchestrator::Client, P>, + l1_v2_message_queue_start_index: u64, ) -> (Self, RollupManagerHandle) { let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE); - let derivation_pipeline = DerivationPipeline::new(l1_provider, database); + let derivation_pipeline = + DerivationPipeline::new(l1_provider, database, l1_v2_message_queue_start_index); let rnm = Self { handle_rx, chain_spec, diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 6fcaf08c..4eee6666 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -320,6 +320,8 @@ impl ScrollRollupNodeConfig { .fetch_client() .await .expect("failed to fetch block client"); + let l1_v2_message_queue_start_index = + l1_v2_message_queue_start_index(chain_spec.chain().named()); let chain_orchestrator = ChainOrchestrator::new( db.clone(), chain_spec.clone(), @@ -327,7 +329,7 @@ impl ScrollRollupNodeConfig { l2_provider, self.chain_orchestrator_args.optimistic_sync_trigger, self.chain_orchestrator_args.chain_buffer_size, - l1_message_queue_index_boundary(chain_spec.chain().named()), + l1_v2_message_queue_start_index, ) .await?; @@ -344,6 +346,7 @@ impl ScrollRollupNodeConfig { signer, block_time, chain_orchestrator, + l1_v2_message_queue_start_index, ) .await; Ok((rnm, handle, l1_notification_tx)) @@ -658,12 +661,10 @@ const fn td_constant(chain: NamedChain) -> U128 { } /// The L1 message queue index at which queue hashes should be computed . -const fn l1_message_queue_index_boundary(chain: Option) -> u64 { +const fn l1_v2_message_queue_start_index(chain: Option) -> u64 { match chain { - Some(NamedChain::Scroll) => constants::SCROLL_MAINNET_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, - Some(NamedChain::ScrollSepolia) => { - constants::SCROLL_SEPOLIA_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - } + Some(NamedChain::Scroll) => constants::SCROLL_MAINNET_V2_MESSAGE_QUEUE_START_INDEX, + Some(NamedChain::ScrollSepolia) => constants::SCROLL_SEPOLIA_V2_MESSAGE_QUEUE_START_INDEX, _ => 0, } } diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 0919aae2..8c0be884 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -41,8 +41,8 @@ pub(crate) const SCROLL_MAINNET_TD_CONSTANT: U128 = U128::from_limbs([14906960, /// Sepolia. pub(crate) const SCROLL_SEPOLIA_TD_CONSTANT: U128 = U128::from_limbs([8484488, 0]); -/// The L1 message queue index at which queue hashes should be computed for mainnet. -pub(crate) const SCROLL_MAINNET_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885; +/// The L1 message queue index at which the V2 L1 message queue was enabled on mainnet. +pub(crate) const SCROLL_MAINNET_V2_MESSAGE_QUEUE_START_INDEX: u64 = 953885; -/// The L1 message queue index at which queue hashes should be computed for sepolia. -pub(crate) const SCROLL_SEPOLIA_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 1062110; +/// The L1 message queue index at which queue hashes should be computed on sepolia. +pub(crate) const SCROLL_SEPOLIA_V2_MESSAGE_QUEUE_START_INDEX: u64 = 1062110; From 5aa46ccbd0825d73c5fe9cc12c3d3aa74c08c88c Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 3 Sep 2025 13:11:20 +0800 Subject: [PATCH 6/7] address comments --- crates/chain-orchestrator/src/lib.rs | 60 +++++++++++++++++---------- crates/derivation-pipeline/src/lib.rs | 3 ++ 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index fa9a70b0..c6106f97 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -542,10 +542,10 @@ impl< ChainOrchestratorFuture::HandleL1Message(self.handle_metered( ChainOrchestratorItem::L1Message, Box::pin(Self::handle_l1_message( + self.l1_v2_message_queue_start_index, self.database.clone(), message, block_number, - self.l1_v2_message_queue_start_index, )), )) } @@ -626,32 +626,15 @@ impl< /// Handles an L1 message by inserting it into the database. async fn handle_l1_message( + l1_v2_message_queue_start_index: u64, database: Arc, l1_message: TxL1Message, l1_block_number: u64, - l1_v2_message_queue_start_index: u64, ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - - let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index { - let mut input = B256::default().to_vec(); - input.append(&mut l1_message.tx_hash().to_vec()); - Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) - } else if l1_message.queue_index > l1_v2_message_queue_start_index { - let index = l1_message.queue_index - 1; - let mut input = database - .get_l1_message_by_index(index) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? - .unwrap_or_default() - .to_vec(); - input.append(&mut l1_message.tx_hash().to_vec()); - Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) - } else { - None - }; - + let queue_hash = + compute_l1_message_queue_hash(&database, &l1_message, l1_v2_message_queue_start_index) + .await?; let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); database.insert_l1_message(l1_message).await?; Ok(Some(event)) @@ -702,6 +685,39 @@ impl< } } +/// Computes the queue hash by taking the previous queue hash and performing a 2-to-1 hash with the +/// current transaction hash using keccak. It then applies a mask which is required for the kzg +/// field used for blobs on ethereum. For the first message in the queue, the previous queue +/// hash is zero. If the L1 message queue index is before migration to `L1MessageQueueV2`, +/// the queue hash will be None. +/// +/// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: +async fn compute_l1_message_queue_hash( + database: &Arc, + l1_message: &TxL1Message, + l1_v2_message_queue_start_index: u64, +) -> Result>, ChainOrchestratorError> { + let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index { + let mut input = B256::default().to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); + Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) + } else if l1_message.queue_index > l1_v2_message_queue_start_index { + let index = l1_message.queue_index - 1; + let mut input = database + .get_l1_message_by_index(index) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); + Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) + } else { + None + }; + Ok(queue_hash) +} + async fn init_chain_from_db + 'static>( database: &Arc, l2_client: &P, diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 95791dc7..d7b8eb24 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -271,6 +271,9 @@ pub async fn derive Date: Wed, 3 Sep 2025 14:20:31 +0800 Subject: [PATCH 7/7] update comment --- crates/chain-orchestrator/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index c6106f97..f20f89dd 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -686,10 +686,10 @@ impl< } /// Computes the queue hash by taking the previous queue hash and performing a 2-to-1 hash with the -/// current transaction hash using keccak. It then applies a mask which is required for the kzg -/// field used for blobs on ethereum. For the first message in the queue, the previous queue -/// hash is zero. If the L1 message queue index is before migration to `L1MessageQueueV2`, -/// the queue hash will be None. +/// current transaction hash using keccak. It then applies a mask to the last 32 bits as these bits +/// are used to store the timestamp at which the message was enqueued in the contract. For the first +/// message in the queue, the previous queue hash is zero. If the L1 message queue index is before +/// migration to `L1MessageQueueV2`, the queue hash will be None. /// /// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: async fn compute_l1_message_queue_hash(