Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum Codec {

impl Codec {
/// Decodes the input data and returns the decoded [`Batch`].
pub fn decode<T: CommitDataSource>(input: &T) -> Result<Batch, CodecError> {
pub fn decode<T: CommitDataSource>(input: T) -> Result<Batch, CodecError> {
let calldata = input.calldata();
let version = get_codec_version(calldata)?;

Expand Down
17 changes: 9 additions & 8 deletions crates/derivation-pipeline/src/data_source.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use alloy_eips::eip4844::Blob;
use alloy_primitives::Bytes;
use scroll_codec::CommitDataSource;

/// Holds the data for the codec.
pub(crate) struct CodecDataSource<'a> {
pub(crate) calldata: &'a Bytes,
pub(crate) blob: Option<&'a Blob>,
pub(crate) struct CodecDataSource<Calldata, Blob> {
pub(crate) calldata: Calldata,
pub(crate) blob: Option<Blob>,
}

impl<'a> CommitDataSource for CodecDataSource<'a> {
impl<Calldata: AsRef<Bytes>, Blob: AsRef<alloy_eips::eip4844::Blob>> CommitDataSource
for CodecDataSource<Calldata, Blob>
{
fn calldata(&self) -> &Bytes {
self.calldata
self.calldata.as_ref()
}

fn blob(&self) -> Option<&Blob> {
self.blob
fn blob(&self) -> Option<&alloy_eips::eip4844::Blob> {
self.blob.as_ref().map(|b| b.as_ref())
}
}
13 changes: 10 additions & 3 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{stream::FuturesOrdered, Stream, StreamExt};
use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope};
use rollup_node_providers::L1Provider;
use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes};
use scroll_codec::{decoding::payload::PayloadData, Codec};
use scroll_codec::{decoding::payload::PayloadData, Codec, CodecError, DecodingError};
use scroll_db::{Database, DatabaseReadOperations, L1MessageKey};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

Expand Down Expand Up @@ -280,8 +280,15 @@ pub async fn derive<L1P: L1Provider + Sync + Send, DB: DatabaseReadOperations>(
} else {
None
};
let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_deref() };
let decoded = Codec::decode(&data)?;

let data = CodecDataSource { calldata: batch.calldata.clone(), blob };

let decoded =
tokio::task::spawn_blocking(move || Codec::decode(data)).await.map_err(|err| {
DerivationPipelineError::Codec(CodecError::Decoding(DecodingError::Other(Box::new(
err,
))))
})??;

// set the cursor for the l1 provider.
let payload_data = &decoded.data;
Expand Down
3 changes: 3 additions & 0 deletions crates/watcher/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ pub enum FilterLogError {
/// The log is missing a transaction hash.
#[error("unknown transaction hash for log")]
MissingTransactionHash,
/// Invalid extracted notification length.
#[error("expected {0} notifications, got {1}")]
InvalidNotificationCount(usize, usize),
}
7 changes: 7 additions & 0 deletions crates/watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ where
notifications.push(system_contract_update);
}

if logs.len() != notifications.len() {
return Err(L1WatcherError::Logs(FilterLogError::InvalidNotificationCount(
logs.len(),
notifications.len(),
)))
}

// send all notifications on the channel.
self.notify_all(notifications).await?;

Expand Down
Loading