Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"processes/replayer", # All-inclusive process to replay messages
"processes/golden_tests", # All-inclusive golden tests process
"processes/tx_submitter_cli", # CLI wrapper for TX submitter
"processes/indexer", # Minimal example indexer
]
resolver = "2"

Expand Down
4 changes: 3 additions & 1 deletion common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ regex = "1"
serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
serde_with = { workspace = true, features = ["base64"] }
tempfile = "3"
tokio = { workspace = true }
tracing = { workspace = true }
futures = "0.3.31"
Expand All @@ -40,8 +39,11 @@ rayon = "1.11.0"
cryptoxide = "0.5.1"
thiserror = "2.0.17"
sha2 = "0.10.8"

[dev-dependencies]
caryatid_process = { workspace = true }
config = { workspace = true }
tempfile = "3"

[lib]
crate-type = ["rlib"]
Expand Down
6 changes: 6 additions & 0 deletions common/src/commands/chain_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use crate::{BlockHash, Slot};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ChainSyncCommand {
FindIntersect { slot: Slot, hash: BlockHash },
}
1 change: 1 addition & 0 deletions common/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod chain_sync;
pub mod transactions;
2 changes: 2 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// We don't use these messages in the acropolis_common crate itself
#![allow(dead_code)]

use crate::commands::chain_sync::ChainSyncCommand;
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
use crate::genesis_values::GenesisValues;
use crate::ledger_state::SPOState;
Expand Down Expand Up @@ -453,6 +454,7 @@ pub enum StateQueryResponse {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Command {
Transactions(TransactionsCommand),
ChainSync(ChainSyncCommand),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down
22 changes: 22 additions & 0 deletions modules/indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Acropolis indexer module

[package]
name = "acropolis_module_indexer"
version = "0.1.0"
edition = "2021"
authors = ["William Hankins <william@sundae.fi>"]
description = "Core indexer logic"
license = "Apache-2.0"

[dependencies]
acropolis_common = { path = "../../common" }

caryatid_sdk = { workspace = true }

anyhow = { workspace = true }
config = { workspace = true }
serde = { workspace = true, features = ["rc"] }
tracing = { workspace = true }

[lib]
path = "src/indexer.rs"
2 changes: 2 additions & 0 deletions modules/indexer/config.default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# The topic to publish sync commands on
sync-command-topic = "cardano.sync.command"
21 changes: 21 additions & 0 deletions modules/indexer/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use anyhow::Result;
use config::Config;

#[derive(serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct IndexerConfig {
pub sync_command_topic: String,
}

impl IndexerConfig {
pub fn try_load(config: &Config) -> Result<Self> {
let full_config = Config::builder()
.add_source(config::File::from_str(
include_str!("../config.default.toml"),
config::FileFormat::Toml,
))
.add_source(config.clone())
.build()?;
Ok(full_config.try_deserialize()?)
}
}
67 changes: 67 additions & 0 deletions modules/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Acropolis indexer module for Caryatid
mod configuration;

use acropolis_common::{
commands::chain_sync::ChainSyncCommand,
hash::Hash,
messages::{Command, Message},
};
use anyhow::Result;
use caryatid_sdk::{module, Context};
use config::Config;
use std::{str::FromStr, sync::Arc};
use tracing::info;

use crate::configuration::IndexerConfig;

/// Indexer module
#[module(
message_type(Message),
name = "indexer",
description = "Core indexer module for indexer process"
)]
pub struct Indexer;

impl Indexer {
/// Async initialisation
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
let cfg = IndexerConfig::try_load(&config)?;
info!(
"Creating sync command publisher on '{}'",
cfg.sync_command_topic
);

let ctx = context.clone();

// This is a placeholder to test dynamic sync
context.run(async move {
let example = ChainSyncCommand::FindIntersect {
slot: 4492799,
hash: Hash::from_str(
"f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457",
)
.expect("Valid hash"),
};

// Initial sync message (This will be read from config for first sync and from DB on subsequent runs)
ctx.message_bus
.publish(
&cfg.sync_command_topic,
Arc::new(Message::Command(Command::ChainSync(example.clone()))),
)
.await
.unwrap();

// Simulate a later sync command to reset sync point to where we started

ctx.message_bus
.publish(
&cfg.sync_command_topic,
Arc::new(Message::Command(Command::ChainSync(example))),
)
.await
.unwrap();
});
Ok(())
}
}
3 changes: 3 additions & 0 deletions modules/peer_network_interface/config.default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ block-topic = "cardano.block.available"
snapshot-completion-topic = "cardano.snapshot.complete"
# The topic to wait for when listening for genesis values from another module
genesis-completion-topic = "cardano.sequence.bootstrapped"
# The topic to listen on for runtime sync commands
sync-command-topic = "cardano.sync.command"

# Upstream node connections
node-addresses = [
Expand All @@ -19,6 +21,7 @@ magic-number = 764824073
# - "tip": sync from the very end of the chain
# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache.
# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot.
# - "dynamic": awaits a sync command to begin fetching blocks, can change sync point at runtime.
sync-point = "snapshot"
# The cache dir to use when sync-point is "cache"
cache-dir = "upstream-cache"
61 changes: 61 additions & 0 deletions modules/peer_network_interface/src/chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,67 @@ mod tests {
state.handle_block_published();
}

#[test]
fn should_ignore_irrelevant_block_fetch_after_rollback() {
let mut state = ChainState::new();
let p1 = PeerId(0);
state.handle_new_preferred_upstream(p1);

let (h1, b1) = make_block(0, "first block");
let (h2a, b2a) = make_block(1, "second block pre-rollback");
let (h3a, b3a) = make_block(2, "third block pre-rollback");
let (h2b, b2b) = make_block(1, "second block post-rollback");
let (h3b, b3b) = make_block(1, "third block post-rollback");

// publish the first block
assert_eq!(state.handle_roll_forward(p1, h1.clone()), vec![p1]);
state.handle_body_fetched(h1.slot, h1.hash, b1.clone());
assert_eq!(
state.next_unpublished_block(),
Some((&h1, b1.as_slice(), false))
);
state.handle_block_published();

// publish the second block
assert_eq!(state.handle_roll_forward(p1, h2a.clone()), vec![p1]);
state.handle_body_fetched(h2a.slot, h2a.hash, b2a.clone());
assert_eq!(
state.next_unpublished_block(),
Some((&h2a, b2a.as_slice(), false))
);
state.handle_block_published();
assert_eq!(state.next_unpublished_block(), None);

// roll forward to the third block, but don't receive the body yet
assert_eq!(state.handle_roll_forward(p1, h3a.clone()), vec![p1]);

// now, roll the chain back to the first block
state.handle_roll_backward(p1, Point::Specific(h1.slot, h1.hash.to_vec()));
assert_eq!(state.next_unpublished_block(), None);

// and when we advance to the new second block, the system should report it as a rollback
assert_eq!(state.handle_roll_forward(p1, h2b.clone()), vec![p1]);
state.handle_body_fetched(h2b.slot, h2b.hash, b2b.clone());
assert_eq!(
state.next_unpublished_block(),
Some((&h2b, b2b.as_slice(), true))
);
state.handle_block_published();

// we should not take any action on receiving the original third block
state.handle_body_fetched(h3a.slot, h3a.hash, b3a);
assert_eq!(state.next_unpublished_block(), None);

// and the new third block should not be a rollback
assert_eq!(state.handle_roll_forward(p1, h3b.clone()), vec![p1]);
state.handle_body_fetched(h3b.slot, h3b.hash, b3b.clone());
assert_eq!(
state.next_unpublished_block(),
Some((&h3b, b3b.as_slice(), false))
);
state.handle_block_published();
}

#[test]
fn should_not_report_rollback_for_unpublished_portion_of_chain() {
let mut state = ChainState::new();
Expand Down
2 changes: 2 additions & 0 deletions modules/peer_network_interface/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum SyncPoint {
Tip,
Cache,
Snapshot,
Dynamic,
}

#[derive(serde::Deserialize)]
Expand All @@ -20,6 +21,7 @@ pub struct InterfaceConfig {
pub sync_point: SyncPoint,
pub snapshot_completion_topic: String,
pub genesis_completion_topic: String,
pub sync_command_topic: String,
pub node_addresses: Vec<String>,
pub magic_number: u64,
pub cache_dir: PathBuf,
Expand Down
32 changes: 27 additions & 5 deletions modules/peer_network_interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,35 @@ impl NetworkManager {

pub async fn run(mut self) -> Result<()> {
while let Some(event) = self.events.recv().await {
match event {
NetworkEvent::PeerUpdate { peer, event } => {
self.handle_peer_update(peer, event);
self.publish_blocks().await?;
self.on_network_event(event).await?;
}

Ok(())
}

async fn on_network_event(&mut self, event: NetworkEvent) -> Result<()> {
match event {
NetworkEvent::PeerUpdate { peer, event } => {
self.handle_peer_update(peer, event);
self.publish_blocks().await?;
}
NetworkEvent::SyncPointUpdate { point } => {
self.chain = ChainState::new();

for peer in self.peers.values_mut() {
peer.reqs.clear();
}

if let Point::Specific(slot, _) = point {
let (epoch, _) = self.block_sink.genesis_values.slot_to_epoch(slot);
self.block_sink.last_epoch = Some(epoch);
}

self.sync_to_point(point);
}
}
bail!("event sink closed")

Ok(())
}

pub fn handle_new_connection(&mut self, address: String, delay: Duration) {
Expand Down Expand Up @@ -235,6 +256,7 @@ impl NetworkManager {

pub enum NetworkEvent {
PeerUpdate { peer: PeerId, event: PeerEvent },
SyncPointUpdate { point: Point },
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down
Loading