diff --git a/Cargo.lock b/Cargo.lock index c19add57..7a5a1adc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -223,7 +223,6 @@ dependencies = [ "anyhow", "caryatid_sdk", "config", - "dashmap", "hex", "imbl", "pallas 0.33.0", @@ -281,6 +280,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_historical_epochs_state" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "fjall", + "minicbor 0.26.5", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -495,6 +509,7 @@ dependencies = [ "acropolis_module_genesis_bootstrapper", "acropolis_module_governance_state", "acropolis_module_historical_accounts_state", + "acropolis_module_historical_epochs_state", "acropolis_module_mithril_snapshot_fetcher", "acropolis_module_parameters_state", "acropolis_module_peer_network_interface", diff --git a/Cargo.toml b/Cargo.toml index 15a3a48a..a758b221 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "modules/accounts_state", # Tracks stake and reward accounts "modules/assets_state", # Tracks native asset mints and burns "modules/historical_accounts_state", # Tracks historical account information + "modules/historical_epochs_state", # Tracks historical epochs information "modules/consensus", # Chooses favoured chain across multiple options "modules/chain_store", # Tracks historical information about blocks and TXs "modules/tx_submitter", # Submits TXs to peers diff --git a/common/Cargo.toml b/common/Cargo.toml index 4f44f198..fc5e948d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -26,7 +26,7 @@ hex = { workspace = true } memmap2 = "0.9" num-rational = { version = "0.4.2", features = ["serde"] } regex = "1" -serde = { workspace = true } +serde = { workspace = true, features = ["rc"] } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } tempfile = "3" diff --git a/common/src/cbor.rs b/common/src/cbor.rs new file mode 100644 index 00000000..953873ab --- /dev/null +++ b/common/src/cbor.rs @@ -0,0 +1,95 @@ +// Custom codec module for u128 using CBOR bignum encoding +pub mod u128_cbor_codec { + use minicbor::{Decoder, Encoder}; + + /// Encode u128 as CBOR Tag 2 (positive bignum) + /// For use with `#[cbor(with = "u128_cbor_codec")]` + pub fn encode( + v: &u128, + e: &mut Encoder, + _ctx: &mut C, + ) -> Result<(), minicbor::encode::Error> { + // Tag 2 = positive bignum + e.tag(minicbor::data::Tag::new(2))?; + + // Optimize: only encode non-zero leading bytes + let bytes = v.to_be_bytes(); + let first_nonzero = bytes.iter().position(|&b| b != 0).unwrap_or(15); + e.bytes(&bytes[first_nonzero..])?; + Ok(()) + } + + /// Decode u128 from CBOR Tag 2 (positive bignum) + pub fn decode<'b, C>( + d: &mut Decoder<'b>, + _ctx: &mut C, + ) -> Result { + // Expect Tag 2 + let tag = d.tag()?; + if tag != minicbor::data::Tag::new(2) { + return Err(minicbor::decode::Error::message( + "Expected CBOR Tag 2 (positive bignum) for u128", + )); + } + + let bytes = d.bytes()?; + if bytes.len() > 16 { + return Err(minicbor::decode::Error::message( + "Bignum too large for u128 (max 16 bytes)", + )); + } + + // Pad with leading zeros to make 16 bytes (big-endian) + let mut arr = [0u8; 16]; + arr[16 - bytes.len()..].copy_from_slice(bytes); + Ok(u128::from_be_bytes(arr)) + } +} + +#[cfg(test)] +mod tests { + use super::u128_cbor_codec; + use minicbor::{Decode, Encode}; + + #[derive(Debug, PartialEq, Encode, Decode)] + struct TestStruct { + #[cbor(n(0), with = "u128_cbor_codec")] + value: u128, + } + + #[test] + fn test_u128_zero() { + let original = TestStruct { value: 0 }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_u128_max() { + let original = TestStruct { value: u128::MAX }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_u128_boundary_values() { + let test_values = [ + 0u128, + 1, + 127, // Max 1-byte value + u64::MAX as u128, // 18446744073709551615 + (u64::MAX as u128) + 1, // First value needing >64 bits + u128::MAX - 1, // Near max + u128::MAX, // Maximum u128 value + ]; + + for &val in &test_values { + let original = TestStruct { value: val }; + let encoded = minicbor::to_vec(&original).unwrap(); + let decoded: TestStruct = minicbor::decode(&encoded).unwrap(); + assert_eq!(original, decoded, "Failed for value {}", val); + } + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index 57889a9e..daa6dda7 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -2,6 +2,7 @@ pub mod address; pub mod calculations; +pub mod cbor; pub mod cip19; pub mod commands; pub mod crypto; diff --git a/common/src/messages.rs b/common/src/messages.rs index 0322e722..674187dc 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -26,6 +26,7 @@ use crate::queries::{ transactions::{TransactionsStateQuery, TransactionsStateQueryResponse}, }; +use crate::cbor::u128_cbor_codec; use crate::types::*; use crate::validation::ValidationStatus; @@ -141,47 +142,68 @@ pub struct BlockTxsMessage { } /// Epoch activity - sent at end of epoch -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, + Clone, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, + PartialEq, +)] pub struct EpochActivityMessage { /// Epoch which has ended + #[n(0)] pub epoch: u64, /// Epoch start time /// UNIX timestamp + #[n(1)] pub epoch_start_time: u64, /// Epoch end time /// UNIX timestamp + #[n(2)] pub epoch_end_time: u64, /// When first block of this epoch was created + #[n(3)] pub first_block_time: u64, /// Block height of first block of this epoch + #[n(4)] pub first_block_height: u64, /// When last block of this epoch was created + #[n(5)] pub last_block_time: u64, /// Block height of last block of this epoch + #[n(6)] pub last_block_height: u64, /// Total blocks in this epoch + #[n(7)] pub total_blocks: usize, /// Total txs in this epoch + #[n(8)] pub total_txs: u64, /// Total outputs of all txs in this epoch + #[cbor(n(9), with = "u128_cbor_codec")] pub total_outputs: u128, /// Total fees in this epoch + #[n(10)] pub total_fees: u64, /// Map of SPO IDs to blocks produced + #[n(11)] pub spo_blocks: Vec<(PoolId, usize)>, /// Nonce + #[n(12)] pub nonce: Option, } diff --git a/common/src/protocol_params.rs b/common/src/protocol_params.rs index e713d38c..cfc07d10 100644 --- a/common/src/protocol_params.rs +++ b/common/src/protocol_params.rs @@ -254,21 +254,46 @@ impl ProtocolVersion { } #[derive( - Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize, + Default, + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, )] #[serde(rename_all = "PascalCase")] pub enum NonceVariant { + #[n(0)] #[default] NeutralNonce, + #[n(1)] Nonce, } pub type NonceHash = [u8; 32]; -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, +)] #[serde(rename_all = "camelCase")] pub struct Nonce { + #[n(0)] pub tag: NonceVariant, + #[n(1)] pub hash: Option, } diff --git a/common/src/queries/epochs.rs b/common/src/queries/epochs.rs index c3348049..686aca71 100644 --- a/common/src/queries/epochs.rs +++ b/common/src/queries/epochs.rs @@ -4,12 +4,20 @@ use crate::{messages::EpochActivityMessage, protocol_params::ProtocolParams, Poo pub const DEFAULT_EPOCHS_QUERY_TOPIC: (&str, &str) = ("epochs-state-query-topic", "cardano.query.epochs"); +pub const DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC: (&str, &str) = ( + "historical-epochs-state-query-topic", + "cardano.query.historical.epochs", +); + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum EpochsStateQuery { GetLatestEpoch, + + // Served from historical epochs state GetEpochInfo { epoch_number: u64 }, GetNextEpochs { epoch_number: u64 }, GetPreviousEpochs { epoch_number: u64 }, + GetEpochStakeDistribution { epoch_number: u64 }, GetEpochStakeDistributionByPool { epoch_number: u64 }, GetLatestEpochBlocksMintedByPool { spo_id: PoolId }, diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index e4a6b7dd..98e1bcb4 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -51,7 +51,7 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards"; const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters"; const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas"; -const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db"); +const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./fjall-spdd"); const DEFAULT_SPDD_RETENTION_EPOCHS: (&str, u64) = ("spdd-retention-epochs", 0); /// Accounts State module @@ -403,24 +403,28 @@ impl AccountsState { let parameters_topic = config .get_string("protocol-parameters-topic") .unwrap_or(DEFAULT_PROTOCOL_PARAMETERS_TOPIC.to_string()); + info!("Creating protocol parameters subscriber on '{parameters_topic}'"); // Publishing topics let drep_distribution_topic = config .get_string("publish-drep-distribution-topic") .unwrap_or(DEFAULT_DREP_DISTRIBUTION_TOPIC.to_string()); + info!("Creating DRep distribution publisher on '{drep_distribution_topic}'"); let spo_distribution_topic = config .get_string("publish-spo-distribution-topic") .unwrap_or(DEFAULT_SPO_DISTRIBUTION_TOPIC.to_string()); + info!("Creating SPO distribution publisher on '{spo_distribution_topic}'"); let spo_rewards_topic = config .get_string("publish-spo-rewards-topic") .unwrap_or(DEFAULT_SPO_REWARDS_TOPIC.to_string()); + info!("Creating SPO rewards publisher on '{spo_rewards_topic}'"); let stake_reward_deltas_topic = config .get_string("publish-stake-reward-deltas-topic") .unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string()); - info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'"); + info!("Creating stake reward deltas publisher on '{stake_reward_deltas_topic}'"); let spdd_db_path = config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string()); diff --git a/modules/address_state/.gitignore b/modules/address_state/.gitignore index 9f4c740d..6aed6c1a 100644 --- a/modules/address_state/.gitignore +++ b/modules/address_state/.gitignore @@ -1 +1,2 @@ -db/ \ No newline at end of file +# fjall immutable db +fjall-*/ diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index 8445756b..57a82774 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -32,7 +32,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); // Configuration defaults -const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./fjall-addresses"); const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false); diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 9c4f930b..350f4e13 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::HashSet, path::Path, sync::Arc}; use acropolis_common::{ Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, TxTotals, @@ -55,13 +51,8 @@ pub struct State { impl State { pub async fn new(config: &AddressStorageConfig) -> Result { - let db_path = if Path::new(&config.db_path).is_relative() { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) - } else { - PathBuf::from(&config.db_path) - }; - - let store = Arc::new(ImmutableAddressStore::new(&db_path, config.clear_on_start)?); + let db_path = Path::new(&config.db_path); + let store = Arc::new(ImmutableAddressStore::new(db_path, config.clear_on_start)?); let mut config = config.clone(); config.skip_until = store.get_last_epoch_stored().await?; diff --git a/modules/epochs_state/Cargo.toml b/modules/epochs_state/Cargo.toml index cbba7d29..d6340659 100644 --- a/modules/epochs_state/Cargo.toml +++ b/modules/epochs_state/Cargo.toml @@ -15,7 +15,6 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } -dashmap = { workspace = true } hex = { workspace = true } imbl = { workspace = true } pallas = { workspace = true } diff --git a/modules/epochs_state/src/epochs_history.rs b/modules/epochs_state/src/epochs_history.rs deleted file mode 100644 index dc37b56a..00000000 --- a/modules/epochs_state/src/epochs_history.rs +++ /dev/null @@ -1,200 +0,0 @@ -use acropolis_common::messages::EpochActivityMessage; -use acropolis_common::BlockInfo; -use anyhow::Result; -use dashmap::DashMap; -use std::sync::Arc; - -use crate::store_config::StoreConfig; - -#[derive(Debug, Clone)] -pub struct EpochsHistoryState { - epochs_history: Option>>, -} - -impl EpochsHistoryState { - pub fn new(store_config: &StoreConfig) -> Self { - Self { - epochs_history: if store_config.store_history { - Some(Arc::new(DashMap::new())) - } else { - None - }, - } - } - - /// Get Epoch Activity Message for certain pool operator at certain epoch - pub fn get_historical_epoch(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - Ok(epochs_history.get(&epoch).map(|e| e.clone())) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive) - pub fn get_next_epochs(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - let mut epochs: Vec = epochs_history - .iter() - .filter(|entry| *entry.key() > epoch) - .map(|e| e.value().clone()) - .collect(); - epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); - Ok(epochs) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive) - pub fn get_previous_epochs(&self, epoch: u64) -> Result> { - if let Some(epochs_history) = self.epochs_history.as_ref() { - let mut epochs: Vec = epochs_history - .iter() - .filter(|entry| *entry.key() < epoch) - .map(|e| e.value().clone()) - .collect(); - epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); - Ok(epochs) - } else { - Err(anyhow::anyhow!("Historical epoch storage is disabled")) - } - } - - /// Handle Epoch Activity - pub fn handle_epoch_activity( - &self, - _block_info: &BlockInfo, - epoch_activity_message: &EpochActivityMessage, - ) { - let Some(epochs_history) = self.epochs_history.as_ref() else { - return; - }; - let EpochActivityMessage { epoch, .. } = epoch_activity_message; - epochs_history.insert(*epoch, epoch_activity_message.clone()); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use acropolis_common::{BlockHash, BlockStatus, Era}; - - fn make_block(epoch: u64) -> BlockInfo { - BlockInfo { - status: BlockStatus::Immutable, - slot: 99, - number: 42, - hash: BlockHash::default(), - epoch, - epoch_slot: 99, - new_epoch: false, - timestamp: 99999, - era: Era::Conway, - } - } - - #[test] - fn epochs_history_is_none_when_store_history_is_false() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(false)); - assert!(epochs_history.epochs_history.is_none()); - } - - #[test] - fn epochs_history_is_some_when_store_history_is_true() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - assert!(epochs_history.epochs_history.is_some()); - } - - #[test] - fn handle_epoch_activity_saves_history() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - let block = make_block(200); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 199, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - // Use the public API method - let history = epochs_history - .get_historical_epoch(199) - .expect("history disabled in test") - .expect("epoch history missing"); - assert_eq!(history.total_blocks, 1); - assert_eq!(history.total_fees, 50); - } - - #[test] - fn get_next_previous_epochs_sorts_epochs() { - let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true)); - let block = make_block(200); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 199, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - let block = make_block(201); - epochs_history.handle_epoch_activity( - &block, - &EpochActivityMessage { - epoch: 200, - epoch_start_time: 0, - epoch_end_time: 0, - first_block_time: 0, - first_block_height: 0, - last_block_time: 0, - last_block_height: 0, - total_blocks: 1, - total_txs: 1, - total_outputs: 100, - total_fees: 50, - spo_blocks: vec![], - nonce: None, - }, - ); - - let next_epochs = epochs_history.get_next_epochs(199).expect("history disabled in test"); - assert_eq!(next_epochs.len(), 1); - assert_eq!(next_epochs[0].epoch, 200); - - let previous_epochs = - epochs_history.get_previous_epochs(201).expect("history disabled in test"); - assert_eq!(previous_epochs.len(), 2); - assert_eq!(previous_epochs[0].epoch, 199); - - let next_epochs = epochs_history.get_next_epochs(200).expect("history disabled in test"); - assert_eq!(next_epochs.len(), 0); - - let previous_epochs = - epochs_history.get_previous_epochs(199).expect("history disabled in test"); - assert_eq!(previous_epochs.len(), 0); - } -} diff --git a/modules/epochs_state/src/epochs_state.rs b/modules/epochs_state/src/epochs_state.rs index e0f3ce4e..128c058c 100644 --- a/modules/epochs_state/src/epochs_state.rs +++ b/modules/epochs_state/src/epochs_state.rs @@ -1,15 +1,14 @@ //! Acropolis epochs state module for Caryatid //! Unpacks block bodies to get transaction fees -use acropolis_common::queries::errors::QueryError; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::epochs::{ - EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, NextEpochs, - PreviousEpochs, DEFAULT_EPOCHS_QUERY_TOPIC, + EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, DEFAULT_EPOCHS_QUERY_TOPIC, }, + queries::errors::QueryError, state_history::{StateHistory, StateHistoryStore}, - BlockInfo, BlockStatus, Era, + BlockInfo, BlockStatus, }; use anyhow::Result; use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; @@ -19,13 +18,8 @@ use std::sync::Arc; use tokio::sync::Mutex; use tracing::{error, info, info_span}; mod epoch_activity_publisher; -mod epochs_history; mod state; -mod store_config; -use crate::{ - epoch_activity_publisher::EpochActivityPublisher, epochs_history::EpochsHistoryState, - store_config::StoreConfig, -}; +use crate::epoch_activity_publisher::EpochActivityPublisher; use state::State; const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = ( @@ -56,7 +50,6 @@ impl EpochsState { /// Run loop async fn run( history: Arc>>, - epochs_history: EpochsHistoryState, mut bootstrapped_subscription: Box>, mut blocks_subscription: Box>, mut block_txs_subscription: Box>, @@ -103,23 +96,22 @@ impl EpochsState { { state.handle_protocol_parameters(params); } + + let ea = state.end_epoch(block_info); + // publish epoch activity message + epoch_activity_publisher.publish(block_info, ea).await.unwrap_or_else( + |e| error!("Failed to publish epoch activity messages: {e}"), + ); } - // decode header - // Derive the variant from the era - just enough to make - // MultiEraHeader::decode() work. - let variant = match block_info.era { - Era::Byron => 0, - Era::Shelley => 1, - Era::Allegra => 2, - Era::Mary => 3, - Era::Alonzo => 4, - _ => 5, - }; let span = info_span!("epochs_state.decode_header", block = block_info.number); let mut header = None; span.in_scope(|| { - header = match MultiEraHeader::decode(variant, None, &block_msg.header) { + header = match MultiEraHeader::decode( + block_info.era as u8, + None, + &block_msg.header, + ) { Ok(header) => Some(header), Err(e) => { error!("Can't decode header {}: {e}", block_info.slot); @@ -137,16 +129,6 @@ impl EpochsState { } }); - if is_new_epoch { - let ea = state.end_epoch(block_info); - // update epochs history - epochs_history.handle_epoch_activity(block_info, &ea); - // publish epoch activity message - epoch_activity_publisher.publish(block_info, ea).await.unwrap_or_else( - |e| error!("Failed to publish epoch activity messages: {e}"), - ); - } - let span = info_span!("epochs_state.handle_mint", block = block_info.number); span.in_scope(|| { if let Some(header) = header.as_ref() { @@ -215,9 +197,6 @@ impl EpochsState { .unwrap_or(DEFAULT_EPOCHS_QUERY_TOPIC.1.to_string()); info!("Creating query handler on '{}'", epochs_query_topic); - // store config - let store_config = StoreConfig::from(config.clone()); - // state history let history = Arc::new(Mutex::new(StateHistory::::new( "epochs_state", @@ -225,10 +204,6 @@ impl EpochsState { ))); let history_query = history.clone(); - // epochs history - let epochs_history = EpochsHistoryState::new(&store_config); - let epochs_history_query = epochs_history.clone(); - // Subscribe let bootstrapped_subscription = context.subscribe(&bootstrapped_subscribe_topic).await?; let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; @@ -243,7 +218,6 @@ impl EpochsState { // handle epochs query context.handle(&epochs_query_topic, move |message| { let history = history_query.clone(); - let epochs_history = epochs_history_query.clone(); async move { let Message::StateQuery(StateQuery::Epochs(query)) = message.as_ref() else { @@ -262,64 +236,6 @@ impl EpochsState { }) } - EpochsStateQuery::GetEpochInfo { epoch_number } => { - match epochs_history.get_historical_epoch(*epoch_number) { - Ok(Some(epoch_info)) => { - EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch: epoch_info }) - } - Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( - format!("Epoch {}", epoch_number), - )), - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - - EpochsStateQuery::GetNextEpochs { epoch_number } => { - let current_epoch = state.get_epoch_info(); - if *epoch_number > current_epoch.epoch { - EpochsStateQueryResponse::Error(QueryError::not_found(format!( - "Epoch {} is in the future", - epoch_number - ))) - } else { - match epochs_history.get_next_epochs(*epoch_number) { - Ok(mut epochs) => { - // check the current epoch also - if current_epoch.epoch > *epoch_number { - epochs.push(current_epoch); - } - EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs }) - } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - } - - EpochsStateQuery::GetPreviousEpochs { epoch_number } => { - let current_epoch = state.get_epoch_info(); - if *epoch_number > current_epoch.epoch { - EpochsStateQueryResponse::Error(QueryError::not_found(format!( - "Epoch {} is in the future", - epoch_number - ))) - } else { - match epochs_history.get_previous_epochs(*epoch_number) { - Ok(epochs) => { - EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs { - epochs, - }) - } - Err(_) => EpochsStateQueryResponse::Error( - QueryError::storage_disabled("historical epoch"), - ), - } - } - } - EpochsStateQuery::GetLatestEpochBlocksMintedByPool { spo_id } => { EpochsStateQueryResponse::LatestEpochBlocksMintedByPool( state.get_latest_epoch_blocks_minted_by_pool(spo_id), @@ -340,7 +256,6 @@ impl EpochsState { context.run(async move { Self::run( history, - epochs_history, bootstrapped_subscription, blocks_subscription, block_txs_subscription, diff --git a/modules/epochs_state/src/store_config.rs b/modules/epochs_state/src/store_config.rs deleted file mode 100644 index d0ef3f33..00000000 --- a/modules/epochs_state/src/store_config.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::sync::Arc; - -use config::Config; - -const DEFAULT_STORE_HISTORY: (&str, bool) = ("store-history", false); - -#[derive(Default, Debug, Clone)] -pub struct StoreConfig { - pub store_history: bool, -} - -impl StoreConfig { - #[allow(dead_code)] - pub fn new(store_history: bool) -> Self { - Self { store_history } - } -} - -impl From> for StoreConfig { - fn from(config: Arc) -> Self { - Self { - store_history: config - .get_bool(DEFAULT_STORE_HISTORY.0) - .unwrap_or(DEFAULT_STORE_HISTORY.1), - } - } -} diff --git a/modules/historical_accounts_state/.gitignore b/modules/historical_accounts_state/.gitignore index 0078d059..6aed6c1a 100644 --- a/modules/historical_accounts_state/.gitignore +++ b/modules/historical_accounts_state/.gitignore @@ -1 +1,2 @@ -/db \ No newline at end of file +# fjall immutable db +fjall-*/ diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 68f950e1..16b25d45 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -35,7 +35,8 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = ("parameters-subscribe-topic", "cardano.protocol.parameters"); // Configuration defaults -const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./fjall-accounts"); +const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); const DEFAULT_STORE_REWARDS_HISTORY: (&str, bool) = ("store-rewards-history", false); const DEFAULT_STORE_ACTIVE_STAKE_HISTORY: (&str, bool) = ("store-active-stake-history", false); const DEFAULT_STORE_REGISTRATION_HISTORY: (&str, bool) = ("store-registration-history", false); @@ -270,6 +271,9 @@ impl HistoricalAccountsState { db_path: config .get_string(DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH.0) .unwrap_or(DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH.1.to_string()), + clear_on_start: config + .get_bool(DEFAULT_CLEAR_ON_START.0) + .unwrap_or(DEFAULT_CLEAR_ON_START.1), store_rewards_history: config .get_bool(DEFAULT_STORE_REWARDS_HISTORY.0) .unwrap_or(DEFAULT_STORE_REWARDS_HISTORY.1), diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 6c16e177..0453d0f9 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -31,7 +31,12 @@ pub struct ImmutableHistoricalAccountStore { } impl ImmutableHistoricalAccountStore { - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef, clear_on_start: bool) -> Result { + let path = path.as_ref(); + if clear_on_start && path.exists() { + std::fs::remove_dir_all(path)?; + } + let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true); let keyspace = Keyspace::open(cfg)?; diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index 254e5f89..3ae834b4 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::HashSet, path::Path, sync::Arc}; use acropolis_common::{ messages::{ @@ -49,6 +45,7 @@ pub struct ActiveStakeHistory { #[derive(Debug, Clone)] pub struct HistoricalAccountsConfig { pub db_path: String, + pub clear_on_start: bool, pub store_rewards_history: bool, pub store_active_stake_history: bool, @@ -83,13 +80,11 @@ pub struct State { impl State { pub async fn new(config: HistoricalAccountsConfig) -> Result { - let db_path = if Path::new(&config.db_path).is_relative() { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) - } else { - PathBuf::from(&config.db_path) - }; - - let store = Arc::new(ImmutableHistoricalAccountStore::new(&db_path)?); + let db_path = Path::new(&config.db_path); + let store = Arc::new(ImmutableHistoricalAccountStore::new( + db_path, + config.clear_on_start, + )?); Ok(Self { config, diff --git a/modules/historical_epochs_state/.gitignore b/modules/historical_epochs_state/.gitignore new file mode 100644 index 00000000..6aed6c1a --- /dev/null +++ b/modules/historical_epochs_state/.gitignore @@ -0,0 +1,2 @@ +# fjall immutable db +fjall-*/ diff --git a/modules/historical_epochs_state/Cargo.toml b/modules/historical_epochs_state/Cargo.toml new file mode 100644 index 00000000..d3d7dc57 --- /dev/null +++ b/modules/historical_epochs_state/Cargo.toml @@ -0,0 +1,27 @@ +# Acropolis historical epochs state module + +[package] +name = "acropolis_module_historical_epochs_state" +version = "0.1.0" +edition = "2021" +authors = ["Golddy "] +description = "Historical stake epochs tracker" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +minicbor = { version = "0.26.0", features = ["std", "derive"] } +tokio = { workspace = true } +tracing = { workspace = true } +fjall = "2.11.2" + +[dev-dependencies] +tempfile = "3" + +[lib] +path = "src/historical_epochs_state.rs" diff --git a/modules/historical_epochs_state/src/historical_epochs_state.rs b/modules/historical_epochs_state/src/historical_epochs_state.rs new file mode 100644 index 00000000..5aa182f5 --- /dev/null +++ b/modules/historical_epochs_state/src/historical_epochs_state.rs @@ -0,0 +1,295 @@ +//! Acropolis historical epochs state module for Caryatid +//! Manages optional state data needed for Blockfrost alignment + +use crate::immutable_historical_epochs_state::ImmutableHistoricalEpochsState; +use crate::state::{HistoricalEpochsStateConfig, State}; +use acropolis_common::messages::StateQuery; +use acropolis_common::queries::epochs::{ + EpochInfo, EpochsStateQuery, NextEpochs, PreviousEpochs, DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC, +}; +use acropolis_common::{ + messages::{CardanoMessage, Message, StateQueryResponse}, + queries::epochs::EpochsStateQueryResponse, + queries::errors::QueryError, + BlockInfo, BlockStatus, +}; +use anyhow::Result; +use caryatid_sdk::{message_bus::Subscription, module, Context, Module}; +use config::Config; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; +use tracing::{error, info, info_span, warn, Instrument}; +mod immutable_historical_epochs_state; +mod state; +mod volatile_historical_epochs_state; + +// Configuration defaults +const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC: (&str, &str) = + ("blocks-subscribe-topic", "cardano.block.proposed"); +const DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC: (&str, &str) = + ("epoch-activity-subscribe-topic", "cardano.epoch.activity"); +const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = + ("parameters-subscribe-topic", "cardano.protocol.parameters"); + +const DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH: (&str, &str) = ("db-path", "./fjall-epochs"); +const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); + +/// Historical Epochs State module +#[module( + message_type(Message), + name = "historical-epochs-state", + description = "Historical epochs state for Blockfrost compatibility" +)] +pub struct HistoricalEpochsState; + +impl HistoricalEpochsState { + /// Async run loop + async fn run( + state_mutex: Arc>, + mut blocks_subscription: Box>, + mut epoch_activity_subscription: Box>, + mut params_subscription: Box>, + ) -> Result<()> { + let _ = params_subscription.read().await?; + info!("Consumed initial genesis params from params_subscription"); + + // Background task to persist epoch sequentially + const MAX_PENDING_PERSISTS: usize = 1; + let (persist_tx, mut persist_rx) = + mpsc::channel::<(u64, Arc)>(MAX_PENDING_PERSISTS); + tokio::spawn(async move { + while let Some((epoch, store)) = persist_rx.recv().await { + if let Err(e) = store.persist_epoch(epoch).await { + error!("failed to persist epoch {epoch}: {e}"); + } + } + }); + + // Main loop of synchronised messages + loop { + let mut current_block: Option = None; + + // Use certs_message as the synchroniser + let (_, blocks_message) = blocks_subscription.read().await?; + let new_epoch = match blocks_message.as_ref() { + Message::Cardano((block_info, _)) => { + // Handle rollbacks on this topic only + let mut state = state_mutex.lock().await; + if block_info.status == BlockStatus::RolledBack { + state.volatile.rollback_before(block_info.number); + } + + current_block = Some(block_info.clone()); + block_info.new_epoch && block_info.epoch > 0 + } + _ => false, + }; + + // Read from epoch-boundary messages only when it's a new epoch + if new_epoch { + let params_message_f = params_subscription.read(); + let epoch_activity_message_f = epoch_activity_subscription.read(); + + let (_, params_msg) = params_message_f.await?; + match params_msg.as_ref() { + Message::Cardano((block_info, CardanoMessage::ProtocolParams(params))) => { + let span = info_span!( + "historical_epochs_state.handle_params", + epoch = block_info.epoch + ); + async { + Self::check_sync(¤t_block, block_info); + let mut state = state_mutex.lock().await; + if let Some(shelley) = ¶ms.params.shelley { + state.volatile.update_k(shelley.security_param); + } + } + .instrument(span) + .await; + } + _ => error!("Unexpected message type: {params_msg:?}"), + } + + let (_, epoch_activity_msg) = epoch_activity_message_f.await?; + match epoch_activity_msg.as_ref() { + Message::Cardano((block_info, CardanoMessage::EpochActivity(ea))) => { + let span = info_span!( + "historical_epochs_state.handle_epoch_activity", + epoch = block_info.epoch + ); + async { + Self::check_sync(¤t_block, block_info); + let mut state = state_mutex.lock().await; + state.volatile.handle_new_epoch(block_info, ea); + } + .instrument(span) + .await; + } + _ => error!("Unexpected message type: {epoch_activity_msg:?}"), + } + } + + // Prune volatile and persist if needed + if let Some(current_block) = current_block { + let should_prune = { + let state = state_mutex.lock().await; + state.ready_to_prune(¤t_block) + }; + + if should_prune { + let immutable = { + let mut state = state_mutex.lock().await; + state.prune_volatile().await; + state.immutable.clone() + }; + + if let Err(e) = persist_tx.send((current_block.epoch, immutable)).await { + error!("persistence worker crashed: {e}"); + } + } + } + } + } + + /// Check for synchronisation + fn check_sync(expected: &Option, actual: &BlockInfo) { + if let Some(ref block) = expected { + if block.number != actual.number { + error!( + expected = block.number, + actual = actual.number, + "Messages out of sync" + ); + } + } + } + + /// Async initialisation + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + + // Subscription topics + let blocks_subscribe_topic = config + .get_string(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_BLOCKS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating blocks subscriber on '{blocks_subscribe_topic}'"); + + let epoch_activity_subscribe_topic = config + .get_string(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_EPOCH_ACTIVITY_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating epoch activity subscriber on '{epoch_activity_subscribe_topic}'"); + + let params_subscribe_topic = config + .get_string(DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC.0) + .unwrap_or(DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC.1.to_string()); + info!("Creating parameters subscriber on '{params_subscribe_topic}'"); + + // Query topic + let historical_epochs_query_topic = config + .get_string(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.1.to_string()); + info!("Creating query handler on '{historical_epochs_query_topic}'"); + + // Configuration + let config = HistoricalEpochsStateConfig { + db_path: config + .get_string(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_STATE_DB_PATH.1.to_string()), + clear_on_start: config + .get_bool(DEFAULT_CLEAR_ON_START.0) + .unwrap_or(DEFAULT_CLEAR_ON_START.1), + }; + + // Initalize state + let state = State::new(&config)?; + let state_mutex = Arc::new(Mutex::new(state)); + let state_query = state_mutex.clone(); + + context.handle(&historical_epochs_query_topic, move |message| { + let state = state_query.clone(); + async move { + let Message::StateQuery(StateQuery::Epochs(query)) = message.as_ref() else { + return Arc::new(Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(QueryError::internal_error( + "Invalid message for epochs-state", + )), + ))); + }; + + let response = match query { + EpochsStateQuery::GetEpochInfo { epoch_number } => { + match state.lock().await.get_historical_epoch(*epoch_number) { + Ok(Some(epoch)) => { + EpochsStateQueryResponse::EpochInfo(EpochInfo { epoch }) + } + Ok(None) => EpochsStateQueryResponse::Error(QueryError::not_found( + format!("Epoch {}", epoch_number), + )), + Err(e) => { + warn!("failed to get epoch info: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical epoch info", + )) + } + } + } + + EpochsStateQuery::GetNextEpochs { epoch_number } => { + match state.lock().await.get_next_epochs(*epoch_number) { + Ok(epochs) => { + EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs }) + } + Err(e) => { + warn!("failed to get next epochs: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical next epochs", + )) + } + } + } + + EpochsStateQuery::GetPreviousEpochs { epoch_number } => { + match state.lock().await.get_previous_epochs(*epoch_number) { + Ok(epochs) => { + EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs { epochs }) + } + Err(e) => { + warn!("failed to get previous epochs: {e}"); + EpochsStateQueryResponse::Error(QueryError::internal_error( + "historical previous epochs", + )) + } + } + } + + _ => EpochsStateQueryResponse::Error(QueryError::not_implemented(format!( + "Unimplemented query variant: {query:?}" + ))), + }; + Arc::new(Message::StateQueryResponse(StateQueryResponse::Epochs( + response, + ))) + } + }); + + // Subscribe + let blocks_subscription = context.subscribe(&blocks_subscribe_topic).await?; + let epoch_activity_subscription = + context.subscribe(&epoch_activity_subscribe_topic).await?; + let params_subscription = context.subscribe(¶ms_subscribe_topic).await?; + + // Start run task + context.run(async move { + Self::run( + state_mutex, + blocks_subscription, + epoch_activity_subscription, + params_subscription, + ) + .await + .unwrap_or_else(|e| error!("Failed: {e}")); + }); + + Ok(()) + } +} diff --git a/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs new file mode 100644 index 00000000..0a70001d --- /dev/null +++ b/modules/historical_epochs_state/src/immutable_historical_epochs_state.rs @@ -0,0 +1,117 @@ +use std::{collections::VecDeque, path::Path}; + +use acropolis_common::messages::EpochActivityMessage; +use anyhow::Result; +use fjall::{Keyspace, Partition, PartitionCreateOptions, PersistMode}; +use minicbor::{decode, to_vec}; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +pub struct ImmutableHistoricalEpochsState { + epochs_history: Partition, + keyspace: Keyspace, + pub pending: Mutex>, + pub max_pending: usize, +} + +impl ImmutableHistoricalEpochsState { + pub fn new(path: impl AsRef, clear_on_start: bool) -> Result { + let path = path.as_ref(); + if clear_on_start && path.exists() { + std::fs::remove_dir_all(path)?; + } + + let cfg = fjall::Config::new(path) + // 4MB write buffer since EpochActivityMessage is not that big + .max_write_buffer_size(4 * 1024 * 1024) + // Enable manual control of flushing + // We store EpochActivityMessage only every 5 days (need manual Flush) + .manual_journal_persist(true); + let keyspace = Keyspace::open(cfg)?; + + let epochs_history = + keyspace.open_partition("epochs_history", PartitionCreateOptions::default())?; + + Ok(Self { + epochs_history, + keyspace, + pending: Mutex::new(VecDeque::new()), + max_pending: 5, + }) + } + + pub async fn update_immutable(&self, ea: EpochActivityMessage) { + let mut pending = self.pending.lock().await; + if pending.len() >= self.max_pending { + warn!("historical epochs state pending buffer full, dropping oldest"); + pending.pop_front(); + } + pending.push_back(ea); + } + + /// Persists pending EpochActivityMessages for Epoch N - 1 + /// at the first block of Epoch N + /// There should be only one EpochActivityMessage for each epoch + /// Returns the number of persisted EpochActivityMessages + /// Errors if the batch commit or persist fails + pub async fn persist_epoch(&self, epoch: u64) -> Result { + let saving_epoch = epoch - 1; + let drained_epochs = { + let mut pending = self.pending.lock().await; + std::mem::take(&mut *pending) + }; + + let mut batch = self.keyspace.batch(); + let mut persisted_epochs: u32 = 0; + + for ea in drained_epochs { + let epoch_key = Self::make_epoch_key(ea.epoch); + batch.insert(&self.epochs_history, epoch_key, to_vec(&ea)?); + persisted_epochs += 1; + } + + if let Err(e) = batch.commit() { + error!("batch commit failed for epoch {saving_epoch}: {e}"); + return Err(e.into()); + } + + if let Err(e) = self.keyspace.persist(PersistMode::Buffer) { + error!("persist failed for epoch {saving_epoch}: {e}"); + return Err(e.into()); + } + + info!("persisted {persisted_epochs} epochs for epoch {saving_epoch}"); + Ok(persisted_epochs) + } + + pub fn get_historical_epoch(&self, epoch: u64) -> Result> { + let epoch_key = Self::make_epoch_key(epoch); + let slice = self.epochs_history.get(epoch_key)?; + if let Some(slice) = slice.as_ref() { + let decoded: EpochActivityMessage = decode(slice)?; + Ok(Some(decoded)) + } else { + Ok(None) + } + } + + pub fn get_epochs( + &self, + range: std::ops::RangeInclusive, + ) -> Result> { + let mut epochs = Vec::new(); + let start_key = Self::make_epoch_key(*range.start()); + let end_key = Self::make_epoch_key(*range.end()); + + for result in self.epochs_history.range(start_key..=end_key) { + let (_, slice) = result?; + let decoded: EpochActivityMessage = decode(&slice)?; + epochs.push(decoded); + } + Ok(epochs) + } + + fn make_epoch_key(epoch: u64) -> [u8; 8] { + epoch.to_be_bytes() + } +} diff --git a/modules/historical_epochs_state/src/state.rs b/modules/historical_epochs_state/src/state.rs new file mode 100644 index 00000000..7af155d7 --- /dev/null +++ b/modules/historical_epochs_state/src/state.rs @@ -0,0 +1,208 @@ +use crate::{ + immutable_historical_epochs_state::ImmutableHistoricalEpochsState, + volatile_historical_epochs_state::VolatileHistoricalEpochsState, +}; +use acropolis_common::{messages::EpochActivityMessage, BlockInfo}; +use anyhow::Result; +use std::{path::Path, sync::Arc}; + +#[derive(Debug, Clone)] +pub struct HistoricalEpochsStateConfig { + pub db_path: String, + pub clear_on_start: bool, +} + +/// Overall state - stored per epoch +#[derive(Clone)] +pub struct State { + pub immutable: Arc, + pub volatile: VolatileHistoricalEpochsState, +} + +impl State { + pub fn new(config: &HistoricalEpochsStateConfig) -> Result { + let db_path = Path::new(&config.db_path); + let immutable = Arc::new(ImmutableHistoricalEpochsState::new( + db_path, + config.clear_on_start, + )?); + + Ok(Self { + volatile: VolatileHistoricalEpochsState::new(), + immutable, + }) + } + + pub async fn prune_volatile(&mut self) { + let drained = self.volatile.prune_volatile(); + if let Some(ea) = drained { + self.immutable.update_immutable(ea).await; + } + } + + /// block_info is the first block of Epoch N (epoch param) + /// And at that point, we are saving EpochActivityMessage for Epoch N - 1 + /// That is why we check Some(block_info.epoch - 1) != self.volatile.last_persisted_epoch + pub fn ready_to_prune(&self, block_info: &BlockInfo) -> bool { + block_info.epoch > 0 + && Some(block_info.epoch - 1) != self.volatile.last_persisted_epoch + && block_info.number > self.volatile.block_number + self.volatile.security_param_k + } + + pub fn get_historical_epoch(&self, epoch: u64) -> Result> { + if let Some(last_persisted_epoch) = self.volatile.last_persisted_epoch { + if epoch <= last_persisted_epoch { + return self.immutable.get_historical_epoch(epoch); + } + } + + Ok(self.volatile.get_volatile_epoch(epoch)) + } + + pub fn get_next_epochs(&self, epoch: u64) -> Result> { + let mut epochs = vec![]; + let immutable_epochs_range = + self.volatile.last_persisted_epoch.and_then(|last_persisted_epoch| { + if last_persisted_epoch > epoch { + Some(epoch + 1..=last_persisted_epoch) + } else { + None + } + }); + + if let Some(immutable_epochs_range) = immutable_epochs_range { + epochs.extend(self.immutable.get_epochs(immutable_epochs_range)?); + } + + if let Some(volatile_ea) = self.volatile.volatile_ea.as_ref() { + if volatile_ea.epoch > epoch { + epochs.push(volatile_ea.clone()); + } + } + epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + Ok(epochs) + } + + pub fn get_previous_epochs(&self, epoch: u64) -> Result> { + let mut epochs = vec![]; + let immutable_epochs_range = self + .volatile + .last_persisted_epoch + .map(|last_persisted_epoch| 0..=last_persisted_epoch.min(epoch - 1)); + + if let Some(immutable_epochs_range) = immutable_epochs_range { + epochs.extend(self.immutable.get_epochs(immutable_epochs_range)?); + } + + if let Some(volatile_ea) = self.volatile.volatile_ea.as_ref() { + if volatile_ea.epoch < epoch { + epochs.push(volatile_ea.clone()); + } + } + epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch)); + Ok(epochs) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use acropolis_common::{BlockHash, BlockStatus, Era, PoolId}; + use tempfile::TempDir; + + fn make_ea(epoch: u64) -> EpochActivityMessage { + EpochActivityMessage { + epoch, + epoch_start_time: epoch * 10, + epoch_end_time: epoch * 10 + 10, + first_block_time: epoch * 10, + first_block_height: epoch * 10, + last_block_time: epoch * 10 + 100, + last_block_height: epoch * 10 + 100, + total_blocks: 100, + total_txs: 100, + total_outputs: 100000, + total_fees: 10000, + spo_blocks: vec![(PoolId::default(), 100)], + nonce: None, + } + } + + fn make_block_info(epoch: u64, new_epoch: bool) -> BlockInfo { + BlockInfo { + status: BlockStatus::Immutable, + slot: 0, + hash: BlockHash::default(), + epoch_slot: 0, + new_epoch, + era: Era::Shelley, + number: epoch * 10 + 100, + epoch, + timestamp: epoch * 10, + } + } + + #[test] + fn test_get_historical_epoch() { + let temp_dir = TempDir::new().unwrap(); + let config = HistoricalEpochsStateConfig { + db_path: temp_dir.path().to_string_lossy().into_owned(), + clear_on_start: true, + }; + let mut state = State::new(&config).unwrap(); + + let block_info = make_block_info(1, true); + let ea = make_ea(0); + state.volatile.handle_new_epoch(&block_info, &ea); + + let historical_epoch = state.get_historical_epoch(0).unwrap().unwrap(); + assert_eq!(historical_epoch, ea); + + let next_epochs = state.get_next_epochs(0).unwrap(); + assert_eq!(next_epochs, vec![]); + + let previous_epochs = state.get_previous_epochs(1).unwrap(); + assert_eq!(previous_epochs, vec![ea.clone()]); + } + + #[tokio::test] + async fn test_persist_epochs() { + let temp_dir = TempDir::new().unwrap(); + let config = HistoricalEpochsStateConfig { + db_path: temp_dir.path().to_string_lossy().into_owned(), + clear_on_start: true, + }; + let mut state = State::new(&config).unwrap(); + + let block_info = make_block_info(1, true); + let ea_0 = make_ea(0); + state.volatile.handle_new_epoch(&block_info, &ea_0); + let mut block_info = make_block_info(1, false); + block_info.number += 1; + assert!(state.ready_to_prune(&block_info)); + + state.prune_volatile().await; + state.immutable.persist_epoch(block_info.epoch).await.unwrap(); + + let block_info = make_block_info(2, true); + let ea_1 = make_ea(1); + state.volatile.handle_new_epoch(&block_info, &ea_1); + state.volatile.update_k(20); + let mut block_info = make_block_info(2, false); + block_info.number += 1; + assert!(!state.ready_to_prune(&block_info)); + block_info.number += 20; + assert!(state.ready_to_prune(&block_info)); + + state.prune_volatile().await; + state.immutable.persist_epoch(block_info.epoch).await.unwrap(); + + let historical_epoch = state.immutable.get_historical_epoch(0).unwrap().unwrap(); + assert_eq!(historical_epoch, ea_0); + let historical_epoch = state.immutable.get_historical_epoch(1).unwrap().unwrap(); + assert_eq!(historical_epoch, ea_1); + + let epochs = state.immutable.get_epochs(0..=1).unwrap(); + assert_eq!(epochs, vec![ea_0, ea_1]); + } +} diff --git a/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs new file mode 100644 index 00000000..c0872e4e --- /dev/null +++ b/modules/historical_epochs_state/src/volatile_historical_epochs_state.rs @@ -0,0 +1,106 @@ +use acropolis_common::{messages::EpochActivityMessage, BlockInfo}; + +#[derive(Default, Debug, Clone)] +pub struct VolatileHistoricalEpochsState { + pub block_number: u64, + + pub volatile_ea: Option, + + pub last_persisted_epoch: Option, + + pub security_param_k: u64, +} + +impl VolatileHistoricalEpochsState { + pub fn new() -> Self { + Self { + block_number: 0, + volatile_ea: None, + last_persisted_epoch: None, + security_param_k: 0, + } + } + + pub fn rollback_before(&mut self, rollbacked_block: u64) -> Option { + if self.block_number >= rollbacked_block { + std::mem::take(&mut self.volatile_ea) + } else { + None + } + } + + pub fn handle_new_epoch(&mut self, block_info: &BlockInfo, ea: &EpochActivityMessage) { + self.block_number = block_info.number; + self.volatile_ea = Some(ea.clone()); + } + + pub fn prune_volatile(&mut self) -> Option { + if let Some(ea) = self.volatile_ea.as_ref() { + self.last_persisted_epoch = Some(ea.epoch); + std::mem::take(&mut self.volatile_ea) + } else { + None + } + } + + pub fn update_k(&mut self, k: u32) { + self.security_param_k = k as u64; + } + + pub fn get_volatile_epoch(&self, epoch: u64) -> Option { + if let Some(ea) = self.volatile_ea.as_ref() { + if ea.epoch == epoch { + return Some(ea.clone()); + } + } + None + } +} + +#[cfg(test)] +mod tests { + use acropolis_common::{BlockHash, BlockStatus, Era}; + + use super::*; + + #[test] + fn test_rollback_before() { + let mut state = VolatileHistoricalEpochsState::new(); + let block_info = BlockInfo { + number: 1, + epoch: 1, + status: BlockStatus::Volatile, + slot: 1, + hash: BlockHash::default(), + epoch_slot: 1, + new_epoch: false, + timestamp: 1, + era: Era::Shelley, + }; + let ea = EpochActivityMessage { + epoch: 1, + epoch_start_time: 1, + epoch_end_time: 2, + total_blocks: 100, + total_txs: 100, + total_outputs: 100, + total_fees: 100, + spo_blocks: vec![], + nonce: None, + first_block_time: 1, + first_block_height: 1, + last_block_time: 1, + last_block_height: 1, + }; + state.handle_new_epoch(&block_info, &ea); + assert!(state.get_volatile_epoch(1).unwrap().eq(&ea)); + assert_eq!(state.get_volatile_epoch(2), None); + + let rollbacked = state.rollback_before(2); + assert_eq!(rollbacked, None); + + let rollbacked = state.rollback_before(1); + assert!(rollbacked.unwrap().eq(&ea)); + assert_eq!(state.get_volatile_epoch(1), None); + } +} diff --git a/modules/rest_blockfrost/src/handlers/epochs.rs b/modules/rest_blockfrost/src/handlers/epochs.rs index b62b6791..e2eef801 100644 --- a/modules/rest_blockfrost/src/handlers/epochs.rs +++ b/modules/rest_blockfrost/src/handlers/epochs.rs @@ -34,26 +34,20 @@ pub async fn handle_epoch_info_blockfrost( } let param = ¶ms[0]; - // query to get latest epoch or epoch info - let query = if param == "latest" { - EpochsStateQuery::GetLatestEpoch - } else { - let parsed = param - .parse::() - .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; - EpochsStateQuery::GetEpochInfo { - epoch_number: parsed, - } - }; - - // Get the current epoch number from epochs-state - let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs(query))); - let epoch_info_response = query_state( + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( &context, &handlers_config.epochs_query_topic, - epoch_info_msg, + latest_epoch_msg, |message| match message { - Message::StateQueryResponse(StateQueryResponse::Epochs(response)) => Ok(response), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), _ => Err(QueryError::internal_error( "Unexpected message type while retrieving latest epoch", )), @@ -61,26 +55,54 @@ pub async fn handle_epoch_info_blockfrost( ) .await?; - let ea_message = match epoch_info_response { - EpochsStateQueryResponse::LatestEpoch(response) => response.epoch, - EpochsStateQueryResponse::EpochInfo(response) => response.epoch, - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }) => { + let (is_latest, mut response) = if param == "latest" { + (true, EpochActivityRest::from(latest_epoch)) + } else { + let parsed = param + .parse::() + .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + + if parsed > latest_epoch.epoch { return Err(RESTError::not_found("Epoch not found")); } - EpochsStateQueryResponse::Error(e) => { - return Err(e.into()); - } - _ => { - return Err(RESTError::unexpected_response( - "Unexpected message type while retrieving epoch info", - )); + + if parsed == latest_epoch.epoch { + (true, EpochActivityRest::from(latest_epoch)) + } else { + let epoch_info_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetEpochInfo { + epoch_number: parsed, + }, + ))); + let epoch_info = query_state( + &context, + &handlers_config.historical_epochs_query_topic, + epoch_info_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::EpochInfo(response), + )) => Ok(EpochActivityRest::from(response.epoch)), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), + )) => Err(QueryError::not_found("Epoch not found")), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving epoch info", + )), + }, + ) + .await?; + (false, epoch_info) } }; - let epoch_number = ea_message.epoch; // For the latest epoch, query accounts-state for the stake pool delegation distribution (SPDD) // Otherwise, fall back to SPDD module to fetch historical epoch totals - let total_active_stakes: u64 = if param == "latest" { + // if spdd_storage is not enabled, return NULL for active_stakes + let epoch_number = response.epoch; + let total_active_stakes = if is_latest { let total_active_stakes_msg = Arc::new(Message::StateQuery(StateQuery::Accounts( AccountsStateQuery::GetActiveStakes {}, ))); @@ -91,10 +113,10 @@ pub async fn handle_epoch_info_blockfrost( |message| match message { Message::StateQueryResponse(StateQueryResponse::Accounts( AccountsStateQueryResponse::ActiveStakes(total_active_stake), - )) => Ok(total_active_stake), + )) => Ok(Some(total_active_stake)), Message::StateQueryResponse(StateQueryResponse::Accounts( - AccountsStateQueryResponse::Error(e), - )) => Err(e), + AccountsStateQueryResponse::Error(_), + )) => Ok(None), _ => Err(QueryError::internal_error( "Unexpected message type while retrieving the latest total active stakes", )), @@ -115,19 +137,17 @@ pub async fn handle_epoch_info_blockfrost( |message| match message { Message::StateQueryResponse(StateQueryResponse::SPDD( SPDDStateQueryResponse::EpochTotalActiveStakes(total_active_stakes), - )) => Ok(total_active_stakes), + )) => Ok(Some(total_active_stakes)), Message::StateQueryResponse(StateQueryResponse::SPDD( - SPDDStateQueryResponse::Error(e), - )) => Err(e), + SPDDStateQueryResponse::Error(_), + )) => Ok(None), _ => Err(QueryError::internal_error( format!("Unexpected message type while retrieving total active stakes for epoch: {epoch_number}"), )), }, ) .await? - }; - - let mut response = EpochActivityRest::from(ea_message); + }.unwrap_or(0); if total_active_stakes == 0 { response.active_stake = None; @@ -246,22 +266,51 @@ pub async fn handle_epoch_next_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + if parsed > latest_epoch.epoch { + return Err(RESTError::not_found( + format!("Epoch {parsed} not found").as_str(), + )); + } + + if parsed == latest_epoch.epoch { + return Ok(RESTResponse::with_json(200, "[]")); + } + let next_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetNextEpochs { epoch_number: parsed, }, ))); - let next_epochs = query_state( + + let mut next_epochs = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.historical_epochs_query_topic, next_epochs_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::NextEpochs(response), )) => Ok(response.epochs.into_iter().map(EpochActivityRest::from).collect::>()), - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), - )) => Err(QueryError::not_found("Epoch")), Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::Error(e), )) => Err(e), @@ -271,6 +320,7 @@ pub async fn handle_epoch_next_blockfrost( }, ) .await?; + next_epochs.push(EpochActivityRest::from(latest_epoch)); let json = serde_json::to_string_pretty(&next_epochs)?; Ok(RESTResponse::with_json(200, &json)) @@ -292,6 +342,33 @@ pub async fn handle_epoch_previous_blockfrost( .parse::() .map_err(|_| RESTError::invalid_param("epoch", "invalid epoch number"))?; + let latest_epoch_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( + EpochsStateQuery::GetLatestEpoch, + ))); + let latest_epoch = query_state( + &context, + &handlers_config.epochs_query_topic, + latest_epoch_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::LatestEpoch(res), + )) => Ok(res.epoch), + Message::StateQueryResponse(StateQueryResponse::Epochs( + EpochsStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving latest epoch", + )), + }, + ) + .await?; + + if parsed > latest_epoch.epoch { + return Err(RESTError::not_found( + format!("Epoch {parsed} not found").as_str(), + )); + } + let previous_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs( EpochsStateQuery::GetPreviousEpochs { epoch_number: parsed, @@ -299,15 +376,12 @@ pub async fn handle_epoch_previous_blockfrost( ))); let previous_epochs = query_state( &context, - &handlers_config.epochs_query_topic, + &handlers_config.historical_epochs_query_topic, previous_epochs_msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::PreviousEpochs(response), )) => Ok(response.epochs.into_iter().map(EpochActivityRest::from).collect::>()), - Message::StateQueryResponse(StateQueryResponse::Epochs( - EpochsStateQueryResponse::Error(QueryError::NotFound { .. }), - )) => Err(QueryError::not_found("Epoch")), Message::StateQueryResponse(StateQueryResponse::Epochs( EpochsStateQueryResponse::Error(e), )) => Err(e), diff --git a/modules/rest_blockfrost/src/handlers_config.rs b/modules/rest_blockfrost/src/handlers_config.rs index 837bfb06..4a49c715 100644 --- a/modules/rest_blockfrost/src/handlers_config.rs +++ b/modules/rest_blockfrost/src/handlers_config.rs @@ -5,7 +5,7 @@ use acropolis_common::queries::{ addresses::DEFAULT_ADDRESS_QUERY_TOPIC, assets::{DEFAULT_ASSETS_QUERY_TOPIC, DEFAULT_OFFCHAIN_TOKEN_REGISTRY_URL}, blocks::DEFAULT_BLOCKS_QUERY_TOPIC, - epochs::DEFAULT_EPOCHS_QUERY_TOPIC, + epochs::{DEFAULT_EPOCHS_QUERY_TOPIC, DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC}, governance::{DEFAULT_DREPS_QUERY_TOPIC, DEFAULT_GOVERNANCE_QUERY_TOPIC}, parameters::DEFAULT_PARAMETERS_QUERY_TOPIC, pools::DEFAULT_POOLS_QUERY_TOPIC, @@ -27,6 +27,7 @@ pub struct HandlersConfig { pub dreps_query_topic: String, pub governance_query_topic: String, pub epochs_query_topic: String, + pub historical_epochs_query_topic: String, pub spdd_query_topic: String, pub parameters_query_topic: String, pub utxos_query_topic: String, @@ -72,6 +73,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_EPOCHS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_EPOCHS_QUERY_TOPIC.1.to_string()); + let historical_epochs_query_topic = config + .get_string(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_HISTORICAL_EPOCHS_QUERY_TOPIC.1.to_string()); + let parameters_query_topic = config .get_string(DEFAULT_PARAMETERS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_PARAMETERS_QUERY_TOPIC.1.to_string()); @@ -102,6 +107,7 @@ impl From> for HandlersConfig { dreps_query_topic, governance_query_topic, epochs_query_topic, + historical_epochs_query_topic, spdd_query_topic, parameters_query_topic, utxos_query_topic, diff --git a/processes/omnibus/.gitignore b/processes/omnibus/.gitignore index ce63bc1e..f7cf0f86 100644 --- a/processes/omnibus/.gitignore +++ b/processes/omnibus/.gitignore @@ -1,9 +1,7 @@ downloads -sled-immutable-utxos -fjall-blocks -fjall-immutable-utxos cache upstream-cache # DB files -*_db +fjall-*/ +sled-*/ diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index bab79696..eb70d33d 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -31,6 +31,7 @@ acropolis_module_chain_store = { path = "../../modules/chain_store" } acropolis_module_address_state = { path = "../../modules/address_state" } acropolis_module_consensus = { path = "../../modules/consensus" } acropolis_module_historical_accounts_state = { path = "../../modules/historical_accounts_state" } +acropolis_module_historical_epochs_state = { path = "../../modules/historical_epochs_state" } acropolis_module_block_vrf_validator = { path = "../../modules/block_vrf_validator" } acropolis_module_block_kes_validator = { path = "../../modules/block_kes_validator" } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 2062c48e..a3a694b2 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -65,6 +65,8 @@ store-stake-addresses = false store-spdd = false [module.historical-accounts-state] +# Clear state on start up (default true) +clear-on-start = true # Enables /accounts/{stake_address}/rewards endpoint store-rewards-history = false # Enables /accounts/{stake_address}/history endpoint @@ -82,6 +84,10 @@ store-addresses = false # Enables /accounts/{stake_address}/addresses/total endpoint (Requires store-addresses to be enabled) store-tx-count = false +[module.historical-epochs-state] +# Clear state on start up (default true) +clear-on-start = true + [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) store-info = false @@ -107,13 +113,11 @@ cache-mode = "predefined" # "predefined", "read", "write", "write-if-absent" write-full-cache = "false" [module.epochs-state] -# Enables /epochs/{number} endpoint (for historical epochs) -store-history = false [module.accounts-state] # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints spdd-retention-epochs = 0 -spdd-db-path = "./spdd_db" +spdd-db-path = "./fjall-spdd" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv" diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index 6bf7e1d2..a018a6ab 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -22,6 +22,7 @@ use acropolis_module_epochs_state::EpochsState; use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; use acropolis_module_governance_state::GovernanceState; use acropolis_module_historical_accounts_state::HistoricalAccountsState; +use acropolis_module_historical_epochs_state::HistoricalEpochsState; use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; use acropolis_module_peer_network_interface::PeerNetworkInterface; @@ -118,6 +119,7 @@ pub async fn main() -> Result<()> { AddressState::register(&mut process); AssetsState::register(&mut process); HistoricalAccountsState::register(&mut process); + HistoricalEpochsState::register(&mut process); BlockfrostREST::register(&mut process); SPDDState::register(&mut process); DRDDState::register(&mut process); diff --git a/processes/replayer/.gitignore b/processes/replayer/.gitignore index 5ca16a7f..456bfd04 100644 --- a/processes/replayer/.gitignore +++ b/processes/replayer/.gitignore @@ -1,3 +1,5 @@ downloads -sled-immutable-utxos -fjall-immutable-utxos \ No newline at end of file + +# DB files +fjall-*/ +sled-*/ diff --git a/processes/replayer/replayer.toml b/processes/replayer/replayer.toml index 2e4044e7..06263731 100644 --- a/processes/replayer/replayer.toml +++ b/processes/replayer/replayer.toml @@ -110,7 +110,7 @@ store-history = false [module.accounts-state] # Enable /epochs/{number}/stakes & /epochs/{number}/stakes/{pool_id} endpoints spdd-retention-epochs = 0 -spdd-db-path = "./spdd_db" +spdd-db-path = "./fjall-spdd" # Verify against captured CSV verify-pots-file = "../../modules/accounts_state/test-data/pots.mainnet.csv" verify-rewards-files = "../../modules/accounts_state/test-data/rewards.mainnet.{}.csv"