Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ pub enum ChainOrchestratorEvent {
/// A batch has been finalized returning an optional finalized L2 block. Also returns a
/// [`BatchInfo`] if the finalized event occurred in a finalized L1 block.
BatchFinalized(Option<WithFinalizedBlockNumber<BatchInfo>>, Option<BlockInfo>),
/// An L1 block has been finalized returning the L1 block number, the list of finalized batches
/// and an optional finalized L2 block.
L1BlockFinalized(u64, Vec<BatchInfo>, Option<BlockInfo>),
/// An L1 block has been finalized returning the L1 block number and the list of finalized
/// batches.
L1BlockFinalized(u64, Vec<BatchInfo>),
/// A `L1Message` event has been committed returning the message queue index.
L1MessageCommitted(u64),
/// A reorg has occurred on L1, returning the L1 block number of the new L1 head,
Expand Down
82 changes: 8 additions & 74 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_network_p2p::{BlockClient, BodiesClient};
use reth_scroll_primitives::ScrollBlock;
use rollup_node_primitives::{
BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ChainImport, L1MessageEnvelope,
L2BlockInfoWithL1Messages, WithBlockNumber,
L2BlockInfoWithL1Messages,
};
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
Expand All @@ -21,7 +21,6 @@ use scroll_db::{Database, DatabaseError, DatabaseOperations, L1MessageStart, Unw
use scroll_network::NewBlockWithPeer;
use std::{
collections::{HashMap, VecDeque},
ops::Add,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -67,8 +66,6 @@ pub struct ChainOrchestrator<ChainSpec, BC, P> {
pending_futures: VecDeque<ChainOrchestratorFuture>,
/// The block number of the L1 finalized block.
l1_finalized_block_number: Arc<AtomicU64>,
/// The block number of the L2 finalized block.
l2_finalized_block_number: Arc<AtomicU64>,
/// The chain specification for the chain orchestrator.
chain_spec: Arc<ChainSpec>,
/// The metrics for the chain orchestrator.
Expand Down Expand Up @@ -109,7 +106,6 @@ impl<
database,
pending_futures: Default::default(),
l1_finalized_block_number: Arc::new(AtomicU64::new(0)),
l2_finalized_block_number: Arc::new(AtomicU64::new(0)),
chain_spec,
metrics: ChainOrchestratorItem::iter()
.map(|i| {
Expand Down Expand Up @@ -529,7 +525,6 @@ impl<
self.database.clone(),
block_number,
self.l1_finalized_block_number.clone(),
self.l2_finalized_block_number.clone(),
)),
))
}
Expand All @@ -551,16 +546,13 @@ impl<
)),
))
}
L1Notification::BatchFinalization { hash, index, block_number } => {
L1Notification::BatchFinalization { hash: _hash, index, block_number } => {
ChainOrchestratorFuture::HandleBatchFinalization(self.handle_metered(
ChainOrchestratorItem::BatchFinalization,
Box::pin(Self::handle_batch_finalization(
self.database.clone(),
hash,
index,
block_number,
self.l1_finalized_block_number.clone(),
self.l2_finalized_block_number.clone(),
)),
))
}
Expand Down Expand Up @@ -615,35 +607,18 @@ impl<
database: Arc<Database>,
block_number: u64,
l1_block_number: Arc<AtomicU64>,
l2_block_number: Arc<AtomicU64>,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
// Set the latest finalized L1 block in the database.
database.set_latest_finalized_l1_block_number(block_number).await?;

// get the finalized batch infos.
// we add 1 to the low finalized l1 block number to avoid fetching the last finalized batch
// a second time.
let low_finalized_l1_block_number =
l1_block_number.load(Ordering::Relaxed).add(1).max(block_number);
let finalized_batches = database
.get_batches_by_finalized_block_range(low_finalized_l1_block_number, block_number)
.await?;

// get the finalized block for the batch.
let finalized_block = if let Some(info) = finalized_batches.last() {
Self::fetch_highest_finalized_block(database, info.hash, l2_block_number).await?
} else {
None
};
// Get all unprocessed batches that have been finalized by this L1 block finalization.
let finalized_batches =
database.fetch_and_update_unprocessed_finalized_batches(block_number).await?;

// update the chain orchestrator l1 block number.
// Update the chain orchestrator L1 block number.
l1_block_number.store(block_number, Ordering::Relaxed);

Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(
block_number,
finalized_batches,
finalized_block,
)))
Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches)))
}

/// Handles an L1 message by inserting it into the database.
Expand Down Expand Up @@ -715,54 +690,13 @@ impl<
/// Handles a batch finalization event by updating the batch input in the database.
async fn handle_batch_finalization(
database: Arc<Database>,
batch_hash: B256,
batch_index: u64,
block_number: u64,
l1_block_number: Arc<AtomicU64>,
l2_block_number: Arc<AtomicU64>,
) -> Result<Option<ChainOrchestratorEvent>, ChainOrchestratorError> {
// finalize all batches up to `batch_index`.
database.finalize_batches_up_to_index(batch_index, block_number).await?;

let mut finalized_block = None;
let mut finalized_batch = None;

// check if the block where the batch was finalized is finalized on L1.
let l1_block_number_value = l1_block_number.load(Ordering::Relaxed);
if l1_block_number_value >= block_number {
// fetch the finalized block.
finalized_block =
Self::fetch_highest_finalized_block(database, batch_hash, l2_block_number).await?;

// set the finalized batch info.
finalized_batch =
Some(WithBlockNumber::new(block_number, BatchInfo::new(batch_index, batch_hash)));
}

let event = ChainOrchestratorEvent::BatchFinalized(finalized_batch, finalized_block);
Ok(Some(event))
}

/// Returns the highest finalized block for the provided batch hash. Will return [`None`] if the
/// block number has already been seen by the chain orchestrator.
async fn fetch_highest_finalized_block(
database: Arc<Database>,
batch_hash: B256,
l2_block_number: Arc<AtomicU64>,
) -> Result<Option<BlockInfo>, ChainOrchestratorError> {
let finalized_block = database.get_highest_block_for_batch_hash(batch_hash).await?;

// only return the block if the chain orchestrator hasn't seen it.
// in which case also update the `l2_finalized_block_number` value.
Ok(finalized_block.filter(|info| {
let current_l2_block_number = l2_block_number.load(Ordering::Relaxed);
if info.number > current_l2_block_number {
l2_block_number.store(info.number, Ordering::Relaxed);
true
} else {
false
}
}))
Ok(None)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ mod test {

// Fetch the finalized batch for provided height and verify number.
let batch_infos = db
.get_batches_by_finalized_block_range(100, 110)
.fetch_and_update_unprocessed_finalized_batches(110)
.await
.unwrap()
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions crates/database/db/src/models/batch_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct Model {
calldata: Vec<u8>,
blob_hash: Option<Vec<u8>>,
pub(crate) finalized_block_number: Option<i64>,
processed: bool,
}

/// The relation for the batch input model.
Expand Down Expand Up @@ -50,6 +51,7 @@ impl From<BatchCommitData> for ActiveModel {
calldata: ActiveValue::Set(batch_commit.calldata.0.to_vec()),
blob_hash: ActiveValue::Set(batch_commit.blob_versioned_hash.map(|b| b.to_vec())),
finalized_block_number: ActiveValue::Unchanged(None),
processed: ActiveValue::Unchanged(false),
}
}
}
Expand Down
49 changes: 26 additions & 23 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,19 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
.map(|x| x.and_then(|x| x.parse::<u64>().ok()))?)
}

/// Get the finalized batches between the provided range \[low; high\].
async fn get_batches_by_finalized_block_range(
/// Fetches unprocessed batches up to the provided finalized L1 block number and updates their
/// status.
async fn fetch_and_update_unprocessed_finalized_batches(
&self,
low: u64,
high: u64,
finalized_l1_block_number: u64,
) -> Result<Vec<BatchInfo>, DatabaseError> {
Ok(models::batch_commit::Entity::find()
.filter(
Condition::all()
.add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null())
.add(models::batch_commit::Column::FinalizedBlockNumber.gte(low))
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(high)),
)
let conditions = Condition::all()
.add(models::batch_commit::Column::FinalizedBlockNumber.is_not_null())
.add(models::batch_commit::Column::FinalizedBlockNumber.lte(finalized_l1_block_number))
.add(models::batch_commit::Column::Processed.eq(false));

let batches = models::batch_commit::Entity::find()
.filter(conditions.clone())
.order_by_asc(models::batch_commit::Column::Index)
.select_only()
.column(models::batch_commit::Column::Index)
Expand All @@ -131,7 +131,15 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
x.into_iter()
.map(|(index, hash)| BatchInfo::new(index as u64, B256::from_slice(&hash)))
.collect()
})?)
})?;

models::batch_commit::Entity::update_many()
.col_expr(models::batch_commit::Column::Processed, Expr::value(true))
.filter(conditions)
.exec(self.get_connection())
.await?;

Ok(batches)
}

/// Delete all [`BatchCommitData`]s with a block number greater than the provided block number.
Expand Down Expand Up @@ -358,27 +366,22 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
let finalized_block_number = self.get_finalized_l1_block_number().await?.unwrap_or(0);
self.unwind(genesis_hash, finalized_block_number).await?;

// Fetch the latest safe L2 block and the block number where its associated batch was
// finalized.
// Delete all unprocessed batches from the database and return starting l2 safe head and l1
// head.
let safe = if let Some(batch_info) = self
.get_latest_safe_l2_info()
.await?
.map(|(_, batch_info)| batch_info)
.filter(|b| b.index > 1)
{
let batch = self
.get_batch_by_index(batch_info.index)
.await?
.expect("Batch info must be present due to database query arguments");
let previous_batch_index = batch_info.index - 1;
let previous_batch = self
.get_batch_by_index(batch_info.index - 1)
.get_batch_by_index(previous_batch_index)
.await?
.expect("Batch info must be present due to database query arguments");
self.delete_batches_gt_batch_index(previous_batch_index).await?;
let l2_block = self.get_highest_block_for_batch_hash(previous_batch.hash).await?;
(
l2_block,
Some(batch.finalized_block_number.expect("All blocks in database are finalized")),
)
(l2_block, Some(previous_batch.block_number + 1))
} else {
(None, None)
};
Expand Down
2 changes: 2 additions & 0 deletions crates/database/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod m20250411_072004_add_l2_block;
mod m20250616_223947_add_metadata;
mod m20250825_093350_remove_unsafe_l2_blocks;
mod m20250829_042803_add_table_indexes;
mod m20250901_102341_add_commit_batch_processed_column;
mod migration_info;
pub use migration_info::{
MigrationInfo, ScrollDevMigrationInfo, ScrollMainnetMigrationInfo, ScrollSepoliaMigrationInfo,
Expand All @@ -27,6 +28,7 @@ impl<MI: MigrationInfo + Send + Sync + 'static> MigratorTrait for Migrator<MI> {
Box::new(m20250616_223947_add_metadata::Migration),
Box::new(m20250825_093350_remove_unsafe_l2_blocks::Migration),
Box::new(m20250829_042803_add_table_indexes::Migration),
Box::new(m20250901_102341_add_commit_batch_processed_column::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ pub(crate) enum BatchCommit {
Calldata,
BlobHash,
FinalizedBlockNumber,
Processed,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use super::m20220101_000001_create_batch_commit_table::BatchCommit;
use sea_orm::Statement;
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// Add the processed column to the batch_commit table.
manager
.alter_table(
Table::alter()
.table(BatchCommit::Table)
.add_column(
ColumnDef::new(BatchCommit::Processed).boolean().not_null().default(false),
)
.to_owned(),
)
.await?;

// Backfill the processed column using data sourced from the l2_block table.
manager
.get_connection()
.execute(Statement::from_sql_and_values(
manager.get_database_backend(),
r#"
UPDATE batch_commit
SET processed = 1
WHERE hash IN (SELECT batch_hash FROM l2_block);
"#,
vec![],
))
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// drop the processed column on the batch_commit table.
manager
.alter_table(
Table::alter()
.table(BatchCommit::Table)
.drop_column(BatchCommit::Processed)
.to_owned(),
)
.await
}
}
3 changes: 2 additions & 1 deletion crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ impl ScrollRollupNodeConfig {
});
}

tracing::info!(target: "scroll::node::args", fcs = ?fcs, payload_building_duration = ?self.sequencer_args.payload_building_duration, "Starting engine driver");
let engine = EngineDriver::new(
Arc::new(engine_api),
chain_spec.clone(),
Expand All @@ -256,7 +257,7 @@ impl ScrollRollupNodeConfig {

let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
if let Some(provider) = l1_provider.filter(|_| !self.test) {
// Determine the start block number for the L1 watcher
tracing::info!(target: "scroll::node::args", ?l1_start_block_number, "Starting L1 watcher");
(None, Some(L1Watcher::spawn(provider, l1_start_block_number, node_config).await))
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
Expand Down
8 changes: 0 additions & 8 deletions crates/node/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,15 +978,7 @@ async fn graceful_shutdown_consolidates_most_recent_batch_on_startup() -> eyre::
let mut rnm_events = handle.get_event_listener().await?;

// Send the second batch again to mimic the watcher behaviour.
l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_0_data.clone()))).await?;
l1_notification_tx.send(Arc::new(L1Notification::BatchCommit(batch_1_data.clone()))).await?;
l1_notification_tx
.send(Arc::new(L1Notification::BatchFinalization {
hash: batch_0_data.hash,
index: batch_0_data.index,
block_number: batch_0_data.block_number,
}))
.await?;
l1_notification_tx
.send(Arc::new(L1Notification::BatchFinalization {
hash: batch_1_data.hash,
Expand Down
Loading