diff --git a/Cargo.lock b/Cargo.lock index f5bc475c..a650f8e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10468,6 +10468,7 @@ dependencies = [ "reth-scroll-chainspec", "reth-scroll-forks", "reth-scroll-primitives", + "reth-tracing", "rollup-node-primitives", "rollup-node-watcher", "scroll-alloy-consensus", diff --git a/crates/chain-orchestrator/Cargo.toml b/crates/chain-orchestrator/Cargo.toml index 55fc9522..16fa7a2a 100644 --- a/crates/chain-orchestrator/Cargo.toml +++ b/crates/chain-orchestrator/Cargo.toml @@ -63,6 +63,7 @@ reth-scroll-forks.workspace = true # reth reth-eth-wire-types.workspace = true reth-network-peers.workspace = true +reth-tracing.workspace = true # misc arbitrary.workspace = true diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index c611e600..f20f89dd 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -15,7 +15,7 @@ use rollup_node_primitives::{ }; use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; -use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks}; +use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, UnwindResult}; use scroll_network::NewBlockWithPeer; @@ -79,6 +79,8 @@ pub struct ChainOrchestrator { chain_buffer_size: usize, /// A boolean to represent if the L1 has been synced. l1_synced: bool, + /// The L1 message queue index at which the V2 L1 message queue was enabled. + l1_v2_message_queue_start_index: u64, /// The waker to notify when the engine driver should be polled. waker: AtomicWaker, } @@ -97,6 +99,7 @@ impl< l2_client: P, optimistic_sync_threshold: u64, chain_buffer_size: usize, + l1_v2_message_queue_start_index: u64, ) -> Result { let chain = init_chain_from_db(&database, &l2_client, chain_buffer_size).await?; Ok(Self { @@ -117,6 +120,7 @@ impl< optimistic_sync_threshold, chain_buffer_size, l1_synced: false, + l1_v2_message_queue_start_index, waker: AtomicWaker::new(), }) } @@ -534,15 +538,14 @@ impl< Box::pin(Self::handle_batch_commit(self.database.clone(), batch)), )) } - L1Notification::L1Message { message, block_number, block_timestamp } => { + L1Notification::L1Message { message, block_number, block_timestamp: _ } => { ChainOrchestratorFuture::HandleL1Message(self.handle_metered( ChainOrchestratorItem::L1Message, Box::pin(Self::handle_l1_message( + self.l1_v2_message_queue_start_index, self.database.clone(), - self.chain_spec.clone(), message, block_number, - block_timestamp, )), )) } @@ -623,33 +626,15 @@ impl< /// Handles an L1 message by inserting it into the database. async fn handle_l1_message( + l1_v2_message_queue_start_index: u64, database: Arc, - chain_spec: Arc, l1_message: TxL1Message, l1_block_number: u64, - block_timestamp: u64, ) -> Result, ChainOrchestratorError> { let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); - - let queue_hash = if chain_spec - .scroll_fork_activation(ScrollHardfork::EuclidV2) - .active_at_timestamp_or_number(block_timestamp, l1_block_number) && - l1_message.queue_index > 0 - { - let index = l1_message.queue_index - 1; - let prev_queue_hash = database - .get_l1_message_by_index(index) - .await? - .map(|m| m.queue_hash) - .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?; - - let mut input = prev_queue_hash.unwrap_or_default().to_vec(); - input.append(&mut l1_message.tx_hash().to_vec()); - Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) - } else { - None - }; - + let queue_hash = + compute_l1_message_queue_hash(&database, &l1_message, l1_v2_message_queue_start_index) + .await?; let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash); database.insert_l1_message(l1_message).await?; Ok(Some(event)) @@ -700,6 +685,39 @@ impl< } } +/// Computes the queue hash by taking the previous queue hash and performing a 2-to-1 hash with the +/// current transaction hash using keccak. It then applies a mask to the last 32 bits as these bits +/// are used to store the timestamp at which the message was enqueued in the contract. For the first +/// message in the queue, the previous queue hash is zero. If the L1 message queue index is before +/// migration to `L1MessageQueueV2`, the queue hash will be None. +/// +/// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: +async fn compute_l1_message_queue_hash( + database: &Arc, + l1_message: &TxL1Message, + l1_v2_message_queue_start_index: u64, +) -> Result>, ChainOrchestratorError> { + let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index { + let mut input = B256::default().to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); + Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) + } else if l1_message.queue_index > l1_v2_message_queue_start_index { + let index = l1_message.queue_index - 1; + let mut input = database + .get_l1_message_by_index(index) + .await? + .map(|m| m.queue_hash) + .ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))? + .unwrap_or_default() + .to_vec(); + input.append(&mut l1_message.tx_hash().to_vec()); + Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK) + } else { + None + }; + Ok(queue_hash) +} + async fn init_chain_from_db + 'static>( database: &Arc, l2_client: &P, @@ -954,6 +972,7 @@ mod test { const TEST_OPTIMISTIC_SYNC_THRESHOLD: u64 = 100; const TEST_CHAIN_BUFFER_SIZE: usize = 2000; + const TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885; /// A headers+bodies client that stores the headers and bodies in memory, with an artificial /// soft bodies response limit that is set to 20 by default. @@ -1105,6 +1124,7 @@ mod test { .expect("Failed to parse mainnet genesis block"); assertor.push_success(&mainnet_genesis); let provider = ProviderBuilder::<_, _, Scroll>::default().connect_mocked_client(assertor); + let db = Arc::new(setup_test_db().await); ( ChainOrchestrator::new( @@ -1114,6 +1134,7 @@ mod test { provider, TEST_OPTIMISTIC_SYNC_THRESHOLD, TEST_CHAIN_BUFFER_SIZE, + TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, ) .await .unwrap(), @@ -1274,6 +1295,8 @@ mod test { #[tokio::test] async fn test_handle_l1_message() { + reth_tracing::init_test_tracing(); + // Instantiate chain orchestrator and db let (mut chain_orchestrator, db) = setup_test_chain_orchestrator().await; @@ -1283,7 +1306,7 @@ mod test { let mut u = Unstructured::new(&bytes); let message = TxL1Message { - queue_index: i64::arbitrary(&mut u).unwrap().unsigned_abs(), + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1, ..Arbitrary::arbitrary(&mut u).unwrap() }; let block_number = u64::arbitrary(&mut u).unwrap(); @@ -1309,7 +1332,10 @@ mod test { // insert the previous L1 message in database. chain_orchestrator.handle_l1_notification(L1Notification::L1Message { - message: TxL1Message { queue_index: 1062109, ..Default::default() }, + message: TxL1Message { + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY, + ..Default::default() + }, block_number: 1475588, block_timestamp: 1745305199, }); @@ -1317,7 +1343,7 @@ mod test { // let message = TxL1Message { - queue_index: 1062110, + queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1, gas_limit: 168000, to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"), value: U256::ZERO, @@ -1336,7 +1362,7 @@ mod test { db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap(); assert_eq!( - b256!("5e48ae1092c7f912849b9935f4e66870d2034b24fb2016f506e6754900000000"), + b256!("b2331b9010aac89f012d648fccc1f0a9aa5ef7b7b2afe21be297dd1a00000000"), l1_message_result.queue_hash.unwrap() ); } @@ -1380,19 +1406,19 @@ mod test { queue_hash: None, l1_block_number: 1, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 1, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; let l1_message_block_20 = L1MessageEnvelope { queue_hash: None, l1_block_number: 20, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 2, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; let l1_message_block_30 = L1MessageEnvelope { queue_hash: None, l1_block_number: 30, l2_block_number: None, - ..Arbitrary::arbitrary(&mut u).unwrap() + transaction: TxL1Message { queue_index: 3, ..Arbitrary::arbitrary(&mut u).unwrap() }, }; // Index L1 messages diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs index 127c650f..242ac273 100644 --- a/crates/derivation-pipeline/benches/pipeline.rs +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -67,7 +67,7 @@ async fn setup_pipeline( // construct the pipeline. let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() }; - DerivationPipeline::new(mock_l1_provider, db) + DerivationPipeline::new(mock_l1_provider, db, u64::MAX) } fn benchmark_pipeline_derivation(c: &mut Criterion) { diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 93d3a27e..d7b8eb24 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -56,6 +56,8 @@ pub struct DerivationPipeline

{ database: Arc, /// A L1 provider. l1_provider: P, + /// The L1 message queue index at which the V2 L1 message queue was enabled. + l1_v2_message_queue_start_index: u64, /// The queue of batches to handle. batch_queue: VecDeque>>, /// The queue of polled attributes. @@ -90,10 +92,15 @@ where P: L1Provider + Clone + Send + Sync + 'static, { /// Returns a new instance of the [`DerivationPipeline`]. - pub fn new(l1_provider: P, database: Arc) -> Self { + pub fn new( + l1_provider: P, + database: Arc, + l1_v2_message_queue_start_index: u64, + ) -> Self { Self { database, l1_provider, + l1_v2_message_queue_start_index, batch_queue: Default::default(), pipeline_future: None, attributes_queue: Default::default(), @@ -119,6 +126,7 @@ where let database = self.database.clone(); let metrics = self.metrics.clone(); let provider = self.l1_provider.clone(); + let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index; if let Some(info) = self.batch_queue.pop_front() { let block_number = info.number; @@ -136,8 +144,9 @@ where .ok_or((info.clone(), DerivationPipelineError::UnknownBatch(index)))?; // derive the attributes and attach the corresponding batch info. - let attrs = - derive(batch, provider, database).await.map_err(|err| (info.clone(), err))?; + let attrs = derive(batch, provider, database, l1_v2_message_queue_start_index) + .await + .map_err(|err| (info.clone(), err))?; // update metrics. metrics.derived_blocks.increment(attrs.len() as u64); @@ -245,6 +254,7 @@ pub async fn derive Result, DerivationPipelineError> { // fetch the blob then decode the input batch. let blob = if let Some(hash) = batch.blob_versioned_hash { @@ -260,10 +270,18 @@ pub async fn derive = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let attribute = attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap(); @@ -695,7 +714,7 @@ mod tests { let l2_provider = MockL2Provider; // derive attributes and extract l1 messages. - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .into_iter() .filter_map(|a| a.transactions) @@ -749,7 +768,7 @@ mod tests { let l2_provider = MockL2Provider; // derive attributes and extract l1 messages. - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let derived_l1_messages: Vec<_> = attributes .into_iter() .filter_map(|a| a.transactions) @@ -863,7 +882,7 @@ mod tests { }; let l2_provider = MockL2Provider; - let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?; + let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?; let attribute = attributes.last().unwrap(); let expected = ScrollPayloadAttributes { @@ -918,6 +937,7 @@ mod tests { )), database: db, l1_provider: mock_l1_provider, + l1_v2_message_queue_start_index: u64::MAX, batch_queue: batches.collect(), attributes_queue: attributes, waker: None, diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index cc405a70..6d376c63 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -176,9 +176,11 @@ where signer: Option, block_time: Option, chain_orchestrator: ChainOrchestrator::Client, P>, + l1_v2_message_queue_start_index: u64, ) -> (Self, RollupManagerHandle) { let (handle_tx, handle_rx) = mpsc::channel(EVENT_CHANNEL_SIZE); - let derivation_pipeline = DerivationPipeline::new(l1_provider, database); + let derivation_pipeline = + DerivationPipeline::new(l1_provider, database, l1_v2_message_queue_start_index); let rnm = Self { handle_rx, chain_spec, diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index 24bb869f..4eee6666 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -320,6 +320,8 @@ impl ScrollRollupNodeConfig { .fetch_client() .await .expect("failed to fetch block client"); + let l1_v2_message_queue_start_index = + l1_v2_message_queue_start_index(chain_spec.chain().named()); let chain_orchestrator = ChainOrchestrator::new( db.clone(), chain_spec.clone(), @@ -327,6 +329,7 @@ impl ScrollRollupNodeConfig { l2_provider, self.chain_orchestrator_args.optimistic_sync_trigger, self.chain_orchestrator_args.chain_buffer_size, + l1_v2_message_queue_start_index, ) .await?; @@ -343,6 +346,7 @@ impl ScrollRollupNodeConfig { signer, block_time, chain_orchestrator, + l1_v2_message_queue_start_index, ) .await; Ok((rnm, handle, l1_notification_tx)) @@ -656,6 +660,15 @@ const fn td_constant(chain: NamedChain) -> U128 { } } +/// The L1 message queue index at which queue hashes should be computed . +const fn l1_v2_message_queue_start_index(chain: Option) -> u64 { + match chain { + Some(NamedChain::Scroll) => constants::SCROLL_MAINNET_V2_MESSAGE_QUEUE_START_INDEX, + Some(NamedChain::ScrollSepolia) => constants::SCROLL_SEPOLIA_V2_MESSAGE_QUEUE_START_INDEX, + _ => 0, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/node/src/constants.rs b/crates/node/src/constants.rs index 5dca72ca..8c0be884 100644 --- a/crates/node/src/constants.rs +++ b/crates/node/src/constants.rs @@ -40,3 +40,9 @@ pub(crate) const SCROLL_MAINNET_TD_CONSTANT: U128 = U128::from_limbs([14906960, /// The constant value that must be added to the block number to get the total difficulty for Scroll /// Sepolia. pub(crate) const SCROLL_SEPOLIA_TD_CONSTANT: U128 = U128::from_limbs([8484488, 0]); + +/// The L1 message queue index at which the V2 L1 message queue was enabled on mainnet. +pub(crate) const SCROLL_MAINNET_V2_MESSAGE_QUEUE_START_INDEX: u64 = 953885; + +/// The L1 message queue index at which queue hashes should be computed on sepolia. +pub(crate) const SCROLL_SEPOLIA_V2_MESSAGE_QUEUE_START_INDEX: u64 = 1062110;