Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions crates/cold-mdbx/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::{
};
use alloy::{consensus::transaction::Recovered, primitives::BlockNumber};
use signet_cold::{
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
ZenithHeaderSpecifier,
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use signet_hot::{
KeySer, MAX_KEY_SIZE, ValSer,
Expand Down Expand Up @@ -191,6 +191,7 @@ fn produce_log_stream_blocking(
/// This backend stores historical blockchain data in an MDBX database.
/// It implements the [`ColdStorage`] trait for use with the cold storage
/// task runner.
#[derive(Clone)]
pub struct MdbxColdBackend {
/// The MDBX environment.
env: DatabaseEnv,
Expand Down Expand Up @@ -646,7 +647,7 @@ impl MdbxColdBackend {
}
}

impl ColdStorage for MdbxColdBackend {
impl ColdStorageRead for MdbxColdBackend {
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
Ok(self.get_header_inner(spec)?)
}
Expand Down Expand Up @@ -757,20 +758,24 @@ impl ColdStorage for MdbxColdBackend {
.map_err(MdbxColdError::from)?;
Ok(latest)
}
}

async fn append_block(&self, data: BlockData) -> ColdResult<()> {
impl ColdStorageWrite for MdbxColdBackend {
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
Ok(self.append_block_inner(data)?)
}

async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
Ok(self.append_blocks_inner(data)?)
}

async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
Ok(self.truncate_above_inner(block)?)
}
}

async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
impl ColdStorage for MdbxColdBackend {
async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
Ok(self.drain_above_inner(block)?)
}
}
Expand Down
18 changes: 11 additions & 7 deletions crates/cold-sql/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use alloy::{
},
};
use signet_cold::{
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
ZenithHeaderSpecifier,
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use signet_storage_types::{
ConfirmationMeta, DbSignetEvent, DbZenithHeader, IndexedReceipt, RecoveredTx, SealedHeader,
Expand Down Expand Up @@ -971,7 +971,7 @@ fn build_log_filter_clause(filter: &Filter, start_idx: u32) -> (String, Vec<&[u8
// ColdStorage implementation
// ============================================================================

impl ColdStorage for SqlColdBackend {
impl ColdStorageRead for SqlColdBackend {
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
let Some(block_num) = self.resolve_header_spec(spec).await? else {
return Ok(None);
Expand Down Expand Up @@ -1364,12 +1364,14 @@ impl ColdStorage for SqlColdBackend {
.map_err(SqlColdError::from)?;
Ok(row.get::<Option<i64>, _>(COL_MAX_BN).map(from_i64))
}
}

async fn append_block(&self, data: BlockData) -> ColdResult<()> {
impl ColdStorageWrite for SqlColdBackend {
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
self.insert_block(data).await.map_err(ColdStorageError::from)
}

async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;
for block_data in data {
write_block_to_tx(&mut tx, block_data).await.map_err(ColdStorageError::from)?;
Expand All @@ -1378,7 +1380,7 @@ impl ColdStorage for SqlColdBackend {
Ok(())
}

async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
let bn = to_i64(block);
let mut tx = self.pool.begin().await.map_err(SqlColdError::from)?;

Expand All @@ -1398,6 +1400,8 @@ impl ColdStorage for SqlColdBackend {
}
}

impl ColdStorage for SqlColdBackend {}

#[cfg(all(test, feature = "test-utils"))]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/cold/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ pub use cold_receipt::ColdReceipt;
mod stream;
pub use stream::{StreamParams, produce_log_stream_default};
mod traits;
pub use traits::{BlockData, ColdStorage, LogStream};
pub use traits::{BlockData, ColdStorage, ColdStorageRead, ColdStorageWrite, LogStream};

pub mod connect;
pub use connect::ColdConnect;
Expand Down
21 changes: 13 additions & 8 deletions crates/cold/src/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
//! It is primarily intended for testing and development.

use crate::{
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, Confirmed, Filter,
HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, TransactionSpecifier,
ZenithHeaderSpecifier,
BlockData, ColdReceipt, ColdResult, ColdStorage, ColdStorageError, ColdStorageRead,
ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog,
SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier,
};
use alloy::primitives::{B256, BlockNumber};
use signet_storage_types::{
Expand Down Expand Up @@ -47,6 +47,7 @@ struct MemColdBackendInner {
///
/// This backend is thread-safe and suitable for concurrent access.
/// All operations are protected by an async read-write lock.
#[derive(Clone)]
pub struct MemColdBackend {
inner: Arc<RwLock<MemColdBackendInner>>,
}
Expand Down Expand Up @@ -100,7 +101,7 @@ impl MemColdBackendInner {
}
}

impl ColdStorage for MemColdBackend {
impl ColdStorageRead for MemColdBackend {
async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
let inner = self.inner.read().await;
match spec {
Expand Down Expand Up @@ -274,8 +275,10 @@ impl ColdStorage for MemColdBackend {
let inner = self.inner.read().await;
Ok(inner.headers.last_key_value().map(|(k, _)| *k))
}
}

async fn append_block(&self, data: BlockData) -> ColdResult<()> {
impl ColdStorageWrite for MemColdBackend {
async fn append_block(&mut self, data: BlockData) -> ColdResult<()> {
let mut inner = self.inner.write().await;

let block = data.block_number();
Expand Down Expand Up @@ -323,20 +326,22 @@ impl ColdStorage for MemColdBackend {
Ok(())
}

async fn append_blocks(&self, data: Vec<BlockData>) -> ColdResult<()> {
async fn append_blocks(&mut self, data: Vec<BlockData>) -> ColdResult<()> {
for block_data in data {
self.append_block(block_data).await?;
}
Ok(())
}

async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
async fn truncate_above(&mut self, block: BlockNumber) -> ColdResult<()> {
let mut inner = self.inner.write().await;
inner.truncate_above(block);
Ok(())
}
}

async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
impl ColdStorage for MemColdBackend {
async fn drain_above(&mut self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
let mut inner = self.inner.write().await;

// Collect receipts for blocks above `block` in ascending order
Expand Down
12 changes: 6 additions & 6 deletions crates/cold/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Log-streaming helper for backends without snapshot semantics.

use crate::{ColdResult, ColdStorage, ColdStorageError, Filter, HeaderSpecifier, RpcLog};
use crate::{ColdResult, ColdStorageError, ColdStorageRead, Filter, HeaderSpecifier, RpcLog};
use alloy::{primitives::BlockNumber, rpc::types::FilterBlockOption};
use tokio::sync::mpsc;

/// Parameters for a log-streaming request.
///
/// Bundles the block range, limits, channel, and deadline that every
/// [`ColdStorage::produce_log_stream`] implementation needs.
/// [`ColdStorageRead::produce_log_stream`] implementation needs.
#[derive(Debug)]
pub struct StreamParams {
/// First block in range (inclusive).
Expand All @@ -28,13 +28,13 @@ pub struct StreamParams {
///
/// Captures an anchor hash from the `to` block at the start and
/// re-checks it before each block to detect reorgs. Uses
/// [`ColdStorage::get_header`] for anchor checks and
/// [`ColdStorage::get_logs`] with single-block filters per block.
/// [`ColdStorageRead::get_header`] for anchor checks and
/// [`ColdStorageRead::get_logs`] with single-block filters per block.
///
/// Backends that hold a consistent read snapshot (MDBX, PostgreSQL
/// with REPEATABLE READ) should provide their own
/// [`ColdStorage::produce_log_stream`] implementation instead.
pub async fn produce_log_stream_default<B: ColdStorage + ?Sized>(
/// [`ColdStorageRead::produce_log_stream`] implementation instead.
pub async fn produce_log_stream_default<B: ColdStorageRead>(
backend: &B,
filter: &Filter,
params: StreamParams,
Expand Down
Loading