Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 23 additions & 35 deletions crates/database/db/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,18 +320,6 @@ impl DatabaseWriteOperations for Database {
)
}

async fn insert_block(
&self,
block_info: BlockInfo,
batch_info: BatchInfo,
) -> Result<(), DatabaseError> {
metered!(
DatabaseOperation::InsertBlock,
self,
tx_mut(move |tx| async move { tx.insert_block(block_info, batch_info).await })
)
}

async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> {
metered!(
DatabaseOperation::InsertGenesisBlock,
Expand All @@ -354,16 +342,16 @@ impl DatabaseWriteOperations for Database {
)
}

async fn update_l1_messages_with_l2_block(
async fn update_l1_messages_with_l2_blocks(
&self,
block_info: L2BlockInfoWithL1Messages,
blocks: Vec<L2BlockInfoWithL1Messages>,
) -> Result<(), DatabaseError> {
metered!(
DatabaseOperation::UpdateL1MessagesWithL2Block,
self,
tx_mut(move |tx| {
let block_info = block_info.clone();
async move { tx.update_l1_messages_with_l2_block(block_info).await }
let blocks = blocks.clone();
async move { tx.update_l1_messages_with_l2_blocks(blocks).await }
})
)
}
Expand Down Expand Up @@ -854,12 +842,14 @@ mod test {
let batch_info: BatchInfo = data.clone().into();
db.insert_batch(data).await.unwrap();

let mut blocks = Vec::new();
for _ in 0..10 {
let block_info =
BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_block(block_info, batch_info).await.unwrap();
block_number += 1;
blocks.push(block_info);
}
db.insert_blocks(blocks, batch_info).await.unwrap();

// Fetch the highest block for the batch hash and verify number.
let highest_block_info =
Expand Down Expand Up @@ -893,12 +883,14 @@ mod test {
db.insert_batch(first_batch).await.unwrap();
db.insert_batch(second_batch).await.unwrap();

let mut blocks = Vec::new();
for _ in 0..10 {
let block_info =
BlockInfo { number: block_number, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_block(block_info, first_batch_info).await.unwrap();
block_number += 1;
blocks.push(block_info);
}
db.insert_blocks(blocks, first_batch_info).await.unwrap();

// Fetch the highest block for the batch hash and verify number.
let highest_block_info =
Expand Down Expand Up @@ -1136,10 +1128,9 @@ mod test {
let mut block_infos = Vec::new();
for i in 200..205 {
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };
let l2_block = block_info;
block_infos.push(block_info);
db.insert_block(l2_block, batch_info).await.unwrap();
}
db.insert_blocks(block_infos.clone(), batch_info).await.unwrap();

// Test getting existing blocks
for expected_block in block_infos {
Expand Down Expand Up @@ -1177,9 +1168,7 @@ mod test {
let safe_block_1 = BlockInfo { number: 200, hash: B256::arbitrary(&mut u).unwrap() };
let safe_block_2 = BlockInfo { number: 201, hash: B256::arbitrary(&mut u).unwrap() };

db.insert_block(safe_block_1, batch_info).await.unwrap();

db.insert_block(safe_block_2, batch_info).await.unwrap();
db.insert_blocks(vec![safe_block_1, safe_block_2], batch_info).await.unwrap();

// Should return the highest safe block (block 201)
let latest_safe = db.get_latest_safe_l2_info().await.unwrap();
Expand All @@ -1198,11 +1187,12 @@ mod test {

// Insert multiple L2 blocks with batch info
let batch_info = BatchInfo { index: 0, hash: B256::default() };
let mut blocks = Vec::new();
for i in 400..410 {
let block_info = BlockInfo { number: i, hash: B256::arbitrary(&mut u).unwrap() };

db.insert_block(block_info, batch_info).await.unwrap();
blocks.push(block_info);
}
db.insert_blocks(blocks, batch_info).await.unwrap();

// Delete blocks with number > 405
let deleted_count = db.delete_l2_blocks_gt_block_number(405).await.unwrap();
Expand Down Expand Up @@ -1245,10 +1235,9 @@ mod test {
for i in 100..110 {
let batch_data = db.get_batch_by_index(i).await.unwrap().unwrap();
let batch_info: BatchInfo = batch_data.into();

let block_info = BlockInfo { number: 500 + i, hash: B256::arbitrary(&mut u).unwrap() };

db.insert_block(block_info, batch_info).await.unwrap();
db.insert_blocks(vec![block_info], batch_info).await.unwrap();
}

// Delete L2 blocks with batch index > 105
Expand Down Expand Up @@ -1304,8 +1293,8 @@ mod test {
L2BlockInfoWithL1Messages { block_info, l1_messages: l1_message_hashes.clone() };

// Insert block
db.insert_block(l2_block.block_info, batch_info).await.unwrap();
db.update_l1_messages_with_l2_block(l2_block).await.unwrap();
db.insert_blocks(vec![l2_block.block_info], batch_info).await.unwrap();
db.update_l1_messages_with_l2_blocks(vec![l2_block]).await.unwrap();

// Verify block was inserted
let retrieved_block = db.get_l2_block_info_by_number(500).await.unwrap();
Expand Down Expand Up @@ -1340,7 +1329,7 @@ mod test {

// Insert initial block
let block_info = BlockInfo { number: 600, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_block(block_info, batch_info_1).await.unwrap();
db.insert_blocks(vec![block_info], batch_info_1).await.unwrap();

// Verify initial insertion
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap();
Expand All @@ -1359,7 +1348,7 @@ mod test {
assert_eq!(initial_batch_info, batch_info_1);

// Update the same block with different batch info (upsert)
db.insert_block(block_info, batch_info_2).await.unwrap();
db.insert_blocks(vec![block_info], batch_info_2).await.unwrap();

// Verify the block still exists and was updated
let retrieved_block = db.get_l2_block_info_by_number(600).await.unwrap().unwrap();
Expand Down Expand Up @@ -1393,23 +1382,22 @@ mod test {
let block_1 = BlockInfo { number: 1, hash: B256::arbitrary(&mut u).unwrap() };
let block_2 = BlockInfo { number: 2, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_batch(batch_data_1.clone()).await.unwrap();
db.insert_block(block_1, batch_data_1.clone().into()).await.unwrap();
db.insert_block(block_2, batch_data_1.clone().into()).await.unwrap();
db.insert_blocks(vec![block_1, block_2], batch_data_1.clone().into()).await.unwrap();

// Insert batch 2 and associate it with one block in the database
let batch_data_2 =
BatchCommitData { index: 2, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() };
let block_3 = BlockInfo { number: 3, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_batch(batch_data_2.clone()).await.unwrap();
db.insert_block(block_3, batch_data_2.clone().into()).await.unwrap();
db.insert_blocks(vec![block_3], batch_data_2.clone().into()).await.unwrap();

// Insert batch 3 produced at the same block number as batch 2 and associate it with one
// block
let batch_data_3 =
BatchCommitData { index: 3, block_number: 20, ..Arbitrary::arbitrary(&mut u).unwrap() };
let block_4 = BlockInfo { number: 4, hash: B256::arbitrary(&mut u).unwrap() };
db.insert_batch(batch_data_3.clone()).await.unwrap();
db.insert_block(block_4, batch_data_3.clone().into()).await.unwrap();
db.insert_blocks(vec![block_4], batch_data_3.clone().into()).await.unwrap();

db.set_finalized_l1_block_number(21).await.unwrap();

Expand Down
117 changes: 67 additions & 50 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rollup_node_primitives::{
};
use scroll_alloy_rpc_types_engine::BlockDataHint;
use sea_orm::{
sea_query::{Expr, OnConflict},
sea_query::{CaseStatement, Expr, OnConflict},
ColumnTrait, Condition, DbErr, EntityTrait, QueryFilter, QueryOrder, QuerySelect,
};
use std::fmt;
Expand Down Expand Up @@ -99,13 +99,6 @@ pub trait DatabaseWriteOperations {
batch_info: BatchInfo,
) -> Result<(), DatabaseError>;

/// Insert a new block in the database.
async fn insert_block(
&self,
block_info: BlockInfo,
batch_info: BatchInfo,
) -> Result<(), DatabaseError>;

/// Insert the genesis block into the database.
async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError>;

Expand All @@ -116,9 +109,9 @@ pub trait DatabaseWriteOperations {
) -> Result<(), DatabaseError>;

/// Update the executed L1 messages with the provided L2 block number in the database.
async fn update_l1_messages_with_l2_block(
async fn update_l1_messages_with_l2_blocks(
&self,
block_info: L2BlockInfoWithL1Messages,
block_info: Vec<L2BlockInfoWithL1Messages>,
) -> Result<(), DatabaseError>;

/// Purge all L1 message to L2 block mappings from the database for blocks greater or equal to
Expand Down Expand Up @@ -426,29 +419,18 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
&self,
blocks: Vec<BlockInfo>,
batch_info: BatchInfo,
) -> Result<(), DatabaseError> {
for block in blocks {
self.insert_block(block, batch_info).await?;
}
Ok(())
}

async fn insert_block(
&self,
block_info: BlockInfo,
batch_info: BatchInfo,
) -> Result<(), DatabaseError> {
// We only insert safe blocks into the database, we do not persist unsafe blocks.
tracing::trace!(
target: "scroll::db",
batch_hash = ?batch_info.hash,
batch_index = batch_info.index,
block_number = block_info.number,
block_hash = ?block_info.hash,
"Inserting block into database."
blocks = ?blocks,
"Inserting blocks into database."
);
let l2_block: models::l2_block::ActiveModel = (block_info, batch_info).into();
models::l2_block::Entity::insert(l2_block)
let l2_blocks: Vec<models::l2_block::ActiveModel> =
blocks.into_iter().map(|b| (b, batch_info).into()).collect();
models::l2_block::Entity::insert_many(l2_blocks)
.on_conflict(
OnConflict::column(models::l2_block::Column::BlockNumber)
.update_columns([
Expand All @@ -458,6 +440,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
])
.to_owned(),
)
.on_empty_do_nothing()
.exec(self.get_connection())
.await?;

Expand All @@ -467,7 +450,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
async fn insert_genesis_block(&self, genesis_hash: B256) -> Result<(), DatabaseError> {
let genesis_block = BlockInfo::new(0, genesis_hash);
let genesis_batch = BatchInfo::new(0, B256::ZERO);
self.insert_block(genesis_block, genesis_batch).await
self.insert_blocks(vec![genesis_block], genesis_batch).await
}

async fn update_l1_messages_from_l2_blocks(
Expand All @@ -481,31 +464,63 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
.await?;

// Then, update the executed L1 messages for each block.
for block in blocks {
self.update_l1_messages_with_l2_block(block).await?;
}
self.update_l1_messages_with_l2_blocks(blocks).await?;

Ok(())
}

async fn update_l1_messages_with_l2_block(
async fn update_l1_messages_with_l2_blocks(
&self,
block_info: L2BlockInfoWithL1Messages,
blocks: Vec<L2BlockInfoWithL1Messages>,
) -> Result<(), DatabaseError> {
tracing::trace!(
target: "scroll::db",
block_number = block_info.block_info.number,
l1_messages = ?block_info.l1_messages,
"Updating executed L1 messages from block with L2 block number in the database."
);
models::l1_message::Entity::update_many()
.col_expr(
models::l1_message::Column::L2BlockNumber,
if blocks.is_empty() {
return Ok(());
}
let start = blocks.first().unwrap().block_info.number;
let end = blocks.last().unwrap().block_info.number;
tracing::trace!(target: "scroll::db", start_block = start, end_block = end, "Updating executed L1 messages from blocks with L2 block number in the database.");

let mut case = CaseStatement::new();
let mut all_hashes = Vec::new();

for block_info in blocks {
if block_info.l1_messages.is_empty() {
continue;
}

tracing::trace!(
target: "scroll::db",
block_number = block_info.block_info.number,
l1_messages = ?block_info.l1_messages,
"Including L1 messages from block in batch update."
);

let hashes: Vec<Vec<u8>> = block_info.l1_messages.iter().map(|x| x.to_vec()).collect();

case = case.case(
models::l1_message::Column::Hash.is_in(hashes.clone()),
Expr::value(block_info.block_info.number as i64),
)
.filter(
models::l1_message::Column::Hash
.is_in(block_info.l1_messages.iter().map(|x| x.to_vec())),
)
);

all_hashes.extend(hashes);
}

if all_hashes.is_empty() {
return Ok(());
}

// query translates to the following sql:
// UPDATE l1_message
// SET l2_block_number = CASE
// WHEN hash IN (block1_hashes) THEN block1_number
// WHEN hash IN (block2_hashes) THEN block2_number
// WHEN hash IN (block3_hashes) THEN block3_number
// ELSE 0
// END
// WHERE hash IN (all_hashes)
models::l1_message::Entity::update_many()
.col_expr(models::l1_message::Column::L2BlockNumber, case.into())
.filter(models::l1_message::Column::Hash.is_in(all_hashes))
.exec(self.get_connection())
.await?;

Expand Down Expand Up @@ -539,10 +554,12 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
&self,
outcome: BatchConsolidationOutcome,
) -> Result<(), DatabaseError> {
for block in outcome.blocks {
self.insert_block(block.block_info, outcome.batch_info).await?;
self.update_l1_messages_with_l2_block(block).await?;
}
self.insert_blocks(
outcome.blocks.iter().map(|b| b.block_info).collect(),
outcome.batch_info,
)
.await?;
self.update_l1_messages_with_l2_blocks(outcome.blocks).await?;
self.update_skipped_l1_messages(outcome.skipped_l1_messages).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sequencer/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ async fn can_build_blocks_with_finalized_l1_messages() {
assert!(!block.body.transactions.iter().any(|tx| tx.tx_hash() == &unfinalized_message_hash));

// Handle the build block with the sequencer in order to update L1 message queue index.
database.update_l1_messages_with_l2_block((&block).into()).await.unwrap();
database.update_l1_messages_with_l2_blocks(vec![(&block).into()]).await.unwrap();

// update finalized block number to 3, now both messages should be available
database.set_finalized_l1_block_number(3).await.unwrap();
Expand Down