From a1545e5d0db9b279bce98b0a03541cf175d90dec Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Fri, 29 May 2026 15:49:12 +0100 Subject: [PATCH] complete journal integration into node --- Cargo.toml | 2 + crates/host-reth/src/notifier.rs | 12 +- crates/host-rpc/src/notifier.rs | 4 + crates/node-config/Cargo.toml | 1 + crates/node-config/src/journal.rs | 229 ++++++++- crates/node-config/src/lib.rs | 2 +- crates/node-config/src/test_utils.rs | 8 +- crates/node-tests/Cargo.toml | 4 + crates/node-tests/src/context.rs | 60 ++- crates/node-tests/tests/db.rs | 5 +- crates/node-tests/tests/journal_sync.rs | 564 +++++++++++++++++++++++ crates/node-types/src/notifier.rs | 16 + crates/node/Cargo.toml | 6 + crates/node/src/builder.rs | 14 + crates/node/src/journal_sync/ingestor.rs | 199 ++++++++ crates/node/src/journal_sync/mod.rs | 11 + crates/node/src/journal_sync/runtime.rs | 500 ++++++++++++++++++++ crates/node/src/lib.rs | 2 + crates/node/src/node.rs | 212 +++++++-- 19 files changed, 1787 insertions(+), 64 deletions(-) create mode 100644 crates/node-tests/tests/journal_sync.rs create mode 100644 crates/node/src/journal_sync/ingestor.rs create mode 100644 crates/node/src/journal_sync/mod.rs create mode 100644 crates/node/src/journal_sync/runtime.rs diff --git a/Cargo.toml b/Cargo.toml index 218d0757..5597bbc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ signet-types = "0.19" signet-zenith = "0.19" signet-journal = "0.19" signet-journal-chain = "0.1" +signet-journal-client = "0.1" signet-storage = "0.10" signet-cold = "0.10" signet-hot = "0.10" @@ -129,6 +130,7 @@ tempfile = "3.17.0" [patch.crates-io] signet-journal-chain = { git = "https://github.com/init4tech/journal-service", branch = "main" } +signet-journal-client = { git = "https://github.com/init4tech/journal-service", branch = "main" } signet-storage = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" } signet-cold = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" } signet-cold-sql = { git = "https://github.com/init4tech/storage.git", branch = "fraser/eng-2017/journal-hashes" } diff --git a/crates/host-reth/src/notifier.rs b/crates/host-reth/src/notifier.rs index c8874379..24c821c2 100644 --- a/crates/host-reth/src/notifier.rs +++ b/crates/host-reth/src/notifier.rs @@ -9,7 +9,7 @@ use futures_util::StreamExt; use reth::{ chainspec::EthChainSpec, primitives::{EthPrimitives, Receipt}, - providers::{BlockIdReader, BlockReader, HeaderProvider, ReceiptProvider}, + providers::{BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ReceiptProvider}, }; use reth_exex::{ExExContext, ExExEvent, ExExNotifications, ExExNotificationsStream}; use reth_node_api::{FullNodeComponents, NodeTypes}; @@ -261,4 +261,14 @@ where self.events.send(ExExEvent::FinishedHeight(BlockNumHash { number: block_number, hash }))?; Ok(()) } + + async fn host_tip(&self) -> Result { + Ok(self.provider.best_block_number()?) + } + + // A reth ExEx shares the host's notification pipeline; unconsumed notifications fill reth's + // buffer and stall its pipeline, so a journal-syncing node must drain them. + fn backpressures_host(&self) -> bool { + true + } } diff --git a/crates/host-rpc/src/notifier.rs b/crates/host-rpc/src/notifier.rs index a439ba4d..4ba3edb5 100644 --- a/crates/host-rpc/src/notifier.rs +++ b/crates/host-rpc/src/notifier.rs @@ -711,4 +711,8 @@ where // No-op: no ExEx to notify for an RPC follower. Ok(()) } + + async fn host_tip(&self) -> Result { + Ok(self.provider.get_block_number().await?) + } } diff --git a/crates/node-config/Cargo.toml b/crates/node-config/Cargo.toml index b6a3e336..720609fb 100644 --- a/crates/node-config/Cargo.toml +++ b/crates/node-config/Cargo.toml @@ -25,6 +25,7 @@ alloy.workspace = true eyre.workspace = true reqwest.workspace = true serde.workspace = true +thiserror.workspace = true tokio-util.workspace = true tracing.workspace = true signet-genesis.workspace = true diff --git a/crates/node-config/src/journal.rs b/crates/node-config/src/journal.rs index 3d2aad7e..39f70918 100644 --- a/crates/node-config/src/journal.rs +++ b/crates/node-config/src/journal.rs @@ -1,8 +1,54 @@ -use core::num::NonZeroU64; -use init4_bin_base::utils::from_env::FromEnv; +use core::{num::NonZeroU64, str::FromStr, time::Duration}; +use init4_bin_base::utils::from_env::{FromEnv, FromEnvErr, FromEnvVar}; use signet_journal_chain::SAFETY_MARGIN; use tracing::warn; +/// How a node sources rollup state. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SyncStrategy { + /// Execute host blocks to derive state (the current, default behaviour). + #[default] + Blocks, + /// Apply pre-computed journals from upstream sources without executing blocks. + Journals, +} + +impl FromStr for SyncStrategy { + type Err = ParseSyncStrategyError; + + fn from_str(input: &str) -> Result { + match input.trim().to_ascii_lowercase().as_str() { + "blocks" => Ok(Self::Blocks), + "journals" => Ok(Self::Journals), + other => Err(ParseSyncStrategyError(other.to_owned())), + } + } +} + +impl FromEnvVar for SyncStrategy { + fn from_env_var(env_var: &str) -> Result { + let raw = String::from_env_var(env_var)?; + raw.parse().map_err(|error| FromEnvErr::parse_error(env_var, error)) + } +} + +/// Error parsing a [`SyncStrategy`] from a string. +#[derive(Debug, Clone, thiserror::Error)] +#[error("invalid journal sync strategy '{0}', expected 'blocks' or 'journals'")] +pub struct ParseSyncStrategyError(String); + +/// Error returned by [`JournalConfig::validate`]. +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub enum JournalConfigError { + /// `sync_strategy` is [`SyncStrategy::Journals`] but no upstream sources were configured. + #[error( + "journal sync strategy is 'journals' but no upstream sources were configured \ + (set SIGNET_JOURNAL_SOURCES)" + )] + MissingSources, +} + /// Default maximum total byte size of the journal ring buffer (64 MiB). pub const DEFAULT_RING_BUFFER_MAX_BYTES: u64 = 64 * 1024 * 1024; @@ -24,9 +70,45 @@ pub const DEFAULT_MAX_SUBSCRIBER_LAG: u64 = 100; /// All fields are optional. When unset, [`JournalConfig`] returns the /// constants above via its accessors. Configurable via environment variables /// (`SIGNET_JOURNAL_*`) or via serde for file-based config. -#[derive(Debug, Clone, Copy, Default, serde::Deserialize, FromEnv)] +#[derive(Debug, Clone, Default, serde::Deserialize, FromEnv)] #[serde(rename_all = "camelCase", default)] pub struct JournalConfig { + /// Sync strategy: execute host blocks (`blocks`, default) or apply journals + /// from upstream sources (`journals`). + #[from_env( + var = "SIGNET_JOURNAL_SYNC_STRATEGY", + desc = "Journal sync strategy: 'blocks' or 'journals' [default: blocks]", + optional + )] + sync_strategy: Option, + + /// Prioritised upstream journal WebSocket source URLs (comma-separated). + /// Required when `sync_strategy` is `journals`. + #[from_env( + var = "SIGNET_JOURNAL_SOURCES", + desc = "Comma-separated upstream journal WebSocket URLs (required for journals strategy)", + optional + )] + sources: Option>, + + /// Per-source stall timeout in milliseconds for the journal client. Falls + /// back to the client default (60s) when unset. + #[from_env( + var = "SIGNET_JOURNAL_CLIENT_SOURCE_STALL_TIMEOUT_MS", + desc = "Journal client per-source stall timeout in ms [default: 60000]", + optional + )] + client_source_stall_timeout_ms: Option, + + /// Faulty-source backoff in milliseconds for the journal client. Falls back + /// to the client default (30s) when unset. + #[from_env( + var = "SIGNET_JOURNAL_CLIENT_SOURCE_BACKOFF_MS", + desc = "Journal client faulty-source backoff in ms [default: 30000]", + optional + )] + client_source_backoff_ms: Option, + /// Maximum total byte size of the journal ring buffer. #[from_env( var = "SIGNET_JOURNAL_RING_BUFFER_MAX_BYTES", @@ -86,11 +168,37 @@ impl JournalConfig { NonZeroU64::new(value).expect("DEFAULT_MAX_SUBSCRIBER_LAG is non-zero") } + /// The configured sync strategy, defaulting to [`SyncStrategy::Blocks`]. + pub fn sync_strategy(&self) -> SyncStrategy { + self.sync_strategy.unwrap_or_default() + } + + /// Upstream journal WebSocket source URLs (as raw strings). Empty when none + /// are configured. Required when [`Self::sync_strategy`] is + /// [`SyncStrategy::Journals`]. + pub fn sources(&self) -> &[String] { + self.sources.as_deref().unwrap_or(&[]) + } + + /// Per-source stall timeout for the journal client, when overridden. `None` + /// lets the client's own default (60s) stand. + pub fn client_source_stall_timeout(&self) -> Option { + self.client_source_stall_timeout_ms.map(Duration::from_millis) + } + + /// Faulty-source backoff for the journal client, when overridden. `None` + /// lets the client's own default (30s) stand. + pub fn client_source_backoff(&self) -> Option { + self.client_source_backoff_ms.map(Duration::from_millis) + } + /// Emit a warning for any field that is explicitly set to a value the /// journal chain will silently normalize. Covers a zero /// `max_subscriber_lag` (which the chain rejects, so the default is /// substituted) and a `ring_buffer_max_count` below [`SAFETY_MARGIN`] - /// (which the chain clamps up). Intended to be called once at startup. + /// (which the chain clamps up). Also warns when journal-client-only or + /// `journals`-strategy-only options are set but the strategy will ignore + /// them. Intended to be called once at startup. pub fn warn_on_misconfiguration(&self) { if self.max_subscriber_lag == Some(0) { warn!( @@ -109,5 +217,118 @@ impl JournalConfig { margin and will be clamped up" ); } + // The journal-sync inputs (sources and client tuning knobs) are only consulted under + // the `journals` strategy. If they are set while the node will execute blocks, they are + // dead config - surface that rather than silently ignoring them. + if self.sync_strategy() != SyncStrategy::Journals { + if !self.sources().is_empty() { + warn!( + "SIGNET_JOURNAL_SOURCES is set but the sync strategy is not 'journals'; \ + the configured sources will be ignored" + ); + } + if self.client_source_stall_timeout_ms.is_some() + || self.client_source_backoff_ms.is_some() + { + warn!( + "journal client tuning knobs are set but the sync strategy is not \ + 'journals'; they will be ignored" + ); + } + } + } + + /// Validate cross-field invariants. Intended to be called once at startup, + /// after [`Self::warn_on_misconfiguration`]. + /// + /// # Errors + /// + /// Returns [`JournalConfigError::MissingSources`] when the strategy is + /// [`SyncStrategy::Journals`] but no upstream sources are configured. + pub fn validate(&self) -> Result<(), JournalConfigError> { + if self.sync_strategy() == SyncStrategy::Journals && self.sources().is_empty() { + return Err(JournalConfigError::MissingSources); + } + Ok(()) + } + + /// Construct a journal-sync configuration ([`SyncStrategy::Journals`]) pointing at the given + /// upstream sources. The client stall timeout is deliberately generous so a working but + /// idle source (e.g. a test that has served all its journals and is waiting to be torn + /// down) is never mistaken for a dead one and exhausted mid-test. All other fields take + /// their defaults. Use [`Self::journal_sync_for_test_fail_fast`] to exercise exhaustion. + #[cfg(any(test, feature = "test_utils"))] + pub fn journal_sync_for_test(sources: Vec) -> Self { + Self { + sync_strategy: Some(SyncStrategy::Journals), + sources: Some(sources), + client_source_stall_timeout_ms: Some(30_000), + client_source_backoff_ms: Some(100), + ..Default::default() + } + } + + /// Like [`Self::journal_sync_for_test`] but with short client timeouts so a node pointed at + /// dead sources exhausts them quickly. Only for tests that assert on source exhaustion; + /// other tests should use [`Self::journal_sync_for_test`] to avoid a spurious exhaustion + /// racing test teardown. + #[cfg(any(test, feature = "test_utils"))] + pub fn journal_sync_for_test_fail_fast(sources: Vec) -> Self { + Self { + sync_strategy: Some(SyncStrategy::Journals), + sources: Some(sources), + client_source_stall_timeout_ms: Some(200), + client_source_backoff_ms: Some(50), + ..Default::default() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_strategy_parses_case_insensitively() { + assert_eq!("blocks".parse::().unwrap(), SyncStrategy::Blocks); + assert_eq!("Journals".parse::().unwrap(), SyncStrategy::Journals); + assert_eq!(" JOURNALS ".parse::().unwrap(), SyncStrategy::Journals); + "neither".parse::().unwrap_err(); + } + + #[test] + fn default_strategy_is_blocks() { + assert_eq!(JournalConfig::default().sync_strategy(), SyncStrategy::Blocks); + } + + #[test] + fn validate_requires_sources_for_journals() { + let config = + JournalConfig { sync_strategy: Some(SyncStrategy::Journals), ..Default::default() }; + config.validate().unwrap_err(); + + let config = JournalConfig { + sync_strategy: Some(SyncStrategy::Journals), + sources: Some(vec!["ws://host:9545".to_owned()]), + ..Default::default() + }; + config.validate().unwrap(); + } + + #[test] + fn validate_allows_blocks_without_sources() { + JournalConfig::default().validate().unwrap(); + } + + #[test] + fn client_timeouts_convert_from_millis() { + let config = JournalConfig { + client_source_stall_timeout_ms: Some(1500), + client_source_backoff_ms: Some(250), + ..Default::default() + }; + assert_eq!(config.client_source_stall_timeout(), Some(Duration::from_millis(1500))); + assert_eq!(config.client_source_backoff(), Some(Duration::from_millis(250))); + assert_eq!(JournalConfig::default().client_source_stall_timeout(), None); } } diff --git a/crates/node-config/src/lib.rs b/crates/node-config/src/lib.rs index 05117727..ae92eb33 100644 --- a/crates/node-config/src/lib.rs +++ b/crates/node-config/src/lib.rs @@ -28,7 +28,7 @@ pub use core::SignetNodeConfig; mod journal; pub use journal::{ DEFAULT_MAX_SUBSCRIBER_LAG, DEFAULT_RING_BUFFER_MAX_BYTES, DEFAULT_RING_BUFFER_MAX_COUNT, - JournalConfig, + JournalConfig, JournalConfigError, ParseSyncStrategyError, SyncStrategy, }; mod storage; diff --git a/crates/node-config/src/test_utils.rs b/crates/node-config/src/test_utils.rs index b727e5f7..e3fddbe5 100644 --- a/crates/node-config/src/test_utils.rs +++ b/crates/node-config/src/test_utils.rs @@ -7,11 +7,17 @@ use std::borrow::Cow; /// Make a test config. pub fn test_config() -> SignetNodeConfig { + test_config_with_journal(JournalConfig::default()) +} + +/// Make a test config with a caller-supplied [`JournalConfig`]. Used by journal-sync tests to +/// point the node at an upstream WebSocket source. +pub const fn test_config_with_journal(journal: JournalConfig) -> SignetNodeConfig { SignetNodeConfig::new( BlobFetcherConfig::new(Cow::Borrowed("")), StorageConfig::new(Cow::Borrowed("NOP"), Cow::Borrowed("NOP")), None, - JournalConfig::default(), + journal, GenesisSpec::Known(KnownChains::Test), SlotCalculator::new(0, 0, 12), ) diff --git a/crates/node-tests/Cargo.toml b/crates/node-tests/Cargo.toml index 3f65228c..955daea3 100644 --- a/crates/node-tests/Cargo.toml +++ b/crates/node-tests/Cargo.toml @@ -37,6 +37,10 @@ tracing.workspace = true tracing-subscriber.workspace = true [dev-dependencies] +bytes.workspace = true serde_json.workspace = true serial_test = "3.2.0" +signet-journal-chain = { workspace = true, features = ["test-utils", "signet-extract"] } signet-journal.workspace = true +tokio-util.workspace = true +trevm.workspace = true diff --git a/crates/node-tests/src/context.rs b/crates/node-tests/src/context.rs index 5505e63d..6875b256 100644 --- a/crates/node-tests/src/context.rs +++ b/crates/node-tests/src/context.rs @@ -45,12 +45,37 @@ use tracing::instrument; /// to the signet node's main loop. pub struct TestHostNotifier { receiver: mpsc::UnboundedReceiver>, + /// Shared host-tip counter. The harness updates it as it produces and + /// reverts blocks, so [`HostNotifier::host_tip`] reflects the test's view + /// of the host chain. Shared with [`SignetTestContext::height`]. + host_tip: Arc, + /// Emulates a backend that backpressures the host (a reth ExEx): when set, + /// [`HostNotifier::backpressures_host`] returns `true` so a journal-syncing + /// node drains notifications, and every drained notification is tallied into + /// [`Self::drained`]. Used by journal-sync drain tests. + backpressures_host: bool, + /// Count of notifications consumed by [`HostNotifier::next_notification`], + /// observed by drain tests. `None` outside those tests. + drained: Option>, } impl TestHostNotifier { - /// Create a new test notifier from a receiver. - pub const fn new(receiver: mpsc::UnboundedReceiver>) -> Self { - Self { receiver } + /// Create a new test notifier from a receiver and a shared host-tip counter. + pub const fn new( + receiver: mpsc::UnboundedReceiver>, + host_tip: Arc, + ) -> Self { + Self { receiver, host_tip, backpressures_host: false, drained: None } + } + + /// Emulate a host-backpressuring backend (a reth ExEx): report + /// [`HostNotifier::backpressures_host`] as `true` and tally each drained + /// notification into `drained`. Lets journal-sync tests exercise the + /// drain-and-discard path. + pub fn with_backpressure(mut self, drained: Arc) -> Self { + self.backpressures_host = true; + self.drained = Some(drained); + self } } @@ -67,7 +92,13 @@ impl HostNotifier for TestHostNotifier { async fn next_notification( &mut self, ) -> Option, Self::Error>> { - self.receiver.recv().await.map(Ok) + let notification = self.receiver.recv().await; + if notification.is_some() + && let Some(drained) = &self.drained + { + drained.fetch_add(1, Ordering::SeqCst); + } + notification.map(Ok) } fn set_head(&mut self, _block_number: u64) {} @@ -77,6 +108,14 @@ impl HostNotifier for TestHostNotifier { fn send_finished_height(&self, _block_number: u64) -> Result<(), Self::Error> { Ok(()) } + + async fn host_tip(&self) -> Result { + Ok(self.host_tip.load(Ordering::SeqCst)) + } + + fn backpressures_host(&self) -> bool { + self.backpressures_host + } } /// Signet Node test context @@ -111,8 +150,9 @@ pub struct SignetTestContext { /// The system constants for the Signet Node instance. pub constants: SignetSystemConstants, - /// The current host block height - pub height: AtomicU64, + /// The current host block height. Shared with the [`TestHostNotifier`] so + /// its reported host tip tracks the blocks the harness has produced. + pub height: Arc, /// The alias oracle used by the Signet Node instance. pub alias_oracle: Arc>>, @@ -194,9 +234,12 @@ impl SignetTestContext { let alias_oracle: Arc>> = Arc::new(Mutex::new(HashSet::default())); + // Shared host-tip counter, seeded at the host deploy height and shared with the notifier. + let height = Arc::new(AtomicU64::new(cfg.constants().unwrap().host_deploy_height())); + // Create the test host notifier channel let (sender, receiver) = mpsc::unbounded_channel(); - let notifier = TestHostNotifier { receiver }; + let notifier = TestHostNotifier::new(receiver, Arc::clone(&height)); // Build the blob cacher from the in-memory blob source let blob_cacher = signet_blobber::BlobFetcher::builder() @@ -222,6 +265,7 @@ impl SignetTestContext { .with_blob_cacher(blob_cacher) .with_serve_config(serve_config) .with_rpc_config(StorageRpcConfig::default()) + .with_cancellation_token(cancel_token.clone()) .build() .await .unwrap(); @@ -255,7 +299,7 @@ impl SignetTestContext { storage, alloy_provider, constants, - height: AtomicU64::new(cfg.constants().unwrap().host_deploy_height()), + height, alias_oracle, addresses, cancel_token, diff --git a/crates/node-tests/tests/db.rs b/crates/node-tests/tests/db.rs index 46b01159..a6930ddc 100644 --- a/crates/node-tests/tests/db.rs +++ b/crates/node-tests/tests/db.rs @@ -11,7 +11,7 @@ use signet_node_config::test_utils::test_config; use signet_node_tests::{HostBlockSpec, TestHostNotifier, run_test}; use signet_rpc::{ServeConfig, StorageRpcConfig}; use signet_storage::{CancellationToken, HistoryRead, HistoryWrite, HotKv, UnifiedStorage}; -use std::sync::Arc; +use std::sync::{Arc, atomic::AtomicU64}; use tokio::sync::mpsc; #[serial] @@ -35,7 +35,7 @@ async fn test_genesis() { // Create a dummy notifier (not used, we only check genesis loading) let (_sender, receiver) = mpsc::unbounded_channel(); - let notifier = TestHostNotifier::new(receiver); + let notifier = TestHostNotifier::new(receiver, Arc::new(AtomicU64::new(0))); // Build a dummy blob cacher let blob_cacher = signet_blobber::BlobFetcher::builder() @@ -58,6 +58,7 @@ async fn test_genesis() { ipc: None, }) .with_rpc_config(StorageRpcConfig::default()) + .with_cancellation_token(cancel_token.clone()) .build() .await .unwrap(); diff --git a/crates/node-tests/tests/journal_sync.rs b/crates/node-tests/tests/journal_sync.rs new file mode 100644 index 00000000..13070e53 --- /dev/null +++ b/crates/node-tests/tests/journal_sync.rs @@ -0,0 +1,564 @@ +//! End-to-end tests for the `journals` sync strategy: a syncing [`SignetNode`] subscribes to an +//! in-process journal WebSocket source via the real [`signet_journal_client::JournalClient`], +//! applies the journals through its ingestor, and (when caught up to the host tip) hands off to +//! block execution. + +use alloy::{ + consensus::Header, + primitives::{Address, B256, keccak256, map::HashSet}, +}; +use bytes::Bytes; +use serial_test::serial; +use signet_cold::mem::MemColdBackend; +use signet_hot::{ + db::{HistoryWrite, HotDbRead, UnsafeDbWrite}, + mem::MemKv, +}; +use signet_journal::{GENESIS_JOURNAL_HASH, HostJournal, Journal, JournalMeta}; +use signet_journal_chain::{Checkpoint, extract_signet_metadata, test_utils::TestServer}; +use signet_node::{NodeStatus, SignetNodeBuilder}; +use signet_node_config::{JournalConfig, test_utils::test_config_with_journal}; +use signet_node_tests::{ + HostBlockSpec, NotificationWithSidecars, TestHostNotifier, convert::to_host_notification, +}; +use signet_rpc::{ServeConfig, StorageRpcConfig}; +use signet_storage::{CancellationToken, HistoryRead, HotKv, UnifiedStorage}; +use std::{ + borrow::Cow, + sync::{ + Arc, Mutex, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; +use tokio::sync::mpsc; +use trevm::journal::{BundleStateIndex, JournalEncode}; + +/// Upper bound on how long any test waits for the node to make progress. In-process journal +/// application is sub-second; the slowest path is source exhaustion (a ~400ms window), so this +/// comfortably exceeds the real timings while keeping a genuine hang quick to surface. +const TIMEOUT: Duration = Duration::from_secs(5); + +/// A cloneable builder for chained `Journal::V1` blobs. Two chains advance in lockstep: +/// +/// - The **journal-hash chain** (`previous_journal_hash`), seeded at [`GENESIS_JOURNAL_HASH`] so +/// the first journal validates against the node's genesis checkpoint, then chaining off the +/// keccak256 of each prior journal blob. +/// - The **block-header chain** (`parent_hash`), seeded at the genesis header hash so storage's +/// `append_blocks` continuity check passes, then chaining off each prior header's hash. +/// +/// Cloning snapshots the state at a height, which a reorg test uses to grow a divergent fork from +/// a common ancestor. +#[derive(Clone)] +struct JournalChainGen { + previous_journal_hash: B256, + parent_hash: B256, + next_height: u64, +} + +impl JournalChainGen { + fn new() -> Self { + Self { + previous_journal_hash: GENESIS_JOURNAL_HASH, + parent_hash: genesis_header_hash(), + next_height: 1, + } + } + + /// Append a journal at the next height. `salt` perturbs the header (via its timestamp) so two + /// forks at the same height produce distinct header and journal hashes. Returns the wire blob + /// and its journal hash (the value persisted into the `JournalHashes` table). + fn push(&mut self, salt: u64) -> (Bytes, B256) { + let header = Header { + number: self.next_height, + parent_hash: self.parent_hash, + timestamp: salt, + ..Header::default() + }; + self.parent_hash = header.hash_slow(); + self.next_height += 1; + let meta = JournalMeta::new(0, self.previous_journal_hash, Cow::Owned(header)); + let journal = Journal::V1(HostJournal::new(meta, BundleStateIndex::default())); + let bytes = Bytes::from(journal.encoded().to_vec()); + let journal_hash = keccak256(&bytes); + self.previous_journal_hash = journal_hash; + (bytes, journal_hash) + } +} + +/// Build `count` chained journals (salt 0 throughout). Returns the blobs and their journal hashes. +fn build_signet_journals(count: u64) -> (Vec, Vec) { + let mut generator = JournalChainGen::new(); + let mut journals = Vec::with_capacity(count as usize); + let mut journal_hashes = Vec::with_capacity(count as usize); + for _ in 0..count { + let (bytes, hash) = generator.push(0); + journals.push(bytes); + journal_hashes.push(hash); + } + (journals, journal_hashes) +} + +/// Load the test genesis into a throwaway hot store and return its block-0 header hash, so test +/// journals can chain their `parent_hash` onto the same genesis the node will load. +fn genesis_header_hash() -> B256 { + let cfg = test_config_with_journal(JournalConfig::default()); + let hot = MemKv::new(); + let hardforks = signet_genesis::genesis_hardforks(cfg.genesis()); + let writer = hot.writer().unwrap(); + writer.load_genesis(cfg.genesis(), &hardforks).unwrap(); + writer.commit().unwrap(); + let reader = hot.reader().unwrap(); + reader.get_header(0).unwrap().expect("missing genesis header").hash_slow() +} + +/// Build and spawn a journal-syncing node over the given `storage` and `cancel_token`, pointed at +/// `server_url` with the host tip fixed at `host_tip`. Genesis is loaded by the builder's prebuild +/// if absent, so the same storage can be handed to a second node to exercise restart-resume. +async fn build_journal_sync_node( + storage: Arc>, + cancel_token: CancellationToken, + server_url: &str, + host_tip: u64, +) -> (tokio::task::JoinHandle>, tokio::sync::watch::Receiver) { + // The notifier is held but never polled until the node transitions to block execution; its + // shared tip drives the catch-up decision. + let (_sender, receiver) = mpsc::unbounded_channel(); + let notifier = TestHostNotifier::new(receiver, Arc::new(AtomicU64::new(host_tip))); + let journal = JournalConfig::journal_sync_for_test(vec![server_url.to_owned()]); + build_node_with_notifier(storage, cancel_token, journal, notifier).await +} + +/// Build and spawn a journal-syncing node with a caller-supplied journal config and notifier. +/// Used directly by the drain test (to install a host-backpressuring notifier whose notification +/// sender it retains) and the exhaustion test (to inject a fail-fast journal config). +async fn build_node_with_notifier( + storage: Arc>, + cancel_token: CancellationToken, + journal: JournalConfig, + notifier: TestHostNotifier, +) -> (tokio::task::JoinHandle>, tokio::sync::watch::Receiver) { + let cfg = test_config_with_journal(journal); + + let blob_cacher = signet_blobber::BlobFetcher::builder() + .with_source(signet_blobber::MemoryBlobSource::new()) + .build_cache() + .spawn(); + + let (node, status) = SignetNodeBuilder::new(cfg) + .with_notifier(notifier) + .with_storage(storage) + .with_alias_oracle(Arc::new(Mutex::new(HashSet::
::default()))) + .with_blob_cacher(blob_cacher) + .with_serve_config(ServeConfig { + http: vec![], + http_cors: None, + ws: vec![], + ws_cors: None, + ipc: None, + }) + .with_rpc_config(StorageRpcConfig::default()) + .with_cancellation_token(cancel_token) + .build() + .await + .unwrap(); + + (tokio::spawn(node.start()), status) +} + +/// Stand up a journal-syncing node with fresh in-memory storage, pointed at `server_url` with the +/// host tip fixed at `host_tip`. Returns the storage handle, the (shared storage + node) +/// cancellation token, the node's join handle, and a status receiver. +async fn spawn_journal_sync_node( + server_url: &str, + host_tip: u64, +) -> ( + Arc>, + CancellationToken, + tokio::task::JoinHandle>, + tokio::sync::watch::Receiver, +) { + let cancel_token = CancellationToken::new(); + let storage = Arc::new(UnifiedStorage::spawn_erased( + MemKv::new(), + MemColdBackend::new(), + cancel_token.clone(), + )); + let (handle, status) = + build_journal_sync_node(Arc::clone(&storage), cancel_token.clone(), server_url, host_tip) + .await; + (storage, cancel_token, handle, status) +} + +/// Wait until the node status reports at least `height`. Races the node's join `handle`: if the +/// node exits early it panics with the node's result (a clear diagnostic) rather than letting the +/// caller fall through to a misleading assertion. Panics on timeout. +async fn wait_for_height( + status: &mut tokio::sync::watch::Receiver, + handle: &mut tokio::task::JoinHandle>, + height: u64, +) { + let wait = async { + loop { + if let NodeStatus::AtHeight(current) = *status.borrow_and_update() + && current >= height + { + return; + } + tokio::select! { + biased; + result = &mut *handle => panic!("node exited before reaching height {height}: {result:?}"), + _ = status.changed() => {} + } + } + }; + tokio::time::timeout(TIMEOUT, wait).await.expect("timed out waiting for node to reach height"); +} + +/// Poll storage until the journal hash at `height` equals `expected`, or panic after a timeout. +/// Used to observe a reorg replacing a height's journal with a divergent one. +async fn wait_for_journal_hash(storage: &UnifiedStorage, height: u64, expected: B256) { + let wait = async { + loop { + let actual = storage.reader().unwrap().get_journal_hash(height).unwrap(); + if actual == Some(expected) { + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }; + tokio::time::timeout(TIMEOUT, wait) + .await + .expect("timed out waiting for journal hash to update"); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_applies_journals_to_storage() { + let (journals, hashes) = build_signet_journals(5); + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + &journals, + ) + .await; + + // Pin the host tip far ahead so the node never decides it has caught up: it stays in journal + // sync and we can observe every applied journal in storage. + let (storage, cancel_token, mut handle, mut status) = + spawn_journal_sync_node(server.url.as_str(), 1_000_000).await; + + wait_for_height(&mut status, &mut handle, 5).await; + + let reader = storage.reader().unwrap(); + assert_eq!(reader.last_block_number().unwrap(), Some(5)); + for (index, expected_hash) in hashes.iter().enumerate() { + let height = index as u64 + 1; + assert!(reader.get_header(height).unwrap().is_some(), "missing header at height {height}"); + assert_eq!( + reader.get_journal_hash(height).unwrap(), + Some(*expected_hash), + "journal hash mismatch at height {height}", + ); + } + drop(reader); + + // Graceful shutdown: cancelling drives the cooperative wind-down and the node exits cleanly. + cancel_token.cancel(); + let result = tokio::time::timeout(TIMEOUT, handle) + .await + .expect("node did not shut down within 10s") + .expect("node task panicked"); + result.expect("node should shut down cleanly"); + + drop(server.shutdown_sender); +} + +/// Serve `journals`, spawn a journal-sync node at `host_tip`, and wait for it to transition to +/// block execution and exit cleanly. The node's host-notifier channel has no live sender, so once +/// it hands off to block execution `next_notification` yields `None` and it shuts down on its own; +/// a self-exit with `Ok(())` (no cancellation) is the signal the transition happened - journal +/// sync alone would keep running. Returns storage so the caller can assert how far it synced. +async fn run_until_transition(journals: &[Bytes], host_tip: u64) -> Arc> { + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + journals, + ) + .await; + let (storage, _cancel_token, handle, _status) = + spawn_journal_sync_node(server.url.as_str(), host_tip).await; + + let result = tokio::time::timeout(TIMEOUT, handle) + .await + .expect("node did not transition and exit before timeout") + .expect("node task panicked"); + result.expect("node should transition to block execution and exit cleanly"); + + drop(server.shutdown_sender); + storage +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_transitions_at_startup_when_already_at_tip() { + let constants = test_config_with_journal(JournalConfig::default()).constants().unwrap(); + // Boundary case: a host tip at the genesis-paired height means the startup catch-up check + // passes before the client is even spawned, so no journal is ever applied (the upstream is + // never connected), the node transitions straight to block execution, and storage stays at + // genesis. + let (journals, _) = build_signet_journals(3); + let storage = run_until_transition(&journals, constants.host_deploy_height()).await; + assert_eq!(storage.reader().unwrap().last_block_number().unwrap(), Some(0)); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_transitions_after_catching_up() { + let constants = test_config_with_journal(JournalConfig::default()).constants().unwrap(); + // A host tip paired with rollup height 5 forces the node to apply journals first: the catch-up + // check only trips once the applied tip is within the transition margin of 5. It then hands + // off to block execution somewhere in [5 - margin, 5]. + let (journals, _) = build_signet_journals(5); + let storage = run_until_transition(&journals, constants.pair_ru(5).host).await; + + let tip = storage + .reader() + .unwrap() + .last_block_number() + .unwrap() + .expect("node should have synced at least one block before transitioning"); + assert!((3..=5).contains(&tip), "expected transition after syncing 3-5 journals, got {tip}"); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_fatal_when_all_sources_exhausted() { + // Point the node at a source that never accepts a journal connection. The fail-fast config + // uses short client timeouts, so the client fails the source, exhausts its only option, and + // the node bails well within `TIMEOUT`. + let cancel_token = CancellationToken::new(); + let storage = Arc::new(UnifiedStorage::spawn_erased( + MemKv::new(), + MemColdBackend::new(), + cancel_token.clone(), + )); + let notifier = + TestHostNotifier::new(mpsc::unbounded_channel().1, Arc::new(AtomicU64::new(1_000_000))); + let journal = + JournalConfig::journal_sync_for_test_fail_fast(vec!["ws://127.0.0.1:1".to_owned()]); + let (handle, _status) = + build_node_with_notifier(storage, cancel_token, journal, notifier).await; + + let result = tokio::time::timeout(TIMEOUT, handle) + .await + .expect("node did not bail before timeout") + .expect("node task panicked"); + let error = result.expect_err("node should bail when all journal sources are exhausted"); + assert!(format!("{error:#}").contains("exhausted all sources"), "unexpected error: {error:#}"); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_handles_reorg() { + // Common prefix: heights 1-2. Snapshot the generator there, then grow two divergent forks. + let mut generator = JournalChainGen::new(); + let prefix: Vec = (0..2).map(|_| generator.push(0).0).collect(); + let mut fork = generator.clone(); + + // Chain A (salt 0): heights 3-5, served initially. + let (a3, a3_hash) = generator.push(0); + let (a4, _) = generator.push(0); + let (a5, _) = generator.push(0); + let chain_a: Vec = prefix.iter().cloned().chain([a3, a4, a5]).collect(); + + // Chain B (salt 1): divergent heights 3-5, pushed live to trigger the reorg. + let (b3, b3_hash) = fork.push(1); + let (b4, b4_hash) = fork.push(1); + let (b5, b5_hash) = fork.push(1); + + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + &chain_a, + ) + .await; + + let (storage, cancel_token, mut handle, mut status) = + spawn_journal_sync_node(server.url.as_str(), 1_000_000).await; + + wait_for_height(&mut status, &mut handle, 5).await; + assert_eq!( + storage.reader().unwrap().get_journal_hash(3).unwrap(), + Some(a3_hash), + "expected chain A journal at height 3 before the reorg", + ); + + // Push the divergent fork. The server detects the reorg at height 3, truncates, and rebroadcasts + // the replacements; the node's chain sees the hash mismatch and the ingestor unwinds + replays. + for journal in [b3, b4, b5] { + server.journal_sender.send(journal).await.expect("server chain channel open"); + } + + wait_for_journal_hash(&storage, 5, b5_hash).await; + let reader = storage.reader().unwrap(); + assert_eq!(reader.last_block_number().unwrap(), Some(5), "tip should be back at 5 post-reorg"); + assert_eq!(reader.get_journal_hash(3).unwrap(), Some(b3_hash), "height 3 not replaced"); + assert_eq!(reader.get_journal_hash(4).unwrap(), Some(b4_hash), "height 4 not replaced"); + drop(reader); + + cancel_token.cancel(); + let result = tokio::time::timeout(TIMEOUT, handle) + .await + .expect("node did not shut down before timeout") + .expect("node task panicked"); + result.expect("node should shut down cleanly"); + + drop(server.shutdown_sender); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_resumes_from_persisted_checkpoint() { + let (journals, hashes) = build_signet_journals(5); + + // The storage outlives both node runs; its own cancellation token is held until the very end + // so its background tasks survive the first node's shutdown. + let storage_cancel = CancellationToken::new(); + let storage = Arc::new(UnifiedStorage::spawn_erased( + MemKv::new(), + MemColdBackend::new(), + storage_cancel.clone(), + )); + + // Phase 1: sync heights 1-3 against an upstream that only holds those, then shut the node down. + // Storage retains the `JournalHashes` entries the next node will seed from. + { + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + &journals[..3], + ) + .await; + let node_cancel = CancellationToken::new(); + let (mut handle, mut status) = build_journal_sync_node( + Arc::clone(&storage), + node_cancel.clone(), + server.url.as_str(), + 1_000_000, + ) + .await; + wait_for_height(&mut status, &mut handle, 3).await; + node_cancel.cancel(); + tokio::time::timeout(TIMEOUT, handle) + .await + .expect("first node did not shut down before timeout") + .expect("first node task panicked") + .expect("first node should shut down cleanly"); + drop(server.shutdown_sender); + } + + assert_eq!(storage.reader().unwrap().last_block_number().unwrap(), Some(3)); + assert_eq!(storage.reader().unwrap().get_journal_hash(3).unwrap(), Some(hashes[2])); + + // Phase 2: a fresh node over the same storage must seed its client checkpoint from height 3 + // (not genesis) and resume from height 4. The upstream serves the whole 1-5 chain; a node that + // wrongly re-bootstrapped from genesis would receive height 1 first and fail to append it onto + // a storage already at tip 3 - so simply reaching height 5 proves the resume. + { + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + &journals, + ) + .await; + let node_cancel = CancellationToken::new(); + let (mut handle, mut status) = build_journal_sync_node( + Arc::clone(&storage), + node_cancel.clone(), + server.url.as_str(), + 1_000_000, + ) + .await; + wait_for_height(&mut status, &mut handle, 5).await; + let reader = storage.reader().unwrap(); + assert_eq!(reader.last_block_number().unwrap(), Some(5)); + assert_eq!(reader.get_journal_hash(5).unwrap(), Some(hashes[4])); + drop(reader); + node_cancel.cancel(); + tokio::time::timeout(TIMEOUT, handle) + .await + .expect("second node did not shut down before timeout") + .expect("second node task panicked") + .expect("second node should shut down cleanly"); + drop(server.shutdown_sender); + } + + storage_cancel.cancel(); +} + +/// Poll `counter` until it reaches at least `target`, or panic after [`TIMEOUT`]. +async fn wait_for_count(counter: &AtomicU64, target: u64) { + let wait = async { + while counter.load(Ordering::SeqCst) < target { + tokio::time::sleep(Duration::from_millis(5)).await; + } + }; + tokio::time::timeout(TIMEOUT, wait).await.expect("timed out waiting for drained count"); +} + +#[serial] +#[tokio::test(flavor = "multi_thread")] +async fn journal_sync_drains_host_notifications() { + let (journals, _) = build_signet_journals(5); + let server = TestServer::spawn_with( + extract_signet_metadata, + Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + &journals, + ) + .await; + + let cancel_token = CancellationToken::new(); + let storage = Arc::new(UnifiedStorage::spawn_erased( + MemKv::new(), + MemColdBackend::new(), + cancel_token.clone(), + )); + + // Pin the host tip far ahead so the node never transitions: it stays in journal sync, keeping + // the drain arm active. The notifier reports `backpressures_host` and tallies every drained + // notification, emulating a reth ExEx. + let drained = Arc::new(AtomicU64::new(0)); + let (host_sender, host_receiver) = mpsc::unbounded_channel(); + let notifier = TestHostNotifier::new(host_receiver, Arc::new(AtomicU64::new(1_000_000))) + .with_backpressure(Arc::clone(&drained)); + + let journal = JournalConfig::journal_sync_for_test(vec![server.url.as_str().to_owned()]); + let (mut handle, mut status) = + build_node_with_notifier(Arc::clone(&storage), cancel_token.clone(), journal, notifier) + .await; + + // Feed host notifications the node must drain and discard - it applies journals, not blocks. + let constants = test_config_with_journal(JournalConfig::default()).constants().unwrap(); + for height in 1..=3u64 { + let block = HostBlockSpec::new(constants.clone()); + block.set_block_number(height); + let notification = NotificationWithSidecars::commit_single_block(block); + host_sender.send(to_host_notification(¬ification.notification)).unwrap(); + } + + // Journals still apply while the host notifications are concurrently drained, and every fed + // notification is consumed rather than left to pile up (which would stall a real host). + wait_for_height(&mut status, &mut handle, 5).await; + wait_for_count(&drained, 3).await; + assert_eq!(storage.reader().unwrap().last_block_number().unwrap(), Some(5)); + + cancel_token.cancel(); + let result = tokio::time::timeout(TIMEOUT, handle) + .await + .expect("node did not shut down before timeout") + .expect("node task panicked"); + result.expect("node should shut down cleanly"); + + drop(server.shutdown_sender); +} diff --git a/crates/node-types/src/notifier.rs b/crates/node-types/src/notifier.rs index 49bfe6a5..4d5bd3db 100644 --- a/crates/node-types/src/notifier.rs +++ b/crates/node-types/src/notifier.rs @@ -59,4 +59,20 @@ pub trait HostNotifier: Send + Sync { /// Signal that processing is complete up to this host block number. /// The backend resolves the block number to a block hash internally. fn send_finished_height(&self, block_number: u64) -> Result<(), Self::Error>; + + /// Query the current host-chain tip block number. Used by a syncing node to + /// decide when its applied state has caught up to the host and it can hand + /// off to live block execution. + fn host_tip(&self) -> impl Future> + Send; + + /// Whether leaving notifications unconsumed backpressures - and can stall - the host. + /// + /// A reth ExEx shares the host node's notification pipeline: if it stops consuming, reth's + /// notification buffer fills and reth's pipeline stalls. A journal-syncing node never + /// consumes host notifications (it derives state from the upstream journal feed), so for such + /// a backend it must drain and discard them to keep the host moving. Pull-based followers + /// that poll a remote host are unaffected and use the default `false`. + fn backpressures_host(&self) -> bool { + false + } } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index f8b5a22e..a93f19ed 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -19,6 +19,7 @@ signet-genesis.workspace = true signet-hot.workspace = true signet-journal.workspace = true signet-journal-chain = { workspace = true, features = ["serve", "signet-extract"] } +signet-journal-client.workspace = true signet-node-config.workspace = true signet-node-types.workspace = true signet-rpc.workspace = true @@ -34,4 +35,9 @@ eyre.workspace = true metrics.workspace = true reqwest.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true +url.workspace = true + +[dev-dependencies] +signet-node-config = { workspace = true, features = ["test_utils"] } diff --git a/crates/node/src/builder.rs b/crates/node/src/builder.rs index 32198b81..5759e608 100644 --- a/crates/node/src/builder.rs +++ b/crates/node/src/builder.rs @@ -11,6 +11,7 @@ use signet_node_types::HostNotifier; use signet_rpc::{ServeConfig, StorageRpcConfig}; use signet_storage::{HistoryRead, HistoryWrite, HotKv, HotKvRead, UnifiedStorage}; use std::sync::Arc; +use tokio_util::sync::CancellationToken; use tracing::info; use trevm::revm::database::DBErrorMarker; @@ -65,6 +66,7 @@ pub struct SignetNodeBuilder, serve_config: Option, rpc_config: Option, + cancellation_token: Option, } impl core::fmt::Debug for SignetNodeBuilder { @@ -85,6 +87,7 @@ impl SignetNodeBuilder { blob_cacher: None, serve_config: None, rpc_config: None, + cancellation_token: None, } } } @@ -104,6 +107,7 @@ impl SignetNodeBuilder { blob_cacher: self.blob_cacher, serve_config: self.serve_config, rpc_config: self.rpc_config, + cancellation_token: self.cancellation_token, } } @@ -118,6 +122,7 @@ impl SignetNodeBuilder { blob_cacher: self.blob_cacher, serve_config: self.serve_config, rpc_config: self.rpc_config, + cancellation_token: self.cancellation_token, } } @@ -135,6 +140,7 @@ impl SignetNodeBuilder { blob_cacher: self.blob_cacher, serve_config: self.serve_config, rpc_config: self.rpc_config, + cancellation_token: self.cancellation_token, } } @@ -161,6 +167,13 @@ impl SignetNodeBuilder { self.rpc_config = Some(rpc_config); self } + + /// Set the cancellation token used for graceful shutdown. Required - the + /// builder fails at `build()` if this is not set. + pub fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self { + self.cancellation_token = Some(cancellation_token); + self + } } impl SignetNodeBuilder>, Aof> @@ -227,6 +240,7 @@ where self.blob_cacher.ok_or_eyre("blob cacher must be set")?, self.serve_config.ok_or_eyre("serve config must be set")?, self.rpc_config.ok_or_eyre("rpc config must be set")?, + self.cancellation_token.ok_or_eyre("cancellation token must be set")?, ) } } diff --git a/crates/node/src/journal_sync/ingestor.rs b/crates/node/src/journal_sync/ingestor.rs new file mode 100644 index 00000000..a0a68b3b --- /dev/null +++ b/crates/node/src/journal_sync/ingestor.rs @@ -0,0 +1,199 @@ +use crate::NodeStatus; +use alloy::{ + consensus::BlockHeader, + primitives::{Sealable, keccak256}, +}; +use bytes::Bytes; +use eyre::{Context, bail, eyre}; +use signet_journal::Journal; +use signet_journal_chain::JournalChainEvent; +use signet_rpc::{ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification}; +use signet_storage::{DrainedBlock, ExecutedBlockBuilder, HotKv, HotKvRead, UnifiedStorage}; +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, instrument}; +use trevm::{ + journal::JournalDecode, + revm::database::{BundleState, DBErrorMarker}, +}; + +/// Applies [`JournalChainEvent`]s from a local journal chain to storage on a +/// syncing node. +/// +/// The ingestor is the consumer end of the chain's backpressured sender: the +/// chain awaits on the sender after each event, so a slow ingestor throttles +/// ingestion all the way back to the upstream source. Each event is applied +/// synchronously before the next is taken, keeping hot storage's tip in +/// lockstep with the chain's. +/// +/// - [`JournalChainEvent::Reorg`] drains storage above the fork point, rewinds +/// the block tags, and broadcasts a reorg notification. +/// - [`JournalChainEvent::Journal`] decodes the journal, applies its state diff +/// via `append_blocks` (cold storage receives the header with empty +/// transaction/receipt/event lists), and broadcasts a new-block notification. +#[derive(Debug)] +pub(crate) struct JournalIngestor { + storage: Arc>, + chain: ChainNotifier, + status: watch::Sender, + event_rx: mpsc::Receiver, + cancellation_token: CancellationToken, + /// Latest applied rollup height, published after every event so the + /// run-loop driver can query `host_tip` exactly when progress occurs. + applied_rollup_height: watch::Sender, +} + +impl JournalIngestor +where + H: HotKv + Clone + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + /// Create a new ingestor consuming from the chain's backpressured receiver. + pub(crate) const fn new( + storage: Arc>, + chain: ChainNotifier, + status: watch::Sender, + event_rx: mpsc::Receiver, + cancellation_token: CancellationToken, + applied_rollup_height: watch::Sender, + ) -> Self { + Self { storage, chain, status, event_rx, cancellation_token, applied_rollup_height } + } + + /// Run the ingestion loop until the cancellation token fires, the chain + /// drops its sender, or a fatal error occurs. The cancellation check only + /// runs between events, so an in-flight `apply_journal` always runs to + /// completion before the loop exits. + /// + /// On cancellation, any events the chain has already emitted are applied + /// before returning. At a journals->blocks handoff this lets hot storage + /// catch up to the chain's tip, so the chain does not lead storage when the + /// node starts executing blocks (see [`crate::SignetNode::run_journal_sync`]). + pub(crate) async fn run(mut self) -> eyre::Result<()> { + loop { + tokio::select! { + biased; + () = self.cancellation_token.cancelled() => break, + event = self.event_rx.recv() => match event { + Some(event) => self.apply_event(event).await?, + None => return Ok(()), + }, + } + } + // Cancelled. Drain events the chain has already emitted so storage reaches the chain's + // emitted tip. Best-effort: `try_recv` takes what is buffered now. The chain may emit a + // little more from input it had not yet processed; block execution reconciles that on + // re-derivation (the chain treats the re-derived journals as duplicates). + while let Ok(event) = self.event_rx.try_recv() { + self.apply_event(event).await?; + } + Ok(()) + } + + /// Apply a single chain event to storage. + async fn apply_event(&self, event: JournalChainEvent) -> eyre::Result<()> { + match event { + JournalChainEvent::Reorg(height) => self.apply_reorg(height).await, + JournalChainEvent::Journal { height, data } => self.apply_journal(height, data).await, + } + } + + /// Unwind storage and tags in response to a reorg at `height`. All state at + /// heights `>= height` is removed, so the surviving tip is `height - 1`. + #[instrument(skip(self))] + async fn apply_reorg(&self, height: u64) -> eyre::Result<()> { + let common_ancestor = height.saturating_sub(1); + let drained = self + .storage + .drain_above(common_ancestor) + .await + .wrap_err("failed to drain storage during journal-sync reorg")?; + + // Cap tags before broadcasting so RPC consumers never observe a tag + // pointing above the post-reorg tip. + self.chain.tags().rewind_to(common_ancestor); + + if !drained.is_empty() { + self.notify_reorg(drained, common_ancestor); + } + self.status.send_modify(|status| *status = NodeStatus::AtHeight(common_ancestor)); + self.applied_rollup_height.send_replace(common_ancestor); + debug!(height, common_ancestor, "applied journal-sync reorg"); + Ok(()) + } + + /// Decode a journal, apply its state diff to storage, and broadcast the new + /// block. The decoded header is re-sealed (recomputing the canonical block + /// hash) and the keccak256 of the wire bytes is stamped onto the block so + /// `append_blocks` records it in the `JournalHashes` table for restart-safe + /// checkpoint seeding. + #[instrument(skip(self, data), fields(len = data.len()))] + async fn apply_journal(&self, height: u64, data: Bytes) -> eyre::Result<()> { + let journal_hash = keccak256(&data); + let mut slice = data.as_ref(); + let journal = Journal::decode(&mut slice) + .map_err(|error| eyre!("failed to decode journal at height {height}: {error}"))?; + let Journal::V1(host_journal) = journal else { + bail!("unsupported journal version at height {height}"); + }; + + let (meta, bundle_index) = host_journal.into_parts(); + let (_host_height, _prev_journal_hash, header) = meta.into_parts(); + let bundle = BundleState::from(bundle_index); + + let executed = ExecutedBlockBuilder::new() + .header(header.seal_slow()) + .bundle(bundle) + .journal_hash(journal_hash) + .build() + .wrap_err("failed to build executed block from journal")?; + + // The journal chain indexes by rollup height (the header number), so a + // mismatch here means the chain and the decoded payload disagree - a + // bug in the chain or the metadata extractor, not bad upstream data. + let block_number = executed.block_number(); + if block_number != height { + bail!("journal-sync height {height} does not match header height {block_number}"); + } + + // Build the broadcast payload before `append_blocks` consumes the block. + // Journal sync carries no transactions or receipts, so both lists are + // empty; `newHeads` subscribers still see the header. + let notification = NewBlockNotification { + header: executed.header.inner().clone(), + transactions: Vec::new(), + receipts: Vec::new(), + }; + + self.storage + .append_blocks(vec![executed]) + .await + .wrap_err("failed to append journal-synced block to storage")?; + + self.chain.tags().set_latest(height); + // Best-effort broadcast to RPC push-subscribers; an error just means none are connected. + let _ = self.chain.send_new_block(notification); + self.status.send_modify(|status| *status = NodeStatus::AtHeight(height)); + self.applied_rollup_height.send_replace(height); + info!(height, "applied journal"); + Ok(()) + } + + /// Broadcast a reorg notification built from the drained blocks. + fn notify_reorg(&self, drained: Vec, common_ancestor: u64) { + let removed_blocks = drained + .into_iter() + .map(|block| { + let number = block.header.number(); + let hash = block.header.hash(); + let timestamp = block.header.timestamp(); + let logs = block.receipts.into_iter().flat_map(|receipt| receipt.receipt.logs); + RemovedBlock { number, hash, timestamp, logs: logs.collect() } + }) + .collect(); + // `send_reorg` records the reorg in the authoritative ring buffer before broadcasting, + // so an error here just means no RPC push-subscribers are connected. + let _ = self.chain.send_reorg(ReorgNotification { common_ancestor, removed_blocks }); + } +} diff --git a/crates/node/src/journal_sync/mod.rs b/crates/node/src/journal_sync/mod.rs new file mode 100644 index 00000000..eb871e68 --- /dev/null +++ b/crates/node/src/journal_sync/mod.rs @@ -0,0 +1,11 @@ +//! Journal-based sync: applying journals received from upstream sources to +//! local hot + cold storage, as an alternative to executing host blocks. + +mod ingestor; +pub(crate) use ingestor::JournalIngestor; +mod runtime; +pub(crate) use runtime::{ + JOURNAL_SYNC_BACKPRESSURE_CAPACITY, JournalExitKind, JournalSyncSetup, RunningJournalSync, + SyncOutcome, build_journal_client, collapse_sync_failure, is_caught_up, journal_sync_loop, + journal_task_result, seed_journal_checkpoints, +}; diff --git a/crates/node/src/journal_sync/runtime.rs b/crates/node/src/journal_sync/runtime.rs new file mode 100644 index 00000000..18514c0a --- /dev/null +++ b/crates/node/src/journal_sync/runtime.rs @@ -0,0 +1,500 @@ +//! Setup, running state, and run-loop helpers for the `journals` sync strategy. +//! +//! Everything in here is consumed by [`crate::SignetNode::run_journal_sync`] and is +//! deliberately decoupled from the node struct so the orchestrator method stays a thin shell. + +use crate::journal_sync::JournalIngestor; +use bytes::Bytes; +use eyre::{Context, OptionExt, Report, eyre}; +use signet_journal::GENESIS_JOURNAL_HASH; +#[cfg(doc)] +use signet_journal_chain::JournalChainEvent; +use signet_journal_chain::{Checkpoint, JournalChainError, SAFETY_MARGIN, extract_signet_metadata}; +use signet_journal_client::{JournalClient, JournalClientConfig}; +use signet_node_config::JournalConfig; +use signet_node_types::HostNotifier; +use signet_storage::{HistoryRead, HotDbRead, HotKv, HotKvRead, UnifiedStorage}; +use signet_types::constants::SignetSystemConstants; +use tokio::{ + sync::{mpsc, watch}, + task::{JoinError, JoinHandle, JoinSet}, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, warn}; +use trevm::revm::database::DBErrorMarker; +use url::Url; + +/// Capacity of the bounded channel carrying [`JournalChainEvent`]s from the journal chain to the +/// ingestor on a syncing node. Small: it exists only to let the chain run a few journals ahead of +/// the ingestor; once full, the chain's `blocking_send` backpressures all the way to the upstream +/// source. +pub(crate) const JOURNAL_SYNC_BACKPRESSURE_CAPACITY: usize = 16; + +/// Host-block margin allowed between the journal-synced rollup tip and the host tip when deciding +/// the node has caught up. A persistent one-block lag is normal in steady state (the host has +/// usually ticked once between the time we apply a journal and the time we query the tip), so a +/// strict equality check would never converge. `DbBackfill` closes any actual gap when +/// `run_block_sync` calls `set_head`. +pub(crate) const JOURNAL_SYNC_TRANSITION_MARGIN: u64 = 2; + +/// Whether the journal-chain ingestion task was expected to have exited at the point of awaiting +/// its join handle. +#[derive(Debug, Clone, Copy)] +pub(crate) enum JournalExitKind { + /// Awaited during shutdown after closing the input channel; a clean exit is success. + Expected, + /// Awaited while still expecting to feed journals; any exit, clean or otherwise, is fatal. + Unexpected, +} + +/// Translate a chain task's join result into an `eyre::Result`, accounting for whether a clean +/// exit was anticipated. +pub(crate) fn journal_task_result( + result: Result, JoinError>, + kind: JournalExitKind, +) -> eyre::Result<()> { + match result { + Ok(Ok(())) => match kind { + JournalExitKind::Expected => Ok(()), + JournalExitKind::Unexpected => { + Err(eyre!("journal chain ingestion task exited unexpectedly")) + } + }, + Ok(Err(error)) => Err(Report::new(error).wrap_err("journal chain ingestion task failed")), + Err(error) => { + Err(Report::new(error).wrap_err("journal chain ingestion task panicked or was aborted")) + } + } +} + +/// The journal client and ingestor for a syncing node, built at construction and spawned once the +/// RPC server is up. +pub(crate) struct JournalSyncSetup { + /// Connects to upstream sources and forwards raw journal bytes into the journal chain's input + /// channel. + pub(crate) client: JournalClient, + /// Consumes [`JournalChainEvent`]s from the chain's backpressured sender and applies them to + /// storage. + pub(crate) ingestor: JournalIngestor, + /// Child of the node's cancellation token; cancelling it stops the client and ingestor + /// cooperatively without touching the chain. Fires either on transition to block execution + /// or via cascade on graceful shutdown. + pub(crate) sync_token: CancellationToken, + /// Updated by the ingestor after every event; the run-loop awakes on each change to query the + /// host tip and decide whether to transition. + pub(crate) applied_rollup_height: watch::Receiver, +} + +/// Owns the running journal client and ingestor tasks plus the signals the run-loop reacts to. +/// Cancelling the embedded token stops both tasks cooperatively without affecting the chain +/// task (which the node owns separately and keeps alive across the handoff to block execution). +pub(crate) struct RunningJournalSync { + pub(crate) sync_tasks: JoinSet>, + pub(crate) sync_token: CancellationToken, + pub(crate) applied_rollup_height: watch::Receiver, +} + +impl RunningJournalSync { + /// Spawn the client and ingestor tasks from `setup`. The chain task is owned separately by + /// the node. + pub(crate) fn start(setup: JournalSyncSetup, journal_sender: mpsc::Sender) -> Self + where + H: HotKv + Clone + Send + Sync + 'static, + ::Error: DBErrorMarker, + { + let JournalSyncSetup { client, ingestor, sync_token, applied_rollup_height } = setup; + let client_token = sync_token.clone(); + let mut sync_tasks: JoinSet> = JoinSet::new(); + sync_tasks.spawn(async move { + tokio::select! { + biased; + () = client_token.cancelled() => Ok(()), + result = client.subscribe(journal_sender) => match result { + Ok(()) => Ok(()), + Err(error) => { + Err(Report::new(error).wrap_err("journal client exhausted all sources")) + } + }, + } + }); + sync_tasks + .spawn(async move { ingestor.run().await.wrap_err("journal ingestor task failed") }); + Self { sync_tasks, sync_token, applied_rollup_height } + } + + /// Cancel the sync token (idempotent) and drain both tasks cooperatively, logging late errors + /// without overriding the caller's primary result. + pub(crate) async fn cancel_and_drain(mut self) { + self.sync_token.cancel(); + while let Some(joined) = self.sync_tasks.join_next().await { + match joined { + Ok(Ok(())) => {} + Ok(Err(error)) => { + let error = format!("{error:#}"); + error!(error, "journal sync task errored during shutdown"); + } + Err(join_error) => { + error!(error = ?join_error, "journal sync task panicked during shutdown"); + } + } + } + } +} + +/// First terminal event observed by [`journal_sync_loop`]. +pub(crate) enum SyncOutcome { + /// The journal-synced tip reached the host tip and the node should hand off to block + /// execution. + CaughtUp, + /// The node's cancellation token fired; perform a graceful shutdown. + Shutdown, + /// A task ended in an unexpected or fatal way; propagate the failure. + Failure(SyncFailure), +} + +/// How a journal-sync task or the chain task ended unexpectedly. +pub(crate) enum SyncFailure { + /// The chain task exited while the sync was still running (the node still held its journal + /// sender, so a clean exit here is itself unexpected). + ChainExited(Result, JoinError>), + /// A sync task returned an error. + SyncTaskFailed(Report), + /// A sync task panicked or was aborted. + SyncTaskPanicked(JoinError), + /// A sync task returned `Ok(())` without cancellation - impossible under normal operation + /// (both tasks only reach `Ok(())` via the cooperative teardown path that the caller + /// initiates after the loop exits). + SyncTaskExitedPrematurely, +} + +/// React to progress published by the ingestor, the chain task's exit, sync task failures, and +/// cancellation; return the first terminal event. +/// +/// Takes `&mut notifier` so it can drain and discard host notifications for backends that +/// [backpressure the host](HostNotifier::backpressures_host) when notifications go unconsumed +/// (a reth ExEx). Journal sync derives state from the upstream feed, not these notifications, so +/// they are discarded; draining only keeps the host's pipeline from stalling. `FinishedHeight` is +/// deliberately never signalled, so the host retains the blocks the post-catch-up handoff +/// backfills via `set_head`. +pub(crate) async fn journal_sync_loop( + notifier: &mut N, + storage: &UnifiedStorage, + constants: &SignetSystemConstants, + cancellation_token: &CancellationToken, + journal_task: &mut JoinHandle>, + sync: &mut RunningJournalSync, +) -> SyncOutcome +where + N: HostNotifier, + H: HotKv + Clone + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let mut drain_host = notifier.backpressures_host(); + + loop { + let mut rollup_progressed = false; + tokio::select! { + biased; + () = cancellation_token.cancelled() => return SyncOutcome::Shutdown, + chain_result = &mut *journal_task => { + return SyncOutcome::Failure(SyncFailure::ChainExited(chain_result)); + } + joined = sync.sync_tasks.join_next() => match joined { + Some(Ok(Ok(()))) => { + return SyncOutcome::Failure(SyncFailure::SyncTaskExitedPrematurely); + } + Some(Ok(Err(error))) => { + return SyncOutcome::Failure(SyncFailure::SyncTaskFailed(error)); + } + Some(Err(join_error)) => { + return SyncOutcome::Failure(SyncFailure::SyncTaskPanicked(join_error)); + } + // `join_next` only returns `None` for an empty set; both sync tasks are in flight + // here, so the variants above always cover the completion. + None => unreachable!("sync task set started non-empty"), + }, + // Ordered above the progress arm so draining stays prompt under a host backfill flood + // (when we are not caught up anyway); the progress arm is only starved while the host + // races ahead, which is exactly when no transition is due. + drained = notifier.next_notification(), if drain_host => match drained { + // Discard: consumed only to keep the host's notification pipeline draining. + Some(Ok(_)) => {} + Some(Err(error)) => { + warn!(%error, "host notification drain failed; disabling drain"); + drain_host = false; + } + None => { + debug!("host notification stream closed; disabling drain"); + drain_host = false; + } + }, + change = sync.applied_rollup_height.changed() => { + // `Err` means the ingestor dropped its sender (task ended); leave + // `rollup_progressed` false and let the next iteration fire the `sync_tasks` arm + // with the actual exit reason. + rollup_progressed = change.is_ok(); + } + } + // Re-checked outside the `select!` so it never borrows `notifier` while the drain arm + // holds `&mut *notifier`. Only rollup progress can newly satisfy the catch-up condition + // (the host tip only rises), so drain and spurious wakeups skip the check. + if rollup_progressed && is_caught_up(&*notifier, storage, constants).await { + return SyncOutcome::CaughtUp; + } + } +} + +/// Whether the journal-synced rollup tip's paired host height is within +/// [`JOURNAL_SYNC_TRANSITION_MARGIN`] blocks of the host tip. Transient query failures +/// (e.g. a flaky RPC) are logged and treated as "not caught up" so the run-loop keeps syncing; +/// `DbBackfill` closes any actual gap once `run_block_sync` calls `set_head`. +pub(crate) async fn is_caught_up( + notifier: &N, + storage: &UnifiedStorage, + constants: &SignetSystemConstants, +) -> bool +where + N: HostNotifier, + H: HotKv, + ::Error: DBErrorMarker, +{ + let result: eyre::Result = async { + let host_tip = notifier + .host_tip() + .await + .map_err(|error| Report::new(error).wrap_err("failed to query host tip"))?; + let reader = storage.reader()?; + let rollup_tip = reader.last_block_number()?.unwrap_or(0); + Ok(rollup_caught_up(rollup_tip, host_tip, constants)) + } + .await; + result.unwrap_or_else(|error| { + let error = format!("{error:#}"); + warn!(error, "host tip check failed, treating as not caught up"); + false + }) +} + +/// Whether a journal-synced rollup tip at `rollup_tip` is within [`JOURNAL_SYNC_TRANSITION_MARGIN`] +/// host blocks of `host_tip`. Split out from [`is_caught_up`] so the margin and pairing arithmetic +/// is unit-testable without a notifier or storage. `rollup_tip == 0` (genesis) pairs to the host +/// deploy height since there is no rollup block to pair. +const fn rollup_caught_up( + rollup_tip: u64, + host_tip: u64, + constants: &SignetSystemConstants, +) -> bool { + let paired_host = if rollup_tip == 0 { + constants.host_deploy_height() + } else { + constants.pair_ru(rollup_tip).host + }; + paired_host.saturating_add(JOURNAL_SYNC_TRANSITION_MARGIN) >= host_tip +} + +/// Translate a [`SyncFailure`] into an `eyre::Result`, awaiting the chain task for cleanup where +/// it hasn't already completed. +pub(crate) async fn collapse_sync_failure( + failure: SyncFailure, + journal_task: JoinHandle>, +) -> eyre::Result<()> { + // For `ChainExited`, the chain task already completed in the select; don't await its handle + // again (which would panic). For every other variant, drain it cooperatively. + if let SyncFailure::ChainExited(chain_result) = failure { + return journal_task_result(chain_result, JournalExitKind::Unexpected); + } + let _ = journal_task_result(journal_task.await, JournalExitKind::Expected); + match failure { + SyncFailure::ChainExited(_) => unreachable!(), + SyncFailure::SyncTaskFailed(error) => Err(error), + SyncFailure::SyncTaskPanicked(join_error) => { + Err(Report::new(join_error).wrap_err("journal sync task panicked or was aborted")) + } + SyncFailure::SyncTaskExitedPrematurely => { + Err(eyre!("journal sync task exited unexpectedly before cancellation")) + } + } +} + +/// The primary and fallback checkpoints used to seed a [`JournalClient`]. +#[derive(Debug, Clone, Copy)] +pub(crate) struct JournalCheckpoints { + /// Last known-good point; the client subscribes from `primary.height + 1`. + pub(crate) primary: Checkpoint, + /// A point `SAFETY_MARGIN` behind the primary (saturating to genesis), used as the reconnect + /// anchor when a reorg spanned downtime. + pub(crate) fallback: Checkpoint, +} + +/// Seed the [`JournalClient`] checkpoints from the node's `JournalHashes` table. +/// +/// On a fresh database both checkpoints are the genesis journal hash. Otherwise the primary is +/// the storage tip and the fallback is `SAFETY_MARGIN` blocks behind it (saturating to genesis), +/// so the client can resubscribe from a known-good point after a reorg that spanned downtime +/// instead of re-bootstrapping from genesis. +pub(crate) fn seed_journal_checkpoints( + storage: &UnifiedStorage, +) -> eyre::Result +where + H: HotKv, + ::Error: DBErrorMarker, +{ + let reader = storage.reader()?; + let tip = reader.last_block_number()?.unwrap_or(0); + + if tip == 0 { + let genesis = Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }; + return Ok(JournalCheckpoints { primary: genesis, fallback: genesis }); + } + + let primary_hash = reader.get_journal_hash(tip)?.ok_or_eyre( + "journal sync requires the JournalHashes table to be populated at the storage tip; \ + either it was disabled previously or storage is corrupt", + )?; + + let fallback_height = tip.saturating_sub(SAFETY_MARGIN); + let fallback_hash = if fallback_height == 0 { + GENESIS_JOURNAL_HASH + } else { + reader + .get_journal_hash(fallback_height)? + .ok_or_eyre("journal sync requires a JournalHashes entry at the fallback height")? + }; + + Ok(JournalCheckpoints { + primary: Checkpoint { height: tip, hash: primary_hash }, + fallback: Checkpoint { height: fallback_height, hash: fallback_hash }, + }) +} + +/// Build a [`JournalClient`] from the journal configuration and seeded checkpoints. Source URLs +/// are parsed here; the client further validates their scheme and shape (see +/// [`JournalClient::new`]). +pub(crate) fn build_journal_client( + config: &JournalConfig, + checkpoints: JournalCheckpoints, +) -> eyre::Result { + let sources = config + .sources() + .iter() + .map(|source| { + Url::parse(source).wrap_err_with(|| format!("invalid journal source URL: {source}")) + }) + .collect::>>()?; + + let mut client_config = JournalClientConfig::new(sources); + if let Some(timeout) = config.client_source_stall_timeout() { + client_config.source_stall_timeout = timeout; + } + if let Some(backoff) = config.client_source_backoff() { + client_config.source_backoff = backoff; + } + + JournalClient::new( + extract_signet_metadata, + checkpoints.primary, + checkpoints.fallback, + client_config, + ) + .wrap_err("failed to construct journal client") +} + +#[cfg(test)] +mod tests { + use super::*; + use signet_node_config::{JournalConfig, test_utils::test_config_with_journal}; + + /// A chain task that has already completed cleanly, for `collapse_sync_failure` cases that + /// drain (rather than inspect) the chain handle. + fn finished_chain_task() -> JoinHandle> { + tokio::spawn(async { Ok(()) }) + } + + #[test] + fn rollup_caught_up_respects_transition_margin() { + let constants = test_config_with_journal(JournalConfig::default()).constants().unwrap(); + + // At genesis (rollup tip 0) the paired host height is the deploy height. + let deploy = constants.host_deploy_height(); + assert!(rollup_caught_up(0, deploy, &constants)); + // Caught up while the host sits within the margin ahead... + assert!(rollup_caught_up(0, deploy + JOURNAL_SYNC_TRANSITION_MARGIN, &constants)); + // ...but not once it is one block past the margin. + assert!(!rollup_caught_up(0, deploy + JOURNAL_SYNC_TRANSITION_MARGIN + 1, &constants)); + + // At a non-zero rollup tip the paired host height comes from the pairing. + let rollup_tip = 5; + let paired = constants.pair_ru(rollup_tip).host; + assert!(rollup_caught_up(rollup_tip, paired, &constants)); + assert!(rollup_caught_up(rollup_tip, paired + JOURNAL_SYNC_TRANSITION_MARGIN, &constants)); + assert!(!rollup_caught_up( + rollup_tip, + paired + JOURNAL_SYNC_TRANSITION_MARGIN + 1, + &constants + )); + } + + #[tokio::test] + async fn collapse_sync_failure_propagates_task_error() { + let failure = SyncFailure::SyncTaskFailed(eyre!("ingestor blew up")); + let error = collapse_sync_failure(failure, finished_chain_task()).await.unwrap_err(); + assert!(format!("{error:#}").contains("ingestor blew up"), "unexpected error: {error:#}"); + } + + #[tokio::test] + async fn collapse_sync_failure_reports_premature_exit() { + let error = + collapse_sync_failure(SyncFailure::SyncTaskExitedPrematurely, finished_chain_task()) + .await + .unwrap_err(); + assert!( + format!("{error:#}").contains("exited unexpectedly"), + "unexpected error: {error:#}" + ); + } + + #[tokio::test] + async fn collapse_sync_failure_reports_panic() { + // Produce a real `JoinError` by awaiting a task that panicked. + let panicked = tokio::spawn(async { panic!("boom") }).await.unwrap_err(); + let error = + collapse_sync_failure(SyncFailure::SyncTaskPanicked(panicked), finished_chain_task()) + .await + .unwrap_err(); + assert!( + format!("{error:#}").contains("panicked or was aborted"), + "unexpected error: {error:#}" + ); + } + + #[tokio::test] + async fn collapse_sync_failure_chain_exit_is_unexpected() { + // A `ChainExited(Ok(Ok(())))` is a clean chain exit while we still expected to feed it - + // treated as fatal. The carried result is inspected directly, so a second never-resolving + // handle is passed to prove it is not awaited. + let never = tokio::spawn(async { + std::future::pending::<()>().await; + Ok(()) + }); + let error = + collapse_sync_failure(SyncFailure::ChainExited(Ok(Ok(()))), never).await.unwrap_err(); + assert!( + format!("{error:#}").contains("exited unexpectedly"), + "unexpected error: {error:#}" + ); + } + + #[test] + fn build_journal_client_rejects_bad_url() { + let config = JournalConfig::journal_sync_for_test(vec!["not a url".to_owned()]); + let checkpoints = JournalCheckpoints { + primary: Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + fallback: Checkpoint { height: 0, hash: GENESIS_JOURNAL_HASH }, + }; + let error = build_journal_client(&config, checkpoints).unwrap_err(); + assert!( + format!("{error:#}").contains("invalid journal source URL"), + "unexpected error: {error:#}" + ); + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 4af31105..b2d06a2e 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -14,6 +14,8 @@ mod builder; pub use builder::SignetNodeBuilder; +mod journal_sync; + mod metrics; mod node; diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 1247700e..b6c1df5d 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1,4 +1,12 @@ -use crate::{NodeStatus, metrics}; +use crate::{ + NodeStatus, + journal_sync::{ + JOURNAL_SYNC_BACKPRESSURE_CAPACITY, JournalExitKind, JournalIngestor, JournalSyncSetup, + RunningJournalSync, SyncOutcome, build_journal_client, collapse_sync_failure, is_caught_up, + journal_sync_loop, journal_task_result, seed_journal_checkpoints, + }, + metrics, +}; use alloy::{ consensus::BlockHeader, primitives::{B256, keccak256}, @@ -11,10 +19,10 @@ use signet_evm::EthereumHardfork; use signet_extract::{Extractable, Extractor}; use signet_journal::{GENESIS_JOURNAL_HASH, HostJournal, Journal, JournalMeta}; use signet_journal_chain::{ - JournalChainBuilder, JournalChainConfig, JournalChainError, JournalChainHandle, - JournalChainParts, RingBufferConfig, extract_signet_metadata, + JournalChainBuilder, JournalChainConfig, JournalChainError, JournalChainEvent, + JournalChainHandle, JournalChainParts, RingBufferConfig, extract_signet_metadata, }; -use signet_node_config::{JournalConfig, SignetNodeConfig}; +use signet_node_config::{JournalConfig, SignetNodeConfig, SyncStrategy}; use signet_node_types::{HostNotification, HostNotifier, RevertRange}; use signet_rpc::{ ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, @@ -27,8 +35,9 @@ use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{borrow::Cow, fmt, sync::Arc}; use tokio::{ sync::{mpsc, watch}, - task::{JoinError, JoinHandle}, + task::JoinHandle, }; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; use trevm::{ journal::{BundleStateIndex, JournalEncode}, @@ -91,6 +100,17 @@ where /// Join handle for the journal chain's ingestion task. journal_task: Option>>, + + /// The configured sync strategy. Selects the startup path in + /// [`Self::start_inner`]: block execution or journal application. + sync_strategy: SyncStrategy, + + /// Journal-sync machinery, present only under [`SyncStrategy::Journals`]. + /// Taken and spawned by [`Self::run_journal_sync`]. + journal_sync: Option>, + + /// Cancellation token for graceful shutdown. + cancellation_token: CancellationToken, } impl fmt::Debug for SignetNode @@ -124,7 +144,7 @@ where /// [`SignetNodeBuilder`]: crate::builder::SignetNodeBuilder #[doc(hidden)] #[allow(clippy::too_many_arguments)] - pub fn new_unsafe( + pub(crate) fn new_unsafe( notifier: N, config: SignetNodeConfig, storage: Arc>, @@ -133,20 +153,57 @@ where blob_cacher: CacheHandle, serve_config: ServeConfig, rpc_config: StorageRpcConfig, + cancellation_token: CancellationToken, ) -> eyre::Result<(Self, watch::Receiver)> { let constants = config.constants().wrap_err("failed to load signet constants from genesis")?; + let sync_strategy = config.journal().sync_strategy(); + config.journal().warn_on_misconfiguration(); + config.journal().validate().wrap_err("invalid journal configuration")?; + let (status, receiver) = watch::channel(NodeStatus::Booting); let chain = ChainNotifier::new(128); + // Under the journals strategy the chain feeds an ingestor through a backpressured sender; + // under the blocks strategy there is no such consumer and the producer drives the chain. + let backpressured = match sync_strategy { + SyncStrategy::Journals => Some(mpsc::channel(JOURNAL_SYNC_BACKPRESSURE_CAPACITY)), + SyncStrategy::Blocks => None, + }; + let (backpressured_sender, backpressured_receiver) = match backpressured { + Some((sender, receiver)) => (Some(sender), Some(receiver)), + None => (None, None), + }; + let JournalChainParts { chain: journal_chain, handle: journal_chain_handle, journal_sender, - } = build_journal_chain(config.journal())?; + } = build_journal_chain(config.journal(), backpressured_sender)?; let journal_task = journal_chain.run(); + // Build the syncing-node machinery up front (it needs storage to seed checkpoints); it is + // spawned later, once the RPC server is up. `backpressured_receiver` is `Some` exactly + // when the strategy is journals. + let journal_sync = backpressured_receiver + .map(|event_rx| { + let checkpoints = seed_journal_checkpoints(storage.as_ref())?; + let client = build_journal_client(config.journal(), checkpoints)?; + let sync_token = cancellation_token.child_token(); + let (height_tx, applied_rollup_height) = watch::channel(0u64); + let ingestor = JournalIngestor::new( + Arc::clone(&storage), + chain.clone(), + status.clone(), + event_rx, + sync_token.clone(), + height_tx, + ); + eyre::Ok(JournalSyncSetup { client, ingestor, sync_token, applied_rollup_height }) + }) + .transpose()?; + let this = Self { config: config.into(), notifier, @@ -163,6 +220,9 @@ where journal_chain_handle, journal_sender, journal_task: Some(journal_task), + sync_strategy, + journal_sync, + cancellation_token, }; Ok((this, receiver)) } @@ -224,6 +284,14 @@ where // Update the node status channel with last block height self.status.send_modify(|s| *s = NodeStatus::AtHeight(last_rollup_block)); + match self.sync_strategy { + SyncStrategy::Journals => self.run_journal_sync().await, + SyncStrategy::Blocks => self.run_block_sync(last_rollup_block).await, + } + } + + /// Drive the node by executing host blocks (the `blocks` strategy). + async fn run_block_sync(mut self, last_rollup_block: u64) -> eyre::Result<()> { // Set the head position and backfill thresholds on the notifier let host_height = match last_rollup_block { 0 => self.constants.host_deploy_height(), @@ -249,6 +317,10 @@ where let main_result: eyre::Result<()> = loop { tokio::select! { biased; + () = self.cancellation_token.cancelled() => { + info!("cancellation requested, shutting down block sync"); + break Ok(()); + } result = &mut journal_task => { return journal_task_result(result, JournalExitKind::Unexpected); }, @@ -281,6 +353,75 @@ where main_result.and(journal_result) } + /// Drive the node by applying journals from upstream sources (the `journals` + /// strategy): spawn the client and ingestor alongside the chain task, react + /// when the ingestor publishes progress, and either hand off to block + /// execution once the applied state reaches the host tip or wind everything + /// down on shutdown. + /// + /// A node that boots already at the host tip short-circuits straight to block + /// execution without connecting to any upstream source. + async fn run_journal_sync(mut self) -> eyre::Result<()> { + let mut journal_task = self + .journal_task + .take() + .expect("journal task should be set by new_unsafe and only taken here"); + + if is_caught_up(&self.notifier, self.storage.as_ref(), &self.constants).await { + info!("already caught up to host tip at startup, starting block execution"); + self.journal_task = Some(journal_task); + let rollup_tip = self.last_rollup_block()?; + return self.run_block_sync(rollup_tip).await; + } + + let setup = + self.journal_sync.take().expect("journal_sync must be set under the journals strategy"); + let mut sync = RunningJournalSync::start(setup, self.journal_sender.clone()); + info!("journal sync started"); + + let outcome = journal_sync_loop( + &mut self.notifier, + self.storage.as_ref(), + &self.constants, + &self.cancellation_token, + &mut journal_task, + &mut sync, + ) + .await; + + match outcome { + SyncOutcome::CaughtUp => { + info!("journal sync caught up to host tip, transitioning to block execution"); + // Stop the client and ingestor cooperatively; keep the chain alive so the + // producer path can keep pushing through `self.journal_sender` after the handoff. + // `cancel_and_drain` lets the ingestor apply any in-flight journals first, so + // storage catches up to the chain's tip. Any residual lead the chain still holds + // (from input buffered beyond what the ingestor drained, bounded by how far the + // upstream ran past storage at catch-up - at most the transition margin) is + // reconciled when block execution re-derives those heights: the chain sees + // identical journals and treats them as duplicates. + sync.cancel_and_drain().await; + let rollup_tip = self.last_rollup_block()?; + self.journal_task = Some(journal_task); + self.run_block_sync(rollup_tip).await + } + SyncOutcome::Shutdown => { + info!("cancellation requested, shutting down journal sync"); + // Closing the chain input lets the chain finish; `cancel_and_drain` finishes off + // the sync tasks. The parent token already cascaded, so the cancel inside the + // drain is just defensive. + drop(self.journal_sender); + sync.cancel_and_drain().await; + journal_task_result(journal_task.await, JournalExitKind::Expected) + } + SyncOutcome::Failure(failure) => { + drop(self.journal_sender); + sync.cancel_and_drain().await; + collapse_sync_failure(failure, journal_task).await + } + } + } + /// Runs on any notification received from the host. /// /// Drives the full per-notification pipeline: revert (if any), committed chain (if any), @@ -415,10 +556,9 @@ where /// /// Returns [`GENESIS_JOURNAL_HASH`] when the database is empty or only contains the genesis /// block, and also when the storage tip has no recorded journal hash - the persistence-off - /// startup path of `DESIGN.md` ยง5.5, also covering an upgrade from a pre-`JournalHashes` - /// build. In that fallback the next emit presents as the initial journal of a fresh chain; - /// downstream `/journal` consumers with cached checkpoints will fail validation and must - /// re-bootstrap. + /// startup path, also covering an upgrade from a pre-`JournalHashes` build. In that fallback + /// the next emit presents as the initial journal of a fresh chain; downstream `/journal` + /// consumers with cached checkpoints will fail validation and must re-bootstrap. /// /// The fallback is only sound while the in-process journal chain is itself fresh /// (`tip = None`); if the chain already holds a tip, emitting with `previous_hash = @@ -650,43 +790,19 @@ impl RevertOutcome { } } -/// Whether the journal chain ingestion task was expected to have exited at -/// the point of awaiting its join handle. -#[derive(Debug, Clone, Copy)] -enum JournalExitKind { - /// Awaited during shutdown after closing the input channel; a clean - /// exit is success. - Expected, - /// Awaited while still expecting to feed journals; any exit, clean or - /// otherwise, is fatal. - Unexpected, -} - -fn journal_task_result( - result: Result, JoinError>, - kind: JournalExitKind, -) -> eyre::Result<()> { - match result { - Ok(Ok(())) => match kind { - JournalExitKind::Expected => Ok(()), - JournalExitKind::Unexpected => { - Err(eyre!("journal chain ingestion task exited unexpectedly")) - } - }, - Ok(Err(error)) => Err(Report::new(error).wrap_err("journal chain ingestion task failed")), - Err(error) => { - Err(Report::new(error).wrap_err("journal chain ingestion task panicked or was aborted")) - } - } -} - -/// Build a store-less journal chain from the producer-side configuration. +/// Build a store-less journal chain. /// /// [`extract_signet_metadata`] is the parser the chain calls on every /// incoming journal to pull out the version tag, previous-journal hash, /// and block height it needs to validate continuity and index the entry. -fn build_journal_chain(config: &JournalConfig) -> eyre::Result { - config.warn_on_misconfiguration(); +/// +/// `backpressured_sender` is supplied only by a syncing node, so the chain +/// throttles ingestion to the ingestor's pace; a producing node leaves it +/// `None` and the chain drives only the broadcast channel. +fn build_journal_chain( + config: &JournalConfig, + backpressured_sender: Option>, +) -> eyre::Result { let chain_config = JournalChainConfig { ring_buffer: RingBufferConfig { max_bytes: config.ring_buffer_max_bytes(), @@ -695,9 +811,11 @@ fn build_journal_chain(config: &JournalConfig) -> eyre::Result