Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 26 additions & 28 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::ExecutionPayloadV1;
use futures::StreamExt;
use reth_chainspec::EthChainSpec;
use reth_network_api::{BlockDownloaderProvider, FullNetwork};
use reth_network_p2p::FullBlockClient;
use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient};
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_scroll_primitives::ScrollBlock;
use reth_tasks::shutdown::Shutdown;
Expand Down Expand Up @@ -1081,36 +1081,34 @@ impl<

if head_block_number == safe_block_number {
tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate");
} else {
let start_block_number = safe_block_number + 1;
// TODO: Make fetching parallel but ensure concurrency limits are respected.
let mut blocks_to_validate = vec![];
for block_number in start_block_number..=head_block_number {
let block = self
.l2_client
.get_block_by_number(block_number.into())
.full()
.await?
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
.into_consensus()
.map_transactions(|tx| tx.inner.into_inner());
blocks_to_validate.push(block);
}

self.notify(ChainOrchestratorEvent::ChainConsolidated {
from: safe_block_number,
to: head_block_number,
});
return Ok(());
}
self.validate_l1_messages(&blocks_to_validate).await?;

let start_block_number = safe_block_number + 1;
// TODO: Make fetching parallel but ensure concurrency limits are respected.
let mut blocks_to_validate = vec![];
for block_number in start_block_number..=head_block_number {
let block = self
.l2_client
.get_block_by_number(block_number.into())
.full()
.await?
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
.into_consensus()
.map_transactions(|tx| tx.inner.into_inner());
blocks_to_validate.push(block);
}

self.validate_l1_messages(&blocks_to_validate).await?;
self.database
.update_l1_messages_from_l2_blocks(
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
)
.await?;
};

self.database
.update_l1_messages_from_l2_blocks(
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
)
.await?;
// send a notification to the network that the chain is synced such that it accepts
// transactions into the transaction pool.
self.network.handle().inner().update_sync_state(RethSyncState::Idle);

self.notify(ChainOrchestratorEvent::ChainConsolidated {
from: safe_block_number,
Expand Down
64 changes: 63 additions & 1 deletion crates/node/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_tokio_util::EventStream;
use rollup_node::{
test_utils::{
default_sequencer_test_scroll_rollup_node_config, default_test_scroll_rollup_node_config,
setup_engine,
generate_tx, setup_engine,
},
BlobProviderArgs, ChainOrchestratorArgs, ConsensusArgs, EngineDriverArgs, L1ProviderArgs,
RollupNodeDatabaseArgs, RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, RpcArgs,
Expand Down Expand Up @@ -98,6 +98,68 @@ async fn test_should_consolidate_to_block_15k() -> eyre::Result<()> {
Ok(())
}

#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn test_node_produces_block_on_startup() -> eyre::Result<()> {
reth_tracing::init_test_tracing();

let mut sequencer_node_config = default_sequencer_test_scroll_rollup_node_config();
sequencer_node_config.sequencer_args.auto_start = true;
sequencer_node_config.sequencer_args.allow_empty_blocks = false;

let (mut nodes, _tasks, wallet) =
setup_engine(sequencer_node_config, 2, (*SCROLL_DEV).clone(), false, false).await?;

let follower = nodes.pop().unwrap();
let mut follower_events =
follower.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
let follower_l1_watcher_tx = follower.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();

let sequencer = nodes.pop().unwrap();
let mut sequencer_events =
sequencer.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?;
let sequencer_l1_watcher_tx = sequencer.inner.add_ons_handle.l1_watcher_tx.clone().unwrap();

// Send a notification to the sequencer and follower nodes that the L1 watcher is synced.
sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap();
follower_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap();

// wait for both nodes to be synced.
wait_n_events(
&mut sequencer_events,
|e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }),
1,
)
.await;
wait_n_events(
&mut follower_events,
|e| matches!(e, ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ }),
1,
)
.await;

// construct a transaction and send it to the follower node.
let wallet = Arc::new(tokio::sync::Mutex::new(wallet));
let handle = tokio::spawn(async move {
loop {
let tx = generate_tx(wallet.clone()).await;
follower.rpc.inject_tx(tx).await.unwrap();
}
});

// Assert that the follower node receives the new block.
wait_n_events(
&mut follower_events,
|e| matches!(e, ChainOrchestratorEvent::ChainExtended(_)),
1,
)
.await;

drop(handle);

Ok(())
}

/// We test if the syncing of the RN is correctly triggered and released when the EN reaches sync.
#[allow(clippy::large_stack_frames)]
#[tokio::test]
Expand Down
Loading