Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions common/src/queries/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub enum AccountsStateQuery {

// Epochs-related queries
GetActiveStakes {},
GetSPDDByEpoch { epoch: u64 },
GetSPDDByEpochAndPool { epoch: u64, pool_id: KeyHash },

// Pools related queries
GetOptimalPoolSizing,
Expand Down Expand Up @@ -57,6 +59,10 @@ pub enum AccountsStateQueryResponse {

// Epochs-related responses
ActiveStakes(u64),
/// Vec<(PoolId, StakeKey, ActiveStakeAmount)>
SPDDByEpoch(Vec<(KeyHash, KeyHash, u64)>),
/// Vec<(StakeKey, ActiveStakeAmount)>
SPDDByEpochAndPool(Vec<(KeyHash, u64)>),

// Pools-related responses
OptimalPoolSizing(Option<OptimalPoolSizing>),
Expand Down
3 changes: 3 additions & 0 deletions common/src/queries/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ pub const DEFAULT_PARAMETERS_QUERY_TOPIC: (&str, &str) =
pub enum ParametersStateQuery {
GetLatestEpochParameters,
GetEpochParameters { epoch_number: u64 },
GetNetworkName,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum ParametersStateQueryResponse {
LatestEpochParameters(ProtocolParams),
EpochParameters(ProtocolParams),
NetworkName(String),

NotFound,
Error(String),
}
Expand Down
61 changes: 59 additions & 2 deletions common/src/serialization.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::marker::PhantomData;

use anyhow::anyhow;
use bech32::{Bech32, Hrp};
use serde::{ser::SerializeMap, Serializer};
use serde_with::{ser::SerializeAsWrap, SerializeAs};
use serde::{ser::SerializeMap, Deserialize, Serializer};
use serde_with::{ser::SerializeAsWrap, DeserializeAs, SerializeAs};

pub struct SerializeMapAs<KAs, VAs>(std::marker::PhantomData<(KAs, VAs)>);

Expand All @@ -26,6 +28,61 @@ where
}
}

// Marker types for different HRP prefixes
pub struct PoolPrefix;
pub struct StakePrefix;
pub struct AddrPrefix;

// Trait to get HRP string from marker types
pub trait HrpPrefix {
const HRP: &'static str;
}

impl HrpPrefix for PoolPrefix {
const HRP: &'static str = "pool";
}

impl HrpPrefix for StakePrefix {
const HRP: &'static str = "stake";
}

impl HrpPrefix for AddrPrefix {
const HRP: &'static str = "addr";
}

// Generic Bech32 converter with HRP parameter
pub struct DisplayFromBech32<PREFIX: HrpPrefix>(PhantomData<PREFIX>);

// Serialization implementation
impl<PREFIX> SerializeAs<Vec<u8>> for DisplayFromBech32<PREFIX>
where
PREFIX: HrpPrefix,
{
fn serialize_as<S>(source: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let bech32_string =
source.to_bech32_with_hrp(PREFIX::HRP).map_err(serde::ser::Error::custom)?;

serializer.serialize_str(&bech32_string)
}
}

// Deserialization implementation
impl<'de, PREFIX> DeserializeAs<'de, Vec<u8>> for DisplayFromBech32<PREFIX>
where
PREFIX: HrpPrefix,
{
fn deserialize_as<D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
Vec::<u8>::from_bech32_with_hrp(&s, PREFIX::HRP).map_err(serde::de::Error::custom)
}
}

pub trait Bech32WithHrp {
fn to_bech32_with_hrp(&self, hrp: &str) -> Result<String, anyhow::Error>;
fn from_bech32_with_hrp(s: &str, expected_hrp: &str) -> Result<Vec<u8>, anyhow::Error>;
Expand Down
20 changes: 19 additions & 1 deletion common/src/stake_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl StakeAddressMap {
/// Derive the Stake Pool Delegation Distribution (SPDD) - a map of total stake values
/// (both with and without rewards) for each active SPO
/// And Stake Pool Reward State (rewards and delegators_count for each pool)
/// Key of returned map is the SPO 'operator' ID
/// <PoolId -> DelegatedStake>
pub fn generate_spdd(&self) -> BTreeMap<KeyHash, DelegatedStake> {
// Shareable Dashmap with referenced keys
let spo_stakes = DashMap::<KeyHash, DelegatedStake>::new();
Expand Down Expand Up @@ -317,6 +317,24 @@ impl StakeAddressMap {
spo_stakes.iter().map(|entry| (entry.key().clone(), entry.value().clone())).collect()
}

/// Dump current Stake Pool Delegation Distribution State
/// <PoolId -> (Stake Key, Active Stakes Amount)>
pub fn dump_spdd_state(&self) -> HashMap<KeyHash, Vec<(KeyHash, u64)>> {
let entries: Vec<_> = self
.inner
.par_iter()
.filter_map(|(key, sas)| {
sas.delegated_spo.as_ref().map(|spo| (spo.clone(), (key.clone(), sas.utxo_value)))
})
.collect();

let mut result: HashMap<KeyHash, Vec<(KeyHash, u64)>> = HashMap::new();
for (spo, entry) in entries {
result.entry(spo).or_default().push(entry);
}
result
}

/// Derive the DRep Delegation Distribution (DRDD) - the total amount
/// delegated to each DRep, including the special "abstain" and "no confidence" dreps.
pub fn generate_drdd(
Expand Down
2 changes: 2 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ impl Default for UTXODelta {
/// Key hash used for pool IDs etc.
pub type KeyHash = Vec<u8>;

pub type PoolId = Vec<u8>;

/// Script identifier
pub type ScriptHash = KeyHash;

Expand Down
1 change: 1 addition & 0 deletions modules/accounts_state/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*_db
1 change: 1 addition & 0 deletions modules/accounts_state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde_json = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
fjall = "2.11.2"
rayon = "1.10.0"
csv = "1.3.1"
itertools = "0.14.0"
Expand Down
109 changes: 106 additions & 3 deletions modules/accounts_state/src/accounts_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use acropolis_common::queries::accounts::{
};
use verifier::Verifier;

use crate::spo_distribution_store::SPDDStore;
mod spo_distribution_store;

const DEFAULT_SPO_STATE_TOPIC: &str = "cardano.spo.state";
const DEFAULT_EPOCH_ACTIVITY_TOPIC: &str = "cardano.epoch.activity";
const DEFAULT_TX_CERTIFICATES_TOPIC: &str = "cardano.certificates";
Expand All @@ -46,6 +49,10 @@ const DEFAULT_SPO_REWARDS_TOPIC: &str = "cardano.spo.rewards";
const DEFAULT_PROTOCOL_PARAMETERS_TOPIC: &str = "cardano.protocol.parameters";
const DEFAULT_STAKE_REWARD_DELTAS_TOPIC: &str = "cardano.stake.reward.deltas";

const DEFAULT_STORE_SPDD_HISTORY: (&str, bool) = ("store-spdd-history", false);
const DEFAULT_SPDD_DB_PATH: (&str, &str) = ("spdd-db-path", "./spdd_db");
const DEFAULT_SPDD_RETENTION_EPOCHS: &str = "spdd-retention-epochs";

/// Accounts State module
#[module(
message_type(Message),
Expand All @@ -58,6 +65,7 @@ impl AccountsState {
/// Async run loop
async fn run(
history: Arc<Mutex<StateHistory<State>>>,
spdd_store: Option<Arc<Mutex<SPDDStore>>>,
mut drep_publisher: DRepDistributionPublisher,
mut spo_publisher: SPODistributionPublisher,
mut spo_rewards_publisher: SPORewardsPublisher,
Expand Down Expand Up @@ -144,6 +152,11 @@ impl AccountsState {
let ea_message_f = ea_subscription.read();
let params_message_f = parameters_subscription.read();

let spdd_store_guard = match spdd_store.as_ref() {
Some(s) => Some(s.lock().await),
None => None,
};

// Handle DRep
let (_, message) = dreps_message_f.await?;
match message.as_ref() {
Expand Down Expand Up @@ -185,6 +198,17 @@ impl AccountsState {
if let Err(e) = spo_publisher.publish_spdd(block_info, spdd).await {
error!("Error publishing SPO stake distribution: {e:#}")
}

// if we store spdd history
let spdd_state = state.dump_spdd_state();
if let Some(mut spdd_store) = spdd_store_guard {
// active stakes taken at beginning of epoch i is for epoch + 1
if let Err(e) =
spdd_store.store_spdd(block_info.epoch + 1, spdd_state)
{
error!("Error storing SPDD state: {e:#}")
}
}
}
.instrument(span)
.await;
Expand Down Expand Up @@ -396,6 +420,42 @@ impl AccountsState {
.unwrap_or(DEFAULT_STAKE_REWARD_DELTAS_TOPIC.to_string());
info!("Creating stake reward deltas subscriber on '{stake_reward_deltas_topic}'");

// store spdd history config
let store_spdd_history =
config.get_bool(DEFAULT_STORE_SPDD_HISTORY.0).unwrap_or(DEFAULT_STORE_SPDD_HISTORY.1);
info!("Store SPDD history: {}", store_spdd_history);

let spdd_db_path =
config.get_string(DEFAULT_SPDD_DB_PATH.0).unwrap_or(DEFAULT_SPDD_DB_PATH.1.to_string());

// Convert to absolute path if relative
let spdd_db_path = if std::path::Path::new(&spdd_db_path).is_absolute() {
spdd_db_path
} else {
let current_dir = std::env::current_dir()
.map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?;
current_dir.join(&spdd_db_path).to_string_lossy().to_string()
};

// Get retention epochs configuration (None = unlimited)
let spdd_retention_epochs = config
.get_int(DEFAULT_SPDD_RETENTION_EPOCHS)
.ok()
.and_then(|v| if v > 0 { Some(v as u64) } else { None });
info!("SPDD retention epochs: {:?}", spdd_retention_epochs);

if store_spdd_history {
info!("SPDD database path: {}", spdd_db_path);
match spdd_retention_epochs {
Some(epochs) => info!(
"SPDD retention: {} epochs (~{} GB max)",
epochs,
(epochs as f64 * 0.12).ceil()
),
None => info!("SPDD retention: unlimited (no automatic pruning)"),
}
}

// Query topics
let accounts_query_topic = config
.get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0)
Expand All @@ -415,16 +475,28 @@ impl AccountsState {
verifier.read_rewards(&verify_rewards_files);
}

// Create history
// History
let history = Arc::new(Mutex::new(StateHistory::<State>::new(
"AccountsState",
StateHistoryStore::default_block_store(),
)));
let history_account_state = history.clone();
let history_query = history.clone();
let history_tick = history.clone();

// Spdd store
let spdd_store = if store_spdd_history {
Some(Arc::new(Mutex::new(SPDDStore::load(
std::path::Path::new(&spdd_db_path),
spdd_retention_epochs,
)?)))
} else {
None
};
let spdd_store_query = spdd_store.clone();

context.handle(&accounts_query_topic, move |message| {
let history = history_account_state.clone();
let history = history_query.clone();
let spdd_store = spdd_store_query.clone();
async move {
let Message::StateQuery(StateQuery::Accounts(query)) = message.as_ref() else {
return Arc::new(Message::StateQueryResponse(StateQueryResponse::Accounts(
Expand All @@ -435,6 +507,10 @@ impl AccountsState {
};

let guard = history.lock().await;
let spdd_store_guard = match spdd_store.as_ref() {
Some(s) => Some(s.lock().await),
None => None,
};
let state = match guard.current() {
Some(s) => s,
None => {
Expand Down Expand Up @@ -539,6 +615,32 @@ impl AccountsState {
}
}

AccountsStateQuery::GetSPDDByEpoch { epoch } => match spdd_store_guard {
Some(spdd_store) => match spdd_store.query_by_epoch(*epoch) {
Ok(result) => AccountsStateQueryResponse::SPDDByEpoch(result),
Err(e) => AccountsStateQueryResponse::Error(e.to_string()),
},
None => AccountsStateQueryResponse::Error(
"SPDD store is not enabled".to_string(),
),
},

AccountsStateQuery::GetSPDDByEpochAndPool { epoch, pool_id } => {
match spdd_store_guard {
Some(spdd_store) => {
match spdd_store.query_by_epoch_and_pool(*epoch, pool_id) {
Ok(result) => {
AccountsStateQueryResponse::SPDDByEpochAndPool(result)
}
Err(e) => AccountsStateQueryResponse::Error(e.to_string()),
}
}
None => AccountsStateQueryResponse::Error(
"SPDD store is not enabled".to_string(),
),
}
}

_ => AccountsStateQueryResponse::Error(format!(
"Unimplemented query variant: {:?}",
query
Expand Down Expand Up @@ -595,6 +697,7 @@ impl AccountsState {
context.run(async move {
Self::run(
history,
spdd_store,
drep_publisher,
spo_publisher,
spo_rewards_publisher,
Expand Down
Loading