Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion crates/cold/tests/common/gated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Use for tests that need to saturate the read pool deterministically.

use alloy::primitives::BlockNumber;
use parking_lot::Mutex;
use signet_cold::{
BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageRead, ColdStorageWrite,
Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier,
Expand All @@ -12,16 +13,40 @@ use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHea
use std::{sync::Arc, time::Duration};
use tokio::sync::Semaphore;

/// Identifies a backend method that has been observed via
/// [`GatedBackend::events`].
///
/// Each variant corresponds to one method on the underlying backend
/// traits. New variants can be added freely as more tests start
/// inspecting the event log; existing tests pattern-match on the
/// variants they care about so unrelated additions stay non-breaking.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[allow(dead_code)]
pub enum BackendOp {
GetLatestBlock,
TruncateAbove,
}

/// Backend that parks all reads on a semaphore gate.
///
/// Writes and stream production are ungated so tests can distinguish
/// read-pool saturation from write/drain blocking.
///
/// Records each instrumented method into an internal collection.
/// Reads are logged after passing the read gate (before the inner
/// call); writes are logged after the inner call returns. Tests use
/// this log to verify ordering guarantees that cannot be observed
/// reliably from caller-side wrappers - in particular, the
/// drain-barrier fairness test in `concurrency.rs`. Only methods that
/// a current test relies on are instrumented; extend [`BackendOp`] and
/// the relevant impl when adding tests that need more.
#[derive(Clone)]
pub struct GatedBackend {
inner: MemColdBackend,
gate: Arc<Semaphore>,
stream_gate: Arc<Semaphore>,
read_timeout: Option<Duration>,
events: Arc<Mutex<Vec<BackendOp>>>,
}

impl GatedBackend {
Expand All @@ -33,6 +58,7 @@ impl GatedBackend {
// Streams are ungated by default: effectively unbounded permits.
stream_gate: Arc::new(Semaphore::new(usize::MAX >> 4)),
read_timeout: None,
events: Arc::new(Mutex::new(Vec::new())),
}
}

Expand Down Expand Up @@ -72,6 +98,13 @@ impl GatedBackend {
pub fn release_streams(&self, n: usize) {
self.stream_gate.add_permits(n);
}

/// Snapshot of operations recorded against the inner backend, in
/// the order they entered the backend.
#[allow(dead_code)]
pub fn events(&self) -> Vec<BackendOp> {
self.events.lock().clone()
}
}

impl std::fmt::Debug for GatedBackend {
Expand Down Expand Up @@ -148,6 +181,7 @@ impl ColdStorageRead for GatedBackend {

async fn get_latest_block(&self) -> ColdResult<Option<BlockNumber>> {
let _p = self.gate.clone().acquire_owned().await.ok();
self.events.lock().push(BackendOp::GetLatestBlock);
self.inner.get_latest_block().await
}

Expand All @@ -174,7 +208,9 @@ impl ColdStorageWrite for GatedBackend {
}

async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
self.inner.truncate_above(block).await
let result = self.inner.truncate_above(block).await;
self.events.lock().push(BackendOp::TruncateAbove);
result
}
}

Expand Down
85 changes: 55 additions & 30 deletions crates/cold/tests/concurrency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
mod common;

use alloy::rpc::types::Filter;
use common::gated::GatedBackend;
use common::gated::{BackendOp, GatedBackend};
use signet_cold::{
ColdStorage, ColdStorageError, HeaderSpecifier, conformance::make_test_block,
mem::MemColdBackend,
};
use std::{sync::Arc, time::Duration};
use tokio::sync::Notify;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

/// 1. 256 concurrent reads against an ungated backend must all complete
Expand Down Expand Up @@ -74,62 +73,88 @@ async fn write_after_saturating_reads_makes_progress() {

/// 3. Fairness: a writer acquired after saturating readers must complete
/// before readers queued *after* the writer.
///
/// The invariant is observed inside the backend, not via caller-side
/// completion signals. `GetLatestBlock` is recorded after passing the
/// read gate; `TruncateAbove` is recorded after the inner write returns,
/// while the writer still holds the drain barrier. So when `TruncateAbove`
/// appears in the log, the write has completed and no later reader can
/// yet be running. The recording is therefore strictly ordered by the
/// semaphore, without racing the post-drain wake-up window where reader
/// and writer wrappers would otherwise compete to signal downstream.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to me

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn fairness_write_serves_before_later_readers() {
let backend = GatedBackend::closed();
let cs = ColdStorage::new(backend.clone(), CancellationToken::new());

// Saturate all 64 read permits behind the backend gate.
let mut saturating = Vec::with_capacity(64);
for _ in 0..64 {
let cs2 = cs.clone();
tokio::spawn(async move {
let _ = cs2.get_latest_block().await;
});
saturating.push(tokio::spawn(async move { cs2.get_latest_block().await }));
}
tokio::time::sleep(Duration::from_millis(100)).await;

// Queue a writer. It holds `write_sem`, then blocks on the drain
// barrier waiting for the 64 in-flight readers.
let writer_done = Arc::new(Notify::new());
let wd = writer_done.clone();
let cs_w = cs.clone();
tokio::spawn(async move {
let _ = cs_w.truncate_above(0).await;
wd.notify_one();
});
let writer = tokio::spawn(async move { cs_w.truncate_above(0).await });
tokio::time::sleep(Duration::from_millis(100)).await;

// Queue 64 "later" readers. They park on `read_sem::acquire_owned`
// because the writer's drain has claimed every permit.
let later_done = Arc::new(Notify::new());
let mut later_count = 0usize;
let mut later = Vec::with_capacity(64);
for _ in 0..64 {
let cs2 = cs.clone();
let ld = later_done.clone();
tokio::spawn(async move {
let _ = cs2.get_latest_block().await;
ld.notify_one();
});
later_count += 1;
later.push(tokio::spawn(async move { cs2.get_latest_block().await }));
}
assert_eq!(later_count, 64);
tokio::time::sleep(Duration::from_millis(100)).await;

// Release the backend gate so the 64 saturating readers complete,
// which lets the drain barrier acquire and the writer run.
backend.release(usize::MAX >> 4);

// The writer MUST resolve before any later reader.
tokio::select! {
biased;
() = writer_done.notified() => {}
() = later_done.notified() => panic!("later reader resolved before writer"),
// Drive everything to completion before inspecting the log.
for h in saturating {
tokio::time::timeout(Duration::from_secs(5), h)
.await
.expect("saturating reader hung")
.expect("saturating reader panicked")
.expect("saturating reader failed");
}

// And the later readers still complete shortly after.
tokio::time::timeout(Duration::from_secs(5), later_done.notified())
tokio::time::timeout(Duration::from_secs(5), writer)
.await
.expect("later readers should complete after writer");
.expect("writer hung")
.expect("writer panicked")
.expect("writer failed");
for h in later {
tokio::time::timeout(Duration::from_secs(5), h)
.await
.expect("later reader hung")
.expect("later reader panicked")
.expect("later reader failed");
}

// Expect 64 saturating reads, then the write, then 64 later reads.
let events = backend.events();
assert_eq!(
events.len(),
129,
"expected 64 saturating reads + 1 write + 64 later reads, got {events:?}",
);
assert!(
events[..64].iter().all(|op| *op == BackendOp::GetLatestBlock),
"first 64 events must be saturating reads: {events:?}",
);
assert_eq!(
events[64],
BackendOp::TruncateAbove,
"write must follow the saturating reads: {events:?}",
);
assert!(
events[65..].iter().all(|op| *op == BackendOp::GetLatestBlock),
"last 64 events must be later reads: {events:?}",
);
}

/// 4. Cancel during reader backpressure: queued acquisitions fail fast.
Expand Down