diff --git a/crates/codec/src/lib.rs b/crates/codec/src/lib.rs index aeba8397..7798a00f 100644 --- a/crates/codec/src/lib.rs +++ b/crates/codec/src/lib.rs @@ -41,7 +41,7 @@ pub enum Codec { impl Codec { /// Decodes the input data and returns the decoded [`Batch`]. - pub fn decode(input: &T) -> Result { + pub fn decode(input: T) -> Result { let calldata = input.calldata(); let version = get_codec_version(calldata)?; diff --git a/crates/derivation-pipeline/src/data_source.rs b/crates/derivation-pipeline/src/data_source.rs index 4b1d4e1a..b32a66a2 100644 --- a/crates/derivation-pipeline/src/data_source.rs +++ b/crates/derivation-pipeline/src/data_source.rs @@ -1,19 +1,20 @@ -use alloy_eips::eip4844::Blob; use alloy_primitives::Bytes; use scroll_codec::CommitDataSource; /// Holds the data for the codec. -pub(crate) struct CodecDataSource<'a> { - pub(crate) calldata: &'a Bytes, - pub(crate) blob: Option<&'a Blob>, +pub(crate) struct CodecDataSource { + pub(crate) calldata: Calldata, + pub(crate) blob: Option, } -impl<'a> CommitDataSource for CodecDataSource<'a> { +impl, Blob: AsRef> CommitDataSource + for CodecDataSource +{ fn calldata(&self) -> &Bytes { - self.calldata + self.calldata.as_ref() } - fn blob(&self) -> Option<&Blob> { - self.blob + fn blob(&self) -> Option<&alloy_eips::eip4844::Blob> { + self.blob.as_ref().map(|b| b.as_ref()) } } diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index e34f00a2..40430f93 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -10,7 +10,7 @@ use futures::{stream::FuturesOrdered, Stream, StreamExt}; use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope}; use rollup_node_providers::L1Provider; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; -use scroll_codec::{decoding::payload::PayloadData, Codec}; +use scroll_codec::{decoding::payload::PayloadData, Codec, CodecError, DecodingError}; use scroll_db::{Database, DatabaseReadOperations, L1MessageKey}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; @@ -280,8 +280,15 @@ pub async fn derive( } else { None }; - let data = CodecDataSource { calldata: batch.calldata.as_ref(), blob: blob.as_deref() }; - let decoded = Codec::decode(&data)?; + + let data = CodecDataSource { calldata: batch.calldata.clone(), blob }; + + let decoded = + tokio::task::spawn_blocking(move || Codec::decode(data)).await.map_err(|err| { + DerivationPipelineError::Codec(CodecError::Decoding(DecodingError::Other(Box::new( + err, + )))) + })??; // set the cursor for the l1 provider. let payload_data = &decoded.data; diff --git a/crates/watcher/src/error.rs b/crates/watcher/src/error.rs index 75e8ca50..ae434b04 100644 --- a/crates/watcher/src/error.rs +++ b/crates/watcher/src/error.rs @@ -52,4 +52,7 @@ pub enum FilterLogError { /// The log is missing a transaction hash. #[error("unknown transaction hash for log")] MissingTransactionHash, + /// Invalid extracted notification length. + #[error("expected {0} notifications, got {1}")] + InvalidNotificationCount(usize, usize), } diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 85b38e0b..e3f12e34 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -267,6 +267,13 @@ where notifications.push(system_contract_update); } + if logs.len() != notifications.len() { + return Err(L1WatcherError::Logs(FilterLogError::InvalidNotificationCount( + logs.len(), + notifications.len(), + ))) + } + // send all notifications on the channel. self.notify_all(notifications).await?;