Skip to content

feat: add builder for blob extractor #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/blobber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ signet-zenith.workspace = true

reth.workspace = true
reth-chainspec.workspace = true
reth-transaction-pool = { workspace = true, optional = true }

smallvec.workspace = true
tokio.workspace = true
tracing.workspace = true
eyre.workspace = true
reqwest.workspace = true
url.workspace = true
foundry-blob-explorers.workspace = true
Expand All @@ -33,5 +33,9 @@ signet-constants = { workspace = true, features = ["test-utils"] }

reth-transaction-pool = { workspace = true, features = ["test-utils"] }

eyre.workspace = true
serde_json.workspace = true
tempfile.workspace = true
tempfile.workspace = true

[features]
test-utils = ["signet-constants/test-utils", "dep:reth-transaction-pool", "reth-transaction-pool?/test-utils"]
17 changes: 17 additions & 0 deletions crates/blobber/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Block Extractor

The [`BlockExtractor`] retrieves blobs from host chain blocks and parses them
into [`ZenithBlock`]s. It is used by the node during notification processing
when a [`Zenith::BlockSubmitted`] event is extracted from a host chain block.

## Data Sources

The following sources can be configured:

- The local EL node transaction pool.
- The local CL node via RPC.
- A blob explorer.
- Signet's Pylon blob storage system.

[`ZenithBlock`]: signet_zenith::ZenithBlock
[`Zenith::BlockSubmitted`]: signet_zenith::Zenith::BlockSubmitted
44 changes: 24 additions & 20 deletions crates/blobber/src/block_data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
error::UnrecoverableBlobError, shim::ExtractableChainShim, BlockExtractionError,
ExtractionResult,
BlockExtractorBuilder, ExtractionResult,
};
use alloy::{
consensus::{Blob, SidecarCoder, SimpleCoder},
Expand All @@ -15,7 +15,7 @@ use reth::{
use signet_extract::{ExtractedEvent, Extracts};
use signet_zenith::{Zenith::BlockSubmitted, ZenithBlock};
use smallvec::SmallVec;
use std::{borrow::Cow, ops::Deref, sync::Arc};
use std::{ops::Deref, sync::Arc};
use tokio::select;
use tracing::{error, instrument, trace};

Expand Down Expand Up @@ -94,7 +94,7 @@ impl From<Vec<Blob>> for Blobs {
/// queries an explorer if it can't find the blob. When Decoder does find a
/// blob, it decodes it and returns the decoded transactions.
#[derive(Debug)]
pub struct BlockExtractor<Pool: TransactionPool> {
pub struct BlockExtractor<Pool> {
pool: Pool,
explorer: foundry_blob_explorers::Client,
client: reqwest::Client,
Expand All @@ -103,26 +103,27 @@ pub struct BlockExtractor<Pool: TransactionPool> {
slot_calculator: SlotCalculator,
}

impl BlockExtractor<()> {
/// Returns a new [`BlockExtractorBuilder`].
pub fn builder() -> BlockExtractorBuilder<()> {
BlockExtractorBuilder::default()
}
}

impl<Pool> BlockExtractor<Pool>
where
Pool: TransactionPool,
{
/// new returns a new `Decoder` generic over a `Pool`
pub fn new(
pub const fn new(
pool: Pool,
explorer: foundry_blob_explorers::Client,
cl_client: reqwest::Client,
cl_url: Option<Cow<'static, str>>,
pylon_url: Option<Cow<'static, str>>,
cl_url: Option<url::Url>,
pylon_url: Option<url::Url>,
slot_calculator: SlotCalculator,
) -> Result<Self, url::ParseError> {
let cl_url =
if let Some(url) = cl_url { Some(url::Url::parse(url.as_ref())?) } else { None };

let pylon_url =
if let Some(url) = pylon_url { Some(url::Url::parse(url.as_ref())?) } else { None };

Ok(Self { pool, explorer, client: cl_client, cl_url, pylon_url, slot_calculator })
) -> Self {
Self { pool, explorer, client: cl_client, cl_url, pylon_url, slot_calculator }
}

/// Get blobs from either the pool or the network and decode them,
Expand Down Expand Up @@ -412,13 +413,16 @@ mod tests {
let constants: SignetSystemConstants = test.try_into().unwrap();
let calc = SlotCalculator::new(0, 0, 12);

let explorer_url = Cow::Borrowed("https://api.holesky.blobscan.com/");
let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
let explorer =
foundry_blob_explorers::Client::new_with_client(explorer_url.as_ref(), client.clone());
let explorer_url = "https://api.holesky.blobscan.com/";
let client = reqwest::Client::builder().use_rustls_tls();

let extractor =
BlockExtractor::new(pool.clone(), explorer, client.clone(), None, None, calc)?;
let extractor = BlockExtractor::builder()
.with_pool(pool.clone())
.with_explorer_url(explorer_url)
.with_client_builder(client)
.unwrap()
.with_slot_calculator(calc)
.build()?;

let tx = Transaction::Eip2930(TxEip2930 {
chain_id: 17001,
Expand Down
140 changes: 140 additions & 0 deletions crates/blobber/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use crate::block_data::BlockExtractor;
use init4_bin_base::utils::calc::SlotCalculator;
use reth::transaction_pool::TransactionPool;
use url::Url;

/// Errors that can occur while building the [`BlockExtractor`] with a
/// [`BlockExtractorBuilder`].
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
/// The transaction pool was not provided.
#[error("transaction pool is required")]
MissingPool,
/// The explorer URL was not provided or could not be parsed.
#[error("explorer URL is required and must be valid")]
MissingExplorerUrl,
/// The URL provided was invalid.
#[error("invalid URL provided")]
Url(#[from] url::ParseError),
/// The client was not provided.
#[error("client is required")]
MissingClient,
/// The client failed to build.
#[error("failed to build client: {0}")]
Client(#[from] reqwest::Error),
/// The slot calculator was not provided.
#[error("slot calculator is required")]
MissingSlotCalculator,
}

/// Builder for the [`BlockExtractor`].
#[derive(Debug, Default, Clone)]
pub struct BlockExtractorBuilder<Pool> {
pool: Option<Pool>,
explorer_url: Option<String>,
client: Option<reqwest::Client>,
cl_url: Option<String>,
pylon_url: Option<String>,
slot_calculator: Option<SlotCalculator>,
}

impl<Pool> BlockExtractorBuilder<Pool> {
/// Set the transaction pool to use for the extractor.
pub fn with_pool<P2>(self, pool: P2) -> BlockExtractorBuilder<P2> {
BlockExtractorBuilder {
pool: Some(pool),
explorer_url: self.explorer_url,
client: self.client,
cl_url: self.cl_url,
pylon_url: self.pylon_url,
slot_calculator: self.slot_calculator,
}
}

/// Set the transaction pool to use a mock test pool.
#[cfg(feature = "test-utils")]
pub fn with_test_pool(
self,
) -> BlockExtractorBuilder<reth_transaction_pool::test_utils::TestPool> {
self.with_pool(reth_transaction_pool::test_utils::testing_pool())
}

/// Set the blob explorer URL to use for the extractor. This will be used
/// to construct a [`foundry_blob_explorers::Client`].
pub fn with_explorer_url(mut self, explorer_url: &str) -> Self {
self.explorer_url = Some(explorer_url.to_string());
self
}

/// Set the [`reqwest::Client`] to use for the extractor. This client will
/// be used to make requests to the blob explorer, and the CL and Pylon URLs
/// if provided.
pub fn with_client(mut self, client: reqwest::Client) -> Self {
self.client = Some(client);
self
}

/// Set the [`reqwest::Client`] via a [reqwest::ClientBuilder]. This
/// function will immediately build the client and return an error if it
/// fails.
///
/// This client will be used to make requests to the blob explorer, and the
/// CL and Pylon URLs if provided.
pub fn with_client_builder(self, client: reqwest::ClientBuilder) -> Result<Self, BuilderError> {
client.build().map(|client| self.with_client(client)).map_err(Into::into)
}

/// Set the CL URL to use for the extractor.
pub fn with_cl_url(mut self, cl_url: &str) -> Result<Self, BuilderError> {
self.cl_url = Some(cl_url.to_string());
Ok(self)
}

/// Set the Pylon URL to use for the extractor.
pub fn with_pylon_url(mut self, pylon_url: &str) -> Result<Self, BuilderError> {
self.pylon_url = Some(pylon_url.to_string());
Ok(self)
}

/// Set the slot calculator to use for the extractor.
pub const fn with_slot_calculator(
mut self,
slot_calculator: SlotCalculator,
) -> BlockExtractorBuilder<Pool> {
self.slot_calculator = Some(slot_calculator);
self
}

/// Set the slot calculator to use for the extractor, using the Pecornino
/// host configuration.
pub const fn with_pecornino_slots(mut self) -> BlockExtractorBuilder<Pool> {
self.slot_calculator = Some(SlotCalculator::pecorino_host());
self
}
}

impl<Pool: TransactionPool> BlockExtractorBuilder<Pool> {
/// Build the [`BlockExtractor`] with the provided parameters.
pub fn build(self) -> Result<BlockExtractor<Pool>, BuilderError> {
let pool = self.pool.ok_or(BuilderError::MissingPool)?;

let explorer_url = self.explorer_url.ok_or(BuilderError::MissingExplorerUrl)?;

let cl_url = self.cl_url.map(parse_url).transpose()?;

let pylon_url = self.pylon_url.map(parse_url).transpose()?;

let client = self.client.ok_or(BuilderError::MissingClient)?;

let explorer =
foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone());

let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?;

Ok(BlockExtractor::new(pool, explorer, client, cl_url, pylon_url, slot_calculator))
}
}

fn parse_url(url: String) -> Result<Url, BuilderError> {
Url::parse(url.as_ref()).map_err(BuilderError::Url)
}
4 changes: 2 additions & 2 deletions crates/blobber/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ pub enum BlockExtractionError {

impl BlockExtractionError {
/// Returns true if the error is ignorable
pub fn is_ignorable(&self) -> bool {
pub const fn is_ignorable(&self) -> bool {
matches!(self, Self::Ignorable(_))
}

/// Returns true if the error is unrecoverable
pub fn is_unrecoverable(&self) -> bool {
pub const fn is_unrecoverable(&self) -> bool {
matches!(self, Self::Unrecoverable(_))
}

Expand Down
16 changes: 15 additions & 1 deletion crates/blobber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
//! Contains logic for extracting data from host chain blocks.
#![doc = include_str!("../README.md")]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
unreachable_pub,
clippy::missing_const_for_fn,
rustdoc::all
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod block_data;
pub use block_data::{Blobs, BlockExtractor};

mod builder;
pub use builder::BlockExtractorBuilder;

mod error;
pub use error::{BlockExtractionError, ExtractionResult};

Expand Down
4 changes: 2 additions & 2 deletions crates/blobber/src/shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ pub struct ExtractableChainShim<'a> {

impl<'a> ExtractableChainShim<'a> {
/// Create a new shim around the given Reth chain.
pub fn new(chain: &'a Chain) -> Self {
pub const fn new(chain: &'a Chain) -> Self {
Self { chain }
}

/// Get a reference to the underlying Reth chain.
pub fn chain(&self) -> &'a Chain {
pub const fn chain(&self) -> &'a Chain {
self.chain
}
}
Expand Down
26 changes: 17 additions & 9 deletions crates/db/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,22 +138,26 @@ where
}

/// Inserts the zenith block into the database, always modifying the following tables:
/// * [`JournalHashes`](crate::db::JournalHashes)
/// * [`JournalHashes`]
/// * [`CanonicalHeaders`](tables::CanonicalHeaders)
/// * [`Headers`](tables::Headers)
/// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties)
/// * [`HeaderNumbers`](tables::HeaderNumbers)
/// * [`BlockBodyIndices`](tables::BlockBodyIndices) (through [`RuWriter::append_signet_block_body`])
/// * [`BlockBodyIndices`](tables::BlockBodyIndices) (through
/// [`RuWriter::append_signet_block_body`])
///
/// If there are transactions in the block, the following tables will be modified:
/// * [`Transactions`](tables::Transactions) (through [`RuWriter::append_signet_block_body`])
/// * [`TransactionBlocks`](tables::TransactionBlocks) (through [`RuWriter::append_signet_block_body`])
/// If there are transactions in the block, the following tables will be
/// modified:
/// * [`Transactions`](tables::Transactions) (through
/// [`RuWriter::append_signet_block_body`])
/// * [`TransactionBlocks`](tables::TransactionBlocks) (through
/// [`RuWriter::append_signet_block_body`])
///
/// If the provider has __not__ configured full sender pruning, this will modify
/// [`TransactionSenders`](tables::TransactionSenders).
/// If the provider has __not__ configured full sender pruning, this will
/// modify [`TransactionSenders`](tables::TransactionSenders).
///
/// If the provider has __not__ configured full transaction lookup pruning, this will modify
/// [`TransactionHashNumbers`](tables::TransactionHashNumbers).
/// If the provider has __not__ configured full transaction lookup pruning,
/// this will modify [`TransactionHashNumbers`](tables::TransactionHashNumbers).
///
/// Ommers and withdrawals are not inserted, as Signet does not use them.
fn insert_signet_block(
Expand Down Expand Up @@ -371,6 +375,8 @@ where

/// Get [`Passage::EnterToken`], [`Passage::Enter`] and
/// [`Transactor::Transact`] events.
///
/// [`Transactor::Transact`]: signet_zenith::Transactor::Transact
fn get_signet_events(
&self,
range: RangeInclusive<BlockNumber>,
Expand All @@ -394,6 +400,8 @@ where

/// Remove [`Passage::EnterToken`], [`Passage::Enter`] and
/// [`Transactor::Transact`] events above the specified height from the DB.
///
/// [`Transactor::Transact`]: signet_zenith::Transactor::Transact
fn remove_signet_events_above(
&self,
target: BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ where
/// - underlying database error
/// - amount of matches exceeds configured limit
///
/// https://github.com/paradigmxyz/reth/blob/d01658e516abbf2a1a76855a26d7123286865f22/crates/rpc/rpc/src/eth/filter.rs#L506
// https://github.com/paradigmxyz/reth/blob/d01658e516abbf2a1a76855a26d7123286865f22/crates/rpc/rpc/src/eth/filter.rs#L506
async fn get_logs_in_block_range(
&self,
filter: &Filter,
Expand Down