Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/chain-orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ reth-scroll-forks.workspace = true
# reth
reth-eth-wire-types.workspace = true
reth-network-peers.workspace = true
reth-tracing.workspace = true

# misc
arbitrary.workspace = true
Expand Down
92 changes: 59 additions & 33 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rollup_node_primitives::{
};
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, UnwindResult};
use scroll_network::NewBlockWithPeer;
Expand Down Expand Up @@ -79,6 +79,8 @@ pub struct ChainOrchestrator<ChainSpec, BC, P> {
chain_buffer_size: usize,
/// A boolean to represent if the L1 has been synced.
l1_synced: bool,
/// The L1 message queue index at which the V2 L1 message queue was enabled.
l1_v2_message_queue_start_index: u64,
/// The waker to notify when the engine driver should be polled.
waker: AtomicWaker,
}
Expand All @@ -97,6 +99,7 @@ impl<
l2_client: P,
optimistic_sync_threshold: u64,
chain_buffer_size: usize,
l1_v2_message_queue_start_index: u64,
) -> Result<Self, ChainOrchestratorError> {
let chain = init_chain_from_db(&database, &l2_client, chain_buffer_size).await?;
Ok(Self {
Expand All @@ -117,6 +120,7 @@ impl<
optimistic_sync_threshold,
chain_buffer_size,
l1_synced: false,
l1_v2_message_queue_start_index,
waker: AtomicWaker::new(),
})
}
Expand Down Expand Up @@ -534,15 +538,14 @@ impl<
Box::pin(Self::handle_batch_commit(self.database.clone(), batch)),
))
}
L1Notification::L1Message { message, block_number, block_timestamp } => {
L1Notification::L1Message { message, block_number, block_timestamp: _ } => {
ChainOrchestratorFuture::HandleL1Message(self.handle_metered(
ChainOrchestratorItem::L1Message,
Box::pin(Self::handle_l1_message(
self.l1_v2_message_queue_start_index,
self.database.clone(),
self.chain_spec.clone(),
message,
block_number,
block_timestamp,
)),
))
}
Expand Down Expand Up @@ -623,33 +626,15 @@ impl<

/// Handles an L1 message by inserting it into the database.
async fn handle_l1_message(
l1_v2_message_queue_start_index: u64,
database: Arc<Database>,
chain_spec: Arc<ChainSpec>,
l1_message: TxL1Message,
l1_block_number: u64,
block_timestamp: u64,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index);

let queue_hash = if chain_spec
.scroll_fork_activation(ScrollHardfork::EuclidV2)
.active_at_timestamp_or_number(block_timestamp, l1_block_number) &&
l1_message.queue_index > 0
{
let index = l1_message.queue_index - 1;
let prev_queue_hash = database
.get_l1_message_by_index(index)
.await?
.map(|m| m.queue_hash)
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?;

let mut input = prev_queue_hash.unwrap_or_default().to_vec();
input.append(&mut l1_message.tx_hash().to_vec());
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
} else {
None
};

let queue_hash =
compute_l1_message_queue_hash(&database, &l1_message, l1_v2_message_queue_start_index)
.await?;
let l1_message = L1MessageEnvelope::new(l1_message, l1_block_number, None, queue_hash);
database.insert_l1_message(l1_message).await?;
Ok(Some(event))
Expand Down Expand Up @@ -700,6 +685,39 @@ impl<
}
}

/// Computes the queue hash by taking the previous queue hash and performing a 2-to-1 hash with the
/// current transaction hash using keccak. It then applies a mask to the last 32 bits as these bits
/// are used to store the timestamp at which the message was enqueued in the contract. For the first
/// message in the queue, the previous queue hash is zero. If the L1 message queue index is before
/// migration to `L1MessageQueueV2`, the queue hash will be None.
///
/// The solidity contract (`L1MessageQueueV2.sol`) implementation is defined here: <https://github.com/scroll-tech/scroll-contracts/blob/67c1bde19c1d3462abf8c175916a2bb3c89530e4/src/L1/rollup/L1MessageQueueV2.sol#L379-L403>
async fn compute_l1_message_queue_hash(
database: &Arc<Database>,
l1_message: &TxL1Message,
l1_v2_message_queue_start_index: u64,
) -> Result<Option<alloy_primitives::FixedBytes<32>>, ChainOrchestratorError> {
let queue_hash = if l1_message.queue_index == l1_v2_message_queue_start_index {
let mut input = B256::default().to_vec();
input.append(&mut l1_message.tx_hash().to_vec());
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
} else if l1_message.queue_index > l1_v2_message_queue_start_index {
let index = l1_message.queue_index - 1;
let mut input = database
.get_l1_message_by_index(index)
.await?
.map(|m| m.queue_hash)
.ok_or(DatabaseError::L1MessageNotFound(L1MessageStart::Index(index)))?
.unwrap_or_default()
.to_vec();
input.append(&mut l1_message.tx_hash().to_vec());
Some(keccak256(input) & L1_MESSAGE_QUEUE_HASH_MASK)
} else {
None
};
Ok(queue_hash)
}

async fn init_chain_from_db<P: Provider<Scroll> + 'static>(
database: &Arc<Database>,
l2_client: &P,
Expand Down Expand Up @@ -954,6 +972,7 @@ mod test {

const TEST_OPTIMISTIC_SYNC_THRESHOLD: u64 = 100;
const TEST_CHAIN_BUFFER_SIZE: usize = 2000;
const TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY: u64 = 953885;

/// A headers+bodies client that stores the headers and bodies in memory, with an artificial
/// soft bodies response limit that is set to 20 by default.
Expand Down Expand Up @@ -1105,6 +1124,7 @@ mod test {
.expect("Failed to parse mainnet genesis block");
assertor.push_success(&mainnet_genesis);
let provider = ProviderBuilder::<_, _, Scroll>::default().connect_mocked_client(assertor);

let db = Arc::new(setup_test_db().await);
(
ChainOrchestrator::new(
Expand All @@ -1114,6 +1134,7 @@ mod test {
provider,
TEST_OPTIMISTIC_SYNC_THRESHOLD,
TEST_CHAIN_BUFFER_SIZE,
TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY,
)
.await
.unwrap(),
Expand Down Expand Up @@ -1274,6 +1295,8 @@ mod test {

#[tokio::test]
async fn test_handle_l1_message() {
reth_tracing::init_test_tracing();

// Instantiate chain orchestrator and db
let (mut chain_orchestrator, db) = setup_test_chain_orchestrator().await;

Expand All @@ -1283,7 +1306,7 @@ mod test {
let mut u = Unstructured::new(&bytes);

let message = TxL1Message {
queue_index: i64::arbitrary(&mut u).unwrap().unsigned_abs(),
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY - 1,
..Arbitrary::arbitrary(&mut u).unwrap()
};
let block_number = u64::arbitrary(&mut u).unwrap();
Expand All @@ -1309,15 +1332,18 @@ mod test {

// insert the previous L1 message in database.
chain_orchestrator.handle_l1_notification(L1Notification::L1Message {
message: TxL1Message { queue_index: 1062109, ..Default::default() },
message: TxL1Message {
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY,
..Default::default()
},
block_number: 1475588,
block_timestamp: 1745305199,
});
let _ = chain_orchestrator.next().await.unwrap().unwrap();

// <https://sepolia.scrollscan.com/tx/0xd80cd61ac5d8665919da19128cc8c16d3647e1e2e278b931769e986d01c6b910>
let message = TxL1Message {
queue_index: 1062110,
queue_index: TEST_L1_MESSAGE_QUEUE_INDEX_BOUNDARY + 1,
gas_limit: 168000,
to: address!("Ba50f5340FB9F3Bd074bD638c9BE13eCB36E603d"),
value: U256::ZERO,
Expand All @@ -1336,7 +1362,7 @@ mod test {
db.get_l1_message_by_index(message.queue_index).await.unwrap().unwrap();

assert_eq!(
b256!("5e48ae1092c7f912849b9935f4e66870d2034b24fb2016f506e6754900000000"),
b256!("b2331b9010aac89f012d648fccc1f0a9aa5ef7b7b2afe21be297dd1a00000000"),
l1_message_result.queue_hash.unwrap()
);
}
Expand Down Expand Up @@ -1380,19 +1406,19 @@ mod test {
queue_hash: None,
l1_block_number: 1,
l2_block_number: None,
..Arbitrary::arbitrary(&mut u).unwrap()
transaction: TxL1Message { queue_index: 1, ..Arbitrary::arbitrary(&mut u).unwrap() },
};
let l1_message_block_20 = L1MessageEnvelope {
queue_hash: None,
l1_block_number: 20,
l2_block_number: None,
..Arbitrary::arbitrary(&mut u).unwrap()
transaction: TxL1Message { queue_index: 2, ..Arbitrary::arbitrary(&mut u).unwrap() },
};
let l1_message_block_30 = L1MessageEnvelope {
queue_hash: None,
l1_block_number: 30,
l2_block_number: None,
..Arbitrary::arbitrary(&mut u).unwrap()
transaction: TxL1Message { queue_index: 3, ..Arbitrary::arbitrary(&mut u).unwrap() },
};

// Index L1 messages
Expand Down
2 changes: 1 addition & 1 deletion crates/derivation-pipeline/benches/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn setup_pipeline(
// construct the pipeline.
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
DerivationPipeline::new(mock_l1_provider, db)
DerivationPipeline::new(mock_l1_provider, db, u64::MAX)
}

fn benchmark_pipeline_derivation(c: &mut Criterion) {
Expand Down
46 changes: 33 additions & 13 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct DerivationPipeline<P> {
database: Arc<Database>,
/// A L1 provider.
l1_provider: P,
/// The L1 message queue index at which the V2 L1 message queue was enabled.
l1_v2_message_queue_start_index: u64,
/// The queue of batches to handle.
batch_queue: VecDeque<WithFinalizedBlockNumber<Arc<BatchInfo>>>,
/// The queue of polled attributes.
Expand Down Expand Up @@ -90,10 +92,15 @@ where
P: L1Provider + Clone + Send + Sync + 'static,
{
/// Returns a new instance of the [`DerivationPipeline`].
pub fn new(l1_provider: P, database: Arc<Database>) -> Self {
pub fn new(
l1_provider: P,
database: Arc<Database>,
l1_v2_message_queue_start_index: u64,
) -> Self {
Self {
database,
l1_provider,
l1_v2_message_queue_start_index,
batch_queue: Default::default(),
pipeline_future: None,
attributes_queue: Default::default(),
Expand All @@ -119,6 +126,7 @@ where
let database = self.database.clone();
let metrics = self.metrics.clone();
let provider = self.l1_provider.clone();
let l1_v2_message_queue_start_index = self.l1_v2_message_queue_start_index;

if let Some(info) = self.batch_queue.pop_front() {
let block_number = info.number;
Expand All @@ -136,8 +144,9 @@ where
.ok_or((info.clone(), DerivationPipelineError::UnknownBatch(index)))?;

// derive the attributes and attach the corresponding batch info.
let attrs =
derive(batch, provider, database).await.map_err(|err| (info.clone(), err))?;
let attrs = derive(batch, provider, database, l1_v2_message_queue_start_index)
.await
.map_err(|err| (info.clone(), err))?;

// update metrics.
metrics.derived_blocks.increment(attrs.len() as u64);
Expand Down Expand Up @@ -245,6 +254,7 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
batch: BatchCommitData,
l1_provider: L1P,
l2_provider: L2P,
l1_v2_message_queue_start_index: u64,
) -> Result<Vec<ScrollPayloadAttributes>, DerivationPipelineError> {
// fetch the blob then decode the input batch.
let blob = if let Some(hash) = batch.blob_versioned_hash {
Expand All @@ -260,10 +270,18 @@ pub async fn derive<L1P: L1Provider + Sync + Send, L2P: BlockDataProvider + Sync
if let Some(index) = data.queue_index_start() {
l1_provider.set_queue_index_cursor(index);
} else if let Some(hash) = data.prev_l1_message_queue_hash() {
l1_provider.set_hash_cursor(*hash).await;
// we skip the first l1 message, as we are interested in the one starting after
// prev_l1_message_queue_hash.
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
// If the message queue hash is zero then we should use the V2 L1 message queue start index.
// We must apply this branch logic because we do not have a L1 message associated with a
// queue hash of ZERO (we only compute a queue hash for the first L1 message of the V2
// contract).
if hash == &B256::ZERO {
l1_provider.set_queue_index_cursor(l1_v2_message_queue_start_index);
} else {
l1_provider.set_hash_cursor(*hash).await;
// we skip the first l1 message, as we are interested in the one starting after
// prev_l1_message_queue_hash.
let _ = l1_provider.next_l1_message().await.map_err(Into::into)?;
}
} else {
return Err(DerivationPipelineError::MissingL1MessageQueueCursor)
}
Expand Down Expand Up @@ -411,6 +429,7 @@ mod tests {
)),
database: db,
l1_provider: mock_l1_provider,
l1_v2_message_queue_start_index: u64::MAX,
batch_queue: [
WithFinalizedBlockNumber::new(
0,
Expand Down Expand Up @@ -470,7 +489,7 @@ mod tests {
// construct the pipeline.
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone());
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db.clone(), u64::MAX);

// as long as we don't call `push_batch`, pipeline should not return attributes.
pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0);
Expand Down Expand Up @@ -537,7 +556,7 @@ mod tests {
// construct the pipeline.
let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0);
let mock_l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db);
let mut pipeline = DerivationPipeline::new(mock_l1_provider, db, u64::MAX);

// as long as we don't call `push_batch`, pipeline should not return attributes.
pipeline.push_batch(BatchInfo { index: 12, hash: Default::default() }, 0);
Expand Down Expand Up @@ -596,7 +615,7 @@ mod tests {
let l1_provider = MockL1Provider { l1_messages_provider, blobs: HashMap::new() };
let l2_provider = MockL2Provider;

let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
let attribute =
attributes.iter().find(|a| a.payload_attributes.timestamp == 1696935384).unwrap();

Expand Down Expand Up @@ -695,7 +714,7 @@ mod tests {
let l2_provider = MockL2Provider;

// derive attributes and extract l1 messages.
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
let derived_l1_messages: Vec<_> = attributes
.into_iter()
.filter_map(|a| a.transactions)
Expand Down Expand Up @@ -749,7 +768,7 @@ mod tests {
let l2_provider = MockL2Provider;

// derive attributes and extract l1 messages.
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;
let derived_l1_messages: Vec<_> = attributes
.into_iter()
.filter_map(|a| a.transactions)
Expand Down Expand Up @@ -863,7 +882,7 @@ mod tests {
};
let l2_provider = MockL2Provider;

let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider).await?;
let attributes: Vec<_> = derive(batch_data, l1_provider, l2_provider, u64::MAX).await?;

let attribute = attributes.last().unwrap();
let expected = ScrollPayloadAttributes {
Expand Down Expand Up @@ -918,6 +937,7 @@ mod tests {
)),
database: db,
l1_provider: mock_l1_provider,
l1_v2_message_queue_start_index: u64::MAX,
batch_queue: batches.collect(),
attributes_queue: attributes,
waker: None,
Expand Down
Loading
Loading