From 489a71379d80a29eeb6b30c2bce08f0d96c8cc4d Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 21 Aug 2025 00:16:48 +0000 Subject: [PATCH 1/2] feat: Expanded drep_state with historical state and rollback support Signed-off-by: William Hankins --- common/src/messages.rs | 4 +- common/src/queries/accounts.rs | 11 + common/src/queries/mod.rs | 8 + common/src/state_history.rs | 14 + common/src/types.rs | 47 +- modules/accounts_state/src/accounts_state.rs | 9 + modules/accounts_state/src/state.rs | 35 +- modules/drep_state/src/drep_state.rs | 317 +++++++-- .../drep_state/src/drep_state_publisher.rs | 43 ++ modules/drep_state/src/state.rs | 637 +++++++++++++++--- modules/governance_state/src/state.rs | 90 +-- .../src/handlers/governance.rs | 4 +- modules/tx_unpacker/src/map_parameters.rs | 160 +++-- modules/tx_unpacker/src/tx_unpacker.rs | 19 +- processes/omnibus/omnibus.toml | 10 + 15 files changed, 1106 insertions(+), 302 deletions(-) create mode 100644 modules/drep_state/src/drep_state_publisher.rs diff --git a/common/src/messages.rs b/common/src/messages.rs index 68641ee2..c3b367a0 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -129,10 +129,10 @@ pub struct GovernanceProceduresMessage { pub proposal_procedures: Vec, /// Voting - pub voting_procedures: Vec<(DataHash, VotingProcedures)>, + pub voting_procedures: Vec<([u8; 32], VotingProcedures)>, /// Alonzo-compatible (from Shelley) and Babbage updates - pub alonzo_babbage_updates: Vec + pub alonzo_babbage_updates: Vec, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 8d3f6525..ab057eea 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,5 +1,10 @@ +use std::collections::HashMap; + use crate::{DRepChoice, KeyHash}; +pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = + ("accounts-state-query-topic", "cardano.query.accounts"); + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQuery { GetAccountInfo { stake_key: Vec }, @@ -16,6 +21,9 @@ pub enum AccountsStateQuery { // Pools related queries GetPoolsLiveStakes { pools_operators: Vec> }, + + // Dreps related queries + GetAccountsDrepDelegationsMap { stake_keys: Vec> }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -35,6 +43,9 @@ pub enum AccountsStateQueryResponse { // Pools related responses PoolsLiveStakes(PoolsLiveStakes), + // DReps related responses + AccountsDrepDelegationsMap(HashMap, Option>), + NotFound, Error(String), } diff --git a/common/src/queries/mod.rs b/common/src/queries/mod.rs index 25052fb1..c68f7014 100644 --- a/common/src/queries/mod.rs +++ b/common/src/queries/mod.rs @@ -1,3 +1,7 @@ +use crate::messages::Message; +use caryatid_sdk::Context; +use std::sync::Arc; + pub mod accounts; pub mod addresses; pub mod assets; @@ -12,3 +16,7 @@ pub mod pools; pub mod scripts; pub mod transactions; pub mod utils; + +pub fn get_query_topic(context: Arc>, topic: (&str, &str)) -> String { + context.config.get_string(topic.0).unwrap_or_else(|_| topic.1.to_string()) +} diff --git a/common/src/state_history.rs b/common/src/state_history.rs index 8e5842cd..d72b1484 100644 --- a/common/src/state_history.rs +++ b/common/src/state_history.rs @@ -89,3 +89,17 @@ impl StateHistory { }); } } + +/// Helper that lets callers initialize the first state with custom config. +impl StateHistory { + pub fn get_or_init_with(&mut self, init: F) -> S + where + F: FnOnce() -> S, + { + if let Some(current) = self.history.back() { + current.state.clone() + } else { + init() + } + } +} diff --git a/common/src/types.rs b/common/src/types.rs index 7d21eda6..c378c20c 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -4,8 +4,8 @@ use crate::{ address::{Address, StakeAddress}, + protocol_params, rational_number::RationalNumber, - protocol_params }; use anyhow::{anyhow, bail, Error, Result}; use bech32::{Bech32, Hrp}; @@ -658,6 +658,13 @@ pub struct DRepRegistration { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepRegistrationWithPos { + pub reg: DRepRegistration, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + /// DRep Deregistration = unreg_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepDeregistration { @@ -668,6 +675,13 @@ pub struct DRepDeregistration { pub refund: Lovelace, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepDeregistrationWithPos { + pub reg: DRepDeregistration, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + /// DRep Update = update_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepUpdate { @@ -678,6 +692,13 @@ pub struct DRepUpdate { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepUpdateWithPos { + pub reg: DRepUpdate, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + pub type CommitteeCredential = Credential; /// Authorise a committee hot credential @@ -774,12 +795,16 @@ impl Display for GovActionId { } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct CostModel (Vec); +pub struct CostModel(Vec); impl CostModel { - pub fn new(m: Vec) -> Self { CostModel(m) } + pub fn new(m: Vec) -> Self { + CostModel(m) + } - pub fn as_vec(&self) -> &Vec { &self.0 } + pub fn as_vec(&self) -> &Vec { + &self.0 + } } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] @@ -1079,13 +1104,13 @@ pub struct ProtocolParamUpdate { /// (Shelley) #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub protocol_version: Option + pub protocol_version: Option, } #[derive(Serialize, PartialEq, Deserialize, Debug, Clone)] pub struct AlonzoBabbageUpdateProposal { pub proposals: Vec<(GenesisKeyhash, Box)>, - pub enactment_epoch: u64 + pub enactment_epoch: u64, } #[derive(Serialize, PartialEq, Deserialize, Debug, Clone)] @@ -1205,6 +1230,7 @@ pub enum Vote { pub struct VotingProcedure { pub vote: Vote, pub anchor: Option, + pub vote_index: u32, } #[serde_as] @@ -1293,7 +1319,7 @@ pub struct AlonzoBabbageVotingOutcome { pub voting: Vec, pub votes_threshold: u64, pub accepted: bool, - pub parameter_update: Box + pub parameter_update: Box, } /// The structure has info about outcome of a single governance action. @@ -1371,13 +1397,13 @@ pub enum TxCertificate { ResignCommitteeCold(ResignCommitteeCold), /// DRep registration - DRepRegistration(DRepRegistration), + DRepRegistration(DRepRegistrationWithPos), /// DRep deregistration - DRepDeregistration(DRepDeregistration), + DRepDeregistration(DRepDeregistrationWithPos), /// DRep update - DRepUpdate(DRepUpdate), + DRepUpdate(DRepUpdateWithPos), } #[cfg(test)] @@ -1436,6 +1462,7 @@ mod tests { VotingProcedure { anchor: None, vote: Vote::Abstain, + vote_index: 0, }, ); voting.votes.insert( diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 8e3c0dab..9509fadf 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -439,6 +439,15 @@ impl AccountsState { }) } + AccountsStateQuery::GetAccountsDrepDelegationsMap { stake_keys } => match state + .get_drep_delegations_map(stake_keys) + { + Some(map) => AccountsStateQueryResponse::AccountsDrepDelegationsMap(map), + None => AccountsStateQueryResponse::Error( + "Error retrieving DRep delegations map".to_string(), + ), + }, + _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 6e46f7b3..4f7b7563 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -133,6 +133,24 @@ impl State { .collect() } + /// Map stake_keys to their delegated DRep + pub fn get_drep_delegations_map( + &self, + stake_keys: &[Vec], + ) -> Option, Option>> { + let accounts = self.stake_addresses.lock().ok()?; // If lock fails, return None + + let mut map = HashMap::new(); + + for stake_key in stake_keys { + let account = accounts.get(stake_key)?; + let maybe_drep = account.delegated_drep.clone(); + map.insert(stake_key.clone(), maybe_drep); + } + + Some(map) + } + /// Log statistics fn log_stats(&self) { info!(num_stake_addresses = self.stake_addresses.lock().unwrap().keys().len(),); @@ -798,11 +816,14 @@ impl State { // Zero withdrawals are expected, as a way to validate stake addresses (per Pi) if withdrawal.value != 0 { let mut sas = sas.clone(); - if let Err(e) = Self::update_value_with_delta(&mut sas.rewards, - -(withdrawal.value as i64)) { - error!("Withdrawing from stake address {} hash {}: {e}", - withdrawal.address.to_string().unwrap_or("???".to_string()), - hex::encode(hash)); + if let Err(e) = + Self::update_value_with_delta(&mut sas.rewards, -(withdrawal.value as i64)) + { + error!( + "Withdrawing from stake address {} hash {}: {e}", + withdrawal.address.to_string().unwrap_or("???".to_string()), + hex::encode(hash) + ); } else { // Update the stake address stake_addresses.insert(hash.to_vec(), sas); @@ -869,8 +890,8 @@ mod tests { use super::*; use acropolis_common::{ rational_number::RationalNumber, AddressNetwork, Anchor, Committee, Constitution, - CostModel, ConwayParams, Credential, DRepVotingThresholds, PoolVotingThresholds, - Pot, PotDelta, ProtocolParams, Ratio, Registration, StakeAddress, StakeAddressDelta, + ConwayParams, CostModel, Credential, DRepVotingThresholds, PoolVotingThresholds, Pot, + PotDelta, ProtocolParams, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAddressPayload, StakeAndVoteDelegation, StakeRegistrationAndStakeAndVoteDelegation, StakeRegistrationAndVoteDelegation, VoteDelegation, Withdrawal, }; diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index a1973747..860026d0 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -2,13 +2,15 @@ //! Accepts certificate events and derives the DRep State in memory use acropolis_common::{ - messages::{CardanoMessage, DRepStateMessage, Message, StateQuery, StateQueryResponse}, + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::governance::{ DRepInfo, DRepsList, GovernanceStateQuery, GovernanceStateQueryResponse, }, + state_history::StateHistory, + BlockInfo, BlockStatus, }; use anyhow::Result; -use caryatid_sdk::{module, Context, Module}; +use caryatid_sdk::{module, Context, Module, Subscription}; use config::Config; use std::sync::Arc; use tokio::sync::Mutex; @@ -16,10 +18,32 @@ use tracing::{error, info, info_span, Instrument}; mod state; use state::State; +mod drep_state_publisher; +use drep_state_publisher::DRepStatePublisher; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.certificates"; +use crate::state::DRepStorageConfig; + +// Subscription topics +const DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC: (&str, &str) = + ("certificates-subscribe-topic", "cardano.certificates"); +const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = + ("governance-subscribe-topic", "cardano.governance"); +const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = + ("parameters-subscribe-topic", "cardano.protocol.parameters"); + +// Publisher topic const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; +// Query topic +const DEFAULT_DREPS_QUERY_TOPIC: (&str, &str) = ("dreps-state-query-topic", "cardano.query.dreps"); + +// Configuration defaults +const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); +const DEFAULT_STORE_DELEGATORS: (&str, bool) = ("store-delegators", false); +const DEFAULT_STORE_METADATA: (&str, bool) = ("store-metadata", false); +const DEFAULT_STORE_UPDATES: (&str, bool) = ("store-updates", false); +const DEFAULT_STORE_VOTES: (&str, bool) = ("store-votes", false); + /// DRep State module #[module( message_type(Message), @@ -29,65 +53,211 @@ const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; pub struct DRepState; impl DRepState { - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - // Get configuration - let subscribe_topic = - config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); - info!("Creating subscriber on '{subscribe_topic}'"); + async fn run( + history: Arc>>, + mut certs_subscription: Box>, + mut gov_subscription: Option>>, + mut params_subscription: Option>>, + mut drep_state_publisher: DRepStatePublisher, + context: Arc>, + storage_config: DRepStorageConfig, + ) -> Result<()> { + if storage_config.store_info { + if let Some(sub) = params_subscription.as_mut() { + let _ = sub.read().await?; + info!("Consumed initial genesis params from params_subscription"); + } + } + // Main loop of synchronised messages + loop { + // Get the current state snapshot + let mut state = { + let mut h = history.lock().await; + h.get_or_init_with(|| State::new(storage_config)) + }; + let mut current_block: Option = None; - let drep_state_topic = config - .get_string("publish-drep-state-topic") - .unwrap_or(DEFAULT_DREP_STATE_TOPIC.to_string()); - info!("Creating DRep state publisher on '{drep_state_topic}'"); + // Read per-block messages in parallel + let certs_message_f = certs_subscription.read(); - let state = Arc::new(Mutex::new(State::new())); + // Certificates are the synchroniser + let (_, certs_message) = certs_message_f.await?; + let new_epoch = match certs_message.as_ref() { + Message::Cardano((ref block_info, _)) => { + // rollback only on certs + if block_info.status == BlockStatus::RolledBack { + state = history.lock().await.get_rolled_back_state(&block_info); + } + current_block = Some(block_info.clone()); + block_info.new_epoch && block_info.epoch > 0 + } + _ => false, + }; - // Subscribe for certificate messages - let state1 = state.clone(); - let mut subscription = context.subscribe(&subscribe_topic).await?; - let context_subscribe = context.clone(); - context.run(async move { - loop { - let Ok((_, message)) = subscription.read().await else { - return; - }; + // Read from epoch-boundary messages only when it's a new epoch + if new_epoch { + // Read params subscription if store-info is enabled to obtain DRep expiration param. Update expirations on epoch transition + if let Some(sub) = params_subscription.as_mut() { + let (_, message) = sub.read().await?; + match message.as_ref() { + Message::Cardano(( + ref block_info, + CardanoMessage::ProtocolParams(params), + )) => { + Self::check_sync(¤t_block, &block_info, "params"); + if let Some(conway) = ¶ms.params.conway { + state + .update_drep_expirations( + block_info.epoch, + conway.d_rep_activity, + ) + .inspect_err(|e| error!("Param update error: {e:#}")) + .ok(); + } + } + _ => error!("Unexpected params message: {message:?}"), + } + } + + // Publish DRep state at the end of the epoch + if let Some(ref block) = current_block { + let dreps = state.active_drep_list(); + drep_state_publisher.publish_drep_state(&block, dreps).await?; + } + } + + // Handle cert message + match certs_message.as_ref() { + Message::Cardano(( + ref block_info, + CardanoMessage::TxCertificates(tx_certs_msg), + )) => { + let span = info_span!("drep_state.handle_certs", block = block_info.number); + async { + Self::check_sync(¤t_block, &block_info, "certs"); + state + .process_certificates( + context.clone(), + &tx_certs_msg.certificates, + block_info.epoch, + ) + .await + .inspect_err(|e| error!("Certificates handling error: {e:#}")) + .ok(); + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {certs_message:?}"), + } + + // Handle governance message + if let Some(sub) = gov_subscription.as_mut() { + let (_, message) = sub.read().await?; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_cert_msg))) => { - let span = info_span!("drep_state.handle", block = block_info.number); + Message::Cardano(( + block_info, + CardanoMessage::GovernanceProcedures(gov_msg), + )) => { + let span = info_span!("drep_state.handle_votes", block = block_info.number); async { - let mut state = state1.lock().await; + Self::check_sync(¤t_block, &block_info, "gov"); state - .handle(&tx_cert_msg) - .await - .inspect_err(|e| error!("Messaging handling error: {e}")) + .process_votes(&gov_msg.voting_procedures) + .inspect_err(|e| error!("Votes handling error: {e:#}")) .ok(); - - if block_info.new_epoch && block_info.epoch > 0 { - // publish DRep state at end of epoch - let dreps = state.active_drep_list(); - let message = Message::Cardano(( - block_info.clone(), - CardanoMessage::DRepState(DRepStateMessage { - epoch: block_info.epoch, - dreps, - }), - )); - context_subscribe - .publish(&drep_state_topic, Arc::new(message)) - .await - .unwrap_or_else(|e| error!("Failed to publish: {e}")); - } - }.instrument(span).await; + } + .instrument(span) + .await; } _ => error!("Unexpected message type: {message:?}"), } } - }); - let query_state = state.clone(); - context.handle("drep-state", move |message| { - let state_handle = query_state.clone(); // your shared Arc> + // Commit the new state + if let Some(block_info) = current_block { + history.lock().await.commit(&block_info, state); + } + } + } + + /// Check for synchronisation + fn check_sync(expected: &Option, actual: &BlockInfo, source: &str) { + if let Some(ref block) = expected { + if block.number != actual.number { + error!( + expected = block.number, + actual = actual.number, + source = source, + "Messages out of sync (expected certs block {}, got {} from {})", + block.number, + actual.number, + source, + ); + panic!( + "Message streams diverged: certs at {} vs {} from {}", + block.number, actual.number, source + ); + } + } + } + + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + fn get_bool_flag(config: &Config, key: (&str, bool)) -> bool { + config.get_bool(key.0).unwrap_or(key.1) + } + + fn get_string_flag(config: &Config, key: (&str, &str)) -> String { + config.get_string(key.0).unwrap_or_else(|_| key.1.to_string()) + } + + // Get configuration flags and topis + let storage_config = DRepStorageConfig { + store_info: get_bool_flag(&config, DEFAULT_STORE_INFO), + store_delegators: get_bool_flag(&config, DEFAULT_STORE_DELEGATORS), + store_metadata: get_bool_flag(&config, DEFAULT_STORE_METADATA), + store_updates: get_bool_flag(&config, DEFAULT_STORE_UPDATES), + store_votes: get_bool_flag(&config, DEFAULT_STORE_VOTES), + }; + + let certificates_subscribe_topic = + get_string_flag(&config, DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{certificates_subscribe_topic}'"); + + let mut governance_subscribe_topic = String::new(); + if storage_config.store_votes { + governance_subscribe_topic = + get_string_flag(&config, DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{governance_subscribe_topic}'"); + } + + let mut parameters_subscribe_topic = String::new(); + if storage_config.store_info { + parameters_subscribe_topic = + get_string_flag(&config, DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{parameters_subscribe_topic}'"); + } + + let drep_state_topic = config + .get_string("publish-drep-state-topic") + .unwrap_or(DEFAULT_DREP_STATE_TOPIC.to_string()); + info!("Creating DRep state publisher on '{drep_state_topic}'"); + + let drep_query_topic = get_string_flag(&config, DEFAULT_DREPS_QUERY_TOPIC); + info!("Creating DRep query handler on '{drep_query_topic}'"); + + // Initalize state history + let history = Arc::new(Mutex::new(StateHistory::::new("DRepState"))); + let history_run = history.clone(); + let query_history = history.clone(); + let ticker_history = history.clone(); + let ctx_run = context.clone(); + + // Query handler + context.handle(&drep_query_topic, move |message| { + let history = query_history.clone(); async move { let Message::StateQuery(StateQuery::Governance(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Governance( @@ -97,15 +267,15 @@ impl DRepState { ))); }; - let locked = state_handle.lock().await; + let state = history.lock().await.get_current_state(); let response = match query { GovernanceStateQuery::GetDRepsList => { - let dreps = locked.list(); + let dreps = state.list(); GovernanceStateQueryResponse::DRepsList(DRepsList { dreps }) } GovernanceStateQuery::GetDRepInfo { drep_credential } => { - match locked.get_drep(&drep_credential) { + match state.get_drep(&drep_credential) { Some(record) => GovernanceStateQueryResponse::DRepInfo(DRepInfo { deposit: record.deposit, anchor: record.anchor.clone(), @@ -125,8 +295,7 @@ impl DRepState { }); // Ticker to log stats - let mut subscription = context.subscribe(&subscribe_topic).await?; - let state2 = state.clone(); + let mut subscription = context.subscribe("clock.tick").await?; context.run(async move { loop { let Ok((_, message)) = subscription.read().await else { @@ -136,19 +305,55 @@ impl DRepState { if (message.number % 60) == 0 { let span = info_span!("drep_state.tick", number = message.number); async { - state2 + ticker_history .lock() .await + .get_current_state() .tick() .await .inspect_err(|e| error!("Tick error: {e}")) .ok(); - }.instrument(span).await; + } + .instrument(span) + .await; } } } }); + // Publisher for DRep State + let drep_state_publisher = DRepStatePublisher::new(context.clone(), drep_state_topic); + + // Subscribe to enabled topics + let certs_sub = context.subscribe(&certificates_subscribe_topic).await?; + + let gov_sub = if storage_config.store_votes { + Some(context.subscribe(&governance_subscribe_topic).await?) + } else { + None + }; + + let params_sub = if storage_config.store_info { + Some(context.subscribe(¶meters_subscribe_topic).await?) + } else { + None + }; + + // Start run task + context.run(async move { + Self::run( + history_run, + certs_sub, + gov_sub, + params_sub, + drep_state_publisher, + ctx_run, + storage_config, + ) + .await + .unwrap_or_else(|e| error!("Failed: {e}")); + }); + Ok(()) } } diff --git a/modules/drep_state/src/drep_state_publisher.rs b/modules/drep_state/src/drep_state_publisher.rs new file mode 100644 index 00000000..7a94a099 --- /dev/null +++ b/modules/drep_state/src/drep_state_publisher.rs @@ -0,0 +1,43 @@ +use acropolis_common::{ + messages::{CardanoMessage, DRepStateMessage, Message}, + BlockInfo, Credential, +}; +use caryatid_sdk::Context; +use std::sync::Arc; + +/// Message publisher for DRep State +pub struct DRepStatePublisher { + /// Module context + context: Arc>, + + /// Topic to publish on + topic: String, +} + +impl DRepStatePublisher { + /// Construct with context and topic to publish on + pub fn new(context: Arc>, topic: String) -> Self { + Self { context, topic } + } + + /// Publish the DRep state + pub async fn publish_drep_state( + &mut self, + block: &BlockInfo, + dreps: Vec<(Credential, u64)>, + ) -> anyhow::Result<()> { + self.context + .message_bus + .publish( + &self.topic, + Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::DRepState(DRepStateMessage { + epoch: block.epoch, + dreps, + }), + ))), + ) + .await + } +} diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 9c574030..4ea0f0ca 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -1,12 +1,19 @@ //! Acropolis DRepState: State storage use acropolis_common::{ - messages::TxCertificatesMessage, Anchor, DRepCredential, Lovelace, TxCertificate, + messages::{Message, StateQuery, StateQueryResponse}, + queries::{ + accounts::{AccountsStateQuery, AccountsStateQueryResponse, DEFAULT_ACCOUNTS_QUERY_TOPIC}, + get_query_topic, + }, + Anchor, Credential, DRepChoice, DRepCredential, Lovelace, StakeCredential, TxCertificate, Vote, + Voter, VotingProcedures, }; use anyhow::{anyhow, Result}; +use caryatid_sdk::Context; use serde_with::serde_as; -use std::collections::HashMap; -use tracing::info; +use std::{collections::HashMap, sync::Arc}; +use tracing::{error, info}; #[serde_as] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -15,20 +22,111 @@ pub struct DRepRecord { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct HistoricalDRepState { + // Populated from the reg field in: + // - DRepRegistration + // - DRepDeregistration + // - DRepUpdate + pub info: Option, + pub updates: Option>, + pub metadata: Option>, + + // Populated from the drep and credential fields in: + // - VoteDelegation + // - StakeAndVoteDelegation + // - StakeRegistrationAndVoteDelegation + // - StakeRegistrationAndStakeAndVoteDelegation + pub delegators: Option>, + + // Populated from voting_procedures in GovernanceProceduresMessage + pub votes: Option>, +} + +impl HistoricalDRepState { + pub fn from_config(cfg: &DRepStorageConfig) -> Self { + Self { + info: cfg.store_info.then(DRepRecordExtended::default), + updates: cfg.store_updates.then(Vec::new), + metadata: cfg.store_metadata.then_some(None), + delegators: cfg.store_delegators.then(Vec::new), + votes: cfg.store_votes.then(Vec::new), + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct DRepRecordExtended { + pub deposit: Lovelace, + pub expired: bool, + pub retired: bool, + pub active_epoch: Option, + pub last_active_epoch: u64, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct DRepUpdateEvent { + pub tx_hash: [u8; 32], + pub cert_index: u64, + pub action: DRepActionUpdate, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub enum DRepActionUpdate { + Registered, + Updated, + Deregistered, +} + +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)] +pub struct VoteRecord { + pub tx_hash: [u8; 32], + pub vote_index: u32, + pub vote: Vote, +} + impl DRepRecord { pub fn new(deposit: Lovelace, anchor: Option) -> Self { Self { deposit, anchor } } } +#[derive(Debug, Copy, Clone, Default)] +pub struct DRepStorageConfig { + pub store_info: bool, + pub store_delegators: bool, + pub store_metadata: bool, + pub store_updates: bool, + pub store_votes: bool, +} + +impl DRepStorageConfig { + fn any_enabled(&self) -> bool { + self.store_info + || self.store_delegators + || self.store_metadata + || self.store_updates + || self.store_votes + } +} + +#[derive(Debug, Default, Clone)] pub struct State { - dreps: HashMap, + pub config: DRepStorageConfig, + pub dreps: HashMap, + pub historical_dreps: Option>, } impl State { - pub fn new() -> Self { + pub fn new(config: DRepStorageConfig) -> Self { Self { + config, dreps: HashMap::new(), + historical_dreps: if config.any_enabled() { + Some(HashMap::new()) + } else { + None + }, } } @@ -56,49 +154,100 @@ impl State { } impl State { - fn process_one_certificate(&mut self, tx_cert: &TxCertificate) -> Result { - match tx_cert { - TxCertificate::DRepRegistration(reg) => match self.dreps.get_mut(®.credential) { - Some(ref mut drep) => { - if reg.deposit != 0 { - Err(anyhow!("DRep registration {:?}: replacement requires deposit = 0, instead of {}", - reg.credential, reg.deposit - )) - } else { - drep.anchor = reg.anchor.clone(); - Ok(false) - } + pub async fn process_certificates( + &mut self, + context: Arc>, + tx_certs: &Vec, + epoch: u64, + ) -> Result<()> { + let mut batched_delegators = Vec::new(); + let store_delegators = self.config.store_delegators; + + for tx_cert in tx_certs { + if store_delegators { + if let Some((cred, drep)) = Self::extract_delegation_fields(tx_cert) { + batched_delegators.push((cred, drep)); + continue; } - None => { - self.dreps.insert( - reg.credential.clone(), - DRepRecord::new(reg.deposit, reg.anchor.clone()), - ); - Ok(true) + } + + if let Err(e) = self.process_one_cert(tx_cert, epoch) { + tracing::error!("Error processing tx_cert: {e}"); + } + } + + // Batched delegations to reduce redundant queries to accounts_state + if store_delegators && !batched_delegators.is_empty() { + if let Err(e) = self.update_delegators(&context, batched_delegators).await { + tracing::error!("Error processing batched delegators: {e}"); + } + } + + Ok(()) + } + + pub fn process_votes( + &mut self, + voting_procedures: &[([u8; 32], VotingProcedures)], + ) -> Result<()> { + let Some(hist_map) = self.historical_dreps.as_mut() else { + return Ok(()); + }; + + let cfg = self.config.clone(); + for (tx_hash, voting_procedures) in voting_procedures { + for (voter, single_votes) in &voting_procedures.votes { + let drep_cred = match voter { + Voter::DRepKey(k) => DRepCredential::AddrKeyHash(k.to_vec()), + Voter::DRepScript(s) => DRepCredential::ScriptHash(s.to_vec()), + _ => continue, + }; + + let entry = hist_map + .entry(drep_cred) + .or_insert_with(|| HistoricalDRepState::from_config(&cfg)); + + // ensure votes vec exists if we created from a config that didn’t set it before + if entry.votes.is_none() { + entry.votes = Some(Vec::new()); } - }, - TxCertificate::DRepDeregistration(reg) => { - if self.dreps.remove(®.credential).is_none() { - Err(anyhow!( - "DRep registration {:?}: internal error, credential not found", - reg.credential - )) - } else { - Ok(true) + let votes = entry.votes.as_mut().unwrap(); + + for (_, vp) in &single_votes.voting_procedures { + votes.push(VoteRecord { + tx_hash: tx_hash.clone(), + vote_index: vp.vote_index, + vote: vp.vote.clone(), + }); } } - TxCertificate::DRepUpdate(reg) => match self.dreps.get_mut(®.credential) { - Some(ref mut drep) => { - drep.anchor = reg.anchor.clone(); - Ok(false) + } + Ok(()) + } + + pub fn update_drep_expirations( + &mut self, + current_epoch: u64, + expired_epoch_param: u32, + ) -> Result<()> { + let expired_offset = expired_epoch_param as u64; + + // If historical storage isn’t enabled, nothing to do. + let Some(historical_dreps) = self.historical_dreps.as_mut() else { + return Ok(()); + }; + + for (_cred, drep_record) in historical_dreps.iter_mut() { + if let Some(info) = drep_record.info.as_mut() { + if let (Some(active_epoch), false) = (info.active_epoch, info.expired) { + if active_epoch + expired_offset <= current_epoch { + info.expired = true; + } } - None => Err(anyhow!( - "DRep registration {:?}: internal error, credential not found", - reg.credential - )), - }, - _ => Ok(false), + } } + + Ok(()) } pub fn active_drep_list(&self) -> Vec<(DRepCredential, Lovelace)> { @@ -109,22 +258,253 @@ impl State { distribution } - pub async fn handle(&mut self, tx_cert_msg: &TxCertificatesMessage) -> Result<()> { - for tx_cert in tx_cert_msg.certificates.iter() { - if let Err(e) = self.process_one_certificate(tx_cert) { - tracing::error!("Error processing tx_cert {}", e); + fn process_one_cert(&mut self, tx_cert: &TxCertificate, epoch: u64) -> Result { + match tx_cert { + TxCertificate::DRepRegistration(reg) => { + let new = match self.dreps.get_mut(®.reg.credential) { + Some(drep) => { + if reg.reg.deposit != 0 { + return Err(anyhow!( + "DRep registration {:?}: replacement requires deposit = 0, got {}", + reg.reg.credential, + reg.reg.deposit + )); + } + drep.anchor = reg.reg.anchor.clone(); + false + } + None => { + self.dreps.insert( + reg.reg.credential.clone(), + DRepRecord::new(reg.reg.deposit, reg.reg.anchor.clone()), + ); + true + } + }; + + if self.historical_dreps.is_some() { + if let Err(err) = self.update_historical(®.reg.credential, true, |entry| { + if let Some(info) = entry.info.as_mut() { + info.deposit = reg.reg.deposit; + info.expired = false; + info.retired = false; + info.active_epoch = Some(epoch); + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Registered, + }); + } + if let Some(anchor) = ®.reg.anchor { + if let Some(inner) = entry.metadata.as_mut() { + *inner = Some(anchor.clone()); + } + } + }) { + return Err(anyhow!("Failed to update DRep on registration: {err}")); + } + } + + Ok(new) + } + + TxCertificate::DRepDeregistration(reg) => { + // Update live state + if self.dreps.remove(®.reg.credential).is_none() { + return Err(anyhow!( + "DRep deregistration {:?}: credential not found", + reg.reg.credential + )); + } + + // Update history if enabled + if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Some(info) = entry.info.as_mut() { + info.deposit = 0; + info.expired = false; + info.retired = true; + info.active_epoch = None; + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Deregistered, + }); + } + }) { + return Err(anyhow!("Failed to update DRep on deregistration: {err}")); + } + + Ok(true) + } + + TxCertificate::DRepUpdate(reg) => { + // Update live state + let drep = self.dreps.get_mut(®.reg.credential).ok_or_else(|| { + anyhow!("DRep update {:?}: credential not found", reg.reg.credential) + })?; + drep.anchor = reg.reg.anchor.clone(); + + // Update history if enabled + if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Some(info) = entry.info.as_mut() { + info.expired = false; + info.retired = false; + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Updated, + }); + } + if let Some(anchor) = ®.reg.anchor { + if let Some(inner) = entry.metadata.as_mut() { + *inner = Some(anchor.clone()); + } + } + }) { + error!("Historical update failed: {err}"); + } + + Ok(false) + } + + _ => Ok(false), + } + } + + fn update_historical( + &mut self, + credential: &DRepCredential, + create_if_missing: bool, + f: F, + ) -> Result<()> + where + F: FnOnce(&mut HistoricalDRepState), + { + let hist = self + .historical_dreps + .as_mut() + .ok_or_else(|| anyhow!("No historical map configured"))?; + + if create_if_missing { + let cfg = self.config.clone(); + let entry = hist + .entry(credential.clone()) + .or_insert_with(|| HistoricalDRepState::from_config(&cfg)); + f(entry); + } else if let Some(entry) = hist.get_mut(credential) { + f(entry); + } else { + error!("Tried to update unknown DRep credential: {:?}", credential); + } + + Ok(()) + } + + async fn update_delegators( + &mut self, + context: &Arc>, + delegators: Vec<(&StakeCredential, &DRepChoice)>, + ) -> Result<()> { + let stake_keys: Vec<_> = delegators.iter().map(|(sc, _)| sc.get_hash()).collect(); + let stake_key_to_input: HashMap<_, _> = delegators + .iter() + .zip(&stake_keys) + .map(|((sc, drep), key)| (key.clone(), (*sc, *drep))) + .collect(); + + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountsDrepDelegationsMap { stake_keys }, + ))); + + let accounts_query_topic = get_query_topic(context.clone(), DEFAULT_ACCOUNTS_QUERY_TOPIC); + let response = context.message_bus.request(&accounts_query_topic, msg).await?; + let message = Arc::try_unwrap(response).unwrap_or_else(|arc| (*arc).clone()); + + // TODO: Ensure AccountsStateQueryResponse is for the correct block + let result_map = match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountsDrepDelegationsMap(map), + )) => map, + _ => { + return Err(anyhow!("Unexpected accounts-state response")); + } + }; + + for (stake_key, old_drep_opt) in result_map { + let (delegator, new_drep_choice) = match stake_key_to_input.get(&stake_key) { + Some(pair) => *pair, + None => continue, + }; + + let new_drep_cred = match drep_choice_to_credential(new_drep_choice) { + Some(c) => c, + None => continue, + }; + + if let Some(old_drep) = old_drep_opt { + if let Some(old_drep_cred) = drep_choice_to_credential(&old_drep) { + if old_drep_cred != new_drep_cred { + self.update_historical(&old_drep_cred, false, |entry| { + if let Some(delegators) = entry.delegators.as_mut() { + delegators.retain(|s| s != delegator); + } + })?; + } + } + } + + // Add delegator to new DRep + match self.update_historical(&new_drep_cred, true, |entry| { + if let Some(delegators) = entry.delegators.as_mut() { + if !delegators.contains(delegator) { + delegators.push(delegator.clone()); + } + } + }) { + Ok(_) => {} + Err(err) => return Err(anyhow!("Failed to update new delegator: {err}")), } } Ok(()) } + + fn extract_delegation_fields(cert: &TxCertificate) -> Option<(&StakeCredential, &DRepChoice)> { + match cert { + TxCertificate::VoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeAndVoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeRegistrationAndVoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeRegistrationAndStakeAndVoteDelegation(d) => { + Some((&d.credential, &d.drep)) + } + _ => None, + } + } +} + +fn drep_choice_to_credential(choice: &DRepChoice) -> Option { + match choice { + DRepChoice::Key(k) => Some(DRepCredential::AddrKeyHash(k.clone())), + DRepChoice::Script(k) => Some(DRepCredential::ScriptHash(k.clone())), + _ => None, + } } #[cfg(test)] mod tests { - use crate::state::{DRepRecord, State}; + use crate::state::{DRepRecord, DRepStorageConfig, State}; use acropolis_common::{ - Anchor, Credential, DRepDeregistration, DRepRegistration, DRepUpdate, TxCertificate, + Anchor, Credential, DRepDeregistration, DRepDeregistrationWithPos, DRepRegistration, + DRepRegistrationWithPos, DRepUpdate, DRepUpdateWithPos, TxCertificate, }; const CRED_1: [u8; 28] = [ @@ -139,13 +519,17 @@ mod tests { #[test] fn test_drep_process_one_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { deposit: 500000000, @@ -160,20 +544,28 @@ mod tests { #[test] fn test_drep_do_not_replace_existing_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); - - let bad_tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 600000000, - anchor: None, + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); + + let bad_tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 600000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - assert!(state.process_one_certificate(&bad_tx_cert).is_err()); + assert!(state.process_one_cert(&bad_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { @@ -189,25 +581,33 @@ mod tests { #[test] fn test_drep_update_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); let anchor = Anchor { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdate { - credential: tx_cred.clone(), - anchor: Some(anchor.clone()), + let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: tx_cred.clone(), + anchor: Some(anchor.clone()), + }, + tx_hash: [0u8; 32], + cert_index: 1, }); assert_eq!( - state.process_one_certificate(&update_anchor_tx_cert).unwrap(), + state.process_one_cert(&update_anchor_tx_cert, 1).unwrap(), false ); @@ -225,24 +625,31 @@ mod tests { #[test] fn test_drep_do_not_update_nonexistent_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); let anchor = Anchor { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdate { - credential: Credential::AddrKeyHash(CRED_2.to_vec()), - anchor: Some(anchor.clone()), + let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: Credential::AddrKeyHash(CRED_2.to_vec()), + anchor: Some(anchor.clone()), + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - - assert!(state.process_one_certificate(&update_anchor_tx_cert).is_err()); + assert!(state.process_one_cert(&update_anchor_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { @@ -258,20 +665,28 @@ mod tests { #[test] fn test_drep_deregister() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistration { - credential: tx_cred.clone(), - refund: 500000000, + let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: tx_cred.clone(), + refund: 500000000, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); assert_eq!( - state.process_one_certificate(&unregister_tx_cert).unwrap(), + state.process_one_cert(&unregister_tx_cert, 1).unwrap(), true ); assert_eq!(state.get_count(), 0); @@ -281,19 +696,27 @@ mod tests { #[test] fn test_drep_do_not_deregister_nonexistent_cert() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistration { - credential: Credential::AddrKeyHash(CRED_2.to_vec()), - refund: 500000000, + let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: Credential::AddrKeyHash(CRED_2.to_vec()), + refund: 500000000, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - assert!(state.process_one_certificate(&unregister_tx_cert).is_err()); + assert!(state.process_one_cert(&unregister_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); assert_eq!(state.get_drep(&tx_cred).unwrap().deposit, 500000000); } diff --git a/modules/governance_state/src/state.rs b/modules/governance_state/src/state.rs index b50e7ad7..fc57a6cb 100644 --- a/modules/governance_state/src/state.rs +++ b/modules/governance_state/src/state.rs @@ -1,19 +1,17 @@ //! Acropolis Governance State: State storage +use crate::alonzo_babbage_voting::AlonzoBabbageVoting; +use crate::VotingRegistrationState; use acropolis_common::{ messages::{ - CardanoMessage, DRepStakeDistributionMessage, SPOStakeDistributionMessage, - GovernanceOutcomesMessage, - GovernanceProceduresMessage, Message, ProtocolParamsMessage, + CardanoMessage, DRepStakeDistributionMessage, GovernanceOutcomesMessage, + GovernanceProceduresMessage, Message, ProtocolParamsMessage, SPOStakeDistributionMessage, }, - BlockInfo, ConwayParams, DelegatedStake, DRepCredential, DataHash, - EnactStateElem, Era, GovActionId, - GovernanceAction, GovernanceOutcome, GovernanceOutcomeVariant, - KeyHash, Lovelace, ProposalProcedure, SingleVoterVotes, - TreasuryWithdrawalsAction, Voter, VotesCount, VotingOutcome, VotingProcedure + BlockInfo, ConwayParams, DRepCredential, DataHash, DelegatedStake, EnactStateElem, Era, + GovActionId, GovernanceAction, GovernanceOutcome, GovernanceOutcomeVariant, KeyHash, Lovelace, + ProposalProcedure, SingleVoterVotes, TreasuryWithdrawalsAction, Voter, VotesCount, + VotingOutcome, VotingProcedure, }; -use crate::alonzo_babbage_voting::AlonzoBabbageVoting; -use crate::VotingRegistrationState; use anyhow::{anyhow, bail, Result}; use caryatid_sdk::Context; use hex::ToHex; @@ -88,7 +86,7 @@ impl State { pub async fn handle_drep_stake( &mut self, drep_message: &DRepStakeDistributionMessage, - spo_message: &SPOStakeDistributionMessage + spo_message: &SPOStakeDistributionMessage, ) -> Result<()> { self.drep_stake_messages_count += 1; self.drep_stake = HashMap::from_iter(drep_message.dreps.iter().cloned()); @@ -104,17 +102,17 @@ impl State { governance_message: &GovernanceProceduresMessage, ) -> Result<()> { if block.era < Era::Conway { - if !(governance_message.proposal_procedures.is_empty() && - governance_message.voting_procedures.is_empty()) + if !(governance_message.proposal_procedures.is_empty() + && governance_message.voting_procedures.is_empty()) { bail!("Non-empty governance message for pre-conway block {block:?}"); } - } - else { + } else { if !governance_message.alonzo_babbage_updates.is_empty() { - if let Err(e) = self.alonzo_babbage_voting.process_update_proposals( - block, &governance_message.alonzo_babbage_updates - ) { + if let Err(e) = self + .alonzo_babbage_voting + .process_update_proposals(block, &governance_message.alonzo_babbage_updates) + { error!("Error handling governance_message: '{e}'"); } } @@ -172,14 +170,15 @@ impl State { fn insert_voting_procedure( &mut self, voter: &Voter, - transaction: &DataHash, + transaction: &[u8; 32], voter_votes: &SingleVoterVotes, ) -> Result<()> { for (action_id, procedure) in voter_votes.voting_procedures.iter() { let votes = self.votes.entry(action_id.clone()).or_insert_with(|| HashMap::new()); - if let Some((prev_trans, prev_vote)) = - votes.insert(voter.clone(), (transaction.clone(), procedure.clone())) - { + if let Some((prev_trans, prev_vote)) = votes.insert( + voter.clone(), + (transaction.clone().to_vec(), procedure.clone()), + ) { // Re-voting is allowed; new vote must be treated as the proper one, // older is to be discarded. if tracing::enabled!(tracing::Level::DEBUG) { @@ -193,7 +192,11 @@ impl State { } /// Checks whether action_id can be considered finally accepted - fn is_finally_accepted(&self, voting_state: &VotingRegistrationState, action_id: &GovActionId) -> Result { + fn is_finally_accepted( + &self, + voting_state: &VotingRegistrationState, + action_id: &GovActionId, + ) -> Result { let (_epoch, proposal) = self .proposals .get(action_id) @@ -312,7 +315,9 @@ impl State { } fn finalize_conway_voting( - &mut self, new_block: &BlockInfo, voting_state: &VotingRegistrationState + &mut self, + new_block: &BlockInfo, + voting_state: &VotingRegistrationState, ) -> Result> { let mut outcome = Vec::::new(); let actions = self.proposals.keys().map(|a| a.clone()).collect::>(); @@ -336,12 +341,10 @@ impl State { action_to_perform, }) } - Ok(Some(out)) => { - outcome.push(GovernanceOutcome { - voting: out, - action_to_perform: GovernanceOutcomeVariant::NoAction, - }) - } + Ok(Some(out)) => outcome.push(GovernanceOutcome { + voting: out, + action_to_perform: GovernanceOutcomeVariant::NoAction, + }), } } @@ -349,24 +352,29 @@ impl State { } fn recalculate_voting_state(&self) -> Result { - let drep_stake = self.drep_stake.iter().map(|(_dr,lov)| lov).sum(); + let drep_stake = self.drep_stake.iter().map(|(_dr, lov)| lov).sum(); let committee_usize = self.get_conway_params()?.committee.members.len(); - let committee = committee_usize.try_into().or_else( - |e| Err(anyhow!("Commitee size: conversion usize -> u64 failed, {e}")) - )?; + let committee = committee_usize.try_into().or_else(|e| { + Err(anyhow!( + "Commitee size: conversion usize -> u64 failed, {e}" + )) + })?; - let spo_stake = self.spo_stake.iter().map(|(_sp,ds)| ds.live).sum(); + let spo_stake = self.spo_stake.iter().map(|(_sp, ds)| ds.live).sum(); - Ok(VotingRegistrationState::new(spo_stake, spo_stake, drep_stake, committee)) + Ok(VotingRegistrationState::new( + spo_stake, spo_stake, drep_stake, committee, + )) } /// Loops through all actions and checks their status for the new_epoch /// All incoming data (parameters for the epoch, drep distribution, etc) /// should already be actual at this moment. - pub fn process_new_epoch(&mut self, new_block: &BlockInfo) - -> Result - { + pub fn process_new_epoch( + &mut self, + new_block: &BlockInfo, + ) -> Result { let mut output = GovernanceOutcomesMessage::default(); output.alonzo_babbage_outcomes = self.alonzo_babbage_voting.finalize_voting(new_block)?; @@ -377,7 +385,9 @@ impl State { info!( "Conway voting, epoch {} ({}): {voting_state}, total {} actions, {acc} accepted", - new_block.epoch, new_block.era, outcome.len() + new_block.epoch, + new_block.era, + outcome.len() ); output.conway_outcomes = outcome; } diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index bae3e41f..4e4b31c7 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -17,7 +17,7 @@ pub async fn handle_dreps_list_blockfrost( let msg = Arc::new(Message::StateQuery(StateQuery::Governance( GovernanceStateQuery::GetDRepsList, ))); - let raw = context.message_bus.request("drep-state", msg).await?; + let raw = context.message_bus.request("cardano.query.dreps", msg).await?; let message = Arc::try_unwrap(raw).unwrap_or_else(|arc| (*arc).clone()); match message { Message::StateQueryResponse(StateQueryResponse::Governance( @@ -68,7 +68,7 @@ pub async fn handle_single_drep_blockfrost( }, ))); - let raw = context.message_bus.request("drep-state", msg).await?; + let raw = context.message_bus.request("cardano.query.dreps", msg).await?; let message = Arc::try_unwrap(raw).unwrap_or_else(|arc| (*arc).clone()); match message { diff --git a/modules/tx_unpacker/src/map_parameters.rs b/modules/tx_unpacker/src/map_parameters.rs index e05c7688..0d33bfb1 100644 --- a/modules/tx_unpacker/src/map_parameters.rs +++ b/modules/tx_unpacker/src/map_parameters.rs @@ -4,17 +4,17 @@ use anyhow::{anyhow, bail, Result}; use pallas::ledger::{ primitives::{ - ExUnitPrices as PallasExUnitPrices, - ProtocolVersion as PallasProtocolVersion, - alonzo, babbage, conway, Nullable, Relay as PallasRelay, ScriptHash, + alonzo, babbage, conway, ExUnitPrices as PallasExUnitPrices, Nullable, + ProtocolVersion as PallasProtocolVersion, Relay as PallasRelay, ScriptHash, StakeCredential as PallasStakeCredential, }, - traverse::MultiEraCert, * + traverse::MultiEraCert, + *, }; use acropolis_common::{ - rational_number::RationalNumber, protocol_params::{Nonce, NonceVariant, ProtocolVersion}, + rational_number::RationalNumber, *, }; use std::collections::{HashMap, HashSet}; @@ -166,22 +166,20 @@ fn map_constitution(constitution: &conway::Constitution) -> Constitution { /// Map a Pallas Relay to ours fn map_relay(relay: &PallasRelay) -> Relay { match relay { - PallasRelay::SingleHostAddr(port, ipv4, ipv6) => { - Relay::SingleHostAddr(SingleHostAddr { - port: match port { - Nullable::Some(port) => Some(*port as u16), - _ => None, - }, - ipv4: match ipv4 { - Nullable::Some(ipv4) => ipv4.try_into().ok(), - _ => None, - }, - ipv6: match ipv6 { - Nullable::Some(ipv6) => ipv6.try_into().ok(), - _ => None, - }, - }) - } + PallasRelay::SingleHostAddr(port, ipv4, ipv6) => Relay::SingleHostAddr(SingleHostAddr { + port: match port { + Nullable::Some(port) => Some(*port as u16), + _ => None, + }, + ipv4: match ipv4 { + Nullable::Some(ipv4) => ipv4.try_into().ok(), + _ => None, + }, + ipv6: match ipv6 { + Nullable::Some(ipv6) => ipv6.try_into().ok(), + _ => None, + }, + }), PallasRelay::SingleHostName(port, dns_name) => Relay::SingleHostName(SingleHostName { port: match port { Nullable::Some(port) => Some(*port as u16), @@ -195,13 +193,14 @@ fn map_relay(relay: &PallasRelay) -> Relay { } } -// +// // Certificates // /// Derive our TxCertificate from a Pallas Certificate pub fn map_certificate( cert: &MultiEraCert, + tx_hash: [u8; 32], tx_index: usize, cert_index: usize, ) -> Result { @@ -383,15 +382,13 @@ pub fn map_certificate( }), ), - conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => { - Ok(TxCertificate::StakeRegistrationAndDelegation( - StakeRegistrationAndDelegation { - credential: map_stake_credential(cred), - operator: pool_key_hash.to_vec(), - deposit: *coin, - }, - )) - } + conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => Ok( + TxCertificate::StakeRegistrationAndDelegation(StakeRegistrationAndDelegation { + credential: map_stake_credential(cred), + operator: pool_key_hash.to_vec(), + deposit: *coin, + }), + ), conway::Certificate::VoteRegDeleg(cred, drep, coin) => { Ok(TxCertificate::StakeRegistrationAndVoteDelegation( @@ -429,24 +426,36 @@ pub fn map_certificate( } conway::Certificate::RegDRepCert(cred, coin, anchor) => { - Ok(TxCertificate::DRepRegistration(DRepRegistration { - credential: map_stake_credential(cred), - deposit: *coin, - anchor: map_nullable_anchor(&anchor), + Ok(TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: map_stake_credential(cred), + deposit: *coin, + anchor: map_nullable_anchor(&anchor), + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, })) } - conway::Certificate::UnRegDRepCert(cred, coin) => { - Ok(TxCertificate::DRepDeregistration(DRepDeregistration { - credential: map_stake_credential(cred), - refund: *coin, - })) - } + conway::Certificate::UnRegDRepCert(cred, coin) => Ok( + TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: map_stake_credential(cred), + refund: *coin, + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, + }), + ), conway::Certificate::UpdateDRepCert(cred, anchor) => { - Ok(TxCertificate::DRepUpdate(DRepUpdate { - credential: map_stake_credential(cred), - anchor: map_nullable_anchor(&anchor), + Ok(TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: map_stake_credential(cred), + anchor: map_nullable_anchor(&anchor), + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, })) } } @@ -495,17 +504,20 @@ fn map_alonzo_nonce(e: &alonzo::Nonce) -> Nonce { alonzo::NonceVariant::NeutralNonce => NonceVariant::NeutralNonce, alonzo::NonceVariant::Nonce => NonceVariant::Nonce, }, - hash: e.hash.map(|v| v.to_vec()) + hash: e.hash.map(|v| v.to_vec()), } } - fn map_alonzo_single_model(model: &alonzo::CostModel) -> Option { Some(CostModel::new(model.clone())) } fn map_alonzo_cost_models(pallas_cost_models: &alonzo::CostModels) -> Result { - let mut res = CostModels { plutus_v1: None, plutus_v2: None, plutus_v3: None }; + let mut res = CostModels { + plutus_v1: None, + plutus_v2: None, + plutus_v3: None, + }; for (lang, mdl) in pallas_cost_models.iter() { if *lang == alonzo::Language::PlutusV1 { res.plutus_v1 = map_alonzo_single_model(mdl); @@ -519,7 +531,7 @@ fn map_alonzo_cost_models(pallas_cost_models: &alonzo::CostModels) -> Result ProtocolVersion { ProtocolVersion { minor: *minor, - major: *major + major: *major, } } @@ -566,7 +578,8 @@ fn map_conway_protocol_param_update(p: &conway::ProtocolParamUpdate) -> Box Box) -> Option { n.as_ref().map(|x| *x as u64) } -pub fn map_alonzo_protocol_param_update(p: &alonzo::ProtocolParamUpdate) - -> Result> -{ +pub fn map_alonzo_protocol_param_update( + p: &alonzo::ProtocolParamUpdate, +) -> Result> { Ok(Box::new(ProtocolParamUpdate { // Fields, common for Conway and Alonzo-compatible minfee_a: map_u32_to_u64(p.minfee_a), @@ -673,8 +688,11 @@ pub fn map_alonzo_protocol_param_update(p: &alonzo::ProtocolParamUpdate) treasury_growth_rate: p.treasury_growth_rate.as_ref().map(&map_unit_interval), min_pool_cost: p.min_pool_cost.clone(), ada_per_utxo_byte: p.ada_per_utxo_byte.clone(), - cost_models_for_script_languages: - p.cost_models_for_script_languages.as_ref().map(&map_alonzo_cost_models).transpose()?, + cost_models_for_script_languages: p + .cost_models_for_script_languages + .as_ref() + .map(&map_alonzo_cost_models) + .transpose()?, execution_costs: p.execution_costs.as_ref().map(&map_execution_costs), max_tx_ex_units: p.max_tx_ex_units.as_ref().map(&map_ex_units), max_block_ex_units: p.max_block_ex_units.as_ref().map(&map_ex_units), @@ -704,13 +722,13 @@ fn map_babbage_cost_models(cost_models: &babbage::CostModels) -> CostModels { CostModels { plutus_v1: cost_models.plutus_v1.as_ref().map(|p| CostModel::new(p.clone())), plutus_v2: cost_models.plutus_v2.as_ref().map(|p| CostModel::new(p.clone())), - plutus_v3: None + plutus_v3: None, } } -pub fn map_babbage_protocol_param_update(p: &babbage::ProtocolParamUpdate) - -> Result> -{ +pub fn map_babbage_protocol_param_update( + p: &babbage::ProtocolParamUpdate, +) -> Result> { Ok(Box::new(ProtocolParamUpdate { // Fields, common for Conway and Alonzo-compatible minfee_a: map_u32_to_u64(p.minfee_a), @@ -727,8 +745,10 @@ pub fn map_babbage_protocol_param_update(p: &babbage::ProtocolParamUpdate) treasury_growth_rate: p.treasury_growth_rate.as_ref().map(&map_unit_interval), min_pool_cost: p.min_pool_cost.clone(), ada_per_utxo_byte: p.ada_per_utxo_byte.clone(), - cost_models_for_script_languages: - p.cost_models_for_script_languages.as_ref().map(&map_babbage_cost_models), + cost_models_for_script_languages: p + .cost_models_for_script_languages + .as_ref() + .map(&map_babbage_cost_models), execution_costs: p.execution_costs.as_ref().map(&map_execution_costs), max_tx_ex_units: p.max_tx_ex_units.as_ref().map(&map_ex_units), max_block_ex_units: p.max_block_ex_units.as_ref().map(&map_ex_units), @@ -790,10 +810,14 @@ fn map_vote(vote: &conway::Vote) -> Vote { } } -fn map_single_governance_voting_procedure(proc: &conway::VotingProcedure) -> VotingProcedure { +fn map_single_governance_voting_procedure( + vote_index: u32, + proc: &conway::VotingProcedure, +) -> VotingProcedure { VotingProcedure { vote: map_vote(&proc.vote), anchor: map_nullable_anchor(&proc.anchor), + vote_index, } } @@ -808,9 +832,7 @@ pub fn map_all_governance_voting_procedures( let voter = map_voter(pallas_voter); if let Some(existing) = procs.votes.insert(voter.clone(), SingleVoterVotes::default()) { - bail!( - "Duplicate voter {voter:?}: procedure {vote_procs:?}, existing {existing:?}" - ); + bail!("Duplicate voter {voter:?}: procedure {vote_procs:?}, existing {existing:?}"); } let single_voter = procs @@ -818,13 +840,15 @@ pub fn map_all_governance_voting_procedures( .get_mut(&voter) .ok_or_else(|| anyhow!("Cannot find voter {:?}, which must present", voter))?; - for (pallas_action_id, pallas_voting_procedure) in pallas_pair.iter() { + for (vote_index, (pallas_action_id, pallas_voting_procedure)) in + pallas_pair.iter().enumerate() + { let action_id = map_gov_action_id(pallas_action_id)?; - let vp = map_single_governance_voting_procedure(&pallas_voting_procedure); + let vp = + map_single_governance_voting_procedure(vote_index as u32, &pallas_voting_procedure); single_voter.voting_procedures.insert(action_id, vp); } } Ok(procs) } - diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 627e93da..6c1419a9 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -9,13 +9,13 @@ use acropolis_common::{ *, }; use caryatid_sdk::{module, Context, Module}; -use std::{fmt::Debug, clone::Clone, sync::Arc}; +use std::{clone::Clone, fmt::Debug, sync::Arc}; use anyhow::Result; use config::Config; use futures::future::join_all; -use pallas::ledger::{traverse::MultiEraTx, primitives, traverse}; use pallas::ledger::primitives::KeyValuePairs; +use pallas::ledger::{primitives, traverse, traverse::MultiEraTx}; use tracing::{debug, error, info, info_span, Instrument}; mod map_parameters; @@ -40,15 +40,13 @@ impl TxUnpacker { ) { let mut update = AlonzoBabbageUpdateProposal { proposals: Vec::new(), - enactment_epoch: epoch + enactment_epoch: epoch, }; for (hash, vote) in proposals.iter() { match map(vote) { - Ok(upd) => - update.proposals.push((hash.to_vec(), upd)), - Err(e) => - error!("Cannot convert alonzo protocol param update {vote:?}: {e}") + Ok(upd) => update.proposals.push((hash.to_vec(), upd)), + Err(e) => error!("Cannot convert alonzo protocol param update {vote:?}: {e}"), } } @@ -214,10 +212,11 @@ impl TxUnpacker { } if publish_certificates_topic.is_some() { + let tx_hash = tx.hash(); for ( cert_index, cert) in certs.iter().enumerate() { - match map_parameters::map_certificate(&cert, tx_index, cert_index) { + match map_parameters::map_certificate(&cert, *tx_hash, tx_index, cert_index) { Ok(tx_cert) => { - certificates.push(tx_cert); + certificates.push( tx_cert); }, Err(_e) => { // TODO error unexpected @@ -259,7 +258,7 @@ impl TxUnpacker { if let Some(pallas_vp) = votes { // Nonempty set -- governance_message.voting_procedures will not be empty match map_parameters::map_all_governance_voting_procedures(pallas_vp) { - Ok(vp) => voting_procedures.push((tx.hash().to_vec(), vp)), + Ok(vp) => voting_procedures.push((*tx.hash(), vp)), Err(e) => error!("Cannot decode governance voting procedures in slot {}: {e}", block.slot) } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index a2deedaf..a90d0360 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -33,6 +33,16 @@ address-delta-topic = "cardano.address.delta" [module.spo-state] [module.drep-state] +# Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) +store-info = true +# Enables /governance/dreps/{drep_id}/delegators endpoint +store-delegators = true +# Enables /governance/dreps/{drep_id}/metadata endpoint +store-metadata = true +# Enables /governance/dreps/{drep_id}/updates endpoint +store-updates = true +# Enables /governance/dreps/{drep_id}/votes endpoint +store-votes = true [module.governance-state] From baa72d14d80d88e9039c09ef3a85e20945952f97 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 21 Aug 2025 01:46:12 +0000 Subject: [PATCH 2/2] fix: gate historical updates during deregistration cert processing Signed-off-by: William Hankins --- modules/drep_state/src/state.rs | 34 +++++++++++++++++---------------- processes/omnibus/omnibus.toml | 10 +++++----- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 4ea0f0ca..5cc2a8e9 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -321,23 +321,25 @@ impl State { } // Update history if enabled - if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { - if let Some(info) = entry.info.as_mut() { - info.deposit = 0; - info.expired = false; - info.retired = true; - info.active_epoch = None; - info.last_active_epoch = epoch; - } - if let Some(updates) = entry.updates.as_mut() { - updates.push(DRepUpdateEvent { - tx_hash: reg.tx_hash, - cert_index: reg.cert_index, - action: DRepActionUpdate::Deregistered, - }); + if self.historical_dreps.is_some() { + if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Some(info) = entry.info.as_mut() { + info.deposit = 0; + info.expired = false; + info.retired = true; + info.active_epoch = None; + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Deregistered, + }); + } + }) { + return Err(anyhow!("Failed to update DRep on deregistration: {err}")); } - }) { - return Err(anyhow!("Failed to update DRep on deregistration: {err}")); } Ok(true) diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index a90d0360..a23cae07 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -34,15 +34,15 @@ address-delta-topic = "cardano.address.delta" [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) -store-info = true +store-info = false # Enables /governance/dreps/{drep_id}/delegators endpoint -store-delegators = true +store-delegators = false # Enables /governance/dreps/{drep_id}/metadata endpoint -store-metadata = true +store-metadata = false # Enables /governance/dreps/{drep_id}/updates endpoint -store-updates = true +store-updates = false # Enables /governance/dreps/{drep_id}/votes endpoint -store-votes = true +store-votes = false [module.governance-state]