From 31a1aee25c0644ea9dc5f3418a68a6655ed362fe Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Mon, 25 May 2026 11:22:55 +0100 Subject: [PATCH] fix spurious test failure --- crates/cold/tests/common/gated.rs | 38 +++++++++++++- crates/cold/tests/concurrency.rs | 85 ++++++++++++++++++++----------- 2 files changed, 92 insertions(+), 31 deletions(-) diff --git a/crates/cold/tests/common/gated.rs b/crates/cold/tests/common/gated.rs index 9ee957e..7ce4ad3 100644 --- a/crates/cold/tests/common/gated.rs +++ b/crates/cold/tests/common/gated.rs @@ -3,6 +3,7 @@ //! Use for tests that need to saturate the read pool deterministically. use alloy::primitives::BlockNumber; +use parking_lot::Mutex; use signet_cold::{ BlockData, ColdReceipt, ColdResult, ColdStorageBackend, ColdStorageRead, ColdStorageWrite, Confirmed, Filter, HeaderSpecifier, ReceiptSpecifier, RpcLog, SignetEventsSpecifier, @@ -12,16 +13,40 @@ use signet_storage_types::{DbSignetEvent, DbZenithHeader, RecoveredTx, SealedHea use std::{sync::Arc, time::Duration}; use tokio::sync::Semaphore; +/// Identifies a backend method that has been observed via +/// [`GatedBackend::events`]. +/// +/// Each variant corresponds to one method on the underlying backend +/// traits. New variants can be added freely as more tests start +/// inspecting the event log; existing tests pattern-match on the +/// variants they care about so unrelated additions stay non-breaking. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[allow(dead_code)] +pub enum BackendOp { + GetLatestBlock, + TruncateAbove, +} + /// Backend that parks all reads on a semaphore gate. /// /// Writes and stream production are ungated so tests can distinguish /// read-pool saturation from write/drain blocking. +/// +/// Records each instrumented method into an internal collection. +/// Reads are logged after passing the read gate (before the inner +/// call); writes are logged after the inner call returns. Tests use +/// this log to verify ordering guarantees that cannot be observed +/// reliably from caller-side wrappers - in particular, the +/// drain-barrier fairness test in `concurrency.rs`. Only methods that +/// a current test relies on are instrumented; extend [`BackendOp`] and +/// the relevant impl when adding tests that need more. #[derive(Clone)] pub struct GatedBackend { inner: MemColdBackend, gate: Arc, stream_gate: Arc, read_timeout: Option, + events: Arc>>, } impl GatedBackend { @@ -33,6 +58,7 @@ impl GatedBackend { // Streams are ungated by default: effectively unbounded permits. stream_gate: Arc::new(Semaphore::new(usize::MAX >> 4)), read_timeout: None, + events: Arc::new(Mutex::new(Vec::new())), } } @@ -72,6 +98,13 @@ impl GatedBackend { pub fn release_streams(&self, n: usize) { self.stream_gate.add_permits(n); } + + /// Snapshot of operations recorded against the inner backend, in + /// the order they entered the backend. + #[allow(dead_code)] + pub fn events(&self) -> Vec { + self.events.lock().clone() + } } impl std::fmt::Debug for GatedBackend { @@ -148,6 +181,7 @@ impl ColdStorageRead for GatedBackend { async fn get_latest_block(&self) -> ColdResult> { let _p = self.gate.clone().acquire_owned().await.ok(); + self.events.lock().push(BackendOp::GetLatestBlock); self.inner.get_latest_block().await } @@ -174,7 +208,9 @@ impl ColdStorageWrite for GatedBackend { } async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { - self.inner.truncate_above(block).await + let result = self.inner.truncate_above(block).await; + self.events.lock().push(BackendOp::TruncateAbove); + result } } diff --git a/crates/cold/tests/concurrency.rs b/crates/cold/tests/concurrency.rs index e0e8912..73ad66b 100644 --- a/crates/cold/tests/concurrency.rs +++ b/crates/cold/tests/concurrency.rs @@ -7,13 +7,12 @@ mod common; use alloy::rpc::types::Filter; -use common::gated::GatedBackend; +use common::gated::{BackendOp, GatedBackend}; use signet_cold::{ ColdStorage, ColdStorageError, HeaderSpecifier, conformance::make_test_block, mem::MemColdBackend, }; -use std::{sync::Arc, time::Duration}; -use tokio::sync::Notify; +use std::time::Duration; use tokio_util::sync::CancellationToken; /// 1. 256 concurrent reads against an ungated backend must all complete @@ -74,62 +73,88 @@ async fn write_after_saturating_reads_makes_progress() { /// 3. Fairness: a writer acquired after saturating readers must complete /// before readers queued *after* the writer. +/// +/// The invariant is observed inside the backend, not via caller-side +/// completion signals. `GetLatestBlock` is recorded after passing the +/// read gate; `TruncateAbove` is recorded after the inner write returns, +/// while the writer still holds the drain barrier. So when `TruncateAbove` +/// appears in the log, the write has completed and no later reader can +/// yet be running. The recording is therefore strictly ordered by the +/// semaphore, without racing the post-drain wake-up window where reader +/// and writer wrappers would otherwise compete to signal downstream. #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn fairness_write_serves_before_later_readers() { let backend = GatedBackend::closed(); let cs = ColdStorage::new(backend.clone(), CancellationToken::new()); // Saturate all 64 read permits behind the backend gate. + let mut saturating = Vec::with_capacity(64); for _ in 0..64 { let cs2 = cs.clone(); - tokio::spawn(async move { - let _ = cs2.get_latest_block().await; - }); + saturating.push(tokio::spawn(async move { cs2.get_latest_block().await })); } tokio::time::sleep(Duration::from_millis(100)).await; // Queue a writer. It holds `write_sem`, then blocks on the drain // barrier waiting for the 64 in-flight readers. - let writer_done = Arc::new(Notify::new()); - let wd = writer_done.clone(); let cs_w = cs.clone(); - tokio::spawn(async move { - let _ = cs_w.truncate_above(0).await; - wd.notify_one(); - }); + let writer = tokio::spawn(async move { cs_w.truncate_above(0).await }); tokio::time::sleep(Duration::from_millis(100)).await; // Queue 64 "later" readers. They park on `read_sem::acquire_owned` // because the writer's drain has claimed every permit. - let later_done = Arc::new(Notify::new()); - let mut later_count = 0usize; + let mut later = Vec::with_capacity(64); for _ in 0..64 { let cs2 = cs.clone(); - let ld = later_done.clone(); - tokio::spawn(async move { - let _ = cs2.get_latest_block().await; - ld.notify_one(); - }); - later_count += 1; + later.push(tokio::spawn(async move { cs2.get_latest_block().await })); } - assert_eq!(later_count, 64); tokio::time::sleep(Duration::from_millis(100)).await; // Release the backend gate so the 64 saturating readers complete, // which lets the drain barrier acquire and the writer run. backend.release(usize::MAX >> 4); - // The writer MUST resolve before any later reader. - tokio::select! { - biased; - () = writer_done.notified() => {} - () = later_done.notified() => panic!("later reader resolved before writer"), + // Drive everything to completion before inspecting the log. + for h in saturating { + tokio::time::timeout(Duration::from_secs(5), h) + .await + .expect("saturating reader hung") + .expect("saturating reader panicked") + .expect("saturating reader failed"); } - - // And the later readers still complete shortly after. - tokio::time::timeout(Duration::from_secs(5), later_done.notified()) + tokio::time::timeout(Duration::from_secs(5), writer) .await - .expect("later readers should complete after writer"); + .expect("writer hung") + .expect("writer panicked") + .expect("writer failed"); + for h in later { + tokio::time::timeout(Duration::from_secs(5), h) + .await + .expect("later reader hung") + .expect("later reader panicked") + .expect("later reader failed"); + } + + // Expect 64 saturating reads, then the write, then 64 later reads. + let events = backend.events(); + assert_eq!( + events.len(), + 129, + "expected 64 saturating reads + 1 write + 64 later reads, got {events:?}", + ); + assert!( + events[..64].iter().all(|op| *op == BackendOp::GetLatestBlock), + "first 64 events must be saturating reads: {events:?}", + ); + assert_eq!( + events[64], + BackendOp::TruncateAbove, + "write must follow the saturating reads: {events:?}", + ); + assert!( + events[65..].iter().all(|op| *op == BackendOp::GetLatestBlock), + "last 64 events must be later reads: {events:?}", + ); } /// 4. Cancel during reader backpressure: queued acquisitions fail fast.