From abe0e9fe43c2bca44c27c3707b2c0191f66a6f9a Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 19:52:03 +0100 Subject: [PATCH 01/18] feat: implement historical epochs state --- Cargo.lock | 17 ++ Cargo.toml | 1 + common/src/cbor.rs | 33 ++ common/src/lib.rs | 1 + common/src/messages.rs | 18 +- common/src/protocol_params.rs | 29 +- common/src/queries/epochs.rs | 7 + modules/accounts_state/src/accounts_state.rs | 6 +- modules/epochs_state/src/epochs_history.rs | 200 ------------- modules/epochs_state/src/epochs_state.rs | 101 +------ modules/epochs_state/src/store_config.rs | 27 -- modules/historical_epochs_state/.gitignore | 1 + modules/historical_epochs_state/Cargo.toml | 26 ++ .../src/historical_epochs_state.rs | 282 ++++++++++++++++++ .../src/immutable_historical_epochs_state.rs | 103 +++++++ modules/historical_epochs_state/src/state.rs | 106 +++++++ .../src/volatile_historical_epochs_state.rs | 58 ++++ .../rest_blockfrost/src/handlers/epochs.rs | 117 ++++++-- .../rest_blockfrost/src/handlers_config.rs | 8 +- processes/omnibus/Cargo.toml | 1 + processes/omnibus/src/main.rs | 2 + 21 files changed, 793 insertions(+), 351 deletions(-) create mode 100644 common/src/cbor.rs delete mode 100644 modules/epochs_state/src/epochs_history.rs delete mode 100644 modules/epochs_state/src/store_config.rs create mode 100644 modules/historical_epochs_state/.gitignore create mode 100644 modules/historical_epochs_state/Cargo.toml create mode 100644 modules/historical_epochs_state/src/historical_epochs_state.rs create mode 100644 modules/historical_epochs_state/src/immutable_historical_epochs_state.rs create mode 100644 modules/historical_epochs_state/src/state.rs create mode 100644 modules/historical_epochs_state/src/volatile_historical_epochs_state.rs diff --git a/Cargo.lock b/Cargo.lock index a1746701..f86371b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,6 +264,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_historical_epochs_state" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "fjall", + "hex", + "minicbor 0.26.5", + "rayon", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -477,6 +493,7 @@ dependencies = [ "acropolis_module_genesis_bootstrapper", "acropolis_module_governance_state", "acropolis_module_historical_accounts_state", + "acropolis_module_historical_epochs_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", "acropolis_module_peer_network_interface", diff --git a/Cargo.toml b/Cargo.toml index 5f6407a2..8d543215 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns "modules/historical_accounts_state", # Tracks historical account information + "modules/historical_epochs_state", # Tracks historical epochs information "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs "modules/tx_submitter", # Submits TXs to peers diff --git a/common/src/cbor.rs b/common/src/cbor.rs new file mode 100644 index 00000000..f8b8a287 --- /dev/null +++ b/common/src/cbor.rs @@ -0,0 +1,33 @@ +// Custom codec module for u128 (similar to minicbor::bytes pattern) +// CBOR doesn't natively support 128-bit integers, so we encode as 16 bytes +pub mod u128_cbor_codec { + use minicbor::{Decoder, Encoder}; + + /// Encode u128 as 16 bytes in big-endian format + /// For use with `#[cbor(with = "u128_cbor_codec")]` + pub fn encode( + v: &u128, + e: &mut Encoder, + _ctx: &mut C, + ) -> Result<(), minicbor::encode::Error> { + e.bytes(&v.to_be_bytes())?; + Ok(()) + } + + /// Decode u128 from 16 bytes in big-endian format + /// For use with `#[cbor(with = "u128_cbor_codec")]` + pub fn decode<'b, C>( + d: &mut Decoder<'b>, + _ctx: &mut C, + ) -> Result { + let bytes = d.bytes()?; + if bytes.len() != 16 { + return Err(minicbor::decode::Error::message( + "Expected 16 bytes for u128", + )); + } + let mut arr = [0u8; 16]; + arr.copy_from_slice(bytes); + Ok(u128::from_be_bytes(arr)) + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 57889a9e..daa6dda7 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,7 @@ pub mod address; pub mod calculations; +pub mod cbor; pub mod cip19; pub mod commands; pub mod crypto; diff --git a/common/src/messages.rs b/common/src/messages.rs index a7f9d1c7..f0c59948 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -26,6 +26,7 @@ use crate::queries::{ transactions::{TransactionsStateQuery, TransactionsStateQueryResponse}, }; +use crate::cbor::u128_cbor_codec; use crate::types::*; use crate::validation::ValidationStatus; @@ -141,47 +142,62 @@ pub struct BlockTxsMessage { } /// Epoch activity - sent at end of epoch -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] pub struct EpochActivityMessage { /// Epoch which has ended + #[n(0)] pub epoch: u64, /// Epoch start time /// UNIX timestamp + #[n(1)] pub epoch_start_time: u64, /// Epoch end time /// UNIX timestamp + #[n(2)] pub epoch_end_time: u64, /// When first block of this epoch was created + #[n(3)] pub first_block_time: u64, /// Block height of first block of this epoch + #[n(4)] pub first_block_height: u64, /// When last block of this epoch was created + #[n(5)] pub last_block_time: u64, /// Block height of last block of this epoch + #[n(6)] pub last_block_height: u64, /// Total blocks in this epoch + #[n(7)] pub total_blocks: usize, /// Total txs in this epoch + #[n(8)] pub total_txs: u64, /// Total outputs of all txs in this epoch + #[cbor(n(9), with = "u128_cbor_codec")] pub total_outputs: u128, /// Total fees in this epoch + #[n(10)] pub total_fees: u64, /// Map of SPO IDs to blocks produced + #[n(11)] pub spo_blocks: Vec<(PoolId, usize)>, /// Nonce + #[n(12)] pub nonce: Option, } diff --git a/common/src/protocol_params.rs b/common/src/protocol_params.rs index e713d38c..cfc07d10 100644 --- a/common/src/protocol_params.rs +++ b/common/src/protocol_params.rs @@ -254,21 +254,46 @@ impl ProtocolVersion { } #[derive( - Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize, + Default, + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, )] #[serde(rename_all = "PascalCase")] pub enum NonceVariant { + #[n(0)] #[default] NeutralNonce, + #[n(1)] Nonce, } pub type NonceHash = [u8; 32]; -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, +)] #[serde(rename_all = "camelCase")] pub struct Nonce { + #[n(0)] pub tag: NonceVariant, + #[n(1)] pub hash: Option, } diff --git a/common/src/queries/epochs.rs b/common/src/queries/epochs.rs index c3348049..ca463dee 100644 --- a/common/src/queries/epochs.rs +++ b/common/src/queries/epochs.rs @@ -4,9 +4,16 @@ use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, Poo pub const DEFAULT_EPOCHS_QUERY_TOPIC: (&str, &str) = ("epochs-state-query-topic", "cardano.query.epochs"); +pub const DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC: (&str, &str) = ( + "historical-epochs-state-query-topic", + "cardano.query.historical.epochs", +); + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum EpochsStateQuery { GetLatestEpoch, + + // Served from historical epochs state GetEpochInfo { epoch_number: u64 }, GetNextEpochs { epoch_number: u64 }, GetPreviousEpochs { epoch_number: u64 }, diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index e4a6b7dd..89a4632f 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -403,24 +403,28 @@ impl AccountsState { let parameters_topic = config .get_string("protocol-parameters-topic") .unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_TOPIC.to_string()); + info!("Creating protocol parameters subscriber on '{parameters_topic}'"); // Publishing topics let drep_distribution_topic = config .get_string("publish-drep-distribution-topic") .unwrap_or(DEFAULT_DREP_DISTRIBUTION_TOPIC.to_string()); + info!("Creating DRep distribution publisher on '{drep_distribution_topic}'"); let spo_distribution_topic = config .get_string("publish-spo-distribution-topic") .unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string()); + info!("Creating SPO distribution publisher on '{spo_distribution_topic}'"); let spo_rewards_topic = config .get_string("publish-spo-rewards-topic") .unwrap_or(DEFAULT_SPO_REWARDS_TOPIC.to_string()); + info!("Creating SPO rewards publisher on '{spo_rewards_topic}'"); let stake_reward_deltas_topic = config .get_string("publish-stake-reward-deltas-topic") .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); - info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); + info!("Creating stake reward deltas publisher on '{stake_reward_deltas_topic}'"); let spdd_db_path = config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); diff --git a/modules/epochs_state/src/epochs_history.rs b/modules/epochs_state/src/epochs_history.rs deleted file mode 100644 index dc37b56a..00000000 --- a/modules/epochs_state/src/epochs_history.rs +++ /dev/null @@ -1,200 +0,0 @@ -use acropolis_common::messages::EpochActivityMessage; -use acropolis_common::BlockInfo; -use anyhow::Result; -use dashmap::DashMap; -use std::sync::Arc; - -use crate::store_config::StoreConfig; - -#[derive(Debug, Clone)] -pub struct EpochsHistoryState { - epochs_history: Option>>, -} - -impl EpochsHistoryState { - pub fn new(store_config: &StoreConfig) -> Self { - Self { - epochs_history: if store_config.store_history { - Some(Arc::new(DashMap::new())) - } else { - None - }, - } - } - - /// Get Epoch Activity Message for certain pool operator at certain epoch - pub fn get_historical_epoch(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - Ok(epochs_history.get(&epoch).map(|e| e.clone())) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive) - pub fn get_next_epochs(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - let mut epochs: Vec = epochs_history - .iter() - .filter(|entry| *entry.key() > epoch) - .map(|e| e.value().clone()) - .collect(); - epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); - Ok(epochs) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive) - pub fn get_previous_epochs(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - let mut epochs: Vec = epochs_history - .iter() - .filter(|entry| *entry.key() < epoch) - .map(|e| e.value().clone()) - .collect(); - epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); - Ok(epochs) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Handle Epoch Activity - pub fn handle_epoch_activity( - &self, - _block_info: &BlockInfo, - epoch_activity_message: &EpochActivityMessage, - ) { - let Some(epochs_history) = self.epochs_history.as_ref() else { - return; - }; - let EpochActivityMessage { epoch, .. } = epoch_activity_message; - epochs_history.insert(*epoch, epoch_activity_message.clone()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use acropolis_common::{BlockHash, BlockStatus, Era}; - - fn make_block(epoch: u64) -> BlockInfo { - BlockInfo { - status: BlockStatus::Immutable, - slot: 99, - number: 42, - hash: BlockHash::default(), - epoch, - epoch_slot: 99, - new_epoch: false, - timestamp: 99999, - era: Era::Conway, - } - } - - #[test] - fn epochs_history_is_none_when_store_history_is_false() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false)); - assert!(epochs_history.epochs_history.is_none()); - } - - #[test] - fn epochs_history_is_some_when_store_history_is_true() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - assert!(epochs_history.epochs_history.is_some()); - } - - #[test] - fn handle_epoch_activity_saves_history() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - let block = make_block(200); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 199, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - // Use the public API method - let history = epochs_history - .get_historical_epoch(199) - .expect("history disabled in test") - .expect("epoch history missing"); - assert_eq!(history.total_blocks, 1); - assert_eq!(history.total_fees, 50); - } - - #[test] - fn get_next_previous_epochs_sorts_epochs() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - let block = make_block(200); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 199, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - let block = make_block(201); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 200, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - let next_epochs = epochs_history.get_next_epochs(199).expect("history disabled in test"); - assert_eq!(next_epochs.len(), 1); - assert_eq!(next_epochs[0].epoch, 200); - - let previous_epochs = - epochs_history.get_previous_epochs(201).expect("history disabled in test"); - assert_eq!(previous_epochs.len(), 2); - assert_eq!(previous_epochs[0].epoch, 199); - - let next_epochs = epochs_history.get_next_epochs(200).expect("history disabled in test"); - assert_eq!(next_epochs.len(), 0); - - let previous_epochs = - epochs_history.get_previous_epochs(199).expect("history disabled in test"); - assert_eq!(previous_epochs.len(), 0); - } -} diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index af8a5990..6045d62e 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -1,15 +1,14 @@ //! Acropolis epochs state module for Caryatid //! Unpacks block bodies to get transaction fees -use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::epochs::{ - EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, NextEpochs, - PreviousEpochs, DEFAULT_EPOCHS_QUERY_TOPIC, + EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC, }, + queries::errors::QueryError, state_history::{StateHistory, StateHistoryStore}, - BlockInfo, BlockStatus, Era, + BlockInfo, BlockStatus, }; use anyhow::Result; use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; @@ -19,13 +18,8 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span}; mod epoch_activity_publisher; -mod epochs_history; mod state; -mod store_config; -use crate::{ - epoch_activity_publisher::EpochActivityPublisher, epochs_history::EpochsHistoryState, - store_config::StoreConfig, -}; +use crate::epoch_activity_publisher::EpochActivityPublisher; use state::State; const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -56,7 +50,6 @@ impl EpochsState { /// Run loop async fn run( history: Arc>>, - epochs_history: EpochsHistoryState, mut bootstrapped_subscription: Box>, mut blocks_subscription: Box>, mut block_txs_subscription: Box>, @@ -105,21 +98,14 @@ impl EpochsState { } } - // decode header - // Derive the variant from the era - just enough to make - // MultiEraHeader::decode() work. - let variant = match block_info.era { - Era::Byron => 0, - Era::Shelley => 1, - Era::Allegra => 2, - Era::Mary => 3, - Era::Alonzo => 4, - _ => 5, - }; let span = info_span!("epochs_state.decode_header", block = block_info.number); let mut header = None; span.in_scope(|| { - header = match MultiEraHeader::decode(variant, None, &block_msg.header) { + header = match MultiEraHeader::decode( + block_info.era as u8, + None, + &block_msg.header, + ) { Ok(header) => Some(header), Err(e) => { error!("Can't decode header {}: {e}", block_info.slot); @@ -139,8 +125,6 @@ impl EpochsState { if is_new_epoch { let ea = state.end_epoch(block_info); - // update epochs history - epochs_history.handle_epoch_activity(block_info, &ea); // publish epoch activity message epoch_activity_publisher.publish(block_info, ea).await.unwrap_or_else( |e| error!("Failed to publish epoch activity messages: {e}"), @@ -217,9 +201,6 @@ impl EpochsState { .unwrap_or(DEFAULT_EPOCHS_QUERY_TOPIC.1.to_string()); info!("Creating query handler on '{}'", epochs_query_topic); - // store config - let store_config = StoreConfig::from(config.clone()); - // state history let history = Arc::new(Mutex::new(StateHistory::::new( "epochs_state", @@ -227,10 +208,6 @@ impl EpochsState { ))); let history_query = history.clone(); - // epochs history - let epochs_history = EpochsHistoryState::new(&store_config); - let epochs_history_query = epochs_history.clone(); - // Subscribe let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; @@ -245,7 +222,6 @@ impl EpochsState { // handle epochs query context.handle(&epochs_query_topic, move |message| { let history = history_query.clone(); - let epochs_history = epochs_history_query.clone(); async move { let Message::StateQuery(StateQuery::Epochs(query)) = message.as_ref() else { @@ -264,64 +240,6 @@ impl EpochsState { }) } - EpochsStateQuery::GetEpochInfo { epoch_number } => { - match epochs_history.get_historical_epoch(*epoch_number) { - Ok(Some(epoch_info)) => { - EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch: epoch_info }) - } - Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( - format!("Epoch {}", epoch_number), - )), - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - - EpochsStateQuery::GetNextEpochs { epoch_number } => { - let current_epoch = state.get_epoch_info(); - if *epoch_number > current_epoch.epoch { - EpochsStateQueryResponse::Error(QueryError::not_found(format!( - "Epoch {} is in the future", - epoch_number - ))) - } else { - match epochs_history.get_next_epochs(*epoch_number) { - Ok(mut epochs) => { - // check the current epoch also - if current_epoch.epoch > *epoch_number { - epochs.push(current_epoch); - } - EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs }) - } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - } - - EpochsStateQuery::GetPreviousEpochs { epoch_number } => { - let current_epoch = state.get_epoch_info(); - if *epoch_number > current_epoch.epoch { - EpochsStateQueryResponse::Error(QueryError::not_found(format!( - "Epoch {} is in the future", - epoch_number - ))) - } else { - match epochs_history.get_previous_epochs(*epoch_number) { - Ok(epochs) => { - EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs { - epochs, - }) - } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - } - EpochsStateQuery::GetLatestEpochBlocksMintedByPool { spo_id } => { EpochsStateQueryResponse::LatestEpochBlocksMintedByPool( state.get_latest_epoch_blocks_minted_by_pool(spo_id), @@ -342,7 +260,6 @@ impl EpochsState { context.run(async move { Self::run( history, - epochs_history, bootstrapped_subscription, blocks_subscription, block_txs_subscription, diff --git a/modules/epochs_state/src/store_config.rs b/modules/epochs_state/src/store_config.rs deleted file mode 100644 index d0ef3f33..00000000 --- a/modules/epochs_state/src/store_config.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::sync::Arc; - -use config::Config; - -const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false); - -#[derive(Default, Debug, Clone)] -pub struct StoreConfig { - pub store_history: bool, -} - -impl StoreConfig { - #[allow(dead_code)] - pub fn new(store_history: bool) -> Self { - Self { store_history } - } -} - -impl From> for StoreConfig { - fn from(config: Arc) -> Self { - Self { - store_history: config - .get_bool(DEFAULT_STORE_HISTORY.0) - .unwrap_or(DEFAULT_STORE_HISTORY.1), - } - } -} diff --git a/modules/historical_epochs_state/.gitignore b/modules/historical_epochs_state/.gitignore new file mode 100644 index 00000000..0078d059 --- /dev/null +++ b/modules/historical_epochs_state/.gitignore @@ -0,0 +1 @@ +/db \ No newline at end of file diff --git a/modules/historical_epochs_state/Cargo.toml b/modules/historical_epochs_state/Cargo.toml new file mode 100644 index 00000000..4e72e9ae --- /dev/null +++ b/modules/historical_epochs_state/Cargo.toml @@ -0,0 +1,26 @@ +# Acropolis historical epochs state module + +[package] +name = "acropolis_module_historical_epochs_state" +version = "0.1.0" +edition = "2021" +authors = ["Golddy "] +description = "Historical stake epochs tracker" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +minicbor = { version = "0.26.0", features = ["std", "derive"] } +hex = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +fjall = "2.11.2" +rayon = "1.10.0" + +[lib] +path = "src/historical_epochs_state.rs" diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs new file mode 100644 index 00000000..31a38bb1 --- /dev/null +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -0,0 +1,282 @@ +//! Acropolis historical epochs state module for Caryatid +//! Manages optional state data needed for Blockfrost alignment + +use crate::immutable_historical_epochs_state::ImmutableHistoricalEpochsState; +use crate::state::{HistoricalEpochsStateConfig, State}; +use acropolis_common::messages::StateQuery; +use acropolis_common::queries::epochs::{ + EpochInfo, EpochsStateQuery, NextEpochs, PreviousEpochs, DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC, +}; +use acropolis_common::{ + messages::{CardanoMessage, Message, StateQueryResponse}, + queries::epochs::EpochsStateQueryResponse, + queries::errors::QueryError, + BlockInfo, BlockStatus, +}; +use anyhow::Result; +use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; +use config::Config; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; +use tracing::{error, info, info_span, Instrument}; +mod immutable_historical_epochs_state; +mod state; +mod volatile_historical_epochs_state; + +// Configuration defaults +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = + ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); +const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = + ("parameters-subscribe-topic", "cardano.protocol.parameters"); + +const DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH: (&str, &str) = ("db-path", "./db"); + +/// Historical Epochs State module +#[module( + message_type(Message), + name = "historical-epochs-state", + description = "Historical epochs state for Blockfrost compatibility" +)] +pub struct HistoricalEpochsState; + +impl HistoricalEpochsState { + /// Async run loop + async fn run( + state_mutex: Arc>, + mut blocks_subscription: Box>, + mut epoch_activity_subscription: Box>, + mut params_subscription: Box>, + ) -> Result<()> { + let _ = params_subscription.read().await?; + info!("Consumed initial genesis params from params_subscription"); + + // Background task to persist epoch sequentially + const MAX_PENDING_PERSISTS: usize = 1; + let (persist_tx, mut persist_rx) = + mpsc::channel::<(u64, Arc)>(MAX_PENDING_PERSISTS); + tokio::spawn(async move { + while let Some((epoch, store)) = persist_rx.recv().await { + if let Err(e) = store.persist_epoch(epoch).await { + error!("failed to persist epoch {epoch}: {e}"); + } + } + }); + + // Main loop of synchronised messages + loop { + let mut current_block: Option = None; + + // Use certs_message as the synchroniser + let (_, blocks_message) = blocks_subscription.read().await?; + let new_epoch = match blocks_message.as_ref() { + Message::Cardano((block_info, _)) => { + // Handle rollbacks on this topic only + let mut state = state_mutex.lock().await; + if block_info.status == BlockStatus::RolledBack { + state.volatile.rollback_before(block_info.number); + } + + current_block = Some(block_info.clone()); + block_info.new_epoch && block_info.epoch > 0 + } + _ => false, + }; + + // Read from epoch-boundary messages only when it's a new epoch + if new_epoch { + let params_message_f = params_subscription.read(); + let epoch_activity_message_f = epoch_activity_subscription.read(); + + let (_, params_msg) = params_message_f.await?; + match params_msg.as_ref() { + Message::Cardano((block_info, CardanoMessage::ProtocolParams(params))) => { + let span = info_span!( + "historical_epochs_state.handle_params", + epoch = block_info.epoch + ); + async { + Self::check_sync(¤t_block, block_info); + let mut state = state_mutex.lock().await; + if let Some(shelley) = ¶ms.params.shelley { + state.volatile.update_k(shelley.security_param); + } + } + .instrument(span) + .await; + } + _ => error!("Unexpected message type: {params_msg:?}"), + } + + let (_, epoch_activity_msg) = epoch_activity_message_f.await?; + match epoch_activity_msg.as_ref() { + Message::Cardano((block_info, CardanoMessage::EpochActivity(ea))) => { + let span = info_span!( + "historical_epochs_state.handle_epoch_activity", + epoch = block_info.epoch + ); + async { + Self::check_sync(¤t_block, block_info); + let mut state = state_mutex.lock().await; + state.volatile.handle_new_epoch(block_info, ea); + } + .instrument(span) + .await; + } + _ => error!("Unexpected message type: {epoch_activity_msg:?}"), + } + } + + // Prune volatile and persist if needed + if let Some(current_block) = current_block { + let should_prune = { + let state = state_mutex.lock().await; + state.ready_to_prune(¤t_block) + }; + + if should_prune { + let immutable = { + let mut state = state_mutex.lock().await; + state.prune_volatile().await; + state.immutable.clone() + }; + + if let Err(e) = persist_tx.send((current_block.epoch - 1, immutable)).await { + panic!("persistence worker crashed: {e}"); + } + } + } + } + } + + /// Check for synchronisation + fn check_sync(expected: &Option, actual: &BlockInfo) { + if let Some(ref block) = expected { + if block.number != actual.number { + error!( + expected = block.number, + actual = actual.number, + "Messages out of sync" + ); + } + } + } + + /// Async initialisation + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + + // Subscription topics + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating blocks subscriber on '{blocks_subscribe_topic}'"); + + let epoch_activity_subscribe_topic = config + .get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating epoch activity subscriber on '{epoch_activity_subscribe_topic}'"); + + let params_subscribe_topic = config + .get_string(DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating parameters subscriber on '{params_subscribe_topic}'"); + + // Query topic + let historical_epochs_query_topic = config + .get_string(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.1.to_string()); + info!("Creating query handler on '{historical_epochs_query_topic}'"); + + // Configuration + let config = HistoricalEpochsStateConfig { + db_path: config + .get_string(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.1.to_string()), + }; + + // Initalize state + let state = State::new(&config).await?; + let state_mutex = Arc::new(Mutex::new(state)); + let state_query = state_mutex.clone(); + + context.handle(&historical_epochs_query_topic, move |message| { + let state = state_query.clone(); + async move { + let Message::StateQuery(StateQuery::Epochs(query)) = message.as_ref() else { + return Arc::new(Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(QueryError::internal_error( + "Invalid message for epochs-state", + )), + ))); + }; + + let response = match query { + EpochsStateQuery::GetEpochInfo { epoch_number } => { + match state.lock().await.get_historical_epoch(*epoch_number) { + Ok(Some(epoch_info)) => { + EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch: epoch_info }) + } + Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( + format!("Epoch {}", epoch_number), + )), + Err(_) => EpochsStateQueryResponse::Error( + QueryError::storage_disabled("historical epoch"), + ), + } + } + + EpochsStateQuery::GetNextEpochs { epoch_number } => { + match state.lock().await.get_next_epochs(*epoch_number) { + Ok(epochs) => { + EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs }) + } + Err(_) => EpochsStateQueryResponse::Error( + QueryError::storage_disabled("historical epoch"), + ), + } + } + + EpochsStateQuery::GetPreviousEpochs { epoch_number } => { + match state.lock().await.get_previous_epochs(*epoch_number) { + Ok(epochs) => { + EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs { epochs }) + } + Err(_) => EpochsStateQueryResponse::Error( + QueryError::storage_disabled("historical epoch"), + ), + } + } + + _ => EpochsStateQueryResponse::Error(QueryError::not_implemented(format!( + "Unimplemented query variant: {query:?}" + ))), + }; + Arc::new(Message::StateQueryResponse(StateQueryResponse::Epochs( + response, + ))) + } + }); + + // Subscribe + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let epoch_activity_subscription = + context.subscribe(&epoch_activity_subscribe_topic).await?; + let params_subscription = context.subscribe(¶ms_subscribe_topic).await?; + + // Start run task + context.run(async move { + Self::run( + state_mutex, + blocks_subscription, + epoch_activity_subscription, + params_subscription, + ) + .await + .unwrap_or_else(|e| error!("Failed: {e}")); + }); + + Ok(()) + } +} diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs new file mode 100644 index 00000000..9b077ed7 --- /dev/null +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -0,0 +1,103 @@ +use std::path::Path; + +use acropolis_common::messages::EpochActivityMessage; +use anyhow::Result; +use fjall::{Keyspace, Partition, PartitionCreateOptions, PersistMode}; +use minicbor::{decode, to_vec}; +use tokio::sync::Mutex; +use tracing::{error, info}; + +pub struct ImmutableHistoricalEpochsState { + epochs_history: Partition, + keyspace: Keyspace, + pub pending: Mutex>, +} + +impl ImmutableHistoricalEpochsState { + pub fn new(path: impl AsRef) -> Result { + let cfg = fjall::Config::new(path) + // 4MB write buffer since EpochActivityMessage is not that big + .max_write_buffer_size(4 * 1024 * 1024) + .temporary(true) + // Enable manual control of flushing + // We store EpochActivityMessage only every 5 days (need manual Flush) + .manual_journal_persist(true); + let keyspace = Keyspace::open(cfg)?; + + let epochs_history = + keyspace.open_partition("epochs_history", PartitionCreateOptions::default())?; + + Ok(Self { + epochs_history, + keyspace, + pending: Mutex::new(Vec::new()), + }) + } + + pub async fn update_immutable(&self, ea: EpochActivityMessage) { + let mut pending = self.pending.lock().await; + pending.push(ea); + } + + /// Persists pending EpochActivityMessages for epoch N + /// Returns the number of persisted EpochActivityMessages + /// Errors if the batch commit or persist fails + pub async fn persist_epoch(&self, epoch: u64) -> Result { + let drained_epochs = { + let mut pending = self.pending.lock().await; + std::mem::take(&mut *pending) + }; + + let mut batch = self.keyspace.batch(); + let mut persisted_epochs: u32 = 0; + + for ea in drained_epochs { + let epoch_key = Self::make_epoch_key(ea.epoch); + batch.insert(&self.epochs_history, epoch_key, to_vec(&ea)?); + persisted_epochs += 1; + } + + if let Err(e) = batch.commit() { + error!("batch commit failed for epoch {epoch}: {e}"); + return Err(e.into()); + } + + if let Err(e) = self.keyspace.persist(PersistMode::Buffer) { + error!("persist failed for epoch {epoch}: {e}"); + return Err(e.into()); + } + + info!("persisted {persisted_epochs} epochs for epoch {epoch}"); + Ok(persisted_epochs) + } + + pub fn get_historical_epoch(&self, epoch: u64) -> Result> { + let epoch_key = Self::make_epoch_key(epoch); + let slice = self.epochs_history.get(epoch_key)?; + if let Some(slice) = slice.as_ref() { + let decoded: EpochActivityMessage = decode(slice)?; + Ok(Some(decoded)) + } else { + Ok(None) + } + } + + pub fn get_epochs( + &self, + range: std::ops::RangeInclusive, + ) -> Result> { + let mut epochs: Vec = Vec::new(); + for epoch in range { + let slice = self.epochs_history.get(Self::make_epoch_key(epoch))?; + if let Some(slice) = slice.as_ref() { + let decoded: EpochActivityMessage = decode(slice)?; + epochs.push(decoded); + } + } + Ok(epochs) + } + + fn make_epoch_key(epoch: u64) -> [u8; 8] { + epoch.to_be_bytes() + } +} diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs new file mode 100644 index 00000000..35f8f9c0 --- /dev/null +++ b/modules/historical_epochs_state/src/state.rs @@ -0,0 +1,106 @@ +use crate::{ + immutable_historical_epochs_state::ImmutableHistoricalEpochsState, + volatile_historical_epochs_state::VolatileHistoricalEpochsState, +}; +use acropolis_common::{messages::EpochActivityMessage, BlockInfo}; +use anyhow::Result; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; + +#[derive(Debug, Clone)] +pub struct HistoricalEpochsStateConfig { + pub db_path: String, +} + +/// Overall state - stored per epoch +#[derive(Clone)] +pub struct State { + pub immutable: Arc, + pub volatile: VolatileHistoricalEpochsState, +} + +impl State { + pub async fn new(config: &HistoricalEpochsStateConfig) -> Result { + let db_path = if Path::new(&config.db_path).is_relative() { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) + } else { + PathBuf::from(&config.db_path) + }; + + let immutable = Arc::new(ImmutableHistoricalEpochsState::new(&db_path)?); + + Ok(Self { + volatile: VolatileHistoricalEpochsState::new(), + immutable, + }) + } + + pub async fn prune_volatile(&mut self) { + let drained = self.volatile.prune_volatile(); + if let Some(ea) = drained { + self.immutable.update_immutable(ea).await; + } + } + + pub fn ready_to_prune(&self, block_info: &BlockInfo) -> bool { + block_info.epoch > 0 + && Some(block_info.epoch - 1) != self.volatile.last_persisted_epoch + && block_info.number > self.volatile.block_number + self.volatile.security_param_k + } + + pub fn get_historical_epoch(&self, epoch: u64) -> Result> { + if let Some(last_persisted_epoch) = self.volatile.last_persisted_epoch { + if epoch <= last_persisted_epoch { + return self.immutable.get_historical_epoch(epoch); + } + } + + Ok(self.volatile.get_volatile_epoch(epoch)) + } + + pub fn get_next_epochs(&self, epoch: u64) -> Result> { + let mut epochs = vec![]; + let immutable_epochs_rage = + self.volatile.last_persisted_epoch.and_then(|last_persisted_epoch| { + if last_persisted_epoch > epoch { + Some(epoch + 1..=last_persisted_epoch) + } else { + None + } + }); + + if let Some(immutable_epochs_rage) = immutable_epochs_rage { + epochs.extend(self.immutable.get_epochs(immutable_epochs_rage)?); + } + + if let Some(volatile_epoch) = self.volatile.get_volatile_epoch(epoch) { + if volatile_epoch.epoch > epoch { + epochs.push(volatile_epoch); + } + } + epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + Ok(epochs) + } + + pub fn get_previous_epochs(&self, epoch: u64) -> Result> { + let mut epochs = vec![]; + let immutable_epochs_rage = self + .volatile + .last_persisted_epoch + .map(|last_persisted_epoch| 0..=last_persisted_epoch.min(epoch - 1)); + + if let Some(immutable_epochs_rage) = immutable_epochs_rage { + epochs.extend(self.immutable.get_epochs(immutable_epochs_rage)?); + } + + if let Some(volatile_epoch) = self.volatile.get_volatile_epoch(epoch) { + if volatile_epoch.epoch < epoch { + epochs.push(volatile_epoch); + } + } + epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + Ok(epochs) + } +} diff --git a/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs new file mode 100644 index 00000000..ecc3c1e8 --- /dev/null +++ b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs @@ -0,0 +1,58 @@ +use acropolis_common::{messages::EpochActivityMessage, BlockInfo}; + +#[derive(Default, Debug, Clone)] +pub struct VolatileHistoricalEpochsState { + pub block_number: u64, + + pub volatile_ea: Option, + + pub last_persisted_epoch: Option, + + pub security_param_k: u64, +} + +impl VolatileHistoricalEpochsState { + pub fn new() -> Self { + Self { + block_number: 0, + volatile_ea: None, + last_persisted_epoch: None, + security_param_k: 0, + } + } + + pub fn rollback_before(&mut self, rollbacked_block: u64) -> Option { + if self.block_number >= rollbacked_block { + std::mem::take(&mut self.volatile_ea) + } else { + None + } + } + + pub fn handle_new_epoch(&mut self, block_info: &BlockInfo, ea: &EpochActivityMessage) { + self.block_number = block_info.number; + self.volatile_ea = Some(ea.clone()); + } + + pub fn prune_volatile(&mut self) -> Option { + if let Some(ea) = self.volatile_ea.as_ref() { + self.last_persisted_epoch = Some(ea.epoch); + std::mem::take(&mut self.volatile_ea) + } else { + None + } + } + + pub fn update_k(&mut self, k: u32) { + self.security_param_k = k as u64; + } + + pub fn get_volatile_epoch(&self, epoch: u64) -> Option { + if let Some(ea) = self.volatile_ea.as_ref() { + if ea.epoch == epoch { + return Some(ea.clone()); + } + } + None + } +} diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index b62b6791..cd0efadd 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -35,31 +35,38 @@ pub async fn handle_epoch_info_blockfrost( let param = ¶ms[0]; // query to get latest epoch or epoch info - let query = if param == "latest" { - EpochsStateQuery::GetLatestEpoch + let (query_topic, query) = if param == "latest" { + ( + handlers_config.epochs_query_topic.as_ref(), + EpochsStateQuery::GetLatestEpoch, + ) } else { let parsed = param .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; - EpochsStateQuery::GetEpochInfo { - epoch_number: parsed, - } + ( + handlers_config.historical_epochs_query_topic.as_ref(), + EpochsStateQuery::GetEpochInfo { + epoch_number: parsed, + }, + ) }; // Get the current epoch number from epochs-state let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs(query))); - let epoch_info_response = query_state( - &context, - &handlers_config.epochs_query_topic, - epoch_info_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Epochs(response)) => Ok(response), - _ => Err(QueryError::internal_error( - "Unexpected message type while retrieving latest epoch", - )), - }, - ) - .await?; + let epoch_info_response = + query_state( + &context, + query_topic, + epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs(response)) => Ok(response), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; let ea_message = match epoch_info_response { EpochsStateQueryResponse::LatestEpoch(response) => response.epoch, @@ -246,22 +253,51 @@ pub async fn handle_epoch_next_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + let lastest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let lastest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + lastest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + if parsed > lastest_epoch.epoch { + return Err(RESTError::not_found( + format!("Epoch {parsed} is in the future").as_str(), + )); + } + + if parsed == lastest_epoch.epoch { + return Ok(RESTResponse::with_json(200, "[]")); + } + let next_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetNextEpochs { epoch_number: parsed, }, ))); - let next_epochs = query_state( + + let mut next_epochs = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.historical_epochs_query_topic, next_epochs_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::NextEpochs(response), )) => Ok(response.epochs.into_iter().map(EpochActivityRest::from).collect::>()), - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), - )) => Err(QueryError::not_found("Epoch")), Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::Error(e), )) => Err(e), @@ -271,6 +307,7 @@ pub async fn handle_epoch_next_blockfrost( }, ) .await?; + next_epochs.push(EpochActivityRest::from(lastest_epoch)); let json = serde_json::to_string_pretty(&next_epochs)?; Ok(RESTResponse::with_json(200, &json)) @@ -292,22 +329,44 @@ pub async fn handle_epoch_previous_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + let lastest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let lastest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + lastest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + if parsed > lastest_epoch.epoch { + return Err(RESTError::not_found("Epoch not found")); + } + let previous_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetPreviousEpochs { epoch_number: parsed, }, ))); - let previous_epochs = query_state( + let mut previous_epochs = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.historical_epochs_query_topic, previous_epochs_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::PreviousEpochs(response), )) => Ok(response.epochs.into_iter().map(EpochActivityRest::from).collect::>()), - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), - )) => Err(QueryError::not_found("Epoch")), Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::Error(e), )) => Err(e), @@ -318,6 +377,10 @@ pub async fn handle_epoch_previous_blockfrost( ) .await?; + if lastest_epoch.epoch < parsed { + previous_epochs.push(EpochActivityRest::from(lastest_epoch)); + } + let json = serde_json::to_string_pretty(&previous_epochs)?; Ok(RESTResponse::with_json(200, &json)) } diff --git a/modules/rest_blockfrost/src/handlers_config.rs b/modules/rest_blockfrost/src/handlers_config.rs index 837bfb06..4a49c715 100644 --- a/modules/rest_blockfrost/src/handlers_config.rs +++ b/modules/rest_blockfrost/src/handlers_config.rs @@ -5,7 +5,7 @@ use acropolis_common::queries::{ addresses::DEFAULT_ADDRESS_QUERY_TOPIC, assets::{DEFAULT_ASSETS_QUERY_TOPIC, DEFAULT_OFFCHAIN_TOKEN_REGISTRY_URL}, blocks::DEFAULT_BLOCKS_QUERY_TOPIC, - epochs::DEFAULT_EPOCHS_QUERY_TOPIC, + epochs::{DEFAULT_EPOCHS_QUERY_TOPIC, DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC}, governance::{DEFAULT_DREPS_QUERY_TOPIC, DEFAULT_GOVERNANCE_QUERY_TOPIC}, parameters::DEFAULT_PARAMETERS_QUERY_TOPIC, pools::DEFAULT_POOLS_QUERY_TOPIC, @@ -27,6 +27,7 @@ pub struct HandlersConfig { pub dreps_query_topic: String, pub governance_query_topic: String, pub epochs_query_topic: String, + pub historical_epochs_query_topic: String, pub spdd_query_topic: String, pub parameters_query_topic: String, pub utxos_query_topic: String, @@ -72,6 +73,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_EPOCHS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_EPOCHS_QUERY_TOPIC.1.to_string()); + let historical_epochs_query_topic = config + .get_string(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.1.to_string()); + let parameters_query_topic = config .get_string(DEFAULT_PARAMETERS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_PARAMETERS_QUERY_TOPIC.1.to_string()); @@ -102,6 +107,7 @@ impl From> for HandlersConfig { dreps_query_topic, governance_query_topic, epochs_query_topic, + historical_epochs_query_topic, spdd_query_topic, parameters_query_topic, utxos_query_topic, diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 1013fe22..8accbbea 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -31,6 +31,7 @@ acropolis_module_chain_store = { path = "../../modules/chain_store" } acropolis_module_address_state = { path = "../../modules/address_state" } acropolis_module_consensus = { path = "../../modules/consensus" } acropolis_module_historical_accounts_state = { path = "../../modules/historical_accounts_state" } +acropolis_module_historical_epochs_state = { path = "../../modules/historical_epochs_state" } acropolis_module_block_vrf_validator = { path = "../../modules/block_vrf_validator" } caryatid_process = { workspace = true } diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 4cc6104e..2efa8675 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -21,6 +21,7 @@ use acropolis_module_epochs_state::EpochsState; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_governance_state::GovernanceState; use acropolis_module_historical_accounts_state::HistoricalAccountsState; +use acropolis_module_historical_epochs_state::HistoricalEpochsState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; use acropolis_module_peer_network_interface::PeerNetworkInterface; @@ -117,6 +118,7 @@ pub async fn main() -> Result<()> { AddressState::register(&mut process); AssetsState::register(&mut process); HistoricalAccountsState::register(&mut process); + HistoricalEpochsState::register(&mut process); BlockfrostREST::register(&mut process); SPDDState::register(&mut process); DRDDState::register(&mut process); From acd725cf38ad572b25b70bbaabd5c8624766bd06 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 20:10:37 +0100 Subject: [PATCH 02/18] fix: handle mint for byron blocks --- modules/epochs_state/src/epochs_state.rs | 4 +-- modules/epochs_state/src/state.rs | 29 ++++++++++--------- .../src/historical_epochs_state.rs | 4 +-- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 6045d62e..11ec826a 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -134,9 +134,7 @@ impl EpochsState { let span = info_span!("epochs_state.handle_mint", block = block_info.number); span.in_scope(|| { if let Some(header) = header.as_ref() { - if let Some(issuer_vkey) = header.issuer_vkey() { - state.handle_mint(block_info, issuer_vkey); - } + state.handle_mint(block_info, header.issuer_vkey()); } }); } diff --git a/modules/epochs_state/src/state.rs b/modules/epochs_state/src/state.rs index 08002465..802b4179 100644 --- a/modules/epochs_state/src/state.rs +++ b/modules/epochs_state/src/state.rs @@ -183,14 +183,15 @@ impl State { // Handle mint // This will update last block time - pub fn handle_mint(&mut self, block_info: &BlockInfo, issuer_vkey: &[u8]) { + pub fn handle_mint(&mut self, block_info: &BlockInfo, issuer_vkey: Option<&[u8]>) { self.last_block_time = block_info.timestamp; self.last_block_height = block_info.number; self.epoch_blocks += 1; - let spo_id = PoolId::from(keyhash_224(issuer_vkey)); - - // Count one on this hash - *(self.blocks_minted.entry(spo_id).or_insert(0)) += 1; + if let Some(issuer_vkey) = issuer_vkey { + let spo_id = PoolId::from(keyhash_224(issuer_vkey)); + // Count one on this hash + *(self.blocks_minted.entry(spo_id).or_insert(0)) += 1; + } } // Handle Block Txs @@ -329,9 +330,9 @@ mod tests { let mut state = State::new(&GenesisValues::mainnet()); let issuer = b"issuer_key"; let mut block = make_block(100); - state.handle_mint(&block, issuer); + state.handle_mint(&block, Some(issuer)); block.number += 1; - state.handle_mint(&block, issuer); + state.handle_mint(&block, Some(issuer)); assert_eq!(state.epoch_blocks, 2); assert_eq!(state.blocks_minted.len(), 1); @@ -345,11 +346,11 @@ mod tests { fn handle_mint_multiple_issuer_records_counts() { let mut state = State::new(&GenesisValues::mainnet()); let mut block = make_block(100); - state.handle_mint(&block, b"issuer_1"); + state.handle_mint(&block, Some(b"issuer_1")); block.number += 1; - state.handle_mint(&block, b"issuer_2"); + state.handle_mint(&block, Some(b"issuer_2")); block.number += 1; - state.handle_mint(&block, b"issuer_2"); + state.handle_mint(&block, Some(b"issuer_2")); assert_eq!(state.epoch_blocks, 3); assert_eq!(state.blocks_minted.len(), 2); @@ -408,7 +409,7 @@ mod tests { let genesis = GenesisValues::mainnet(); let mut state = State::new(&genesis); let block = make_block(1); - state.handle_mint(&block, b"issuer_1"); + state.handle_mint(&block, Some(b"issuer_1")); state.handle_block_txs( &block, &BlockTxsMessage { @@ -464,7 +465,7 @@ mod tests { ))); let mut state = history.lock().await.get_current_state(); let mut block = make_block(1); - state.handle_mint(&block, b"issuer_1"); + state.handle_mint(&block, Some(b"issuer_1")); state.handle_block_txs( &block, &BlockTxsMessage { @@ -477,7 +478,7 @@ mod tests { let mut state = history.lock().await.get_current_state(); block.number += 1; - state.handle_mint(&block, b"issuer_1"); + state.handle_mint(&block, Some(b"issuer_1")); state.handle_block_txs( &block, &BlockTxsMessage { @@ -494,7 +495,7 @@ mod tests { block = make_rolled_back_block(0); let mut state = history.lock().await.get_rolled_back_state(block.number); - state.handle_mint(&block, b"issuer_2"); + state.handle_mint(&block, Some(b"issuer_2")); state.handle_block_txs( &block, &BlockTxsMessage { diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 31a38bb1..87f83bd0 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -215,8 +215,8 @@ impl HistoricalEpochsState { let response = match query { EpochsStateQuery::GetEpochInfo { epoch_number } => { match state.lock().await.get_historical_epoch(*epoch_number) { - Ok(Some(epoch_info)) => { - EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch: epoch_info }) + Ok(Some(epoch)) => { + EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch }) } Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( format!("Epoch {}", epoch_number), From 276ade5f9dd7e409e9ce4b1e0b94e2475bf355e9 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 20:20:04 +0100 Subject: [PATCH 03/18] fix: publish epoch activity message before evolving nonce --- modules/epochs_state/src/epochs_state.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index 11ec826a..128c058c 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -96,6 +96,12 @@ impl EpochsState { { state.handle_protocol_parameters(params); } + + let ea = state.end_epoch(block_info); + // publish epoch activity message + epoch_activity_publisher.publish(block_info, ea).await.unwrap_or_else( + |e| error!("Failed to publish epoch activity messages: {e}"), + ); } let span = info_span!("epochs_state.decode_header", block = block_info.number); @@ -123,14 +129,6 @@ impl EpochsState { } }); - if is_new_epoch { - let ea = state.end_epoch(block_info); - // publish epoch activity message - epoch_activity_publisher.publish(block_info, ea).await.unwrap_or_else( - |e| error!("Failed to publish epoch activity messages: {e}"), - ); - } - let span = info_span!("epochs_state.handle_mint", block = block_info.number); span.in_scope(|| { if let Some(header) = header.as_ref() { From f3df74d6db6e8248f557912cd7e3d87e0b6a27a9 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 20:48:25 +0100 Subject: [PATCH 04/18] fix: epoch info endpoint to return latest epoch even when param is not latest but number --- .../rest_blockfrost/src/handlers/epochs.rs | 106 ++++++++++-------- 1 file changed, 60 insertions(+), 46 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index cd0efadd..fdf3e224 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -34,60 +34,74 @@ pub async fn handle_epoch_info_blockfrost( } let param = ¶ms[0]; - // query to get latest epoch or epoch info - let (query_topic, query) = if param == "latest" { - ( - handlers_config.epochs_query_topic.as_ref(), - EpochsStateQuery::GetLatestEpoch, - ) + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + let (is_latest, mut response) = if param == "latest" { + (true, EpochActivityRest::from(latest_epoch)) } else { let parsed = param .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; - ( - handlers_config.historical_epochs_query_topic.as_ref(), - EpochsStateQuery::GetEpochInfo { - epoch_number: parsed, - }, - ) - }; - // Get the current epoch number from epochs-state - let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs(query))); - let epoch_info_response = - query_state( - &context, - query_topic, - epoch_info_msg, - |message| match message { - Message::StateQueryResponse(StateQueryResponse::Epochs(response)) => Ok(response), - _ => Err(QueryError::internal_error( - "Unexpected message type while retrieving latest epoch", - )), - }, - ) - .await?; - - let ea_message = match epoch_info_response { - EpochsStateQueryResponse::LatestEpoch(response) => response.epoch, - EpochsStateQueryResponse::EpochInfo(response) => response.epoch, - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }) => { + if parsed > latest_epoch.epoch { return Err(RESTError::not_found("Epoch not found")); } - EpochsStateQueryResponse::Error(e) => { - return Err(e.into()); - } - _ => { - return Err(RESTError::unexpected_response( - "Unexpected message type while retrieving epoch info", - )); + + if parsed == latest_epoch.epoch { + (true, EpochActivityRest::from(latest_epoch)) + } else { + let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetEpochInfo { + epoch_number: parsed, + }, + ))); + let epoch_info = query_state( + &context, + &handlers_config.historical_epochs_query_topic, + epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::EpochInfo(response), + )) => Ok(EpochActivityRest::from(response.epoch)), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), + )) => Err(QueryError::not_found("Epoch not found")), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving epoch info", + )), + }, + ) + .await?; + (false, epoch_info) } }; - let epoch_number = ea_message.epoch; // For the latest epoch, query accounts-state for the stake pool delegation distribution (SPDD) // Otherwise, fall back to SPDD module to fetch historical epoch totals - let total_active_stakes: u64 = if param == "latest" { + let epoch_number = response.epoch; + let total_active_stakes: u64 = if is_latest { let total_active_stakes_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetActiveStakes {}, ))); @@ -134,8 +148,6 @@ pub async fn handle_epoch_info_blockfrost( .await? }; - let mut response = EpochActivityRest::from(ea_message); - if total_active_stakes == 0 { response.active_stake = None; } else { @@ -276,7 +288,7 @@ pub async fn handle_epoch_next_blockfrost( if parsed > lastest_epoch.epoch { return Err(RESTError::not_found( - format!("Epoch {parsed} is in the future").as_str(), + format!("Epoch {parsed} not found").as_str(), )); } @@ -351,7 +363,9 @@ pub async fn handle_epoch_previous_blockfrost( .await?; if parsed > lastest_epoch.epoch { - return Err(RESTError::not_found("Epoch not found")); + return Err(RESTError::not_found( + format!("Epoch {parsed} not found").as_str(), + )); } let previous_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( From 9403dddc3de93a790b9d7f07b899efdc1db290cb Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 20:52:31 +0100 Subject: [PATCH 05/18] fix: cargo shear --- Cargo.lock | 3 --- modules/epochs_state/Cargo.toml | 1 - modules/historical_epochs_state/Cargo.toml | 2 -- processes/omnibus/omnibus.toml | 2 ++ 4 files changed, 2 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f86371b5..7962c566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,7 +206,6 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", - "dashmap", "hex", "imbl", "pallas 0.33.0", @@ -273,9 +272,7 @@ dependencies = [ "caryatid_sdk", "config", "fjall", - "hex", "minicbor 0.26.5", - "rayon", "tokio", "tracing", ] diff --git a/modules/epochs_state/Cargo.toml b/modules/epochs_state/Cargo.toml index cbba7d29..d6340659 100644 --- a/modules/epochs_state/Cargo.toml +++ b/modules/epochs_state/Cargo.toml @@ -15,7 +15,6 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } -dashmap = { workspace = true } hex = { workspace = true } imbl = { workspace = true } pallas = { workspace = true } diff --git a/modules/historical_epochs_state/Cargo.toml b/modules/historical_epochs_state/Cargo.toml index 4e72e9ae..58a8d0f8 100644 --- a/modules/historical_epochs_state/Cargo.toml +++ b/modules/historical_epochs_state/Cargo.toml @@ -16,11 +16,9 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } minicbor = { version = "0.26.0", features = ["std", "derive"] } -hex = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } fjall = "2.11.2" -rayon = "1.10.0" [lib] path = "src/historical_epochs_state.rs" diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 7de217e1..87fba395 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -73,6 +73,8 @@ store-mir-history = false store-withdrawal-history = false store-addresses = false +[module.historical-epochs-state] + [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) store-info = false From 99c8a1e7c6b1aba49e165e1c7f586e7eb0c5738b Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 20:58:47 +0100 Subject: [PATCH 06/18] fix: space indent --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8d543215..43cd7f58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ members = [ "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns "modules/historical_accounts_state", # Tracks historical account information - "modules/historical_epochs_state", # Tracks historical epochs information + "modules/historical_epochs_state", # Tracks historical epochs information "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs "modules/tx_submitter", # Submits TXs to peers From 8514496829ae3a755d6570158cb71cc6697700b2 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Sun, 16 Nov 2025 21:02:26 +0100 Subject: [PATCH 07/18] fix: typo --- modules/historical_epochs_state/src/state.rs | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs index 35f8f9c0..817f5c9c 100644 --- a/modules/historical_epochs_state/src/state.rs +++ b/modules/historical_epochs_state/src/state.rs @@ -62,7 +62,7 @@ impl State { pub fn get_next_epochs(&self, epoch: u64) -> Result> { let mut epochs = vec![]; - let immutable_epochs_rage = + let immutable_epochs_range = self.volatile.last_persisted_epoch.and_then(|last_persisted_epoch| { if last_persisted_epoch > epoch { Some(epoch + 1..=last_persisted_epoch) @@ -71,13 +71,13 @@ impl State { } }); - if let Some(immutable_epochs_rage) = immutable_epochs_rage { - epochs.extend(self.immutable.get_epochs(immutable_epochs_rage)?); + if let Some(immutable_epochs_range) = immutable_epochs_range { + epochs.extend(self.immutable.get_epochs(immutable_epochs_range)?); } - if let Some(volatile_epoch) = self.volatile.get_volatile_epoch(epoch) { - if volatile_epoch.epoch > epoch { - epochs.push(volatile_epoch); + if let Some(volatile_ea) = self.volatile.volatile_ea.as_ref() { + if volatile_ea.epoch > epoch { + epochs.push(volatile_ea.clone()); } } epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); @@ -86,18 +86,18 @@ impl State { pub fn get_previous_epochs(&self, epoch: u64) -> Result> { let mut epochs = vec![]; - let immutable_epochs_rage = self + let immutable_epochs_range = self .volatile .last_persisted_epoch .map(|last_persisted_epoch| 0..=last_persisted_epoch.min(epoch - 1)); - if let Some(immutable_epochs_rage) = immutable_epochs_rage { - epochs.extend(self.immutable.get_epochs(immutable_epochs_rage)?); + if let Some(immutable_epochs_range) = immutable_epochs_range { + epochs.extend(self.immutable.get_epochs(immutable_epochs_range)?); } - if let Some(volatile_epoch) = self.volatile.get_volatile_epoch(epoch) { - if volatile_epoch.epoch < epoch { - epochs.push(volatile_epoch); + if let Some(volatile_ea) = self.volatile.volatile_ea.as_ref() { + if volatile_ea.epoch < epoch { + epochs.push(volatile_ea.clone()); } } epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); From 91f5f1fe04711bea99a9bf3424da2ca64b446aa2 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 17 Nov 2025 15:22:38 +0100 Subject: [PATCH 08/18] tests: add test cases for historical epochs state and add rc features to common serde crate --- common/Cargo.toml | 2 +- common/src/messages.rs | 8 +- modules/historical_epochs_state/.gitignore | 3 +- .../src/historical_epochs_state.rs | 2 +- modules/historical_epochs_state/src/state.rs | 100 +++++++++++++++++- .../src/volatile_historical_epochs_state.rs | 48 +++++++++ 6 files changed, 158 insertions(+), 5 deletions(-) diff --git a/common/Cargo.toml b/common/Cargo.toml index 4f44f198..fc5e948d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -26,7 +26,7 @@ hex = { workspace = true } memmap2 = "0.9" num-rational = { version = "0.4.2", features = ["serde"] } regex = "1" -serde = { workspace = true } +serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } tempfile = "3" diff --git a/common/src/messages.rs b/common/src/messages.rs index f0c59948..d8108bf3 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -143,7 +143,13 @@ pub struct BlockTxsMessage { /// Epoch activity - sent at end of epoch #[derive( - Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, + Debug, + Clone, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, + PartialEq, )] pub struct EpochActivityMessage { /// Epoch which has ended diff --git a/modules/historical_epochs_state/.gitignore b/modules/historical_epochs_state/.gitignore index 0078d059..38f607bc 100644 --- a/modules/historical_epochs_state/.gitignore +++ b/modules/historical_epochs_state/.gitignore @@ -1 +1,2 @@ -/db \ No newline at end of file +db/ +test_db/ diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 87f83bd0..1e75acba 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -197,7 +197,7 @@ impl HistoricalEpochsState { }; // Initalize state - let state = State::new(&config).await?; + let state = State::new(&config)?; let state_mutex = Arc::new(Mutex::new(state)); let state_query = state_mutex.clone(); diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs index 817f5c9c..8465b0ed 100644 --- a/modules/historical_epochs_state/src/state.rs +++ b/modules/historical_epochs_state/src/state.rs @@ -22,7 +22,7 @@ pub struct State { } impl State { - pub async fn new(config: &HistoricalEpochsStateConfig) -> Result { + pub fn new(config: &HistoricalEpochsStateConfig) -> Result { let db_path = if Path::new(&config.db_path).is_relative() { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) } else { @@ -104,3 +104,101 @@ impl State { Ok(epochs) } } + +#[cfg(test)] +mod tests { + use super::*; + use acropolis_common::{BlockHash, BlockStatus, Era, PoolId}; + + fn make_ea(epoch: u64) -> EpochActivityMessage { + EpochActivityMessage { + epoch, + epoch_start_time: epoch * 10, + epoch_end_time: epoch * 10 + 10, + first_block_time: epoch * 10, + first_block_height: epoch * 10, + last_block_time: epoch * 10 + 100, + last_block_height: epoch * 10 + 100, + total_blocks: 100, + total_txs: 100, + total_outputs: 100000, + total_fees: 10000, + spo_blocks: vec![(PoolId::default(), 100)], + nonce: None, + } + } + + fn make_block_info(epoch: u64, new_epoch: bool) -> BlockInfo { + BlockInfo { + status: BlockStatus::Immutable, + slot: 0, + hash: BlockHash::default(), + epoch_slot: 0, + new_epoch, + era: Era::Shelley, + number: epoch * 10 + 100, + epoch, + timestamp: epoch * 10, + } + } + + #[test] + fn test_get_historical_epoch() { + let config = HistoricalEpochsStateConfig { + db_path: "test_db".to_string(), + }; + let mut state = State::new(&config).unwrap(); + + let block_info = make_block_info(1, true); + let ea = make_ea(0); + state.volatile.handle_new_epoch(&block_info, &ea); + + let historical_epoch = state.get_historical_epoch(0).unwrap().unwrap(); + assert_eq!(historical_epoch, ea); + + let next_epochs = state.get_next_epochs(0).unwrap(); + assert_eq!(next_epochs, vec![]); + + let previous_epochs = state.get_previous_epochs(1).unwrap(); + assert_eq!(previous_epochs, vec![ea.clone()]); + } + + #[tokio::test] + async fn test_persist_epochs() { + let config = HistoricalEpochsStateConfig { + db_path: "test_db".to_string(), + }; + let mut state = State::new(&config).unwrap(); + + let block_info = make_block_info(1, true); + let ea_0 = make_ea(0); + state.volatile.handle_new_epoch(&block_info, &ea_0); + let mut block_info = make_block_info(1, false); + block_info.number += 1; + assert!(state.ready_to_prune(&block_info)); + + state.prune_volatile().await; + state.immutable.persist_epoch(0).await.unwrap(); + + let block_info = make_block_info(2, true); + let ea_1 = make_ea(1); + state.volatile.handle_new_epoch(&block_info, &ea_1); + state.volatile.update_k(20); + let mut block_info = make_block_info(2, false); + block_info.number += 1; + assert!(!state.ready_to_prune(&block_info)); + block_info.number += 20; + assert!(state.ready_to_prune(&block_info)); + + state.prune_volatile().await; + state.immutable.persist_epoch(1).await.unwrap(); + + let historical_epoch = state.immutable.get_historical_epoch(0).unwrap().unwrap(); + assert_eq!(historical_epoch, ea_0); + let historical_epoch = state.immutable.get_historical_epoch(1).unwrap().unwrap(); + assert_eq!(historical_epoch, ea_1); + + let epochs = state.immutable.get_epochs(0..=1).unwrap(); + assert_eq!(epochs, vec![ea_0, ea_1]); + } +} diff --git a/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs index ecc3c1e8..c0872e4e 100644 --- a/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs @@ -56,3 +56,51 @@ impl VolatileHistoricalEpochsState { None } } + +#[cfg(test)] +mod tests { + use acropolis_common::{BlockHash, BlockStatus, Era}; + + use super::*; + + #[test] + fn test_rollback_before() { + let mut state = VolatileHistoricalEpochsState::new(); + let block_info = BlockInfo { + number: 1, + epoch: 1, + status: BlockStatus::Volatile, + slot: 1, + hash: BlockHash::default(), + epoch_slot: 1, + new_epoch: false, + timestamp: 1, + era: Era::Shelley, + }; + let ea = EpochActivityMessage { + epoch: 1, + epoch_start_time: 1, + epoch_end_time: 2, + total_blocks: 100, + total_txs: 100, + total_outputs: 100, + total_fees: 100, + spo_blocks: vec![], + nonce: None, + first_block_time: 1, + first_block_height: 1, + last_block_time: 1, + last_block_height: 1, + }; + state.handle_new_epoch(&block_info, &ea); + assert!(state.get_volatile_epoch(1).unwrap().eq(&ea)); + assert_eq!(state.get_volatile_epoch(2), None); + + let rollbacked = state.rollback_before(2); + assert_eq!(rollbacked, None); + + let rollbacked = state.rollback_before(1); + assert!(rollbacked.unwrap().eq(&ea)); + assert_eq!(state.get_volatile_epoch(1), None); + } +} From a5f74c905b5e94bcb357d72e71d128a8a0ed4f9c Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 17 Nov 2025 16:05:31 +0100 Subject: [PATCH 09/18] fix: typo, add max pending to historical epochs state pending, add error traces --- common/src/queries/epochs.rs | 1 + .../src/historical_epochs_state.rs | 31 ++++++++++++------- .../src/immutable_historical_epochs_state.rs | 18 +++++++---- .../rest_blockfrost/src/handlers/epochs.rs | 26 +++++++--------- 4 files changed, 44 insertions(+), 32 deletions(-) diff --git a/common/src/queries/epochs.rs b/common/src/queries/epochs.rs index ca463dee..686aca71 100644 --- a/common/src/queries/epochs.rs +++ b/common/src/queries/epochs.rs @@ -17,6 +17,7 @@ pub enum EpochsStateQuery { GetEpochInfo { epoch_number: u64 }, GetNextEpochs { epoch_number: u64 }, GetPreviousEpochs { epoch_number: u64 }, + GetEpochStakeDistribution { epoch_number: u64 }, GetEpochStakeDistributionByPool { epoch_number: u64 }, GetLatestEpochBlocksMintedByPool { spo_id: PoolId }, diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 1e75acba..c5ea7efa 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -18,7 +18,7 @@ use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; use config::Config; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; -use tracing::{error, info, info_span, Instrument}; +use tracing::{error, info, info_span, warn, Instrument}; mod immutable_historical_epochs_state; mod state; mod volatile_historical_epochs_state; @@ -143,7 +143,7 @@ impl HistoricalEpochsState { }; if let Err(e) = persist_tx.send((current_block.epoch - 1, immutable)).await { - panic!("persistence worker crashed: {e}"); + error!("persistence worker crashed: {e}"); } } } @@ -221,9 +221,12 @@ impl HistoricalEpochsState { Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( format!("Epoch {}", epoch_number), )), - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), + Err(e) => { + warn!("failed to get epoch info: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical epoch info", + )) + } } } @@ -232,9 +235,12 @@ impl HistoricalEpochsState { Ok(epochs) => { EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs }) } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), + Err(e) => { + warn!("failed to get next epochs: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical next epochs", + )) + } } } @@ -243,9 +249,12 @@ impl HistoricalEpochsState { Ok(epochs) => { EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs { epochs }) } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), + Err(e) => { + warn!("failed to get previous epochs: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical previous epochs", + )) + } } } diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs index 9b077ed7..d2e21f6d 100644 --- a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -1,16 +1,17 @@ -use std::path::Path; +use std::{collections::VecDeque, path::Path}; use acropolis_common::messages::EpochActivityMessage; use anyhow::Result; use fjall::{Keyspace, Partition, PartitionCreateOptions, PersistMode}; use minicbor::{decode, to_vec}; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{error, info, warn}; pub struct ImmutableHistoricalEpochsState { epochs_history: Partition, keyspace: Keyspace, - pub pending: Mutex>, + pub pending: Mutex>, + pub max_pending: usize, } impl ImmutableHistoricalEpochsState { @@ -18,7 +19,6 @@ impl ImmutableHistoricalEpochsState { let cfg = fjall::Config::new(path) // 4MB write buffer since EpochActivityMessage is not that big .max_write_buffer_size(4 * 1024 * 1024) - .temporary(true) // Enable manual control of flushing // We store EpochActivityMessage only every 5 days (need manual Flush) .manual_journal_persist(true); @@ -30,16 +30,22 @@ impl ImmutableHistoricalEpochsState { Ok(Self { epochs_history, keyspace, - pending: Mutex::new(Vec::new()), + pending: Mutex::new(VecDeque::new()), + max_pending: 5, }) } pub async fn update_immutable(&self, ea: EpochActivityMessage) { let mut pending = self.pending.lock().await; - pending.push(ea); + if pending.len() >= self.max_pending { + warn!("historical epochs state pending buffer full, dropping oldest"); + pending.pop_front(); + } + pending.push_back(ea); } /// Persists pending EpochActivityMessages for epoch N + /// There should be only one EpochActivityMessage for each epoch /// Returns the number of persisted EpochActivityMessages /// Errors if the batch commit or persist fails pub async fn persist_epoch(&self, epoch: u64) -> Result { diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index fdf3e224..ec441f74 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -265,13 +265,13 @@ pub async fn handle_epoch_next_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; - let lastest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetLatestEpoch, ))); - let lastest_epoch = query_state( + let latest_epoch = query_state( &context, &handlers_config.epochs_query_topic, - lastest_epoch_msg, + latest_epoch_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::LatestEpoch(res), @@ -286,13 +286,13 @@ pub async fn handle_epoch_next_blockfrost( ) .await?; - if parsed > lastest_epoch.epoch { + if parsed > latest_epoch.epoch { return Err(RESTError::not_found( format!("Epoch {parsed} not found").as_str(), )); } - if parsed == lastest_epoch.epoch { + if parsed == latest_epoch.epoch { return Ok(RESTResponse::with_json(200, "[]")); } @@ -319,7 +319,7 @@ pub async fn handle_epoch_next_blockfrost( }, ) .await?; - next_epochs.push(EpochActivityRest::from(lastest_epoch)); + next_epochs.push(EpochActivityRest::from(latest_epoch)); let json = serde_json::to_string_pretty(&next_epochs)?; Ok(RESTResponse::with_json(200, &json)) @@ -341,13 +341,13 @@ pub async fn handle_epoch_previous_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; - let lastest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetLatestEpoch, ))); - let lastest_epoch = query_state( + let latest_epoch = query_state( &context, &handlers_config.epochs_query_topic, - lastest_epoch_msg, + latest_epoch_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::LatestEpoch(res), @@ -362,7 +362,7 @@ pub async fn handle_epoch_previous_blockfrost( ) .await?; - if parsed > lastest_epoch.epoch { + if parsed > latest_epoch.epoch { return Err(RESTError::not_found( format!("Epoch {parsed} not found").as_str(), )); @@ -373,7 +373,7 @@ pub async fn handle_epoch_previous_blockfrost( epoch_number: parsed, }, ))); - let mut previous_epochs = query_state( + let previous_epochs = query_state( &context, &handlers_config.historical_epochs_query_topic, previous_epochs_msg, @@ -391,10 +391,6 @@ pub async fn handle_epoch_previous_blockfrost( ) .await?; - if lastest_epoch.epoch < parsed { - previous_epochs.push(EpochActivityRest::from(lastest_epoch)); - } - let json = serde_json::to_string_pretty(&previous_epochs)?; Ok(RESTResponse::with_json(200, &json)) } From ae44ba6a11e398089fa6e55c2709486d5ce541ec Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Mon, 17 Nov 2025 16:15:57 +0100 Subject: [PATCH 10/18] refactor: u128 cbor encode and add test cases --- common/src/cbor.rs | 80 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 9 deletions(-) diff --git a/common/src/cbor.rs b/common/src/cbor.rs index f8b8a287..953873ab 100644 --- a/common/src/cbor.rs +++ b/common/src/cbor.rs @@ -1,33 +1,95 @@ -// Custom codec module for u128 (similar to minicbor::bytes pattern) -// CBOR doesn't natively support 128-bit integers, so we encode as 16 bytes +// Custom codec module for u128 using CBOR bignum encoding pub mod u128_cbor_codec { use minicbor::{Decoder, Encoder}; - /// Encode u128 as 16 bytes in big-endian format + /// Encode u128 as CBOR Tag 2 (positive bignum) /// For use with `#[cbor(with = "u128_cbor_codec")]` pub fn encode( v: &u128, e: &mut Encoder, _ctx: &mut C, ) -> Result<(), minicbor::encode::Error> { - e.bytes(&v.to_be_bytes())?; + // Tag 2 = positive bignum + e.tag(minicbor::data::Tag::new(2))?; + + // Optimize: only encode non-zero leading bytes + let bytes = v.to_be_bytes(); + let first_nonzero = bytes.iter().position(|&b| b != 0).unwrap_or(15); + e.bytes(&bytes[first_nonzero..])?; Ok(()) } - /// Decode u128 from 16 bytes in big-endian format - /// For use with `#[cbor(with = "u128_cbor_codec")]` + /// Decode u128 from CBOR Tag 2 (positive bignum) pub fn decode<'b, C>( d: &mut Decoder<'b>, _ctx: &mut C, ) -> Result { + // Expect Tag 2 + let tag = d.tag()?; + if tag != minicbor::data::Tag::new(2) { + return Err(minicbor::decode::Error::message( + "Expected CBOR Tag 2 (positive bignum) for u128", + )); + } + let bytes = d.bytes()?; - if bytes.len() != 16 { + if bytes.len() > 16 { return Err(minicbor::decode::Error::message( - "Expected 16 bytes for u128", + "Bignum too large for u128 (max 16 bytes)", )); } + + // Pad with leading zeros to make 16 bytes (big-endian) let mut arr = [0u8; 16]; - arr.copy_from_slice(bytes); + arr[16 - bytes.len()..].copy_from_slice(bytes); Ok(u128::from_be_bytes(arr)) } } + +#[cfg(test)] +mod tests { + use super::u128_cbor_codec; + use minicbor::{Decode, Encode}; + + #[derive(Debug, PartialEq, Encode, Decode)] + struct TestStruct { + #[cbor(n(0), with = "u128_cbor_codec")] + value: u128, + } + + #[test] + fn test_u128_zero() { + let original = TestStruct { value: 0 }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_u128_max() { + let original = TestStruct { value: u128::MAX }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_u128_boundary_values() { + let test_values = [ + 0u128, + 1, + 127, // Max 1-byte value + u64::MAX as u128, // 18446744073709551615 + (u64::MAX as u128) + 1, // First value needing >64 bits + u128::MAX - 1, // Near max + u128::MAX, // Maximum u128 value + ]; + + for &val in &test_values { + let original = TestStruct { value: val }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded, "Failed for value {}", val); + } + } +} From 6a951d1e0b90c4142fafa9eb00fbdd614a6fa232 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 18 Nov 2025 06:36:23 +0100 Subject: [PATCH 11/18] refactor: use fjall's range instead of manual get several times --- .../src/immutable_historical_epochs_state.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs index d2e21f6d..d20c0f3c 100644 --- a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -92,13 +92,14 @@ impl ImmutableHistoricalEpochsState { &self, range: std::ops::RangeInclusive, ) -> Result> { - let mut epochs: Vec = Vec::new(); - for epoch in range { - let slice = self.epochs_history.get(Self::make_epoch_key(epoch))?; - if let Some(slice) = slice.as_ref() { - let decoded: EpochActivityMessage = decode(slice)?; - epochs.push(decoded); - } + let mut epochs = Vec::new(); + let start_key = Self::make_epoch_key(*range.start()); + let end_key = Self::make_epoch_key(*range.end()); + + for result in self.epochs_history.range(start_key..=end_key) { + let (_, slice) = result?; + let decoded: EpochActivityMessage = decode(&slice)?; + epochs.push(decoded); } Ok(epochs) } From 28d9e0b99a1bd7192e87f4385467e8451ddacb89 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Tue, 18 Nov 2025 07:03:54 +0100 Subject: [PATCH 12/18] refactor: use relative path for historical storage and prefix their path with fjall and update gitignore files to ignore all db related files --- Cargo.lock | 1 + modules/accounts_state/src/accounts_state.rs | 2 +- modules/historical_accounts_state/.gitignore | 3 ++- .../src/historical_accounts_state.rs | 2 +- .../historical_accounts_state/src/state.rs | 15 +++---------- modules/historical_epochs_state/.gitignore | 4 ++-- modules/historical_epochs_state/Cargo.toml | 3 +++ .../src/historical_epochs_state.rs | 2 +- modules/historical_epochs_state/src/state.rs | 21 +++++++------------ processes/omnibus/.gitignore | 6 ++---- processes/omnibus/omnibus.toml | 2 +- processes/replayer/.gitignore | 6 ++++-- processes/replayer/replayer.toml | 2 +- 13 files changed, 30 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7962c566..fab676bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,6 +273,7 @@ dependencies = [ "config", "fjall", "minicbor 0.26.5", + "tempfile", "tokio", "tracing", ] diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 89a4632f..98e1bcb4 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -51,7 +51,7 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; -const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); +const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./fjall-spdd"); const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0); /// Accounts State module diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore index 0078d059..6aed6c1a 100644 --- a/modules/historical_accounts_state/.gitignore +++ b/modules/historical_accounts_state/.gitignore @@ -1 +1,2 @@ -/db \ No newline at end of file +# fjall immutable db +fjall-*/ diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 4ee95710..e1578fd4 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -35,7 +35,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); // Configuration defaults -const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./fjall-accounts"); const DEFAULT_STORE_REWARDS_HISTORY: (&str, bool) = ("store-rewards-history", false); const DEFAULT_STORE_ACTIVE_STAKE_HISTORY: (&str, bool) = ("store-active-stake-history", false); const DEFAULT_STORE_REGISTRATION_HISTORY: (&str, bool) = ("store-registration-history", false); diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index bb5c7eac..7b8542e1 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::HashSet, path::Path, sync::Arc}; use acropolis_common::{ messages::{ @@ -80,13 +76,8 @@ pub struct State { impl State { pub async fn new(config: HistoricalAccountsConfig) -> Result { - let db_path = if Path::new(&config.db_path).is_relative() { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) - } else { - PathBuf::from(&config.db_path) - }; - - let store = Arc::new(ImmutableHistoricalAccountStore::new(&db_path)?); + let db_path = Path::new(&config.db_path); + let store = Arc::new(ImmutableHistoricalAccountStore::new(db_path)?); Ok(Self { config, diff --git a/modules/historical_epochs_state/.gitignore b/modules/historical_epochs_state/.gitignore index 38f607bc..6aed6c1a 100644 --- a/modules/historical_epochs_state/.gitignore +++ b/modules/historical_epochs_state/.gitignore @@ -1,2 +1,2 @@ -db/ -test_db/ +# fjall immutable db +fjall-*/ diff --git a/modules/historical_epochs_state/Cargo.toml b/modules/historical_epochs_state/Cargo.toml index 58a8d0f8..d3d7dc57 100644 --- a/modules/historical_epochs_state/Cargo.toml +++ b/modules/historical_epochs_state/Cargo.toml @@ -20,5 +20,8 @@ tokio = { workspace = true } tracing = { workspace = true } fjall = "2.11.2" +[dev-dependencies] +tempfile = "3" + [lib] path = "src/historical_epochs_state.rs" diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index c5ea7efa..87d00a7c 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -31,7 +31,7 @@ const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); -const DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH: (&str, &str) = ("db-path", "./fjall-epochs"); /// Historical Epochs State module #[module( diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs index 8465b0ed..bd1910c4 100644 --- a/modules/historical_epochs_state/src/state.rs +++ b/modules/historical_epochs_state/src/state.rs @@ -4,10 +4,7 @@ use crate::{ }; use acropolis_common::{messages::EpochActivityMessage, BlockInfo}; use anyhow::Result; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{path::Path, sync::Arc}; #[derive(Debug, Clone)] pub struct HistoricalEpochsStateConfig { @@ -23,13 +20,8 @@ pub struct State { impl State { pub fn new(config: &HistoricalEpochsStateConfig) -> Result { - let db_path = if Path::new(&config.db_path).is_relative() { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) - } else { - PathBuf::from(&config.db_path) - }; - - let immutable = Arc::new(ImmutableHistoricalEpochsState::new(&db_path)?); + let db_path = Path::new(&config.db_path); + let immutable = Arc::new(ImmutableHistoricalEpochsState::new(db_path)?); Ok(Self { volatile: VolatileHistoricalEpochsState::new(), @@ -109,6 +101,7 @@ impl State { mod tests { use super::*; use acropolis_common::{BlockHash, BlockStatus, Era, PoolId}; + use tempfile::TempDir; fn make_ea(epoch: u64) -> EpochActivityMessage { EpochActivityMessage { @@ -144,8 +137,9 @@ mod tests { #[test] fn test_get_historical_epoch() { + let temp_dir = TempDir::new().unwrap(); let config = HistoricalEpochsStateConfig { - db_path: "test_db".to_string(), + db_path: temp_dir.path().to_string_lossy().into_owned(), }; let mut state = State::new(&config).unwrap(); @@ -165,8 +159,9 @@ mod tests { #[tokio::test] async fn test_persist_epochs() { + let temp_dir = TempDir::new().unwrap(); let config = HistoricalEpochsStateConfig { - db_path: "test_db".to_string(), + db_path: temp_dir.path().to_string_lossy().into_owned(), }; let mut state = State::new(&config).unwrap(); diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index ce63bc1e..f7cf0f86 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -1,9 +1,7 @@ downloads -sled-immutable-utxos -fjall-blocks -fjall-immutable-utxos cache upstream-cache # DB files -*_db +fjall-*/ +sled-*/ diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 87fba395..efaede6e 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -106,7 +106,7 @@ store-history = false [module.accounts-state] # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints spdd-retention-epochs = 0 -spdd-db-path = "./spdd_db" +spdd-db-path = "./fjall-spdd" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" diff --git a/processes/replayer/.gitignore b/processes/replayer/.gitignore index 5ca16a7f..456bfd04 100644 --- a/processes/replayer/.gitignore +++ b/processes/replayer/.gitignore @@ -1,3 +1,5 @@ downloads -sled-immutable-utxos -fjall-immutable-utxos \ No newline at end of file + +# DB files +fjall-*/ +sled-*/ diff --git a/processes/replayer/replayer.toml b/processes/replayer/replayer.toml index 2e4044e7..06263731 100644 --- a/processes/replayer/replayer.toml +++ b/processes/replayer/replayer.toml @@ -110,7 +110,7 @@ store-history = false [module.accounts-state] # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints spdd-retention-epochs = 0 -spdd-db-path = "./spdd_db" +spdd-db-path = "./fjall-spdd" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" From 7611dc0568b0a6849e79d3393cc99835868b4afb Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 13:44:15 +0100 Subject: [PATCH 13/18] refactor: update address state's db path to have fjall prefix --- modules/address_state/.gitignore | 3 ++- modules/address_state/src/address_state.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/modules/address_state/.gitignore b/modules/address_state/.gitignore index 9f4c740d..6aed6c1a 100644 --- a/modules/address_state/.gitignore +++ b/modules/address_state/.gitignore @@ -1 +1,2 @@ -db/ \ No newline at end of file +# fjall immutable db +fjall-*/ diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index 8445756b..57a82774 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -32,7 +32,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); // Configuration defaults -const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./fjall-addresses"); const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false); From 58e284c8ba2db38b75fa9587a1364bc54e0d766c Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 13:48:22 +0100 Subject: [PATCH 14/18] refactor: add clear on start option to historical epochs state, and remove store-history option from epochs state --- .../src/historical_epochs_state.rs | 4 ++++ .../src/immutable_historical_epochs_state.rs | 7 ++++++- modules/historical_epochs_state/src/state.rs | 8 +++++++- processes/omnibus/omnibus.toml | 4 ++-- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 87d00a7c..345e4783 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -32,6 +32,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); const DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH: (&str, &str) = ("db-path", "./fjall-epochs"); +const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); /// Historical Epochs State module #[module( @@ -194,6 +195,9 @@ impl HistoricalEpochsState { db_path: config .get_string(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.0) .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.1.to_string()), + clear_on_start: config + .get_bool(DEFAULT_CLEAR_ON_START.0) + .unwrap_or(DEFAULT_CLEAR_ON_START.1), }; // Initalize state diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs index d20c0f3c..7f245833 100644 --- a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -15,7 +15,12 @@ pub struct ImmutableHistoricalEpochsState { } impl ImmutableHistoricalEpochsState { - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef, clear_on_start: bool) -> Result { + let path = path.as_ref(); + if clear_on_start && path.exists() { + std::fs::remove_dir_all(path)?; + } + let cfg = fjall::Config::new(path) // 4MB write buffer since EpochActivityMessage is not that big .max_write_buffer_size(4 * 1024 * 1024) diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs index bd1910c4..5feaea42 100644 --- a/modules/historical_epochs_state/src/state.rs +++ b/modules/historical_epochs_state/src/state.rs @@ -9,6 +9,7 @@ use std::{path::Path, sync::Arc}; #[derive(Debug, Clone)] pub struct HistoricalEpochsStateConfig { pub db_path: String, + pub clear_on_start: bool, } /// Overall state - stored per epoch @@ -21,7 +22,10 @@ pub struct State { impl State { pub fn new(config: &HistoricalEpochsStateConfig) -> Result { let db_path = Path::new(&config.db_path); - let immutable = Arc::new(ImmutableHistoricalEpochsState::new(db_path)?); + let immutable = Arc::new(ImmutableHistoricalEpochsState::new( + db_path, + config.clear_on_start, + )?); Ok(Self { volatile: VolatileHistoricalEpochsState::new(), @@ -140,6 +144,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let config = HistoricalEpochsStateConfig { db_path: temp_dir.path().to_string_lossy().into_owned(), + clear_on_start: true, }; let mut state = State::new(&config).unwrap(); @@ -162,6 +167,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let config = HistoricalEpochsStateConfig { db_path: temp_dir.path().to_string_lossy().into_owned(), + clear_on_start: true, }; let mut state = State::new(&config).unwrap(); diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 883c4129..76b345be 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -83,6 +83,8 @@ store-addresses = false store-tx-count = false [module.historical-epochs-state] +# Clear state on start up (default true) +clear-on-start = true [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) @@ -109,8 +111,6 @@ cache-mode = "predefined" # "predefined", "read", "write", "write-if-absent" write-full-cache = "false" [module.epochs-state] -# Enables /epochs/{number} endpoint (for historical epochs) -store-history = false [module.accounts-state] # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints From 04b5c29896495c78bc19a4ee14a3693b1d5cb1a9 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 13:50:19 +0100 Subject: [PATCH 15/18] refactor: add clear on start option to historical accounts state --- .../src/historical_accounts_state.rs | 4 ++++ .../src/immutable_historical_account_store.rs | 7 ++++++- modules/historical_accounts_state/src/state.rs | 6 +++++- processes/omnibus/omnibus.toml | 2 ++ 4 files changed, 17 insertions(+), 2 deletions(-) diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index aa19757e..16b25d45 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -36,6 +36,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = // Configuration defaults const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./fjall-accounts"); +const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); const DEFAULT_STORE_REWARDS_HISTORY: (&str, bool) = ("store-rewards-history", false); const DEFAULT_STORE_ACTIVE_STAKE_HISTORY: (&str, bool) = ("store-active-stake-history", false); const DEFAULT_STORE_REGISTRATION_HISTORY: (&str, bool) = ("store-registration-history", false); @@ -270,6 +271,9 @@ impl HistoricalAccountsState { db_path: config .get_string(DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH.0) .unwrap_or(DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH.1.to_string()), + clear_on_start: config + .get_bool(DEFAULT_CLEAR_ON_START.0) + .unwrap_or(DEFAULT_CLEAR_ON_START.1), store_rewards_history: config .get_bool(DEFAULT_STORE_REWARDS_HISTORY.0) .unwrap_or(DEFAULT_STORE_REWARDS_HISTORY.1), diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 6c16e177..0453d0f9 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -31,7 +31,12 @@ pub struct ImmutableHistoricalAccountStore { } impl ImmutableHistoricalAccountStore { - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef, clear_on_start: bool) -> Result { + let path = path.as_ref(); + if clear_on_start && path.exists() { + std::fs::remove_dir_all(path)?; + } + let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true); let keyspace = Keyspace::open(cfg)?; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 6d2ca79a..3ae834b4 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -45,6 +45,7 @@ pub struct ActiveStakeHistory { #[derive(Debug, Clone)] pub struct HistoricalAccountsConfig { pub db_path: String, + pub clear_on_start: bool, pub store_rewards_history: bool, pub store_active_stake_history: bool, @@ -80,7 +81,10 @@ pub struct State { impl State { pub async fn new(config: HistoricalAccountsConfig) -> Result { let db_path = Path::new(&config.db_path); - let store = Arc::new(ImmutableHistoricalAccountStore::new(db_path)?); + let store = Arc::new(ImmutableHistoricalAccountStore::new( + db_path, + config.clear_on_start, + )?); Ok(Self { config, diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 76b345be..a3a694b2 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -65,6 +65,8 @@ store-stake-addresses = false store-spdd = false [module.historical-accounts-state] +# Clear state on start up (default true) +clear-on-start = true # Enables /accounts/{stake_address}/rewards endpoint store-rewards-history = false # Enables /accounts/{stake_address}/history endpoint From cacbaa9e8f8f27fbb1b19070ea8d0dadc297e375 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 13:54:18 +0100 Subject: [PATCH 16/18] fix: return historical epochs info even when store-spdd is disabled --- modules/rest_blockfrost/src/handlers/epochs.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index ec441f74..e2eef801 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -100,8 +100,9 @@ pub async fn handle_epoch_info_blockfrost( // For the latest epoch, query accounts-state for the stake pool delegation distribution (SPDD) // Otherwise, fall back to SPDD module to fetch historical epoch totals + // if spdd_storage is not enabled, return NULL for active_stakes let epoch_number = response.epoch; - let total_active_stakes: u64 = if is_latest { + let total_active_stakes = if is_latest { let total_active_stakes_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetActiveStakes {}, ))); @@ -112,10 +113,10 @@ pub async fn handle_epoch_info_blockfrost( |message| match message { Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::ActiveStakes(total_active_stake), - )) => Ok(total_active_stake), + )) => Ok(Some(total_active_stake)), Message::StateQueryResponse(StateQueryResponse::Accounts( - AccountsStateQueryResponse::Error(e), - )) => Err(e), + AccountsStateQueryResponse::Error(_), + )) => Ok(None), _ => Err(QueryError::internal_error( "Unexpected message type while retrieving the latest total active stakes", )), @@ -136,17 +137,17 @@ pub async fn handle_epoch_info_blockfrost( |message| match message { Message::StateQueryResponse(StateQueryResponse::SPDD( SPDDStateQueryResponse::EpochTotalActiveStakes(total_active_stakes), - )) => Ok(total_active_stakes), + )) => Ok(Some(total_active_stakes)), Message::StateQueryResponse(StateQueryResponse::SPDD( - SPDDStateQueryResponse::Error(e), - )) => Err(e), + SPDDStateQueryResponse::Error(_), + )) => Ok(None), _ => Err(QueryError::internal_error( format!("Unexpected message type while retrieving total active stakes for epoch: {epoch_number}"), )), }, ) .await? - }; + }.unwrap_or(0); if total_active_stakes == 0 { response.active_stake = None; From 3fd30a2657999fd4a6e1f8483b97d034830da8c1 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 14:01:02 +0100 Subject: [PATCH 17/18] fix: use relative path for address state --- modules/address_state/src/state.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 9c4f930b..350f4e13 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::HashSet, path::Path, sync::Arc}; use acropolis_common::{ Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, TxTotals, @@ -55,13 +51,8 @@ pub struct State { impl State { pub async fn new(config: &AddressStorageConfig) -> Result { - let db_path = if Path::new(&config.db_path).is_relative() { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) - } else { - PathBuf::from(&config.db_path) - }; - - let store = Arc::new(ImmutableAddressStore::new(&db_path, config.clear_on_start)?); + let db_path = Path::new(&config.db_path); + let store = Arc::new(ImmutableAddressStore::new(db_path, config.clear_on_start)?); let mut config = config.clone(); config.skip_until = store.get_last_epoch_stored().await?; From 45ef59fa090620f9518dd0381bbbd4db695f3e36 Mon Sep 17 00:00:00 2001 From: Golddy Dev Date: Wed, 19 Nov 2025 14:08:31 +0100 Subject: [PATCH 18/18] refactor: add comments for persist_epoch and should_prune in historical_epochs_state --- .../src/historical_epochs_state.rs | 2 +- .../src/immutable_historical_epochs_state.rs | 10 ++++++---- modules/historical_epochs_state/src/state.rs | 7 +++++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs index 345e4783..5aa182f5 100644 --- a/modules/historical_epochs_state/src/historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -143,7 +143,7 @@ impl HistoricalEpochsState { state.immutable.clone() }; - if let Err(e) = persist_tx.send((current_block.epoch - 1, immutable)).await { + if let Err(e) = persist_tx.send((current_block.epoch, immutable)).await { error!("persistence worker crashed: {e}"); } } diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs index 7f245833..0a70001d 100644 --- a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -49,11 +49,13 @@ impl ImmutableHistoricalEpochsState { pending.push_back(ea); } - /// Persists pending EpochActivityMessages for epoch N + /// Persists pending EpochActivityMessages for Epoch N - 1 + /// at the first block of Epoch N /// There should be only one EpochActivityMessage for each epoch /// Returns the number of persisted EpochActivityMessages /// Errors if the batch commit or persist fails pub async fn persist_epoch(&self, epoch: u64) -> Result { + let saving_epoch = epoch - 1; let drained_epochs = { let mut pending = self.pending.lock().await; std::mem::take(&mut *pending) @@ -69,16 +71,16 @@ impl ImmutableHistoricalEpochsState { } if let Err(e) = batch.commit() { - error!("batch commit failed for epoch {epoch}: {e}"); + error!("batch commit failed for epoch {saving_epoch}: {e}"); return Err(e.into()); } if let Err(e) = self.keyspace.persist(PersistMode::Buffer) { - error!("persist failed for epoch {epoch}: {e}"); + error!("persist failed for epoch {saving_epoch}: {e}"); return Err(e.into()); } - info!("persisted {persisted_epochs} epochs for epoch {epoch}"); + info!("persisted {persisted_epochs} epochs for epoch {saving_epoch}"); Ok(persisted_epochs) } diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs index 5feaea42..7af155d7 100644 --- a/modules/historical_epochs_state/src/state.rs +++ b/modules/historical_epochs_state/src/state.rs @@ -40,6 +40,9 @@ impl State { } } + /// block_info is the first block of Epoch N (epoch param) + /// And at that point, we are saving EpochActivityMessage for Epoch N - 1 + /// That is why we check Some(block_info.epoch - 1) != self.volatile.last_persisted_epoch pub fn ready_to_prune(&self, block_info: &BlockInfo) -> bool { block_info.epoch > 0 && Some(block_info.epoch - 1) != self.volatile.last_persisted_epoch @@ -179,7 +182,7 @@ mod tests { assert!(state.ready_to_prune(&block_info)); state.prune_volatile().await; - state.immutable.persist_epoch(0).await.unwrap(); + state.immutable.persist_epoch(block_info.epoch).await.unwrap(); let block_info = make_block_info(2, true); let ea_1 = make_ea(1); @@ -192,7 +195,7 @@ mod tests { assert!(state.ready_to_prune(&block_info)); state.prune_volatile().await; - state.immutable.persist_epoch(1).await.unwrap(); + state.immutable.persist_epoch(block_info.epoch).await.unwrap(); let historical_epoch = state.immutable.get_historical_epoch(0).unwrap().unwrap(); assert_eq!(historical_epoch, ea_0);