Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/chain-orchestrator/src/handle/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
use scroll_db::L1MessageKey;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -35,5 +36,5 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
#[derive(Debug)]
pub enum DatabaseQuery {
/// Get L1 message by its index.
GetL1MessageByIndex(u64, oneshot::Sender<Option<L1MessageEnvelope>>),
GetL1MessageByKey(L1MessageKey, oneshot::Sender<Option<L1MessageEnvelope>>),
}
7 changes: 4 additions & 3 deletions crates/chain-orchestrator/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
use scroll_db::L1MessageKey;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::{mpsc, oneshot};
use tracing::error;
Expand Down Expand Up @@ -91,13 +92,13 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
}

/// Get an L1 message by its index.
pub async fn get_l1_message_by_index(
pub async fn get_l1_message_by_key(
&self,
index: u64,
key: L1MessageKey,
) -> Result<Option<L1MessageEnvelope>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(ChainOrchestratorCommand::DatabaseQuery(
DatabaseQuery::GetL1MessageByIndex(index, tx),
DatabaseQuery::GetL1MessageByKey(key, tx),
));
rx.await
}
Expand Down
9 changes: 3 additions & 6 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,12 +379,9 @@ impl<
}
}
ChainOrchestratorCommand::DatabaseQuery(query) => match query {
DatabaseQuery::GetL1MessageByIndex(index, sender) => {
let l1_message = self
.database
.get_n_l1_messages(Some(L1MessageKey::from_queue_index(index)), 1)
.await?
.pop();
DatabaseQuery::GetL1MessageByKey(l1_message_key, sender) => {
let l1_message =
self.database.get_n_l1_messages(Some(l1_message_key), 1).await?.pop();
let _ = sender.send(l1_message);
}
},
Expand Down
1 change: 1 addition & 0 deletions crates/database/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures.workspace = true
metrics.workspace = true
metrics-derive.workspace = true
sea-orm = { workspace = true, features = ["sqlx-sqlite", "runtime-tokio-native-tls", "macros"] }
serde.workspace = true
serde_json.workspace = true
tempfile = { version = "3.20.0", optional = true }
thiserror.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use models::*;

mod operations;
pub use operations::{
DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, NotIncludedStart, UnwindResult,
DatabaseReadOperations, DatabaseWriteOperations, L1MessageKey, NotIncludedKey, UnwindResult,
};

pub use sea_orm::EntityTrait;
Expand Down
89 changes: 80 additions & 9 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl<T: WriteConnectionProvider + ?Sized + Sync> DatabaseWriteOperations for T {
let Some((block_info, batch_info)) =
self.get_latest_safe_l2_info().await?.filter(|(block_info, _)| block_info.number > 0)
else {
return Ok((None, None))
return Ok((None, None));
};
let batch = self.get_batch_by_index(batch_info.index).await?.expect("batch must exist");
Ok((Some(block_info), Some(batch.block_number.saturating_add(1))))
Expand Down Expand Up @@ -864,7 +864,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
// Provides a stream over all L1 messages with increasing queue index starting that have
// not been included in an L2 block and have a block number less than or equal to the
// finalized L1 block number (they have been finalized on L1).
Some(L1MessageKey::NotIncluded(NotIncludedStart::FinalizedWithBlockDepth(depth))) => {
Some(L1MessageKey::NotIncluded(NotIncludedKey::FinalizedWithBlockDepth(depth))) => {
// Lookup the finalized L1 block number.
let finalized_block_number = self.get_finalized_l1_block_number().await?;

Expand Down Expand Up @@ -898,7 +898,7 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
// included in an L2 block and have a block number less than or equal to the
// latest L1 block number minus the provided depth (they have been sufficiently deep
// on L1 to be considered safe to include - reorg risk is low).
Some(L1MessageKey::NotIncluded(NotIncludedStart::BlockDepth(depth))) => {
Some(L1MessageKey::NotIncluded(NotIncludedKey::BlockDepth(depth))) => {
// Lookup the latest L1 block number.
let latest_block_number = self.get_latest_l1_block_number().await?;

Expand Down Expand Up @@ -1055,7 +1055,8 @@ impl<T: ReadConnectionProvider + Sync + ?Sized> DatabaseReadOperations for T {
/// A key for an L1 message stored in the database.
///
/// It can either be the queue index, queue hash or the transaction hash.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum L1MessageKey {
/// The queue index of the message.
QueueIndex(u64),
Expand All @@ -1066,7 +1067,7 @@ pub enum L1MessageKey {
/// Start from the first message for the provided block number.
BlockNumber(u64),
/// Start from messages that have not been included in a block yet.
NotIncluded(NotIncludedStart),
NotIncluded(NotIncludedKey),
}

impl L1MessageKey {
Expand All @@ -1093,13 +1094,15 @@ impl L1MessageKey {

/// This type defines where to start when fetching L1 messages that have not been included in a
/// block yet.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NotIncludedStart {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum NotIncludedKey {
/// Start from finalized messages that have not been included in a block yet and have a L1
/// block number that is a specified number of blocks below the current finalized L1 block
/// number.
#[serde(rename = "notIncludedFinalizedWithBlockDepth")]
FinalizedWithBlockDepth(u64),
/// Start from unfinalized messages that are included in L1 blocks at a specific depth.
#[serde(rename = "notIncludedBlockDepth")]
BlockDepth(u64),
}

Expand All @@ -1115,10 +1118,10 @@ impl fmt::Display for L1MessageKey {
Self::TransactionHash(hash) => write!(f, "TransactionHash({hash:#x})"),
Self::BlockNumber(number) => write!(f, "BlockNumber({number})"),
Self::NotIncluded(start) => match start {
NotIncludedStart::FinalizedWithBlockDepth(depth) => {
NotIncludedKey::FinalizedWithBlockDepth(depth) => {
write!(f, "NotIncluded(Finalized:{depth})")
}
NotIncludedStart::BlockDepth(depth) => {
NotIncludedKey::BlockDepth(depth) => {
write!(f, "NotIncluded(BlockDepth({depth}))")
}
},
Expand All @@ -1139,3 +1142,71 @@ pub struct UnwindResult {
/// The L2 safe block info after the unwind. This is only populated if the L2 safe has reorged.
pub l2_safe_block_info: Option<BlockInfo>,
}

mod tests {

#[test]
fn test_l1_message_key_serialization() {
use crate::{L1MessageKey, NotIncludedKey};
use alloy_primitives::B256;
use std::str::FromStr;

// Test for `L1MessageKey::QueueIndex`
let key = L1MessageKey::QueueIndex(42);
let json = serde_json::to_string(&key).unwrap();
let decoded: L1MessageKey = serde_json::from_str(&json).unwrap();
assert_eq!(key, decoded);

// Test for `L1MessageKey::TransactionHash`
let key = L1MessageKey::TransactionHash(
B256::from_str("0xa46f0b1dbe17b3d0d86fa70cef4a23dca5efcd35858998cc8c53140d01429746")
.unwrap(),
);
let json = serde_json::to_string(&key).unwrap();
let decoded: L1MessageKey = serde_json::from_str(&json).unwrap();
assert_eq!(key, decoded);

// Test for `L1MessageKey::NotIncluded`
let key = L1MessageKey::NotIncluded(NotIncludedKey::FinalizedWithBlockDepth(100));
let json = serde_json::to_string(&key).unwrap();
let decoded: L1MessageKey = serde_json::from_str(&json).unwrap();
assert_eq!(key, decoded);
}

#[test]
fn test_l1_message_key_manual_serialization() {
use crate::{L1MessageKey, NotIncludedKey};
use alloy_primitives::B256;
use std::str::FromStr;

// Test for `L1MessageKey::QueueIndex`
let json_string_queue_index = r#"{"queueIndex":42}"#;
let decoded_queue_index: L1MessageKey =
serde_json::from_str(json_string_queue_index).unwrap();
assert_eq!(decoded_queue_index, L1MessageKey::QueueIndex(42));

// Test for `L1MessageKey::TransactionHash`
let json_string_transaction_hash = r#"{"transactionHash":"0xa46f0b1dbe17b3d0d86fa70cef4a23dca5efcd35858998cc8c53140d01429746"}"#;
let decoded_transaction_hash: L1MessageKey =
serde_json::from_str(json_string_transaction_hash).unwrap();
assert_eq!(
decoded_transaction_hash,
L1MessageKey::TransactionHash(
B256::from_str(
"0xa46f0b1dbe17b3d0d86fa70cef4a23dca5efcd35858998cc8c53140d01429746"
)
.unwrap()
)
);

// Test for `L1MessageKey::NotIncluded`
let json_string_not_included_key =
r#"{"notIncluded":{"notIncludedFinalizedWithBlockDepth":100}}"#;
let decoded_not_included_key: L1MessageKey =
serde_json::from_str(json_string_not_included_key).unwrap();
assert_eq!(
decoded_not_included_key,
L1MessageKey::NotIncluded(NotIncludedKey::FinalizedWithBlockDepth(100))
);
}
}
31 changes: 30 additions & 1 deletion crates/node/src/add_ons/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use rollup_node_chain_orchestrator::{ChainOrchestratorHandle, ChainOrchestratorStatus};
use rollup_node_primitives::L1MessageEnvelope;
use scroll_db::L1MessageKey;
use tokio::sync::{oneshot, Mutex, OnceCell};

/// RPC extension for rollup node management operations.
Expand Down Expand Up @@ -84,6 +85,13 @@ pub trait RollupNodeExtApi {
/// Returns the L1 message by index.
#[method(name = "getL1MessageByIndex")]
async fn get_l1_message_by_index(&self, index: u64) -> RpcResult<Option<L1MessageEnvelope>>;

/// Returns the L1 message by key.
#[method(name = "getL1MessageByKey")]
async fn get_l1_message_by_key(
&self,
l1_message_key: L1MessageKey,
) -> RpcResult<Option<L1MessageEnvelope>>;
}

#[async_trait]
Expand Down Expand Up @@ -154,12 +162,33 @@ where
)
})?;

handle.get_l1_message_by_index(index).await.map_err(|e| {
handle.get_l1_message_by_key(L1MessageKey::from_queue_index(index)).await.map_err(|e| {
ErrorObjectOwned::owned(
error::INTERNAL_ERROR_CODE,
format!("Failed to get L1 message by index: {}", e),
None::<()>,
)
})
}

async fn get_l1_message_by_key(
&self,
l1_message_key: L1MessageKey,
) -> RpcResult<Option<L1MessageEnvelope>> {
let handle = self.rollup_manager_handle().await.map_err(|e| {
ErrorObjectOwned::owned(
error::INTERNAL_ERROR_CODE,
format!("Failed to get rollup manager handle: {}", e),
None::<()>,
)
})?;

handle.get_l1_message_by_key(l1_message_key).await.map_err(|e| {
ErrorObjectOwned::owned(
error::INTERNAL_ERROR_CODE,
format!("Failed to get L1 message by key: {}", e),
None::<()>,
)
})
}
}
6 changes: 3 additions & 3 deletions crates/sequencer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_primitives::Address;
use scroll_db::{L1MessageKey, NotIncludedStart};
use scroll_db::{L1MessageKey, NotIncludedKey};
use std::{fmt, str::FromStr, sync::Arc};

/// Configuration for the sequencer.
Expand Down Expand Up @@ -82,10 +82,10 @@ impl From<L1MessageInclusionMode> for L1MessageKey {
fn from(mode: L1MessageInclusionMode) -> Self {
match mode {
L1MessageInclusionMode::FinalizedWithBlockDepth(depth) => {
Self::NotIncluded(NotIncludedStart::FinalizedWithBlockDepth(depth))
Self::NotIncluded(NotIncludedKey::FinalizedWithBlockDepth(depth))
}
L1MessageInclusionMode::BlockDepth(depth) => {
Self::NotIncluded(NotIncludedStart::BlockDepth(depth))
Self::NotIncluded(NotIncludedKey::BlockDepth(depth))
}
}
}
Expand Down