From a7122d7b3bf72345a6005019ccd8163c11145d1e Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Tue, 18 Nov 2025 13:51:55 -0500 Subject: [PATCH] feat: replace UpstreamChainFetcher with PeerNetworkInterface --- Cargo.lock | 49 +-- Cargo.toml | 1 - README.md | 2 +- modules/README.md | 2 +- modules/block_unpacker/README.md | 3 +- modules/mithril_snapshot_fetcher/README.md | 2 +- modules/upstream_chain_fetcher/Cargo.toml | 25 -- modules/upstream_chain_fetcher/NOTES.md | 40 --- modules/upstream_chain_fetcher/README.md | 113 ------- .../src/body_fetcher.rs | 238 -------------- .../src/upstream_chain_fetcher.rs | 294 ------------------ modules/upstream_chain_fetcher/src/utils.rs | 158 ---------- processes/omnibus/Cargo.toml | 1 - processes/omnibus/omnibus-sancho.toml | 4 +- processes/omnibus/omnibus.toml | 8 +- processes/omnibus/src/main.rs | 2 - processes/replayer/Cargo.toml | 2 +- processes/replayer/replayer-sancho.toml | 6 +- processes/replayer/replayer.toml | 8 +- processes/replayer/src/main.rs | 6 +- 20 files changed, 26 insertions(+), 938 deletions(-) delete mode 100644 modules/upstream_chain_fetcher/Cargo.toml delete mode 100644 modules/upstream_chain_fetcher/NOTES.md delete mode 100644 modules/upstream_chain_fetcher/README.md delete mode 100644 modules/upstream_chain_fetcher/src/body_fetcher.rs delete mode 100644 modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs delete mode 100644 modules/upstream_chain_fetcher/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index c19add57..c8c80009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,21 +444,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "acropolis_module_upstream_chain_fetcher" -version = "0.2.0" -dependencies = [ - "acropolis_common", - "anyhow", - "caryatid_sdk", - "config", - "crossbeam", - "pallas 0.33.0", - "serde", - "tokio", - "tracing", -] - [[package]] name = "acropolis_module_utxo_state" version = "0.1.0" @@ -503,7 +488,6 @@ dependencies = [ "acropolis_module_spo_state", "acropolis_module_stake_delta_filter", "acropolis_module_tx_unpacker", - "acropolis_module_upstream_chain_fetcher", "acropolis_module_utxo_state", "anyhow", "caryatid_module_clock", @@ -537,12 +521,12 @@ dependencies = [ "acropolis_module_governance_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", + "acropolis_module_peer_network_interface", "acropolis_module_rest_blockfrost", "acropolis_module_spdd_state", "acropolis_module_spo_state", "acropolis_module_stake_delta_filter", "acropolis_module_tx_unpacker", - "acropolis_module_upstream_chain_fetcher", "acropolis_module_utxo_state", "anyhow", "caryatid_module_clock", @@ -1899,28 +1883,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-channel" -version = "0.5.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -1940,15 +1902,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-skiplist" version = "0.1.3" diff --git a/Cargo.toml b/Cargo.toml index 15a3a48a..b6068d11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,6 @@ members = [ "modules/genesis_bootstrapper", # Genesis bootstrap UTXOs "modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher "modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot - "modules/upstream_chain_fetcher", # Upstream chain fetcher "modules/peer_network_interface", # Multi-peer network interface "modules/block_unpacker", # Block to transaction unpacker "modules/tx_unpacker", # Tx to UTXO unpacker diff --git a/README.md b/README.md index a3a1c60d..41cc4182 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ graph TB ## Modules -- [Upstream Chain Fetcher](modules/upstream_chain_fetcher) - +- [Peer Network Interface](modules/peer_network_interface) - implementation of the Node-to-Node (N2N) client-side (initiator) protocol, allowing chain synchronisation and block fetching - [Mithril Snapshot Fetcher](modules/mithril_snapshot_fetcher) - diff --git a/modules/README.md b/modules/README.md index f66c87bc..3d288502 100644 --- a/modules/README.md +++ b/modules/README.md @@ -3,7 +3,7 @@ This directory holds microservice modules for a Caryatid framework which compose the Acropolis Architecture -* [Upstream Chain Fetcher](upstream_chain_fetcher) - +* [Peer Network Interface](peer_network_interface) - implementation of the Node-to-Node (N2N) client-side (initiator) protocol, allowing chain synchronisation and block fetching * [Mithril Snapshot Fetcher](mithril_snapshot_fetcher) - diff --git a/modules/block_unpacker/README.md b/modules/block_unpacker/README.md index bad3e4c1..a4784854 100644 --- a/modules/block_unpacker/README.md +++ b/modules/block_unpacker/README.md @@ -20,8 +20,7 @@ publish-topic = "cardano.txs" ## Messages The block unpacker subscribes for RawBlockMessages on -`cardano.block.proposed` (see the [Upstream Chain -Fetcher](../upstream_chain_fetcher) module for details). It unpacks +`cardano.block.proposed` (see the [Consensus](../consensus) module for details). It unpacks this into transactions, which it publishes as a single RawTxsMessage on `cardano.txs`, containing the block information and an ordered vector of raw transaction CBOR. This ensure the transactions are kept in order. diff --git a/modules/mithril_snapshot_fetcher/README.md b/modules/mithril_snapshot_fetcher/README.md index 65eb5da1..d09f6a2f 100644 --- a/modules/mithril_snapshot_fetcher/README.md +++ b/modules/mithril_snapshot_fetcher/README.md @@ -9,7 +9,7 @@ a startup event before beginning to allow the When it has finished it sends a snapshot complete event indicating the last block fetched, which is used by the -[Upstream Chain Fetcher](../upstream_chain_fetcher) to synchronize ongoing +[Peer Network Interface](../peer_network_interface) to synchronize ongoing fetches. ## Configuration diff --git a/modules/upstream_chain_fetcher/Cargo.toml b/modules/upstream_chain_fetcher/Cargo.toml deleted file mode 100644 index 7e87dcbb..00000000 --- a/modules/upstream_chain_fetcher/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -# Acropolis upstream chain fetcher module - -[package] -name = "acropolis_module_upstream_chain_fetcher" -version = "0.2.0" -edition = "2021" -authors = ["Paul Clark "] -description = "Upstream chain fetcher Caryatid module for Acropolis" -license = "Apache-2.0" - -[dependencies] -acropolis_common = { path = "../../common" } - -caryatid_sdk = { workspace = true } - -anyhow = { workspace = true } -config = { workspace = true } -crossbeam = "0.8.4" -pallas = { workspace = true } -serde = { workspace = true, features = ["rc"] } -tokio = { workspace = true } -tracing = { workspace = true } - -[lib] -path = "src/upstream_chain_fetcher.rs" diff --git a/modules/upstream_chain_fetcher/NOTES.md b/modules/upstream_chain_fetcher/NOTES.md deleted file mode 100644 index bad87b97..00000000 --- a/modules/upstream_chain_fetcher/NOTES.md +++ /dev/null @@ -1,40 +0,0 @@ -## Network - -By default, messages are fetched from mainnet. However, other networks -are available (testnet, sanchonet). - -### Sanchonet -https://sancho.cardanoconnect.io/ -Sanchonet has the following stats: -Epoch 750 -Block Height 3203471 - -100 (400) blocks per minute -32030 (8000) minutes to sync -500 (150) hours to sync - -## Protocol versions -SanchoNet has version 6.0 (Alonzo, 2nd version, after intra-era hardfork) in genesis. -In epoch 2 it upgrades to 7.0 (Babbage) -In epoch 3 it upgrades to 8.0 (Babbage, Valentine HF). -It stays 8.0 till epoch 492, where it upgrades to 9.0 (Conway, ChangHF). - -Version numbers are taken from CIP-0059: -https://github.com/cardano-foundation/CIPs/blob/master/CIP-0059/feature-table.md - -## header `variant` field from Pallas header parser (chain-sync protocol) - -It seems `variant` is TipInfo from Haskell node or something similar. -ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs -This is indirect observation, but the epochs are numbered like this in -multiple occurrences and used to distinguish block version: - -``` - pattern TagByron x = Z x - pattern TagShelley x = S (Z x) - pattern TagAllegra x = S (S (Z x)) - pattern TagMary x = S (S (S (Z x))) - pattern TagAlonzo x = S (S (S (S (Z x)))) - pattern TagBabbage x = S (S (S (S (S (Z x))))) - pattern TagConway x = S (S (S (S (S (S (Z x)))))) -``` diff --git a/modules/upstream_chain_fetcher/README.md b/modules/upstream_chain_fetcher/README.md deleted file mode 100644 index b526db54..00000000 --- a/modules/upstream_chain_fetcher/README.md +++ /dev/null @@ -1,113 +0,0 @@ -# Upstream chain fetcher module - -The upstream chain fetcher module provides a Ouroboros network client using -ChainSync and BlockFetch to fetch blocks from a single upstream source. - -It can either run independently, either from the origin or current tip, or -be triggered by a Mithril snapshot event (the default) where it starts from -where the snapshot left off, and follows the chain from there. - -Rollbacks are handled by signalling in the block data - it is downstream -subscribers' responsibility to deal with the effects of this. - -## Configuration - -The following is the default configuration - if the defaults are OK, -everything except the section header can be left out. - -```toml -[module.upstream-chain-fetcher] - -# Upstream node connection -node-address = "backbone.cardano.iog.io:3001" -magic-number = 764824073 - -# Initial sync point -sync-point = "snapshot" # or "origin", "tip", "cache" - -# Message topics -header-topic = "cardano.block.header" -body-topic = "cardano.block.body" -snapshot-complete-topic = "cardano.snapshot.complete" -``` - -### Sync point modes (`sync-point` parameter) - -Upstream fetching is very slow, so it may be an acceptable optimisation -to take the initial part of the blockchain (which is produced long time ago and -cannot be changed anymore) from another (off-chain) source. -In another words, fetching may start not from the origin but from a different -synchronisation point. Here are the possible variants: - -* Fetch from the origin (`origin`, `tip` modes). No optimisations. - -* Fetch after snapshot is replayed (`snapshot`). Snapshot is downloaded -by other module, and when all snapshot messages are processed, that module -sends `SnapshotComplete` message in `snapshot-complete-topic` topic. -The last snapshot message then serves as the synchronisation point, after -which fetching is started. - -* Fetch after cache is replayed (`cache`). Similar to `origin` mode, -but the received messages are saved on disk into directory, specified -in `cache-dir` parameter (`upload-cache` is the default value). -When the node is restarted, the cached messages are not downloaded -again, but are taken from the directory instead. The last message in -cache serves as the synchronisation point. - -## Messages - -When the chain rolls forward, it sends a BlockHeaderMessage on topic -`cardano.block.header`, containing the slot number, header number and -raw CBOR of the header: - -```rust -pub enum BlockStatus -{ - Bootstrap, // Pseudo-block from bootstrap data - Immutable, // Now immutable (more than 'k' blocks ago) - Volatile, // Volatile, in sequence - RolledBack, // Volatile, restarted after rollback -} - -pub struct BlockInfo { - /// Block status - pub status: BlockStatus, - - /// Slot number - pub slot: u64, - - /// Block number - pub number: u64, - - /// Block hash - pub hash: Vec, -} - -pub struct BlockHeaderMessage { - /// Block info - pub block: BlockInfo, - - /// Raw Data - pub raw: Vec, -} - -``` - -It then fetches the corresponding block body and sends this as a -BlockBodyMessage on topic `cardano.block.body`, containing the slot -number and raw CBOR of the body: - -```rust -pub struct BlockBodyMessage { - /// Block info - pub block: BlockInfo, - - /// Raw Data - pub raw: Vec, -} -``` - -Note that the chain fetcher currently assumes everything is volatile. -If it gets a RollBackward from the upstream, it will remember this and -the next header and body message generated on RollForward will be -tagged with status `RolledBack`. diff --git a/modules/upstream_chain_fetcher/src/body_fetcher.rs b/modules/upstream_chain_fetcher/src/body_fetcher.rs deleted file mode 100644 index 6b453b3c..00000000 --- a/modules/upstream_chain_fetcher/src/body_fetcher.rs +++ /dev/null @@ -1,238 +0,0 @@ -//! Acropolis Miniprotocols module for Caryatid -//! Multi-connection, block body fetching part of the client (in separate thread). - -use acropolis_common::{ - messages::RawBlockMessage, - upstream_cache::{UpstreamCache, UpstreamCacheRecord}, - BlockHash, BlockInfo, BlockStatus, Era, -}; -use anyhow::{bail, Result}; -use crossbeam::channel::{Receiver, TryRecvError}; -use pallas::{ - ledger::traverse::MultiEraHeader, - network::{ - facades::PeerClient, - miniprotocols::{blockfetch, chainsync::HeaderContent, Point}, - }, -}; -use std::{sync::Arc, time::Duration}; -use tokio::{sync::Mutex, time::sleep}; -use tracing::{debug, error, info}; - -use crate::{ - utils, - utils::{ - FetchResult, - FetchResult::{NetworkError, Success}, - FetcherConfig, - }, -}; - -pub struct BodyFetcher { - cfg: Arc, - peer: PeerClient, - cache: Option>>, - - prev_epoch: Option, -} - -impl BodyFetcher { - async fn new( - cfg: Arc, - cache: Option>>, - prev_epoch: Option, - ) -> Result> { - let peer_opt = utils::peer_connect(cfg.clone(), "body fetcher").await?; - - match peer_opt { - NetworkError => Ok(NetworkError), - Success(peer) => Ok(Success(BodyFetcher { - cfg: cfg.clone(), - peer, - cache, - prev_epoch, - })), - } - } - - async fn fetch_block(&mut self, point: Point) -> Result>>> { - // Fetch the block body - debug!("Requesting single block {point:?}"); - let body = self.peer.blockfetch().fetch_single(point.clone()).await; - - match body { - Ok(body) => Ok(Success(Arc::new(body))), - Err(blockfetch::ClientError::Plexer(e)) => { - error!("Can't fetch block at {point:?}: {e}, will try to restart"); - Ok(NetworkError) - } - Err(e) => bail!("Irrecoverable error in blockfetch.fetch_single at {point:?}: {e}",), - } - } - - fn make_era(header: &MultiEraHeader, variant: u8) -> Result> { - // It seems that `variant` field is 'TipInfo' from Haskell Node: - // ouroboros-consensus-cardano/Ouroboros/Consensus/Cardano/Block.hs - // TODO: should we parse protocol version from header? - match header { - MultiEraHeader::EpochBoundary(_) => Ok(None), // Ignore EBBs - MultiEraHeader::Byron(_) => Ok(Some(Era::Byron)), - MultiEraHeader::ShelleyCompatible(_) => match variant { - // TPraos eras - 1 => Ok(Some(Era::Shelley)), - 2 => Ok(Some(Era::Allegra)), - 3 => Ok(Some(Era::Mary)), - 4 => Ok(Some(Era::Alonzo)), - x => bail!("Impossible header variant {x} for ShelleyCompatible (TPraos)"), - }, - MultiEraHeader::BabbageCompatible(_) => match variant { - // Praos eras - 5 => Ok(Some(Era::Babbage)), - 6 => Ok(Some(Era::Conway)), - x => bail!("Impossible header variant {x} for BabbaageCompatible (Praos)"), - }, - } - } - - fn make_block_info( - &self, - rolled_back: bool, - last_epoch: Option, - era: Era, - header: &MultiEraHeader, - ) -> Result { - let slot = header.slot(); - let number = header.number(); - let hash = *header.hash(); - - let (epoch, epoch_slot) = self.cfg.slot_to_epoch(slot); - let new_epoch = match last_epoch { - Some(last_epoch) => epoch != last_epoch, - None => true, - }; - let timestamp = self.cfg.slot_to_timestamp(slot); - - Ok(BlockInfo { - status: if rolled_back { - BlockStatus::RolledBack - } else { - BlockStatus::Volatile - }, // TODO vary with 'k' - slot, - number, - hash: BlockHash::from(hash), - epoch, - epoch_slot, - new_epoch, - timestamp, - era, - }) - } - - /// Returns Ok(None) if block could not be retrieved due to network problems - async fn fetch_and_construct( - &mut self, - block_info: &BlockInfo, - h: HeaderContent, - ) -> Result> { - // Fetch the block itself - note we need to - // reconstruct a Point from the header because the one we get - // in the RollForward is the *tip*, not the next read point - let fetch_point = Point::Specific(block_info.slot, block_info.hash.to_vec()); - let raw_body = match self.fetch_block(fetch_point).await? { - Success(body) => body, - NetworkError => return Ok(NetworkError), - }; - - let message = Arc::new(RawBlockMessage { - header: h.cbor, - body: raw_body.to_vec(), - }); - let record = UpstreamCacheRecord { - id: block_info.clone(), - message: message.clone(), - }; - - Ok(Success(record)) - } - - // Returns block info of the message, if it was successfully published and cached. - async fn fetch_and_publish( - &mut self, - rolled_back: bool, - h: HeaderContent, - ) -> Result>> { - // Get Byron sub-tag if any - let hdr_tag = match h.byron_prefix { - Some((tag, _)) => Some(tag), - _ => None, - }; - let hdr_variant = h.variant; - - // Decode header - let header = MultiEraHeader::decode(hdr_variant, hdr_tag, &h.cbor)?; - let era = match Self::make_era(&header, hdr_variant)? { - Some(era) => era, - None => return Ok(Success(None)), - }; - - // Build block info - let blk = self.make_block_info(rolled_back, self.prev_epoch, era, &header)?; - self.prev_epoch = Some(blk.epoch); - - // Fetch block body and construct record for caching/publishing - match self.fetch_and_construct(&blk, h).await? { - Success(record) => { - if blk.new_epoch { - info!( - blk.epoch, - blk.number, blk.slot, hdr_variant, hdr_tag, "New epoch" - ); - } - - if record.id.number % 100 == 0 { - info!("Publishing message {}", record.id.number); - } - - // Publish block body and write it to cache (if it is enabled) - if let Some(cache_mutex) = &self.cache { - let mut cache = cache_mutex.lock().await; - cache.write_record(&record)?; - } - utils::publish_message(self.cfg.clone(), &record).await?; - - Ok(Success(Some(blk))) - } - NetworkError => Ok(NetworkError), - } - } - - pub async fn run( - cfg: Arc, - cache: Option>>, - last_epoch: Option, - receiver: Receiver<(bool, HeaderContent)>, - ) -> Result> { - let fetcher_opt = Self::new(cfg, cache, last_epoch).await?; - let mut fetcher = match fetcher_opt { - Success(f) => f, - NetworkError => return Ok(None), - }; - - let mut last_successful_block = None; - loop { - match receiver.try_recv() { - Ok((rolled_back, header)) => { - match fetcher.fetch_and_publish(rolled_back, header).await? { - Success(b @ Some(_)) => last_successful_block = b, - Success(None) => (), - NetworkError => break, - } - } - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => sleep(Duration::from_millis(1)).await, - } - } - Ok(last_successful_block) - } -} diff --git a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs b/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs deleted file mode 100644 index 6cbdca43..00000000 --- a/modules/upstream_chain_fetcher/src/upstream_chain_fetcher.rs +++ /dev/null @@ -1,294 +0,0 @@ -//! Acropolis Miniprotocols module for Caryatid -//! Multi-connection, multi-protocol client interface to the Cardano node - -use acropolis_common::{ - genesis_values::GenesisValues, - messages::{CardanoMessage, Message}, - upstream_cache::{UpstreamCache, UpstreamCacheRecord}, - BlockInfo, -}; -use anyhow::{anyhow, bail, Result}; -use caryatid_sdk::{module, Context, Module, Subscription}; -use config::Config; -use crossbeam::channel::{bounded, Sender, TrySendError}; -use pallas::network::facades::PeerClient; -use pallas::network::miniprotocols::chainsync::{ClientError, HeaderContent}; -use pallas::{ - ledger::traverse::MultiEraHeader, - network::miniprotocols::{ - chainsync::{NextResponse, Tip}, - Point, - }, -}; -use std::{sync::Arc, time::Duration}; -use tokio::{sync::Mutex, time::sleep}; -use tracing::{debug, error, info}; - -mod body_fetcher; -mod utils; - -use crate::utils::FetchResult; -use body_fetcher::BodyFetcher; -use utils::{FetcherConfig, SyncPoint}; - -const MAX_BODY_FETCHER_CHANNEL_LENGTH: usize = 100; - -/// Upstream chain fetcher module -/// Parameterised by the outer message enum used on the bus -#[module( - message_type(Message), - name = "upstream-chain-fetcher", - description = "Mini-protocol chain fetcher from an upstream Cardano node" -)] -pub struct UpstreamChainFetcher; - -impl UpstreamChainFetcher { - async fn sync_to_point_loop( - sender: Sender<(bool, HeaderContent)>, - start: Point, - my_peer: &mut PeerClient, - ) -> Result<()> { - // Loop fetching messages - let mut rolled_back = false; - let mut response_count = 0; - - loop { - response_count += 1; - let next = match my_peer.chainsync().request_or_await_next().await { - Err(ClientError::Plexer(e)) => { - error!("Connection error for chainsync: {e}, will try to restart"); - return Ok(()); - } - Err(e) => bail!("Connection error for chainsync: {e}, exiting"), - Ok(next) => next, - }; - - match next { - NextResponse::RollForward(h, Tip(tip_point, _)) => { - debug!("RollForward, tip is {tip_point:?}"); - - let tag = match h.byron_prefix { - Some((tag, _)) => Some(tag), - _ => None, - }; - - if response_count % 100 == 0 { - let header = MultiEraHeader::decode(h.variant, tag, &h.cbor)?; - let number = header.number(); - info!("Fetching header {}", number); - } - - let mut for_send = (rolled_back, h); - - 'sender: loop { - for_send = match sender.try_send(for_send) { - Ok(()) => break 'sender, - Err(TrySendError::Full(fs)) => fs, - Err(TrySendError::Disconnected(_)) => { - error!("BodyFetcher disconnected, will try to restart"); - return Ok(()); - } - }; - sleep(Duration::from_millis(100)).await; - } - - rolled_back = false; - } - - // TODO The first message after sync start always comes with 'RollBackward'. - // Here we suppress this status (since it says nothing about actual rollbacks, - // but about our sync restart). Can there arise any problems? - NextResponse::RollBackward(point, _) if start == point && response_count == 1 => (), - - // TODO Handle RollBackward, publish sync message - NextResponse::RollBackward(point, _) => { - info!("RollBackward to {point:?}"); - rolled_back = true; - } - - _ => debug!("Ignoring message: {next:?}"), - } - } - } - - /// ChainSync client loop - fetch headers and pass it to body fetching thread - /// Returns last read block, if there is a reason to restart the loop. - /// If the loop did not read any block, returns None. - async fn sync_to_point_impl( - cfg: Arc, - cache: Option>>, - start: Point, - ) -> Result> { - // Find intersect to given point - let slot = start.slot_or_default(); - info!("Synchronising to slot {slot}"); - - let peer = utils::peer_connect(cfg.clone(), "header fetcher").await?; - let mut my_peer = match peer { - FetchResult::NetworkError => return Ok(None), - FetchResult::Success(p) => p, - }; - - // TODO: check for lost connection in find_intersect; skipped now to keep code simpler - let (start, _) = my_peer.chainsync().find_intersect(vec![start]).await?; - let start = start.ok_or(anyhow!("Intersection for slot {slot} not found"))?; - - let last_epoch: Option = match slot { - 0 => None, // If we're starting from origin - _ => Some(cfg.slot_to_epoch(slot).0), // From slot of last block - }; - - let (sender, receiver) = bounded(MAX_BODY_FETCHER_CHANNEL_LENGTH); - - let body_fetcher_handle = tokio::spawn(async move { - info!("Starting BodyFetcher..."); - BodyFetcher::run(cfg, cache, last_epoch, receiver).await - }); - - Self::sync_to_point_loop(sender, start, &mut my_peer).await?; - - let outcome = body_fetcher_handle.await??; - Ok(outcome) - } - - async fn sync_to_point( - cfg: Arc, - cache: Option>>, - mut start: Point, - ) -> Result<()> { - loop { - let stops_at = - Self::sync_to_point_impl(cfg.clone(), cache.clone(), start.clone()).await?; - - if let Some(blk) = stops_at { - start = Point::new(blk.slot, blk.hash.to_vec()); - } - } - } - - async fn read_cache( - cfg: Arc, - cache: &mut UpstreamCache, - ) -> Result> { - let mut last_block = None; - cache.start_reading()?; - - while let Some(record) = cache.read_record()? { - last_block = Some(record.id.clone()); - utils::publish_message(cfg.clone(), &record).await?; - cache.next_record()?; - } - - Ok(last_block) - } - - async fn wait_genesis_completion( - subscription: &mut Box>, - ) -> Result { - let (_, message) = subscription.read().await?; - match message.as_ref() { - Message::Cardano((_, CardanoMessage::GenesisComplete(complete))) => { - Ok(complete.values.clone()) - } - msg => bail!("Unexpected message in genesis completion topic: {msg:?}"), - } - } - - async fn wait_snapshot_completion( - subscription: &mut Box>, - ) -> Result> { - let Ok((_, message)) = subscription.read().await else { - return Ok(None); - }; - - match message.as_ref() { - Message::Cardano((blk, CardanoMessage::SnapshotComplete)) => Ok(Some(blk.clone())), - msg => bail!("Unexpected message in completion topic: {msg:?}"), - } - } - - async fn run_chain_sync( - cfg: Arc, - snapshot_complete: &mut Option>>, - ) -> Result<()> { - match cfg.sync_point { - SyncPoint::Tip => { - // Ask for origin but get the tip as well - let mut peer = match utils::peer_connect(cfg.clone(), "tip fetcher").await? { - FetchResult::NetworkError => bail!("Cannot get tip: network error"), - FetchResult::Success(p) => p, - }; - - let (_, Tip(point, _)) = - peer.chainsync().find_intersect(vec![Point::Origin]).await?; - Self::sync_to_point(cfg, None, point).await?; - } - SyncPoint::Origin => { - Self::sync_to_point(cfg, None, Point::Origin).await?; - } - SyncPoint::Cache => { - let mut upstream_cache = UpstreamCache::new(&cfg.cache_dir)?; - let point = match Self::read_cache(cfg.clone(), &mut upstream_cache).await? { - None => Point::Origin, - Some(blk) => Point::Specific(blk.slot, blk.hash.to_vec()), - }; - - let upstream_cache_mutex = Arc::new(Mutex::new(upstream_cache)); - Self::sync_to_point(cfg, Some(upstream_cache_mutex), point).await?; - } - SyncPoint::Snapshot => { - info!( - "Waiting for snapshot completion on {}", - cfg.snapshot_completion_topic - ); - let completion_subscription = snapshot_complete - .as_mut() - .ok_or_else(|| anyhow!("Snapshot topic subscription missing"))?; - - match Self::wait_snapshot_completion(completion_subscription).await? { - Some(block) => { - info!( - "Notified snapshot complete at slot {} block number {}", - block.slot, block.number - ); - let point = Point::Specific(block.slot, block.hash.to_vec()); - Self::sync_to_point(cfg, None, point).await?; - } - None => info!("Completion not received. Exiting ..."), - } - } - } - Ok(()) - } - - /// Main init function - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - let mut cfg = FetcherConfig::new(context.clone(), config)?; - let genesis_complete = if cfg.genesis_values.is_none() { - Some(cfg.context.subscribe(&cfg.genesis_completion_topic).await?) - } else { - None - }; - let mut snapshot_complete = match cfg.sync_point { - SyncPoint::Snapshot => { - Some(cfg.context.subscribe(&cfg.snapshot_completion_topic).await?) - } - _ => None, - }; - - context.clone().run(async move { - if let Some(mut genesis_complete) = genesis_complete { - let genesis = Self::wait_genesis_completion(&mut genesis_complete) - .await - .unwrap_or_else(|err| panic!("could not fetch genesis: {err}")); - cfg.genesis_values = Some(genesis); - } - let cfg = Arc::new(cfg); - Self::run_chain_sync(cfg, &mut snapshot_complete) - .await - .unwrap_or_else(|e| error!("Chain sync failed: {e}")); - }); - - Ok(()) - } -} diff --git a/modules/upstream_chain_fetcher/src/utils.rs b/modules/upstream_chain_fetcher/src/utils.rs deleted file mode 100644 index 14b5075d..00000000 --- a/modules/upstream_chain_fetcher/src/utils.rs +++ /dev/null @@ -1,158 +0,0 @@ -use crate::UpstreamCacheRecord; -use acropolis_common::genesis_values::GenesisValues; -use acropolis_common::messages::{CardanoMessage, Message}; -use acropolis_common::GenesisDelegates; -use anyhow::{anyhow, bail, Result}; -use caryatid_sdk::Context; -use config::Config; -use pallas::network::facades; -use pallas::network::facades::PeerClient; -use serde::Deserialize; -use std::sync::Arc; -use tracing::{error, info}; - -const DEFAULT_BLOCK_TOPIC: (&str, &str) = ("block-topic", "cardano.block.available"); -const DEFAULT_SNAPSHOT_COMPLETION_TOPIC: (&str, &str) = - ("snapshot-completion-topic", "cardano.snapshot.complete"); -const DEFAULT_GENESIS_COMPLETION_TOPIC: (&str, &str) = - ("genesis-completion-topic", "cardano.sequence.bootstrapped"); - -const DEFAULT_NODE_ADDRESS: (&str, &str) = ("node-address", "backbone.cardano.iog.io:3001"); -const DEFAULT_MAGIC_NUMBER: (&str, u64) = ("magic-number", 764824073); - -const DEFAULT_SYNC_POINT: (&str, SyncPoint) = ("sync-point", SyncPoint::Snapshot); -const DEFAULT_CACHE_DIR: (&str, &str) = ("cache-dir", "upstream-cache"); - -const BYRON_TIMESTAMP: &str = "byron-timestamp"; -const SHELLEY_EPOCH: &str = "shelley-epoch"; -const SHELLEY_EPOCH_LEN: &str = "shelley-epoch-len"; -const SHELLEY_GENESIS_HASH: &str = "shelley-genesis-hash"; - -#[derive(Clone, Debug, serde::Deserialize, PartialEq)] -pub enum SyncPoint { - #[serde(rename = "origin")] - Origin, - #[serde(rename = "tip")] - Tip, - #[serde(rename = "cache")] - Cache, - #[serde(rename = "snapshot")] - Snapshot, -} - -pub struct FetcherConfig { - pub context: Arc>, - pub block_topic: String, - pub sync_point: SyncPoint, - pub snapshot_completion_topic: String, - pub genesis_completion_topic: String, - pub node_address: String, - pub magic_number: u64, - pub cache_dir: String, - - pub genesis_values: Option, -} - -/// Custom option type --- for the purpose of code clarity. -/// Represents outcome of network operation. -pub enum FetchResult { - Success(T), - NetworkError, -} - -impl FetcherConfig { - fn conf(config: &Arc, keydef: (&str, &str)) -> String { - let actual = config.get_string(keydef.0).unwrap_or(keydef.1.to_string()); - info!("Parameter value '{}' for {}", actual, keydef.0); - actual - } - - fn conf_enum<'a, T: Deserialize<'a> + std::fmt::Debug>( - config: &Arc, - keydef: (&str, T), - ) -> Result { - let actual = if config.get_string(keydef.0).is_ok() { - config - .get::(keydef.0) - .map_err(|e| anyhow!("cannot parse {} value: {e}", keydef.0))? - } else { - keydef.1 - }; - info!("Parameter value '{actual:?}' for {}", keydef.0); - Ok(actual) - } - - fn conf_genesis(config: &Arc) -> Option { - let byron_timestamp = config.get(BYRON_TIMESTAMP).ok()?; - let shelley_epoch = config.get(SHELLEY_EPOCH).ok()?; - let shelley_epoch_len = config.get(SHELLEY_EPOCH_LEN).ok()?; - let shelley_genesis_hash = - config.get::(SHELLEY_GENESIS_HASH).ok()?.as_bytes().try_into().unwrap(); - Some(GenesisValues { - byron_timestamp, - shelley_epoch, - shelley_epoch_len, - shelley_genesis_hash, - // TODO: load genesis keys from config - genesis_delegs: GenesisDelegates::try_from(vec![]).unwrap(), - }) - } - - pub fn new(context: Arc>, config: Arc) -> Result { - Ok(Self { - context, - block_topic: Self::conf(&config, DEFAULT_BLOCK_TOPIC), - snapshot_completion_topic: Self::conf(&config, DEFAULT_SNAPSHOT_COMPLETION_TOPIC), - genesis_completion_topic: Self::conf(&config, DEFAULT_GENESIS_COMPLETION_TOPIC), - sync_point: Self::conf_enum::(&config, DEFAULT_SYNC_POINT)?, - magic_number: config - .get::(DEFAULT_MAGIC_NUMBER.0) - .unwrap_or(DEFAULT_MAGIC_NUMBER.1), - node_address: Self::conf(&config, DEFAULT_NODE_ADDRESS), - cache_dir: Self::conf(&config, DEFAULT_CACHE_DIR), - genesis_values: Self::conf_genesis(&config), - }) - } - - pub fn slot_to_epoch(&self, slot: u64) -> (u64, u64) { - self.genesis_values.as_ref().unwrap().slot_to_epoch(slot) - } - - pub fn slot_to_timestamp(&self, slot: u64) -> u64 { - self.genesis_values.as_ref().unwrap().slot_to_timestamp(slot) - } -} - -pub async fn publish_message(cfg: Arc, record: &UpstreamCacheRecord) -> Result<()> { - let message = Arc::new(Message::Cardano(( - record.id.clone(), - CardanoMessage::BlockAvailable((*record.message).clone()), - ))); - - cfg.context.message_bus.publish(&cfg.block_topic, message).await -} - -pub async fn peer_connect(cfg: Arc, role: &str) -> Result> { - info!( - "Connecting {role} to {} ({}) ...", - cfg.node_address, cfg.magic_number - ); - - match PeerClient::connect(cfg.node_address.clone(), cfg.magic_number).await { - Ok(peer) => { - info!("Connected"); - Ok(FetchResult::Success(peer)) - } - - Err(facades::Error::ConnectFailure(e)) => { - error!("Network error: {e}"); - Ok(FetchResult::NetworkError) - } - - Err(e) => bail!( - "Cannot connect {role} to {} ({}): {e}", - cfg.node_address, - cfg.magic_number - ), - } -} diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index bab79696..d5146674 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -11,7 +11,6 @@ license = "Apache-2.0" acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_mithril_snapshot_fetcher = { path = "../../modules/mithril_snapshot_fetcher" } -acropolis_module_upstream_chain_fetcher = { path = "../../modules/upstream_chain_fetcher" } acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } acropolis_module_tx_unpacker = { path = "../../modules/tx_unpacker" } diff --git a/processes/omnibus/omnibus-sancho.toml b/processes/omnibus/omnibus-sancho.toml index eb7d9c6e..8ec08446 100644 --- a/processes/omnibus/omnibus-sancho.toml +++ b/processes/omnibus/omnibus-sancho.toml @@ -11,9 +11,9 @@ network-name = "sanchonet" # "sanchonet", "mainnet" [module.rest-blockfrost] -[module.upstream-chain-fetcher] +[module.peer-network-interface] sync-point = "cache" #"cache" # "origin", "tip", "snapshot" -node-address = "sancho-testnet.able-pool.io:6002" +node-addresses = ["sancho-testnet.able-pool.io:6002"] magic-number = 4 [module.block-unpacker] diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index b10f5ec0..ed8e16b2 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -10,9 +10,13 @@ download-max-age = "never" # Pause constraint E.g. "epoch:100", "block:1200" pause = "none" -[module.upstream-chain-fetcher] +[module.peer-network-interface] sync-point = "snapshot" -node-address = "backbone.cardano.iog.io:3001" +node-addresses = [ + "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", +] magic-number = 764824073 [module.consensus] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 6bf7e1d2..9d28e392 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -30,7 +30,6 @@ use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; use acropolis_module_stake_delta_filter::StakeDeltaFilter; use acropolis_module_tx_unpacker::TxUnpacker; -use acropolis_module_upstream_chain_fetcher::UpstreamChainFetcher; use acropolis_module_utxo_state::UTXOState; use caryatid_module_clock::Clock; @@ -103,7 +102,6 @@ pub async fn main() -> Result<()> { // Register modules GenesisBootstrapper::register(&mut process); MithrilSnapshotFetcher::register(&mut process); - UpstreamChainFetcher::register(&mut process); BlockUnpacker::register(&mut process); PeerNetworkInterface::register(&mut process); TxUnpacker::register(&mut process); diff --git a/processes/replayer/Cargo.toml b/processes/replayer/Cargo.toml index d34220c9..b59e97f5 100644 --- a/processes/replayer/Cargo.toml +++ b/processes/replayer/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" acropolis_common = { path = "../../common" } acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } acropolis_module_mithril_snapshot_fetcher = { path = "../../modules/mithril_snapshot_fetcher" } -acropolis_module_upstream_chain_fetcher = { path = "../../modules/upstream_chain_fetcher" } +acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } acropolis_module_tx_unpacker = { path = "../../modules/tx_unpacker" } acropolis_module_utxo_state = { path = "../../modules/utxo_state" } diff --git a/processes/replayer/replayer-sancho.toml b/processes/replayer/replayer-sancho.toml index 3a333983..f696065b 100644 --- a/processes/replayer/replayer-sancho.toml +++ b/processes/replayer/replayer-sancho.toml @@ -23,13 +23,13 @@ path = "governance-logs" [module.rest-blockfrost] -[module.upstream-chain-fetcher] +[module.peer-network-interface] sync-point = "cache" #"cache" # "origin", "tip", "snapshot" -node-address = "sancho-testnet.able-pool.io:6002" +node-addresses = ["sancho-testnet.able-pool.io:6002"] magic-number = 4 shelley-epoch = 0 shelley-epoch-len = 86400 -shelley-genesis-hash = f94457ec45a0c6773057a529533cf7ccf746cb44dabd56ae970e1dbfb55bfdb2 +shelley-genesis-hash = "f94457ec45a0c6773057a529533cf7ccf746cb44dabd56ae970e1dbfb55bfdb2" [module.block-unpacker] diff --git a/processes/replayer/replayer.toml b/processes/replayer/replayer.toml index 2e4044e7..54c1c371 100644 --- a/processes/replayer/replayer.toml +++ b/processes/replayer/replayer.toml @@ -23,9 +23,13 @@ download-max-age = "never" # Pause constraint E.g. "epoch:100", "block:1200" pause = "none" -[module.upstream-chain-fetcher] +[module.peer-network-interface] sync-point = "snapshot" -node-address = "backbone.cardano.iog.io:3001" +node-addresses = [ + "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", +] magic-number = 764824073 #[module.consensus] diff --git a/processes/replayer/src/main.rs b/processes/replayer/src/main.rs index e41d1f3e..a4576f7a 100644 --- a/processes/replayer/src/main.rs +++ b/processes/replayer/src/main.rs @@ -25,12 +25,12 @@ use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_governance_state::GovernanceState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; +use acropolis_module_peer_network_interface::PeerNetworkInterface; use acropolis_module_rest_blockfrost::BlockfrostREST; use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; use acropolis_module_stake_delta_filter::StakeDeltaFilter; use acropolis_module_tx_unpacker::TxUnpacker; -use acropolis_module_upstream_chain_fetcher::UpstreamChainFetcher; use acropolis_module_utxo_state::UTXOState; use caryatid_module_clock::Clock; @@ -56,7 +56,7 @@ fn setup_governance_collect(process: &mut dyn ModuleRegistry) { tracing::info!("Collecting"); GenesisBootstrapper::register(process); MithrilSnapshotFetcher::register(process); - UpstreamChainFetcher::register(process); + PeerNetworkInterface::register(process); BlockUnpacker::register(process); TxUnpacker::register(process); UTXOState::register(process); @@ -83,7 +83,7 @@ fn setup_alonzo_governance_collect(process: &mut dyn ModuleRegistry) { tracing::info!("Collecting"); GenesisBootstrapper::register(process); MithrilSnapshotFetcher::register(process); - UpstreamChainFetcher::register(process); + PeerNetworkInterface::register(process); BlockUnpacker::register(process); TxUnpacker::register(process); /*