Skip to content

Conversation

frisitano
Copy link
Collaborator

No description provided.

@frisitano frisitano marked this pull request as draft October 7, 2025 07:10
@frisitano frisitano marked this pull request as ready for review October 8, 2025 17:15
Copy link

codspeed-hq bot commented Oct 8, 2025

CodSpeed Performance Report

Merging #351 will degrade performances by 97.18%

Comparing refactor/rollup-node-refactor (4309fb1) with main (6852bf6)

Summary

⚡ 1 improvement
❌ 1 regression

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark BASE HEAD Change
pipeline_derive_in_file_blobs 27.4 ms 971.5 ms -97.18%
pipeline_derive_s3_blobs 16,914.8 ms 104.1 ms ×160

Copy link
Contributor

@jonastheis jonastheis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is great! Simplifies the readability and concepts in the flow of the code so much! imo it's much easier to reason about the state of the node than before.

A few things:

  • we should add an in-depth description of the changes, new features, simplifications -> this will also allow us to systematically evaluate whether we have everything tested or need to add some tests later. + it will help with reviewing
  • I left a bunch of comments inline.
  • I'm a bit concerned about performance in some cases but we need to evaluate with benchmarks
  • I think this PR addresses a few issues at once, we should link that to the description above and then close these issues accordingly:

impl<
N: FullNetwork<Primitives = ScrollNetworkPrimitives>,
CS: ScrollHardforks + EthChainSpec + Send + Sync + 'static,
> Stream for ScrollNetworkManager<N, CS>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this from a Stream to a Future?

ChainOrchestrator, ChainOrchestratorConfig, ChainOrchestratorHandle, Consensus, NoopConsensus,
SystemContractConsensus,
};
// use rollup_node_manager::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

number: 0,
});
}
// if let Some(block_info) = startup_safe_block {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out?

self.sequencer_args.allow_empty_blocks,
);
let engine = Engine::new(Arc::new(engine_api), fcs);
// let engine = EngineDriver::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why commented?

.stream(self.get_connection())
.await?
.map(|res| Ok(res.map(Into::into)?)))
Some(L1MessageKey::BlockNumber(block_number)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a lot of stuff happening in this function and it would be great to add some comments as to what on a high-level is happening in each branch and why.


return Err(ChainOrchestratorError::ChainInconsistency);
// /// Wraps a pending chain orchestrator future, metering the completion of it.
// pub fn handle_metered(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this commented out?

soft_limit: usize,
}
// If the block number is greater than the current head we attempt to extend the chain.
let mut new_headers = if received_block_number > self.engine.fcs().head_block_info().number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut new_headers = if received_block_number > self.engine.fcs().head_block_info().number
let mut new_headers = if received_block_number > current_head_number

.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(received_block_number))?;

if current_chain_block.header.hash_slow() == received_block_hash {
tracing::debug!(target: "scroll::chain_orchestrator", ?received_block_hash, ?received_block_number, "Received block from peer that is already in the chain");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure we only want to log this in debug?

// Assert that we are not reorging below the safe head.
let current_safe_info = self.engine.fcs().safe_block_info();
if received_block_number <= current_safe_info.number {
tracing::debug!(target: "scroll::chain_orchestrator", ?received_block_hash, ?received_block_number, current_safe_info = ?self.engine.fcs().safe_block_info(), "Received block from peer that would reorg below the safe head - ignoring");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure we only want to log this in debug?

let mut bytes = [0u8; 1024];
rand::rng().fill(bytes.as_mut_slice());
let mut u = Unstructured::new(&bytes);
// Check if the parent hash of the received block is in the chain.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just a reorg of depth 1? Shouldn't this case also be handled by the reorg logic below? I think the code flow here could be a bit better to make it clearer which conditions are met and which path is taken. especially in the reorg case and with the fork-choice condition if block_with_peer.block.header.timestamp <= current_head.header.timestamp {

@frisitano frisitano requested a review from greged93 October 9, 2025 05:36
// If the received block number has a block number greater than the current head by more
// than the optimistic sync threshold, we optimistically sync the chain.
if received_block_number > current_head_number + self.config.optimistic_sync_threshold() {
tracing::trace!(target: "scroll::chain_orchestrator", ?received_block_number, ?current_head_number, "Received new block from peer with block number greater than current head by more than the optimistic sync threshold");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we start optimistic sync but also do the other consolidation. is that intended?

// Safe head should be the highest block from batch index <= 100
assert_eq!(safe_head, Some(block_1.block_info));
// Persist the mapping of L1 messages to L2 blocks such that we can react to L1 reorgs.
let blocks = chain.iter().map(|block| block.into()).collect::<Vec<_>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a valid operation in optimistic sync mode? what if the L1 messages contained in the chain are garbage?


// If we were previously in L2 syncing mode and the FCS update resulted in a valid state, we
// transition the L2 sync state to synced and consolidate the chain.
if result.is_valid() && self.sync_state.l2().is_syncing() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check if the result is valid? above we already check whether it is invalid and return

// Persist the signature for the block and notify the network manager of a successful
// import.
let tx = self.database.tx_mut().await?;
tx.insert_signature(chain_head_hash, block_with_peer.signature).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we already persist the signature in handle_block_from_peer


// If the received and expected L1 messages do not match return an error.
if message_hash != expected_hash {
self.notify(ChainOrchestratorEvent::L1MessageMismatch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we currently react to this event?

Copy link
Collaborator

@greged93 greged93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great refactor, this is soooo much easier to read and nicer to go through then the previous state of the orchestrator and even node in general!

Left some inline comments and a small nit.

if block_matches_attributes(
&attributes.attributes,
&current_block,
current_block.parent_hash,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can go, this check was used before in order to check that the block we received from the L2 was the child block of the safe head in the Engine Driver. Here all we are doing is check block.parent_hash == block.parent_hash.

Comment on lines 416 to 423
BlockConsolidationOutcome::Consolidated(block_info) => {
self.insert_block(block_info, outcome.batch_info).await?;
}
BlockConsolidationOutcome::Skipped(block_info) => {
// No action needed, the block has already been previously consolidated however
// we will insert it again defensively
self.insert_block(block_info, outcome.batch_info).await?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can collapsed into one arm

Comment on lines +139 to +140
let result =
self.client.fork_choice_updated_v1(fcs.get_alloy_fcs(), Some(attributes)).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small note here: I think this works in the case of Reth because payloads built from attributes are automatically inserted here.

One concern we might have which isn't handled here but mentioned in the Op stack docs, is the case where the data from the batch contains invalid transaction data and the execution node fails to build a payload. I believe in this case, the result we get here would be valid, but trying to call get_payload(id) would return an error.

Comment on lines +43 to +47
let messages = if let Some(stream) = tx.get_l1_messages(Some(start)).await? {
stream.take(n as usize).try_collect().await?
} else {
vec![]
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried this could lead to unwanted behavior where we might never include L1 messages because of a database issue but we would never bubble up the error.

Comment on lines +253 to +261
// If there is an inflight payload building job, poll it.
if let Some(payload_building_job) = this.payload_building_job.as_mut() {
match payload_building_job.future.as_mut().poll(cx) {
Poll::Ready(payload_id) => {
this.payload_building_job = None;
return Poll::Ready(Some(SequencerEvent::PayloadReady(payload_id)));
}
Poll::Pending => {}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the payload_building_job have higher priority in the polling order? If the payload is ready and the trigger as well, the current order means we decide to skip the next slot. If we invert them, we would return the payload to the chain orchestrator, and would catch the trigger on the next polling (might be a little late, but at least we won't completely miss it).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants