From 237e8d2c33f564504616701f2089571184cfd48f Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 00:31:52 +0000 Subject: [PATCH 01/14] feat: add SyncCommand channel to peer network interface Signed-off-by: William Hankins --- common/src/commands/mod.rs | 1 + common/src/commands/sync.rs | 13 +++++++ common/src/messages.rs | 3 ++ .../config.default.toml | 2 + .../src/configuration.rs | 1 + modules/peer_network_interface/src/network.rs | 5 ++- .../src/peer_network_interface.rs | 38 +++++++++++++++---- 7 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 common/src/commands/sync.rs diff --git a/common/src/commands/mod.rs b/common/src/commands/mod.rs index 0824d7a9..14763b59 100644 --- a/common/src/commands/mod.rs +++ b/common/src/commands/mod.rs @@ -1 +1,2 @@ +pub mod sync; pub mod transactions; diff --git a/common/src/commands/sync.rs b/common/src/commands/sync.rs new file mode 100644 index 00000000..ef281238 --- /dev/null +++ b/common/src/commands/sync.rs @@ -0,0 +1,13 @@ +use crate::{BlockHash, Slot}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct SyncCommand { + slot: Slot, + hash: BlockHash, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum SyncCommandResponse { + Success, + Error(String), +} diff --git a/common/src/messages.rs b/common/src/messages.rs index 674187dc..c87cc15d 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,6 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] +use crate::commands::sync::{SyncCommand, SyncCommandResponse}; use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; @@ -453,9 +454,11 @@ pub enum StateQueryResponse { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Command { Transactions(TransactionsCommand), + Sync(SyncCommand), } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum CommandResponse { Transactions(TransactionsCommandResponse), + Sync(SyncCommandResponse), } diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml index c48d02dc..2212c7de 100644 --- a/modules/peer_network_interface/config.default.toml +++ b/modules/peer_network_interface/config.default.toml @@ -4,6 +4,8 @@ block-topic = "cardano.block.available" snapshot-completion-topic = "cardano.snapshot.complete" # The topic to wait for when listening for genesis values from another module genesis-completion-topic = "cardano.sequence.bootstrapped" +# The topic to listen on for runtime sync commands +sync-command-topic = "cardano.sync.command" # Upstream node connections node-addresses = [ diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs index 9f02ebb0..35affc28 100644 --- a/modules/peer_network_interface/src/configuration.rs +++ b/modules/peer_network_interface/src/configuration.rs @@ -20,6 +20,7 @@ pub struct InterfaceConfig { pub sync_point: SyncPoint, pub snapshot_completion_topic: String, pub genesis_completion_topic: String, + pub sync_command_topic: String, pub node_addresses: Vec, pub magic_number: u64, pub cache_dir: PathBuf, diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 450eaa97..6b0a4158 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -5,7 +5,7 @@ use crate::{ chain_state::ChainState, connection::{PeerChainSyncEvent, PeerConnection, PeerEvent}, }; -use acropolis_common::BlockHash; +use acropolis_common::{BlockHash, commands::sync::SyncCommand}; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; @@ -55,6 +55,7 @@ pub struct NetworkManager { events_sender: mpsc::Sender, block_sink: BlockSink, published_blocks: u64, + cmd_rx: mpsc::Receiver, } impl NetworkManager { @@ -63,6 +64,7 @@ impl NetworkManager { events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, + cmd_rx: mpsc::Receiver, ) -> Self { Self { network_magic, @@ -73,6 +75,7 @@ impl NetworkManager { events_sender, block_sink, published_blocks: 0, + cmd_rx, } } diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index bea47e78..4a3ee05a 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -5,8 +5,9 @@ mod network; use acropolis_common::{ BlockInfo, BlockStatus, + commands::sync::{SyncCommand, SyncCommandResponse}, genesis_values::GenesisValues, - messages::{CardanoMessage, Message, RawBlockMessage}, + messages::{CardanoMessage, Command, CommandResponse, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord}, }; use anyhow::{Result, bail}; @@ -43,6 +44,24 @@ impl PeerNetworkInterface { SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?), _ => None, }; + let (cmd_tx, cmd_rx) = mpsc::channel::(32); + + context.handle(&cfg.sync_command_topic, move |message| { + let cmd_tx = cmd_tx.clone(); + async move { + let Message::Command(Command::Sync(point)) = message.as_ref() else { + return Arc::new(Message::CommandResponse(CommandResponse::Sync( + SyncCommandResponse::Error("Invalid message for sync command".to_string()), + ))); + }; + + let _ = cmd_tx.send(point.clone()).await; + + Arc::new(Message::CommandResponse(CommandResponse::Sync( + SyncCommandResponse::Success, + ))) + } + }); context.clone().run(async move { let genesis_values = if let Some(mut sub) = genesis_complete { @@ -82,12 +101,12 @@ impl PeerNetworkInterface { let manager = match cfg.sync_point { SyncPoint::Origin => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, cmd_rx); manager.sync_to_point(Point::Origin); manager } SyncPoint::Tip => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, cmd_rx); if let Err(error) = manager.sync_to_tip().await { warn!("could not sync to tip: {error:#}"); return; @@ -95,7 +114,7 @@ impl PeerNetworkInterface { manager } SyncPoint::Cache => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, cmd_rx); manager.sync_to_point(cache_sync_point); manager } @@ -108,7 +127,7 @@ impl PeerNetworkInterface { let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); sink.last_epoch = Some(epoch); } - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, cmd_rx); manager.sync_to_point(point); manager } @@ -128,9 +147,14 @@ impl PeerNetworkInterface { Ok(()) } - fn init_manager(cfg: InterfaceConfig, sink: BlockSink) -> NetworkManager { + fn init_manager( + cfg: InterfaceConfig, + sink: BlockSink, + cmd_rx: mpsc::Receiver, + ) -> NetworkManager { let (events_sender, events) = mpsc::channel(1024); - let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); + let mut manager = + NetworkManager::new(cfg.magic_number, events, events_sender, sink, cmd_rx); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } From d85e915e2b943d6f483e2988e3347e3f0f611011 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:07:33 +0000 Subject: [PATCH 02/14] refactor: await inital sync command before initializing network manager Signed-off-by: William Hankins --- common/src/commands/sync.rs | 11 +- common/src/messages.rs | 3 +- .../config.default.toml | 1 + .../src/configuration.rs | 1 + modules/peer_network_interface/src/network.rs | 55 ++++++++-- .../src/peer_network_interface.rs | 100 +++++++++++++----- 6 files changed, 126 insertions(+), 45 deletions(-) diff --git a/common/src/commands/sync.rs b/common/src/commands/sync.rs index ef281238..761aae7e 100644 --- a/common/src/commands/sync.rs +++ b/common/src/commands/sync.rs @@ -1,13 +1,6 @@ use crate::{BlockHash, Slot}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct SyncCommand { - slot: Slot, - hash: BlockHash, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum SyncCommandResponse { - Success, - Error(String), +pub enum SyncCommand { + ChangeSyncPoint { slot: Slot, hash: BlockHash }, } diff --git a/common/src/messages.rs b/common/src/messages.rs index c87cc15d..df28e3ca 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,7 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] -use crate::commands::sync::{SyncCommand, SyncCommandResponse}; +use crate::commands::sync::SyncCommand; use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; @@ -460,5 +460,4 @@ pub enum Command { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum CommandResponse { Transactions(TransactionsCommandResponse), - Sync(SyncCommandResponse), } diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml index 2212c7de..5babba18 100644 --- a/modules/peer_network_interface/config.default.toml +++ b/modules/peer_network_interface/config.default.toml @@ -21,6 +21,7 @@ magic-number = 764824073 # - "tip": sync from the very end of the chain # - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache. # - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot. +# - "dynamic": awaits a sync command to begin fetching blocks, can change sync point at runtime. sync-point = "snapshot" # The cache dir to use when sync-point is "cache" cache-dir = "upstream-cache" \ No newline at end of file diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs index 35affc28..40461a45 100644 --- a/modules/peer_network_interface/src/configuration.rs +++ b/modules/peer_network_interface/src/configuration.rs @@ -11,6 +11,7 @@ pub enum SyncPoint { Tip, Cache, Snapshot, + Dynamic, } #[derive(serde::Deserialize)] diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 6b0a4158..d64e4929 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -5,7 +5,7 @@ use crate::{ chain_state::ChainState, connection::{PeerChainSyncEvent, PeerConnection, PeerEvent}, }; -use acropolis_common::{BlockHash, commands::sync::SyncCommand}; +use acropolis_common::BlockHash; use anyhow::{Context as _, Result, bail}; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; @@ -55,7 +55,7 @@ pub struct NetworkManager { events_sender: mpsc::Sender, block_sink: BlockSink, published_blocks: u64, - cmd_rx: mpsc::Receiver, + cmd_rx: Option>, } impl NetworkManager { @@ -64,7 +64,7 @@ impl NetworkManager { events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, - cmd_rx: mpsc::Receiver, + cmd_rx: Option>, ) -> Self { Self { network_magic, @@ -80,15 +80,52 @@ impl NetworkManager { } pub async fn run(mut self) -> Result<()> { - while let Some(event) = self.events.recv().await { - match event { - NetworkEvent::PeerUpdate { peer, event } => { - self.handle_peer_update(peer, event); - self.publish_blocks().await?; + loop { + tokio::select! { + cmd = async { + if let Some(rx) = &mut self.cmd_rx { + rx.recv().await + } else { + std::future::pending().await + } + } => { + self.on_sync_cmd(cmd).await?; + }, + + event = self.events.recv() => { + self.on_network_event(event).await?; } } } - bail!("event sink closed") + } + + async fn on_sync_cmd(&mut self, cmd: Option) -> Result<()> { + let Some(cmd) = cmd else { + return Ok(()); + }; + + self.handle_sync_command(cmd).await + } + + async fn on_network_event(&mut self, event: Option) -> Result<()> { + let Some(NetworkEvent::PeerUpdate { peer, event }) = event else { + bail!("event sink closed"); + }; + + self.handle_peer_update(peer, event); + self.publish_blocks().await?; + Ok(()) + } + + pub async fn handle_sync_command(&mut self, point: Point) -> Result<()> { + self.chain = ChainState::new(); + + for peer in self.peers.values_mut() { + peer.reqs.clear(); + peer.find_intersect(vec![point.clone()]); + } + + Ok(()) } pub fn handle_new_connection(&mut self, address: String, delay: Duration) { diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 4a3ee05a..11c8540f 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -5,9 +5,9 @@ mod network; use acropolis_common::{ BlockInfo, BlockStatus, - commands::sync::{SyncCommand, SyncCommandResponse}, + commands::sync::SyncCommand, genesis_values::GenesisValues, - messages::{CardanoMessage, Command, CommandResponse, Message, RawBlockMessage}, + messages::{CardanoMessage, Command, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord}, }; use anyhow::{Result, bail}; @@ -15,7 +15,7 @@ use caryatid_sdk::{Context, Module, Subscription, module}; use config::Config; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use std::{path::Path, sync::Arc, time::Duration}; @@ -44,24 +44,13 @@ impl PeerNetworkInterface { SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?), _ => None, }; - let (cmd_tx, cmd_rx) = mpsc::channel::(32); - context.handle(&cfg.sync_command_topic, move |message| { - let cmd_tx = cmd_tx.clone(); - async move { - let Message::Command(Command::Sync(point)) = message.as_ref() else { - return Arc::new(Message::CommandResponse(CommandResponse::Sync( - SyncCommandResponse::Error("Invalid message for sync command".to_string()), - ))); - }; - - let _ = cmd_tx.send(point.clone()).await; - - Arc::new(Message::CommandResponse(CommandResponse::Sync( - SyncCommandResponse::Success, - ))) - } - }); + // Create background task to foward sync commands to NetworkManager + let mut cmd_rx = if cfg.sync_point == SyncPoint::Dynamic { + Some(Self::spawn_command_forwarder(context.clone(), &cfg.sync_command_topic).await?) + } else { + None + }; context.clone().run(async move { let genesis_values = if let Some(mut sub) = genesis_complete { @@ -101,12 +90,12 @@ impl PeerNetworkInterface { let manager = match cfg.sync_point { SyncPoint::Origin => { - let mut manager = Self::init_manager(cfg, sink, cmd_rx); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(Point::Origin); manager } SyncPoint::Tip => { - let mut manager = Self::init_manager(cfg, sink, cmd_rx); + let mut manager = Self::init_manager(cfg, sink, None); if let Err(error) = manager.sync_to_tip().await { warn!("could not sync to tip: {error:#}"); return; @@ -114,7 +103,7 @@ impl PeerNetworkInterface { manager } SyncPoint::Cache => { - let mut manager = Self::init_manager(cfg, sink, cmd_rx); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(cache_sync_point); manager } @@ -127,7 +116,7 @@ impl PeerNetworkInterface { let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); sink.last_epoch = Some(epoch); } - let mut manager = Self::init_manager(cfg, sink, cmd_rx); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(point); manager } @@ -137,6 +126,36 @@ impl PeerNetworkInterface { } } } + SyncPoint::Dynamic => { + let mut rx = match cmd_rx.take() { + Some(rx) => rx, + None => { + warn!("Dynamic mode configured but cmd_rx is missing"); + return; + } + }; + + let point = match Self::wait_sync_command(&mut rx).await { + Ok(Point::Specific(slot, hash)) => { + let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); + sink.last_epoch = Some(epoch); + info!("Dynamic sync starting at slot {} (epoch {})", slot, epoch); + Point::Specific(slot, hash) + } + Ok(Point::Origin) => { + warn!("Dynamic sync received Point::Origin; ignoring"); + return; + } + Err(err) => { + warn!("Failed to receive initial sync command: {err:#}"); + return; + } + }; + + let mut manager = Self::init_manager(cfg, sink, Some(rx)); + manager.sync_to_point(point); + manager + } }; if let Err(err) = manager.run().await { @@ -150,7 +169,7 @@ impl PeerNetworkInterface { fn init_manager( cfg: InterfaceConfig, sink: BlockSink, - cmd_rx: mpsc::Receiver, + cmd_rx: Option>, ) -> NetworkManager { let (events_sender, events) = mpsc::channel(1024); let mut manager = @@ -207,6 +226,37 @@ impl PeerNetworkInterface { msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), } } + + async fn wait_sync_command(rx: &mut mpsc::Receiver) -> Result { + match rx.recv().await { + Some(point) => Ok(point), + None => Err(anyhow::anyhow!( + "Channel closed before receiving a start point" + )), + } + } + + async fn spawn_command_forwarder( + context: Arc>, + topic: &str, + ) -> Result> { + let (tx, rx) = mpsc::channel::(32); + + let mut sub = context.subscribe(topic).await?; + tokio::spawn(async move { + while let Ok((_, msg)) = sub.read().await { + if let Message::Command(Command::Sync(SyncCommand::ChangeSyncPoint { + slot, + hash, + })) = msg.as_ref() + { + let _ = tx.send(Point::new(*slot, hash.to_vec())).await; + } + } + }); + + Ok(rx) + } } struct BlockSink { From 64a302f62b2dc67a4bfabfa8026b14d6172c158e Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:08:10 +0000 Subject: [PATCH 03/14] feat: minimal indexer process and module Signed-off-by: William Hankins --- Cargo.lock | 30 +++++++++++++++ Cargo.toml | 1 + modules/indexer/Cargo.toml | 22 +++++++++++ modules/indexer/src/indexer.rs | 69 ++++++++++++++++++++++++++++++++++ processes/indexer/Cargo.toml | 25 ++++++++++++ processes/indexer/indexer.toml | 32 ++++++++++++++++ processes/indexer/src/main.rs | 42 +++++++++++++++++++++ 7 files changed, 221 insertions(+) create mode 100644 modules/indexer/Cargo.toml create mode 100644 modules/indexer/src/indexer.rs create mode 100644 processes/indexer/Cargo.toml create mode 100644 processes/indexer/indexer.toml create mode 100644 processes/indexer/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 223f5bd9..bb3a9ef2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -475,6 +487,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_process_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "acropolis_module_block_unpacker", + "acropolis_module_genesis_bootstrapper", + "acropolis_module_indexer", + "acropolis_module_peer_network_interface", + "anyhow", + "caryatid_process", + "clap 4.5.51", + "config", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "acropolis_process_omnibus" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index f5b46041..20329a4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "processes/replayer", # All-inclusive process to replay messages "processes/golden_tests", # All-inclusive golden tests process "processes/tx_submitter_cli", # CLI wrapper for TX submitter + "processes/indexer", # Minimal example indexer ] resolver = "2" diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml new file mode 100644 index 00000000..5cc4e0cd --- /dev/null +++ b/modules/indexer/Cargo.toml @@ -0,0 +1,22 @@ +# Acropolis indexer module + +[package] +name = "acropolis_module_indexer" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Core indexer logic" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/indexer.rs" diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs new file mode 100644 index 00000000..82bb34e2 --- /dev/null +++ b/modules/indexer/src/indexer.rs @@ -0,0 +1,69 @@ +//! Acropolis indexer module for Caryatid + +use acropolis_common::{ + commands::sync::SyncCommand, + hash::Hash, + messages::{Command, Message}, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use std::{str::FromStr, sync::Arc}; +use tracing::info; + +// Configuration defaults +const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) = + ("dynamic-sync-publisher-topic", "cardano.sync.command"); + +/// Historical Epochs State module +#[module( + message_type(Message), + name = "indexer", + description = "Core indexer module for indexer process" +)] +pub struct Indexer; + +impl Indexer { + /// Async initialisation + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + let dynamic_sync_publisher_topic = config + .get_string(DEFAULT_DYNAMIC_SYNC_TOPIC.0) + .unwrap_or(DEFAULT_DYNAMIC_SYNC_TOPIC.1.to_string()); + info!("Creating dynamic sync publisher on '{dynamic_sync_publisher_topic}'"); + + let ctx = context.clone(); + + // This is a placeholder to test dynamic sync + context.run(async move { + let example = SyncCommand::ChangeSyncPoint { + slot: 4492799, + hash: Hash::from_str( + "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", + ) + .expect("Valid hash"), + }; + + // Inital sync message (This will be read from config for first sync and from DB on subsequent runs) + ctx.message_bus + .publish( + &dynamic_sync_publisher_topic, + Arc::new(Message::Command(Command::Sync(example.clone()))), + ) + .await + .unwrap(); + + // Simulate a later sync command to reset sync point to where we started + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + ctx.message_bus + .publish( + &dynamic_sync_publisher_topic, + Arc::new(Message::Command(Command::Sync(example))), + ) + .await + .unwrap(); + }); + Ok(()) + } +} diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml new file mode 100644 index 00000000..2e99700b --- /dev/null +++ b/processes/indexer/Cargo.toml @@ -0,0 +1,25 @@ +# Acropolis indexer process +[package] +name = "acropolis_process_indexer" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Acropolis indexer process containing core modules" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } +acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } +acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } +acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } +acropolis_module_indexer = { path = "../../modules/indexer" } + +caryatid_process = { workspace = true } + +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } +tokio = { workspace = true } + diff --git a/processes/indexer/indexer.toml b/processes/indexer/indexer.toml new file mode 100644 index 00000000..013c8dac --- /dev/null +++ b/processes/indexer/indexer.toml @@ -0,0 +1,32 @@ +# Top-level configuration for Acropolis indexer process + +[module.genesis-bootstrapper] + +[module.peer-network-interface] +sync-point = "dynamic" +node-addresses = [ + "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", +] +magic-number = 764824073 + +[module.block-unpacker] + +[module.indexer] + +[startup] +topic = "cardano.sequence.start" + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal" diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs new file mode 100644 index 00000000..50a3b458 --- /dev/null +++ b/processes/indexer/src/main.rs @@ -0,0 +1,42 @@ +use acropolis_common::messages::Message; +use acropolis_module_indexer::Indexer; +use anyhow::Result; +use caryatid_process::Process; +use clap::Parser; +use config::{Config, Environment, File}; +use std::sync::Arc; + +use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; +use acropolis_module_peer_network_interface::PeerNetworkInterface; + +#[derive(Debug, clap::Parser)] +struct Args { + #[arg(long, value_name = "PATH", default_value = "indexer.toml")] + config: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + tracing_subscriber::fmt().with_env_filter("info").init(); + + let config = Arc::new( + Config::builder() + .add_source(File::with_name(&args.config)) + .add_source(Environment::with_prefix("ACROPOLIS")) + .build() + .unwrap(), + ); + + let mut process = Process::::create(config).await; + + GenesisBootstrapper::register(&mut process); + BlockUnpacker::register(&mut process); + PeerNetworkInterface::register(&mut process); + Indexer::register(&mut process); + + process.run().await?; + Ok(()) +} From 65125b3b90189cc308dfe7f8aba498daeedbe68f Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:42:21 +0000 Subject: [PATCH 04/14] fix: move handle_sync_command logic inside on_sync_cmd Signed-off-by: William Hankins --- modules/peer_network_interface/src/network.rs | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index d64e4929..6f3d7306 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -99,12 +99,19 @@ impl NetworkManager { } } - async fn on_sync_cmd(&mut self, cmd: Option) -> Result<()> { - let Some(cmd) = cmd else { + async fn on_sync_cmd(&mut self, point: Option) -> Result<()> { + let Some(point) = point else { return Ok(()); }; - self.handle_sync_command(cmd).await + self.chain = ChainState::new(); + + for peer in self.peers.values_mut() { + peer.reqs.clear(); + peer.find_intersect(vec![point.clone()]); + } + + Ok(()) } async fn on_network_event(&mut self, event: Option) -> Result<()> { @@ -117,17 +124,6 @@ impl NetworkManager { Ok(()) } - pub async fn handle_sync_command(&mut self, point: Point) -> Result<()> { - self.chain = ChainState::new(); - - for peer in self.peers.values_mut() { - peer.reqs.clear(); - peer.find_intersect(vec![point.clone()]); - } - - Ok(()) - } - pub fn handle_new_connection(&mut self, address: String, delay: Duration) { let id = PeerId(self.next_id); self.next_id += 1; From c1ae6819369a6e6382c7d35ea0ad2f2ddf389dd6 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:46:08 +0000 Subject: [PATCH 05/14] fix: comments Signed-off-by: William Hankins --- modules/indexer/src/indexer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs index 82bb34e2..9b18a931 100644 --- a/modules/indexer/src/indexer.rs +++ b/modules/indexer/src/indexer.rs @@ -15,7 +15,7 @@ use tracing::info; const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) = ("dynamic-sync-publisher-topic", "cardano.sync.command"); -/// Historical Epochs State module +/// Indexer module #[module( message_type(Message), name = "indexer", @@ -44,7 +44,7 @@ impl Indexer { .expect("Valid hash"), }; - // Inital sync message (This will be read from config for first sync and from DB on subsequent runs) + // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) ctx.message_bus .publish( &dynamic_sync_publisher_topic, From e385823d0fa7f09b3087a2c96ec650c2dd1f09b7 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:52:06 +0000 Subject: [PATCH 06/14] fix: cargo shear Signed-off-by: William Hankins --- Cargo.lock | 1 - modules/indexer/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb3a9ef2..af03d760 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,7 +304,6 @@ dependencies = [ "caryatid_sdk", "config", "tokio", - "tracing", ] [[package]] diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml index 5cc4e0cd..eee370e4 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/indexer/Cargo.toml @@ -16,7 +16,6 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } tokio = { workspace = true } -tracing = { workspace = true } [lib] path = "src/indexer.rs" From 9668f143edb68053d6f7e1f12ecb26592b11f9a4 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 22:54:20 +0000 Subject: [PATCH 07/14] fix: cargo shear (again) Signed-off-by: William Hankins --- Cargo.lock | 2 +- modules/indexer/Cargo.toml | 1 + processes/indexer/Cargo.toml | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af03d760..11c677bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,6 +304,7 @@ dependencies = [ "caryatid_sdk", "config", "tokio", + "tracing", ] [[package]] @@ -500,7 +501,6 @@ dependencies = [ "clap 4.5.51", "config", "tokio", - "tracing", "tracing-subscriber", ] diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml index eee370e4..5cc4e0cd 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/indexer/Cargo.toml @@ -16,6 +16,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } [lib] path = "src/indexer.rs" diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml index 2e99700b..3a368efb 100644 --- a/processes/indexer/Cargo.toml +++ b/processes/indexer/Cargo.toml @@ -19,7 +19,6 @@ caryatid_process = { workspace = true } anyhow = { workspace = true } clap = { workspace = true } config = { workspace = true } -tracing = { workspace = true } tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } tokio = { workspace = true } From 76cd5fb4417c3b8e5ab11e5ed59cd6f0072c9d36 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 20 Nov 2025 23:26:35 +0000 Subject: [PATCH 08/14] refactor: rename SyncCommand to ChainSyncCommand Signed-off-by: William Hankins --- common/src/commands/{sync.rs => chain_sync.rs} | 2 +- common/src/commands/mod.rs | 2 +- common/src/messages.rs | 4 ++-- modules/indexer/src/indexer.rs | 8 ++++---- .../peer_network_interface/src/peer_network_interface.rs | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) rename common/src/commands/{sync.rs => chain_sync.rs} (84%) diff --git a/common/src/commands/sync.rs b/common/src/commands/chain_sync.rs similarity index 84% rename from common/src/commands/sync.rs rename to common/src/commands/chain_sync.rs index 761aae7e..2584783c 100644 --- a/common/src/commands/sync.rs +++ b/common/src/commands/chain_sync.rs @@ -1,6 +1,6 @@ use crate::{BlockHash, Slot}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum SyncCommand { +pub enum ChainSyncCommand { ChangeSyncPoint { slot: Slot, hash: BlockHash }, } diff --git a/common/src/commands/mod.rs b/common/src/commands/mod.rs index 14763b59..5747153a 100644 --- a/common/src/commands/mod.rs +++ b/common/src/commands/mod.rs @@ -1,2 +1,2 @@ -pub mod sync; +pub mod chain_sync; pub mod transactions; diff --git a/common/src/messages.rs b/common/src/messages.rs index df28e3ca..61b14e49 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,7 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] -use crate::commands::sync::SyncCommand; +use crate::commands::chain_sync::ChainSyncCommand; use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; @@ -454,7 +454,7 @@ pub enum StateQueryResponse { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Command { Transactions(TransactionsCommand), - Sync(SyncCommand), + ChainSync(ChainSyncCommand), } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs index 9b18a931..25abfa12 100644 --- a/modules/indexer/src/indexer.rs +++ b/modules/indexer/src/indexer.rs @@ -1,7 +1,7 @@ //! Acropolis indexer module for Caryatid use acropolis_common::{ - commands::sync::SyncCommand, + commands::chain_sync::ChainSyncCommand, hash::Hash, messages::{Command, Message}, }; @@ -36,7 +36,7 @@ impl Indexer { // This is a placeholder to test dynamic sync context.run(async move { - let example = SyncCommand::ChangeSyncPoint { + let example = ChainSyncCommand::ChangeSyncPoint { slot: 4492799, hash: Hash::from_str( "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", @@ -48,7 +48,7 @@ impl Indexer { ctx.message_bus .publish( &dynamic_sync_publisher_topic, - Arc::new(Message::Command(Command::Sync(example.clone()))), + Arc::new(Message::Command(Command::ChainSync(example.clone()))), ) .await .unwrap(); @@ -59,7 +59,7 @@ impl Indexer { ctx.message_bus .publish( &dynamic_sync_publisher_topic, - Arc::new(Message::Command(Command::Sync(example))), + Arc::new(Message::Command(Command::ChainSync(example))), ) .await .unwrap(); diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 11c8540f..341eeea1 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -5,7 +5,7 @@ mod network; use acropolis_common::{ BlockInfo, BlockStatus, - commands::sync::SyncCommand, + commands::chain_sync::ChainSyncCommand, genesis_values::GenesisValues, messages::{CardanoMessage, Command, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord}, @@ -245,7 +245,7 @@ impl PeerNetworkInterface { let mut sub = context.subscribe(topic).await?; tokio::spawn(async move { while let Ok((_, msg)) = sub.read().await { - if let Message::Command(Command::Sync(SyncCommand::ChangeSyncPoint { + if let Message::Command(Command::ChainSync(ChainSyncCommand::ChangeSyncPoint { slot, hash, })) = msg.as_ref() From d9110c11cc031727a87676d8c73363ca6c6e6842 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 21 Nov 2025 17:25:02 +0000 Subject: [PATCH 09/14] refactor: route sync commands through NetworkManager event queue Signed-off-by: William Hankins --- modules/peer_network_interface/src/network.rs | 60 ++++------ .../src/peer_network_interface.rs | 112 ++++++------------ 2 files changed, 58 insertions(+), 114 deletions(-) diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 6f3d7306..f6c4039e 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -55,7 +55,6 @@ pub struct NetworkManager { events_sender: mpsc::Sender, block_sink: BlockSink, published_blocks: u64, - cmd_rx: Option>, } impl NetworkManager { @@ -64,7 +63,6 @@ impl NetworkManager { events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, - cmd_rx: Option>, ) -> Self { Self { network_magic, @@ -75,52 +73,39 @@ impl NetworkManager { events_sender, block_sink, published_blocks: 0, - cmd_rx, } } pub async fn run(mut self) -> Result<()> { - loop { - tokio::select! { - cmd = async { - if let Some(rx) = &mut self.cmd_rx { - rx.recv().await - } else { - std::future::pending().await - } - } => { - self.on_sync_cmd(cmd).await?; - }, - - event = self.events.recv() => { - self.on_network_event(event).await?; - } - } + while let Some(event) = self.events.recv().await { + self.on_network_event(event).await?; } - } - async fn on_sync_cmd(&mut self, point: Option) -> Result<()> { - let Some(point) = point else { - return Ok(()); - }; + Ok(()) + } - self.chain = ChainState::new(); + async fn on_network_event(&mut self, event: NetworkEvent) -> Result<()> { + match event { + NetworkEvent::PeerUpdate { peer, event } => { + self.handle_peer_update(peer, event); + self.publish_blocks().await?; + } + NetworkEvent::SyncPointUpdate { point } => { + self.chain = ChainState::new(); - for peer in self.peers.values_mut() { - peer.reqs.clear(); - peer.find_intersect(vec![point.clone()]); - } + for peer in self.peers.values_mut() { + peer.reqs.clear(); + } - Ok(()) - } + if let Point::Specific(slot, _) = point { + let (epoch, _) = self.block_sink.genesis_values.slot_to_epoch(slot); + self.block_sink.last_epoch = Some(epoch); + } - async fn on_network_event(&mut self, event: Option) -> Result<()> { - let Some(NetworkEvent::PeerUpdate { peer, event }) = event else { - bail!("event sink closed"); - }; + self.sync_to_point(point); + } + } - self.handle_peer_update(peer, event); - self.publish_blocks().await?; Ok(()) } @@ -271,6 +256,7 @@ impl NetworkManager { pub enum NetworkEvent { PeerUpdate { peer: PeerId, event: PeerEvent }, + SyncPointUpdate { point: Point }, } #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 341eeea1..5e1cdb87 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -15,14 +15,14 @@ use caryatid_sdk::{Context, Module, Subscription, module}; use config::Config; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use std::{path::Path, sync::Arc, time::Duration}; use crate::{ configuration::{InterfaceConfig, SyncPoint}, connection::Header, - network::NetworkManager, + network::{NetworkEvent, NetworkManager}, }; #[module( @@ -44,13 +44,7 @@ impl PeerNetworkInterface { SyncPoint::Snapshot => Some(context.subscribe(&cfg.snapshot_completion_topic).await?), _ => None, }; - - // Create background task to foward sync commands to NetworkManager - let mut cmd_rx = if cfg.sync_point == SyncPoint::Dynamic { - Some(Self::spawn_command_forwarder(context.clone(), &cfg.sync_command_topic).await?) - } else { - None - }; + let command_subscription = context.subscribe(&cfg.sync_command_topic).await?; context.clone().run(async move { let genesis_values = if let Some(mut sub) = genesis_complete { @@ -90,12 +84,12 @@ impl PeerNetworkInterface { let manager = match cfg.sync_point { SyncPoint::Origin => { - let mut manager = Self::init_manager(cfg, sink, None); + let mut manager = Self::init_manager(cfg, sink, command_subscription); manager.sync_to_point(Point::Origin); manager } SyncPoint::Tip => { - let mut manager = Self::init_manager(cfg, sink, None); + let mut manager = Self::init_manager(cfg, sink, command_subscription); if let Err(error) = manager.sync_to_tip().await { warn!("could not sync to tip: {error:#}"); return; @@ -103,7 +97,7 @@ impl PeerNetworkInterface { manager } SyncPoint::Cache => { - let mut manager = Self::init_manager(cfg, sink, None); + let mut manager = Self::init_manager(cfg, sink, command_subscription); manager.sync_to_point(cache_sync_point); manager } @@ -116,7 +110,7 @@ impl PeerNetworkInterface { let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); sink.last_epoch = Some(epoch); } - let mut manager = Self::init_manager(cfg, sink, None); + let mut manager = Self::init_manager(cfg, sink, command_subscription); manager.sync_to_point(point); manager } @@ -126,36 +120,7 @@ impl PeerNetworkInterface { } } } - SyncPoint::Dynamic => { - let mut rx = match cmd_rx.take() { - Some(rx) => rx, - None => { - warn!("Dynamic mode configured but cmd_rx is missing"); - return; - } - }; - - let point = match Self::wait_sync_command(&mut rx).await { - Ok(Point::Specific(slot, hash)) => { - let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); - sink.last_epoch = Some(epoch); - info!("Dynamic sync starting at slot {} (epoch {})", slot, epoch); - Point::Specific(slot, hash) - } - Ok(Point::Origin) => { - warn!("Dynamic sync received Point::Origin; ignoring"); - return; - } - Err(err) => { - warn!("Failed to receive initial sync command: {err:#}"); - return; - } - }; - - let mut manager = Self::init_manager(cfg, sink, Some(rx)); - manager.sync_to_point(point); - manager - } + SyncPoint::Dynamic => Self::init_manager(cfg, sink, command_subscription), }; if let Err(err) = manager.run().await { @@ -169,17 +134,41 @@ impl PeerNetworkInterface { fn init_manager( cfg: InterfaceConfig, sink: BlockSink, - cmd_rx: Option>, + command_subscription: Box>, ) -> NetworkManager { let (events_sender, events) = mpsc::channel(1024); - let mut manager = - NetworkManager::new(cfg.magic_number, events, events_sender, sink, cmd_rx); + tokio::spawn(Self::forward_commands_to_events( + command_subscription, + events_sender.clone(), + )); + let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } manager } + async fn forward_commands_to_events( + mut subscription: Box>, + events_sender: mpsc::Sender, + ) -> Result<()> { + while let Ok((_, msg)) = subscription.read().await { + if let Message::Command(Command::ChainSync(ChainSyncCommand::ChangeSyncPoint { + slot, + hash, + })) = msg.as_ref() + { + let point = Point::new(*slot, hash.to_vec()); + + if events_sender.send(NetworkEvent::SyncPointUpdate { point }).await.is_err() { + bail!("event channel closed"); + } + } + } + + bail!("subscription closed"); + } + async fn init_cache( cache_dir: &Path, block_topic: &str, @@ -226,37 +215,6 @@ impl PeerNetworkInterface { msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), } } - - async fn wait_sync_command(rx: &mut mpsc::Receiver) -> Result { - match rx.recv().await { - Some(point) => Ok(point), - None => Err(anyhow::anyhow!( - "Channel closed before receiving a start point" - )), - } - } - - async fn spawn_command_forwarder( - context: Arc>, - topic: &str, - ) -> Result> { - let (tx, rx) = mpsc::channel::(32); - - let mut sub = context.subscribe(topic).await?; - tokio::spawn(async move { - while let Ok((_, msg)) = sub.read().await { - if let Message::Command(Command::ChainSync(ChainSyncCommand::ChangeSyncPoint { - slot, - hash, - })) = msg.as_ref() - { - let _ = tx.send(Point::new(*slot, hash.to_vec())).await; - } - } - }); - - Ok(rx) - } } struct BlockSink { From 3dfdb78a50134bde3d392eb6f11d5b3c424062a1 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 21 Nov 2025 17:55:16 +0000 Subject: [PATCH 10/14] refactor: use strongly-typed config pattern Signed-off-by: William Hankins --- Cargo.lock | 1 + common/src/commands/chain_sync.rs | 2 +- modules/indexer/Cargo.toml | 1 + modules/indexer/config.default.toml | 2 ++ modules/indexer/src/configuration.rs | 21 ++++++++++++++++++ modules/indexer/src/indexer.rs | 22 +++++++++---------- .../src/peer_network_interface.rs | 2 +- 7 files changed, 37 insertions(+), 14 deletions(-) create mode 100644 modules/indexer/config.default.toml create mode 100644 modules/indexer/src/configuration.rs diff --git a/Cargo.lock b/Cargo.lock index 11c677bf..6e6c463d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -303,6 +303,7 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", + "serde", "tokio", "tracing", ] diff --git a/common/src/commands/chain_sync.rs b/common/src/commands/chain_sync.rs index 2584783c..77a55f12 100644 --- a/common/src/commands/chain_sync.rs +++ b/common/src/commands/chain_sync.rs @@ -2,5 +2,5 @@ use crate::{BlockHash, Slot}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum ChainSyncCommand { - ChangeSyncPoint { slot: Slot, hash: BlockHash }, + FindIntersect { slot: Slot, hash: BlockHash }, } diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml index 5cc4e0cd..fca7c465 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/indexer/Cargo.toml @@ -15,6 +15,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } +serde = { workspace = true, features = ["rc"] } tokio = { workspace = true } tracing = { workspace = true } diff --git a/modules/indexer/config.default.toml b/modules/indexer/config.default.toml new file mode 100644 index 00000000..e8549e76 --- /dev/null +++ b/modules/indexer/config.default.toml @@ -0,0 +1,2 @@ +# The topic to publish sync commands on +sync-command-topic = "cardano.sync.command" \ No newline at end of file diff --git a/modules/indexer/src/configuration.rs b/modules/indexer/src/configuration.rs new file mode 100644 index 00000000..6b2b5731 --- /dev/null +++ b/modules/indexer/src/configuration.rs @@ -0,0 +1,21 @@ +use anyhow::Result; +use config::Config; + +#[derive(serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub struct IndexerConfig { + pub sync_command_topic: String, +} + +impl IndexerConfig { + pub fn try_load(config: &Config) -> Result { + let full_config = Config::builder() + .add_source(config::File::from_str( + include_str!("../config.default.toml"), + config::FileFormat::Toml, + )) + .add_source(config.clone()) + .build()?; + Ok(full_config.try_deserialize()?) + } +} diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs index 25abfa12..050eee04 100644 --- a/modules/indexer/src/indexer.rs +++ b/modules/indexer/src/indexer.rs @@ -1,4 +1,5 @@ //! Acropolis indexer module for Caryatid +mod configuration; use acropolis_common::{ commands::chain_sync::ChainSyncCommand, @@ -11,9 +12,7 @@ use config::Config; use std::{str::FromStr, sync::Arc}; use tracing::info; -// Configuration defaults -const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) = - ("dynamic-sync-publisher-topic", "cardano.sync.command"); +use crate::configuration::IndexerConfig; /// Indexer module #[module( @@ -26,17 +25,17 @@ pub struct Indexer; impl Indexer { /// Async initialisation pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - // Get configuration - let dynamic_sync_publisher_topic = config - .get_string(DEFAULT_DYNAMIC_SYNC_TOPIC.0) - .unwrap_or(DEFAULT_DYNAMIC_SYNC_TOPIC.1.to_string()); - info!("Creating dynamic sync publisher on '{dynamic_sync_publisher_topic}'"); + let cfg = IndexerConfig::try_load(&config)?; + info!( + "Creating sync command publisher on '{}'", + cfg.sync_command_topic + ); let ctx = context.clone(); // This is a placeholder to test dynamic sync context.run(async move { - let example = ChainSyncCommand::ChangeSyncPoint { + let example = ChainSyncCommand::FindIntersect { slot: 4492799, hash: Hash::from_str( "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", @@ -47,18 +46,17 @@ impl Indexer { // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) ctx.message_bus .publish( - &dynamic_sync_publisher_topic, + &cfg.sync_command_topic, Arc::new(Message::Command(Command::ChainSync(example.clone()))), ) .await .unwrap(); // Simulate a later sync command to reset sync point to where we started - tokio::time::sleep(std::time::Duration::from_secs(5)).await; ctx.message_bus .publish( - &dynamic_sync_publisher_topic, + &cfg.sync_command_topic, Arc::new(Message::Command(Command::ChainSync(example))), ) .await diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index 5e1cdb87..cbf1c77a 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -153,7 +153,7 @@ impl PeerNetworkInterface { events_sender: mpsc::Sender, ) -> Result<()> { while let Ok((_, msg)) = subscription.read().await { - if let Message::Command(Command::ChainSync(ChainSyncCommand::ChangeSyncPoint { + if let Message::Command(Command::ChainSync(ChainSyncCommand::FindIntersect { slot, hash, })) = msg.as_ref() From 26809f0f4d39b008b863d0a5f3d8658969303968 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 21 Nov 2025 18:12:45 +0000 Subject: [PATCH 11/14] fix: clippy Signed-off-by: William Hankins --- modules/indexer/src/indexer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs index 050eee04..8877c6d5 100644 --- a/modules/indexer/src/indexer.rs +++ b/modules/indexer/src/indexer.rs @@ -7,7 +7,7 @@ use acropolis_common::{ messages::{Command, Message}, }; use anyhow::Result; -use caryatid_sdk::{module, Context, Module}; +use caryatid_sdk::{module, Context}; use config::Config; use std::{str::FromStr, sync::Arc}; use tracing::info; From 6e12ea6c59afad8413920a167c37bc810e2ddfbf Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 21 Nov 2025 18:50:35 +0000 Subject: [PATCH 12/14] fix: shear Signed-off-by: William Hankins --- Cargo.lock | 1 - modules/indexer/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 576553af..f34c3e52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -304,7 +304,6 @@ dependencies = [ "caryatid_sdk", "config", "serde", - "tokio", "tracing", ] diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml index fca7c465..aef2fcd7 100644 --- a/modules/indexer/Cargo.toml +++ b/modules/indexer/Cargo.toml @@ -16,7 +16,6 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } serde = { workspace = true, features = ["rc"] } -tokio = { workspace = true } tracing = { workspace = true } [lib] From 6038416a601ddc5c8eca6e1097bde57c88a7829f Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 24 Nov 2025 16:16:12 -0500 Subject: [PATCH 13/14] test: add test that blocks coming in post-rollback are ignored --- .../peer_network_interface/src/chain_state.rs | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/modules/peer_network_interface/src/chain_state.rs b/modules/peer_network_interface/src/chain_state.rs index b432d5fb..5c86b430 100644 --- a/modules/peer_network_interface/src/chain_state.rs +++ b/modules/peer_network_interface/src/chain_state.rs @@ -397,6 +397,67 @@ mod tests { state.handle_block_published(); } + #[test] + fn should_ignore_irrelevant_block_fetch_after_rollback() { + let mut state = ChainState::new(); + let p1 = PeerId(0); + state.handle_new_preferred_upstream(p1); + + let (h1, b1) = make_block(0, "first block"); + let (h2a, b2a) = make_block(1, "second block pre-rollback"); + let (h3a, b3a) = make_block(2, "third block pre-rollback"); + let (h2b, b2b) = make_block(1, "second block post-rollback"); + let (h3b, b3b) = make_block(1, "third block post-rollback"); + + // publish the first block + assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]); + state.handle_body_fetched(h1.slot, h1.hash, b1.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h1, b1.as_slice(), false)) + ); + state.handle_block_published(); + + // publish the second block + assert_eq!(state.handle_roll_forward(p1, h2a.clone()), vec![p1]); + state.handle_body_fetched(h2a.slot, h2a.hash, b2a.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h2a, b2a.as_slice(), false)) + ); + state.handle_block_published(); + assert_eq!(state.next_unpublished_block(), None); + + // roll forward to the third block, but don't receive the body yet + assert_eq!(state.handle_roll_forward(p1, h3a.clone()), vec![p1]); + + // now, roll the chain back to the first block + state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec())); + assert_eq!(state.next_unpublished_block(), None); + + // and when we advance to the new second block, the system should report it as a rollback + assert_eq!(state.handle_roll_forward(p1, h2b.clone()), vec![p1]); + state.handle_body_fetched(h2b.slot, h2b.hash, b2b.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h2b, b2b.as_slice(), true)) + ); + state.handle_block_published(); + + // we should not take any action on receiving the original third block + state.handle_body_fetched(h3a.slot, h3a.hash, b3a); + assert_eq!(state.next_unpublished_block(), None); + + // and the new third block should not be a rollback + assert_eq!(state.handle_roll_forward(p1, h3b.clone()), vec![p1]); + state.handle_body_fetched(h3b.slot, h3b.hash, b3b.clone()); + assert_eq!( + state.next_unpublished_block(), + Some((&h3b, b3b.as_slice(), false)) + ); + state.handle_block_published(); + } + #[test] fn should_not_report_rollback_for_unpublished_portion_of_chain() { let mut state = ChainState::new(); From 725a0216d07fb507c8174812fc0c970dad30ae31 Mon Sep 17 00:00:00 2001 From: Simon Gellis Date: Mon, 24 Nov 2025 16:26:36 -0500 Subject: [PATCH 14/14] fix: shear --- common/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/Cargo.toml b/common/Cargo.toml index fc5e948d..1c23f39c 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -29,7 +29,6 @@ regex = "1" serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } -tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } futures = "0.3.31" @@ -40,8 +39,11 @@ rayon = "1.11.0" cryptoxide = "0.5.1" thiserror = "2.0.17" sha2 = "0.10.8" + +[dev-dependencies] caryatid_process = { workspace = true } config = { workspace = true } +tempfile = "3" [lib] crate-type = ["rlib"]