diff --git a/common/src/messages.rs b/common/src/messages.rs index 976cc87f..c3b367a0 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -129,7 +129,7 @@ pub struct GovernanceProceduresMessage { pub proposal_procedures: Vec, /// Voting - pub voting_procedures: Vec<(DataHash, VotingProcedures)>, + pub voting_procedures: Vec<([u8; 32], VotingProcedures)>, /// Alonzo-compatible (from Shelley) and Babbage updates pub alonzo_babbage_updates: Vec, diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 8d3f6525..ab057eea 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,5 +1,10 @@ +use std::collections::HashMap; + use crate::{DRepChoice, KeyHash}; +pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = + ("accounts-state-query-topic", "cardano.query.accounts"); + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AccountsStateQuery { GetAccountInfo { stake_key: Vec }, @@ -16,6 +21,9 @@ pub enum AccountsStateQuery { // Pools related queries GetPoolsLiveStakes { pools_operators: Vec> }, + + // Dreps related queries + GetAccountsDrepDelegationsMap { stake_keys: Vec> }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -35,6 +43,9 @@ pub enum AccountsStateQueryResponse { // Pools related responses PoolsLiveStakes(PoolsLiveStakes), + // DReps related responses + AccountsDrepDelegationsMap(HashMap, Option>), + NotFound, Error(String), } diff --git a/common/src/queries/mod.rs b/common/src/queries/mod.rs index 25052fb1..c68f7014 100644 --- a/common/src/queries/mod.rs +++ b/common/src/queries/mod.rs @@ -1,3 +1,7 @@ +use crate::messages::Message; +use caryatid_sdk::Context; +use std::sync::Arc; + pub mod accounts; pub mod addresses; pub mod assets; @@ -12,3 +16,7 @@ pub mod pools; pub mod scripts; pub mod transactions; pub mod utils; + +pub fn get_query_topic(context: Arc>, topic: (&str, &str)) -> String { + context.config.get_string(topic.0).unwrap_or_else(|_| topic.1.to_string()) +} diff --git a/common/src/state_history.rs b/common/src/state_history.rs index 8e5842cd..d72b1484 100644 --- a/common/src/state_history.rs +++ b/common/src/state_history.rs @@ -89,3 +89,17 @@ impl StateHistory { }); } } + +/// Helper that lets callers initialize the first state with custom config. +impl StateHistory { + pub fn get_or_init_with(&mut self, init: F) -> S + where + F: FnOnce() -> S, + { + if let Some(current) = self.history.back() { + current.state.clone() + } else { + init() + } + } +} diff --git a/common/src/types.rs b/common/src/types.rs index b642605b..c378c20c 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -658,6 +658,13 @@ pub struct DRepRegistration { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepRegistrationWithPos { + pub reg: DRepRegistration, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + /// DRep Deregistration = unreg_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepDeregistration { @@ -668,6 +675,13 @@ pub struct DRepDeregistration { pub refund: Lovelace, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepDeregistrationWithPos { + pub reg: DRepDeregistration, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + /// DRep Update = update_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepUpdate { @@ -678,6 +692,13 @@ pub struct DRepUpdate { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DRepUpdateWithPos { + pub reg: DRepUpdate, + pub tx_hash: [u8; 32], + pub cert_index: u64, +} + pub type CommitteeCredential = Credential; /// Authorise a committee hot credential @@ -1209,6 +1230,7 @@ pub enum Vote { pub struct VotingProcedure { pub vote: Vote, pub anchor: Option, + pub vote_index: u32, } #[serde_as] @@ -1375,13 +1397,13 @@ pub enum TxCertificate { ResignCommitteeCold(ResignCommitteeCold), /// DRep registration - DRepRegistration(DRepRegistration), + DRepRegistration(DRepRegistrationWithPos), /// DRep deregistration - DRepDeregistration(DRepDeregistration), + DRepDeregistration(DRepDeregistrationWithPos), /// DRep update - DRepUpdate(DRepUpdate), + DRepUpdate(DRepUpdateWithPos), } #[cfg(test)] @@ -1440,6 +1462,7 @@ mod tests { VotingProcedure { anchor: None, vote: Vote::Abstain, + vote_index: 0, }, ); voting.votes.insert( diff --git a/modules/accounts_state/src/accounts_state.rs b/modules/accounts_state/src/accounts_state.rs index 8e3c0dab..9509fadf 100644 --- a/modules/accounts_state/src/accounts_state.rs +++ b/modules/accounts_state/src/accounts_state.rs @@ -439,6 +439,15 @@ impl AccountsState { }) } + AccountsStateQuery::GetAccountsDrepDelegationsMap { stake_keys } => match state + .get_drep_delegations_map(stake_keys) + { + Some(map) => AccountsStateQueryResponse::AccountsDrepDelegationsMap(map), + None => AccountsStateQueryResponse::Error( + "Error retrieving DRep delegations map".to_string(), + ), + }, + _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 2b2d5c69..4f7b7563 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -133,6 +133,24 @@ impl State { .collect() } + /// Map stake_keys to their delegated DRep + pub fn get_drep_delegations_map( + &self, + stake_keys: &[Vec], + ) -> Option, Option>> { + let accounts = self.stake_addresses.lock().ok()?; // If lock fails, return None + + let mut map = HashMap::new(); + + for stake_key in stake_keys { + let account = accounts.get(stake_key)?; + let maybe_drep = account.delegated_drep.clone(); + map.insert(stake_key.clone(), maybe_drep); + } + + Some(map) + } + /// Log statistics fn log_stats(&self) { info!(num_stake_addresses = self.stake_addresses.lock().unwrap().keys().len(),); diff --git a/modules/drep_state/src/drep_state.rs b/modules/drep_state/src/drep_state.rs index 7b201aec..860026d0 100644 --- a/modules/drep_state/src/drep_state.rs +++ b/modules/drep_state/src/drep_state.rs @@ -2,13 +2,15 @@ //! Accepts certificate events and derives the DRep State in memory use acropolis_common::{ - messages::{CardanoMessage, DRepStateMessage, Message, StateQuery, StateQueryResponse}, + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, queries::governance::{ DRepInfo, DRepsList, GovernanceStateQuery, GovernanceStateQueryResponse, }, + state_history::StateHistory, + BlockInfo, BlockStatus, }; use anyhow::Result; -use caryatid_sdk::{module, Context, Module}; +use caryatid_sdk::{module, Context, Module, Subscription}; use config::Config; use std::sync::Arc; use tokio::sync::Mutex; @@ -16,10 +18,32 @@ use tracing::{error, info, info_span, Instrument}; mod state; use state::State; +mod drep_state_publisher; +use drep_state_publisher::DRepStatePublisher; -const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.certificates"; +use crate::state::DRepStorageConfig; + +// Subscription topics +const DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC: (&str, &str) = + ("certificates-subscribe-topic", "cardano.certificates"); +const DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC: (&str, &str) = + ("governance-subscribe-topic", "cardano.governance"); +const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = + ("parameters-subscribe-topic", "cardano.protocol.parameters"); + +// Publisher topic const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; +// Query topic +const DEFAULT_DREPS_QUERY_TOPIC: (&str, &str) = ("dreps-state-query-topic", "cardano.query.dreps"); + +// Configuration defaults +const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); +const DEFAULT_STORE_DELEGATORS: (&str, bool) = ("store-delegators", false); +const DEFAULT_STORE_METADATA: (&str, bool) = ("store-metadata", false); +const DEFAULT_STORE_UPDATES: (&str, bool) = ("store-updates", false); +const DEFAULT_STORE_VOTES: (&str, bool) = ("store-votes", false); + /// DRep State module #[module( message_type(Message), @@ -29,54 +53,120 @@ const DEFAULT_DREP_STATE_TOPIC: &str = "cardano.drep.state"; pub struct DRepState; impl DRepState { - pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - // Get configuration - let subscribe_topic = - config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); - info!("Creating subscriber on '{subscribe_topic}'"); + async fn run( + history: Arc>>, + mut certs_subscription: Box>, + mut gov_subscription: Option>>, + mut params_subscription: Option>>, + mut drep_state_publisher: DRepStatePublisher, + context: Arc>, + storage_config: DRepStorageConfig, + ) -> Result<()> { + if storage_config.store_info { + if let Some(sub) = params_subscription.as_mut() { + let _ = sub.read().await?; + info!("Consumed initial genesis params from params_subscription"); + } + } + // Main loop of synchronised messages + loop { + // Get the current state snapshot + let mut state = { + let mut h = history.lock().await; + h.get_or_init_with(|| State::new(storage_config)) + }; + let mut current_block: Option = None; - let drep_state_topic = config - .get_string("publish-drep-state-topic") - .unwrap_or(DEFAULT_DREP_STATE_TOPIC.to_string()); - info!("Creating DRep state publisher on '{drep_state_topic}'"); + // Read per-block messages in parallel + let certs_message_f = certs_subscription.read(); - let state = Arc::new(Mutex::new(State::new())); + // Certificates are the synchroniser + let (_, certs_message) = certs_message_f.await?; + let new_epoch = match certs_message.as_ref() { + Message::Cardano((ref block_info, _)) => { + // rollback only on certs + if block_info.status == BlockStatus::RolledBack { + state = history.lock().await.get_rolled_back_state(&block_info); + } + current_block = Some(block_info.clone()); + block_info.new_epoch && block_info.epoch > 0 + } + _ => false, + }; - // Subscribe for certificate messages - let state1 = state.clone(); - let mut subscription = context.subscribe(&subscribe_topic).await?; - let context_subscribe = context.clone(); - context.run(async move { - loop { - let Ok((_, message)) = subscription.read().await else { - return; - }; + // Read from epoch-boundary messages only when it's a new epoch + if new_epoch { + // Read params subscription if store-info is enabled to obtain DRep expiration param. Update expirations on epoch transition + if let Some(sub) = params_subscription.as_mut() { + let (_, message) = sub.read().await?; + match message.as_ref() { + Message::Cardano(( + ref block_info, + CardanoMessage::ProtocolParams(params), + )) => { + Self::check_sync(¤t_block, &block_info, "params"); + if let Some(conway) = ¶ms.params.conway { + state + .update_drep_expirations( + block_info.epoch, + conway.d_rep_activity, + ) + .inspect_err(|e| error!("Param update error: {e:#}")) + .ok(); + } + } + _ => error!("Unexpected params message: {message:?}"), + } + } + + // Publish DRep state at the end of the epoch + if let Some(ref block) = current_block { + let dreps = state.active_drep_list(); + drep_state_publisher.publish_drep_state(&block, dreps).await?; + } + } + + // Handle cert message + match certs_message.as_ref() { + Message::Cardano(( + ref block_info, + CardanoMessage::TxCertificates(tx_certs_msg), + )) => { + let span = info_span!("drep_state.handle_certs", block = block_info.number); + async { + Self::check_sync(¤t_block, &block_info, "certs"); + state + .process_certificates( + context.clone(), + &tx_certs_msg.certificates, + block_info.epoch, + ) + .await + .inspect_err(|e| error!("Certificates handling error: {e:#}")) + .ok(); + } + .instrument(span) + .await; + } + + _ => error!("Unexpected message type: {certs_message:?}"), + } + + // Handle governance message + if let Some(sub) = gov_subscription.as_mut() { + let (_, message) = sub.read().await?; match message.as_ref() { - Message::Cardano((block_info, CardanoMessage::TxCertificates(tx_cert_msg))) => { - let span = info_span!("drep_state.handle", block = block_info.number); + Message::Cardano(( + block_info, + CardanoMessage::GovernanceProcedures(gov_msg), + )) => { + let span = info_span!("drep_state.handle_votes", block = block_info.number); async { - let mut state = state1.lock().await; + Self::check_sync(¤t_block, &block_info, "gov"); state - .handle(&tx_cert_msg) - .await - .inspect_err(|e| error!("Messaging handling error: {e}")) + .process_votes(&gov_msg.voting_procedures) + .inspect_err(|e| error!("Votes handling error: {e:#}")) .ok(); - - if block_info.new_epoch && block_info.epoch > 0 { - // publish DRep state at end of epoch - let dreps = state.active_drep_list(); - let message = Message::Cardano(( - block_info.clone(), - CardanoMessage::DRepState(DRepStateMessage { - epoch: block_info.epoch, - dreps, - }), - )); - context_subscribe - .publish(&drep_state_topic, Arc::new(message)) - .await - .unwrap_or_else(|e| error!("Failed to publish: {e}")); - } } .instrument(span) .await; @@ -85,11 +175,89 @@ impl DRepState { _ => error!("Unexpected message type: {message:?}"), } } - }); - let query_state = state.clone(); - context.handle("drep-state", move |message| { - let state_handle = query_state.clone(); // your shared Arc> + // Commit the new state + if let Some(block_info) = current_block { + history.lock().await.commit(&block_info, state); + } + } + } + + /// Check for synchronisation + fn check_sync(expected: &Option, actual: &BlockInfo, source: &str) { + if let Some(ref block) = expected { + if block.number != actual.number { + error!( + expected = block.number, + actual = actual.number, + source = source, + "Messages out of sync (expected certs block {}, got {} from {})", + block.number, + actual.number, + source, + ); + panic!( + "Message streams diverged: certs at {} vs {} from {}", + block.number, actual.number, source + ); + } + } + } + + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + fn get_bool_flag(config: &Config, key: (&str, bool)) -> bool { + config.get_bool(key.0).unwrap_or(key.1) + } + + fn get_string_flag(config: &Config, key: (&str, &str)) -> String { + config.get_string(key.0).unwrap_or_else(|_| key.1.to_string()) + } + + // Get configuration flags and topis + let storage_config = DRepStorageConfig { + store_info: get_bool_flag(&config, DEFAULT_STORE_INFO), + store_delegators: get_bool_flag(&config, DEFAULT_STORE_DELEGATORS), + store_metadata: get_bool_flag(&config, DEFAULT_STORE_METADATA), + store_updates: get_bool_flag(&config, DEFAULT_STORE_UPDATES), + store_votes: get_bool_flag(&config, DEFAULT_STORE_VOTES), + }; + + let certificates_subscribe_topic = + get_string_flag(&config, DEFAULT_CERTIFICATES_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{certificates_subscribe_topic}'"); + + let mut governance_subscribe_topic = String::new(); + if storage_config.store_votes { + governance_subscribe_topic = + get_string_flag(&config, DEFAULT_GOVERNANCE_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{governance_subscribe_topic}'"); + } + + let mut parameters_subscribe_topic = String::new(); + if storage_config.store_info { + parameters_subscribe_topic = + get_string_flag(&config, DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{parameters_subscribe_topic}'"); + } + + let drep_state_topic = config + .get_string("publish-drep-state-topic") + .unwrap_or(DEFAULT_DREP_STATE_TOPIC.to_string()); + info!("Creating DRep state publisher on '{drep_state_topic}'"); + + let drep_query_topic = get_string_flag(&config, DEFAULT_DREPS_QUERY_TOPIC); + info!("Creating DRep query handler on '{drep_query_topic}'"); + + // Initalize state history + let history = Arc::new(Mutex::new(StateHistory::::new("DRepState"))); + let history_run = history.clone(); + let query_history = history.clone(); + let ticker_history = history.clone(); + let ctx_run = context.clone(); + + // Query handler + context.handle(&drep_query_topic, move |message| { + let history = query_history.clone(); async move { let Message::StateQuery(StateQuery::Governance(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Governance( @@ -99,15 +267,15 @@ impl DRepState { ))); }; - let locked = state_handle.lock().await; + let state = history.lock().await.get_current_state(); let response = match query { GovernanceStateQuery::GetDRepsList => { - let dreps = locked.list(); + let dreps = state.list(); GovernanceStateQueryResponse::DRepsList(DRepsList { dreps }) } GovernanceStateQuery::GetDRepInfo { drep_credential } => { - match locked.get_drep(&drep_credential) { + match state.get_drep(&drep_credential) { Some(record) => GovernanceStateQueryResponse::DRepInfo(DRepInfo { deposit: record.deposit, anchor: record.anchor.clone(), @@ -127,8 +295,7 @@ impl DRepState { }); // Ticker to log stats - let mut subscription = context.subscribe(&subscribe_topic).await?; - let state2 = state.clone(); + let mut subscription = context.subscribe("clock.tick").await?; context.run(async move { loop { let Ok((_, message)) = subscription.read().await else { @@ -138,9 +305,10 @@ impl DRepState { if (message.number % 60) == 0 { let span = info_span!("drep_state.tick", number = message.number); async { - state2 + ticker_history .lock() .await + .get_current_state() .tick() .await .inspect_err(|e| error!("Tick error: {e}")) @@ -153,6 +321,39 @@ impl DRepState { } }); + // Publisher for DRep State + let drep_state_publisher = DRepStatePublisher::new(context.clone(), drep_state_topic); + + // Subscribe to enabled topics + let certs_sub = context.subscribe(&certificates_subscribe_topic).await?; + + let gov_sub = if storage_config.store_votes { + Some(context.subscribe(&governance_subscribe_topic).await?) + } else { + None + }; + + let params_sub = if storage_config.store_info { + Some(context.subscribe(¶meters_subscribe_topic).await?) + } else { + None + }; + + // Start run task + context.run(async move { + Self::run( + history_run, + certs_sub, + gov_sub, + params_sub, + drep_state_publisher, + ctx_run, + storage_config, + ) + .await + .unwrap_or_else(|e| error!("Failed: {e}")); + }); + Ok(()) } } diff --git a/modules/drep_state/src/drep_state_publisher.rs b/modules/drep_state/src/drep_state_publisher.rs new file mode 100644 index 00000000..7a94a099 --- /dev/null +++ b/modules/drep_state/src/drep_state_publisher.rs @@ -0,0 +1,43 @@ +use acropolis_common::{ + messages::{CardanoMessage, DRepStateMessage, Message}, + BlockInfo, Credential, +}; +use caryatid_sdk::Context; +use std::sync::Arc; + +/// Message publisher for DRep State +pub struct DRepStatePublisher { + /// Module context + context: Arc>, + + /// Topic to publish on + topic: String, +} + +impl DRepStatePublisher { + /// Construct with context and topic to publish on + pub fn new(context: Arc>, topic: String) -> Self { + Self { context, topic } + } + + /// Publish the DRep state + pub async fn publish_drep_state( + &mut self, + block: &BlockInfo, + dreps: Vec<(Credential, u64)>, + ) -> anyhow::Result<()> { + self.context + .message_bus + .publish( + &self.topic, + Arc::new(Message::Cardano(( + block.clone(), + CardanoMessage::DRepState(DRepStateMessage { + epoch: block.epoch, + dreps, + }), + ))), + ) + .await + } +} diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 9c574030..5cc2a8e9 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -1,12 +1,19 @@ //! Acropolis DRepState: State storage use acropolis_common::{ - messages::TxCertificatesMessage, Anchor, DRepCredential, Lovelace, TxCertificate, + messages::{Message, StateQuery, StateQueryResponse}, + queries::{ + accounts::{AccountsStateQuery, AccountsStateQueryResponse, DEFAULT_ACCOUNTS_QUERY_TOPIC}, + get_query_topic, + }, + Anchor, Credential, DRepChoice, DRepCredential, Lovelace, StakeCredential, TxCertificate, Vote, + Voter, VotingProcedures, }; use anyhow::{anyhow, Result}; +use caryatid_sdk::Context; use serde_with::serde_as; -use std::collections::HashMap; -use tracing::info; +use std::{collections::HashMap, sync::Arc}; +use tracing::{error, info}; #[serde_as] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -15,20 +22,111 @@ pub struct DRepRecord { pub anchor: Option, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct HistoricalDRepState { + // Populated from the reg field in: + // - DRepRegistration + // - DRepDeregistration + // - DRepUpdate + pub info: Option, + pub updates: Option>, + pub metadata: Option>, + + // Populated from the drep and credential fields in: + // - VoteDelegation + // - StakeAndVoteDelegation + // - StakeRegistrationAndVoteDelegation + // - StakeRegistrationAndStakeAndVoteDelegation + pub delegators: Option>, + + // Populated from voting_procedures in GovernanceProceduresMessage + pub votes: Option>, +} + +impl HistoricalDRepState { + pub fn from_config(cfg: &DRepStorageConfig) -> Self { + Self { + info: cfg.store_info.then(DRepRecordExtended::default), + updates: cfg.store_updates.then(Vec::new), + metadata: cfg.store_metadata.then_some(None), + delegators: cfg.store_delegators.then(Vec::new), + votes: cfg.store_votes.then(Vec::new), + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +pub struct DRepRecordExtended { + pub deposit: Lovelace, + pub expired: bool, + pub retired: bool, + pub active_epoch: Option, + pub last_active_epoch: u64, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct DRepUpdateEvent { + pub tx_hash: [u8; 32], + pub cert_index: u64, + pub action: DRepActionUpdate, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub enum DRepActionUpdate { + Registered, + Updated, + Deregistered, +} + +#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)] +pub struct VoteRecord { + pub tx_hash: [u8; 32], + pub vote_index: u32, + pub vote: Vote, +} + impl DRepRecord { pub fn new(deposit: Lovelace, anchor: Option) -> Self { Self { deposit, anchor } } } +#[derive(Debug, Copy, Clone, Default)] +pub struct DRepStorageConfig { + pub store_info: bool, + pub store_delegators: bool, + pub store_metadata: bool, + pub store_updates: bool, + pub store_votes: bool, +} + +impl DRepStorageConfig { + fn any_enabled(&self) -> bool { + self.store_info + || self.store_delegators + || self.store_metadata + || self.store_updates + || self.store_votes + } +} + +#[derive(Debug, Default, Clone)] pub struct State { - dreps: HashMap, + pub config: DRepStorageConfig, + pub dreps: HashMap, + pub historical_dreps: Option>, } impl State { - pub fn new() -> Self { + pub fn new(config: DRepStorageConfig) -> Self { Self { + config, dreps: HashMap::new(), + historical_dreps: if config.any_enabled() { + Some(HashMap::new()) + } else { + None + }, } } @@ -56,49 +154,100 @@ impl State { } impl State { - fn process_one_certificate(&mut self, tx_cert: &TxCertificate) -> Result { - match tx_cert { - TxCertificate::DRepRegistration(reg) => match self.dreps.get_mut(®.credential) { - Some(ref mut drep) => { - if reg.deposit != 0 { - Err(anyhow!("DRep registration {:?}: replacement requires deposit = 0, instead of {}", - reg.credential, reg.deposit - )) - } else { - drep.anchor = reg.anchor.clone(); - Ok(false) - } + pub async fn process_certificates( + &mut self, + context: Arc>, + tx_certs: &Vec, + epoch: u64, + ) -> Result<()> { + let mut batched_delegators = Vec::new(); + let store_delegators = self.config.store_delegators; + + for tx_cert in tx_certs { + if store_delegators { + if let Some((cred, drep)) = Self::extract_delegation_fields(tx_cert) { + batched_delegators.push((cred, drep)); + continue; } - None => { - self.dreps.insert( - reg.credential.clone(), - DRepRecord::new(reg.deposit, reg.anchor.clone()), - ); - Ok(true) + } + + if let Err(e) = self.process_one_cert(tx_cert, epoch) { + tracing::error!("Error processing tx_cert: {e}"); + } + } + + // Batched delegations to reduce redundant queries to accounts_state + if store_delegators && !batched_delegators.is_empty() { + if let Err(e) = self.update_delegators(&context, batched_delegators).await { + tracing::error!("Error processing batched delegators: {e}"); + } + } + + Ok(()) + } + + pub fn process_votes( + &mut self, + voting_procedures: &[([u8; 32], VotingProcedures)], + ) -> Result<()> { + let Some(hist_map) = self.historical_dreps.as_mut() else { + return Ok(()); + }; + + let cfg = self.config.clone(); + for (tx_hash, voting_procedures) in voting_procedures { + for (voter, single_votes) in &voting_procedures.votes { + let drep_cred = match voter { + Voter::DRepKey(k) => DRepCredential::AddrKeyHash(k.to_vec()), + Voter::DRepScript(s) => DRepCredential::ScriptHash(s.to_vec()), + _ => continue, + }; + + let entry = hist_map + .entry(drep_cred) + .or_insert_with(|| HistoricalDRepState::from_config(&cfg)); + + // ensure votes vec exists if we created from a config that didn’t set it before + if entry.votes.is_none() { + entry.votes = Some(Vec::new()); } - }, - TxCertificate::DRepDeregistration(reg) => { - if self.dreps.remove(®.credential).is_none() { - Err(anyhow!( - "DRep registration {:?}: internal error, credential not found", - reg.credential - )) - } else { - Ok(true) + let votes = entry.votes.as_mut().unwrap(); + + for (_, vp) in &single_votes.voting_procedures { + votes.push(VoteRecord { + tx_hash: tx_hash.clone(), + vote_index: vp.vote_index, + vote: vp.vote.clone(), + }); } } - TxCertificate::DRepUpdate(reg) => match self.dreps.get_mut(®.credential) { - Some(ref mut drep) => { - drep.anchor = reg.anchor.clone(); - Ok(false) + } + Ok(()) + } + + pub fn update_drep_expirations( + &mut self, + current_epoch: u64, + expired_epoch_param: u32, + ) -> Result<()> { + let expired_offset = expired_epoch_param as u64; + + // If historical storage isn’t enabled, nothing to do. + let Some(historical_dreps) = self.historical_dreps.as_mut() else { + return Ok(()); + }; + + for (_cred, drep_record) in historical_dreps.iter_mut() { + if let Some(info) = drep_record.info.as_mut() { + if let (Some(active_epoch), false) = (info.active_epoch, info.expired) { + if active_epoch + expired_offset <= current_epoch { + info.expired = true; + } } - None => Err(anyhow!( - "DRep registration {:?}: internal error, credential not found", - reg.credential - )), - }, - _ => Ok(false), + } } + + Ok(()) } pub fn active_drep_list(&self) -> Vec<(DRepCredential, Lovelace)> { @@ -109,22 +258,255 @@ impl State { distribution } - pub async fn handle(&mut self, tx_cert_msg: &TxCertificatesMessage) -> Result<()> { - for tx_cert in tx_cert_msg.certificates.iter() { - if let Err(e) = self.process_one_certificate(tx_cert) { - tracing::error!("Error processing tx_cert {}", e); + fn process_one_cert(&mut self, tx_cert: &TxCertificate, epoch: u64) -> Result { + match tx_cert { + TxCertificate::DRepRegistration(reg) => { + let new = match self.dreps.get_mut(®.reg.credential) { + Some(drep) => { + if reg.reg.deposit != 0 { + return Err(anyhow!( + "DRep registration {:?}: replacement requires deposit = 0, got {}", + reg.reg.credential, + reg.reg.deposit + )); + } + drep.anchor = reg.reg.anchor.clone(); + false + } + None => { + self.dreps.insert( + reg.reg.credential.clone(), + DRepRecord::new(reg.reg.deposit, reg.reg.anchor.clone()), + ); + true + } + }; + + if self.historical_dreps.is_some() { + if let Err(err) = self.update_historical(®.reg.credential, true, |entry| { + if let Some(info) = entry.info.as_mut() { + info.deposit = reg.reg.deposit; + info.expired = false; + info.retired = false; + info.active_epoch = Some(epoch); + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Registered, + }); + } + if let Some(anchor) = ®.reg.anchor { + if let Some(inner) = entry.metadata.as_mut() { + *inner = Some(anchor.clone()); + } + } + }) { + return Err(anyhow!("Failed to update DRep on registration: {err}")); + } + } + + Ok(new) + } + + TxCertificate::DRepDeregistration(reg) => { + // Update live state + if self.dreps.remove(®.reg.credential).is_none() { + return Err(anyhow!( + "DRep deregistration {:?}: credential not found", + reg.reg.credential + )); + } + + // Update history if enabled + if self.historical_dreps.is_some() { + if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Some(info) = entry.info.as_mut() { + info.deposit = 0; + info.expired = false; + info.retired = true; + info.active_epoch = None; + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Deregistered, + }); + } + }) { + return Err(anyhow!("Failed to update DRep on deregistration: {err}")); + } + } + + Ok(true) + } + + TxCertificate::DRepUpdate(reg) => { + // Update live state + let drep = self.dreps.get_mut(®.reg.credential).ok_or_else(|| { + anyhow!("DRep update {:?}: credential not found", reg.reg.credential) + })?; + drep.anchor = reg.reg.anchor.clone(); + + // Update history if enabled + if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Some(info) = entry.info.as_mut() { + info.expired = false; + info.retired = false; + info.last_active_epoch = epoch; + } + if let Some(updates) = entry.updates.as_mut() { + updates.push(DRepUpdateEvent { + tx_hash: reg.tx_hash, + cert_index: reg.cert_index, + action: DRepActionUpdate::Updated, + }); + } + if let Some(anchor) = ®.reg.anchor { + if let Some(inner) = entry.metadata.as_mut() { + *inner = Some(anchor.clone()); + } + } + }) { + error!("Historical update failed: {err}"); + } + + Ok(false) + } + + _ => Ok(false), + } + } + + fn update_historical( + &mut self, + credential: &DRepCredential, + create_if_missing: bool, + f: F, + ) -> Result<()> + where + F: FnOnce(&mut HistoricalDRepState), + { + let hist = self + .historical_dreps + .as_mut() + .ok_or_else(|| anyhow!("No historical map configured"))?; + + if create_if_missing { + let cfg = self.config.clone(); + let entry = hist + .entry(credential.clone()) + .or_insert_with(|| HistoricalDRepState::from_config(&cfg)); + f(entry); + } else if let Some(entry) = hist.get_mut(credential) { + f(entry); + } else { + error!("Tried to update unknown DRep credential: {:?}", credential); + } + + Ok(()) + } + + async fn update_delegators( + &mut self, + context: &Arc>, + delegators: Vec<(&StakeCredential, &DRepChoice)>, + ) -> Result<()> { + let stake_keys: Vec<_> = delegators.iter().map(|(sc, _)| sc.get_hash()).collect(); + let stake_key_to_input: HashMap<_, _> = delegators + .iter() + .zip(&stake_keys) + .map(|((sc, drep), key)| (key.clone(), (*sc, *drep))) + .collect(); + + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountsDrepDelegationsMap { stake_keys }, + ))); + + let accounts_query_topic = get_query_topic(context.clone(), DEFAULT_ACCOUNTS_QUERY_TOPIC); + let response = context.message_bus.request(&accounts_query_topic, msg).await?; + let message = Arc::try_unwrap(response).unwrap_or_else(|arc| (*arc).clone()); + + // TODO: Ensure AccountsStateQueryResponse is for the correct block + let result_map = match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountsDrepDelegationsMap(map), + )) => map, + _ => { + return Err(anyhow!("Unexpected accounts-state response")); + } + }; + + for (stake_key, old_drep_opt) in result_map { + let (delegator, new_drep_choice) = match stake_key_to_input.get(&stake_key) { + Some(pair) => *pair, + None => continue, + }; + + let new_drep_cred = match drep_choice_to_credential(new_drep_choice) { + Some(c) => c, + None => continue, + }; + + if let Some(old_drep) = old_drep_opt { + if let Some(old_drep_cred) = drep_choice_to_credential(&old_drep) { + if old_drep_cred != new_drep_cred { + self.update_historical(&old_drep_cred, false, |entry| { + if let Some(delegators) = entry.delegators.as_mut() { + delegators.retain(|s| s != delegator); + } + })?; + } + } + } + + // Add delegator to new DRep + match self.update_historical(&new_drep_cred, true, |entry| { + if let Some(delegators) = entry.delegators.as_mut() { + if !delegators.contains(delegator) { + delegators.push(delegator.clone()); + } + } + }) { + Ok(_) => {} + Err(err) => return Err(anyhow!("Failed to update new delegator: {err}")), } } Ok(()) } + + fn extract_delegation_fields(cert: &TxCertificate) -> Option<(&StakeCredential, &DRepChoice)> { + match cert { + TxCertificate::VoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeAndVoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeRegistrationAndVoteDelegation(d) => Some((&d.credential, &d.drep)), + TxCertificate::StakeRegistrationAndStakeAndVoteDelegation(d) => { + Some((&d.credential, &d.drep)) + } + _ => None, + } + } +} + +fn drep_choice_to_credential(choice: &DRepChoice) -> Option { + match choice { + DRepChoice::Key(k) => Some(DRepCredential::AddrKeyHash(k.clone())), + DRepChoice::Script(k) => Some(DRepCredential::ScriptHash(k.clone())), + _ => None, + } } #[cfg(test)] mod tests { - use crate::state::{DRepRecord, State}; + use crate::state::{DRepRecord, DRepStorageConfig, State}; use acropolis_common::{ - Anchor, Credential, DRepDeregistration, DRepRegistration, DRepUpdate, TxCertificate, + Anchor, Credential, DRepDeregistration, DRepDeregistrationWithPos, DRepRegistration, + DRepRegistrationWithPos, DRepUpdate, DRepUpdateWithPos, TxCertificate, }; const CRED_1: [u8; 28] = [ @@ -139,13 +521,17 @@ mod tests { #[test] fn test_drep_process_one_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { deposit: 500000000, @@ -160,20 +546,28 @@ mod tests { #[test] fn test_drep_do_not_replace_existing_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); - - let bad_tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 600000000, - anchor: None, + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); + + let bad_tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 600000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - assert!(state.process_one_certificate(&bad_tx_cert).is_err()); + assert!(state.process_one_cert(&bad_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { @@ -189,25 +583,33 @@ mod tests { #[test] fn test_drep_update_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); let anchor = Anchor { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdate { - credential: tx_cred.clone(), - anchor: Some(anchor.clone()), + let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: tx_cred.clone(), + anchor: Some(anchor.clone()), + }, + tx_hash: [0u8; 32], + cert_index: 1, }); assert_eq!( - state.process_one_certificate(&update_anchor_tx_cert).unwrap(), + state.process_one_cert(&update_anchor_tx_cert, 1).unwrap(), false ); @@ -225,24 +627,31 @@ mod tests { #[test] fn test_drep_do_not_update_nonexistent_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); let anchor = Anchor { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdate { - credential: Credential::AddrKeyHash(CRED_2.to_vec()), - anchor: Some(anchor.clone()), + let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: Credential::AddrKeyHash(CRED_2.to_vec()), + anchor: Some(anchor.clone()), + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - - assert!(state.process_one_certificate(&update_anchor_tx_cert).is_err()); + assert!(state.process_one_cert(&update_anchor_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); let tx_cert_record = DRepRecord { @@ -258,20 +667,28 @@ mod tests { #[test] fn test_drep_deregister() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistration { - credential: tx_cred.clone(), - refund: 500000000, + let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: tx_cred.clone(), + refund: 500000000, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); assert_eq!( - state.process_one_certificate(&unregister_tx_cert).unwrap(), + state.process_one_cert(&unregister_tx_cert, 1).unwrap(), true ); assert_eq!(state.get_count(), 0); @@ -281,19 +698,27 @@ mod tests { #[test] fn test_drep_do_not_deregister_nonexistent_cert() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistration { - credential: tx_cred.clone(), - deposit: 500000000, - anchor: None, + let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { + reg: DRepRegistration { + credential: tx_cred.clone(), + deposit: 500000000, + anchor: None, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - let mut state = State::new(); - assert_eq!(state.process_one_certificate(&tx_cert).unwrap(), true); + let mut state = State::new(DRepStorageConfig::default()); + assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistration { - credential: Credential::AddrKeyHash(CRED_2.to_vec()), - refund: 500000000, + let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: Credential::AddrKeyHash(CRED_2.to_vec()), + refund: 500000000, + }, + tx_hash: [0u8; 32], + cert_index: 1, }); - assert!(state.process_one_certificate(&unregister_tx_cert).is_err()); + assert!(state.process_one_cert(&unregister_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); assert_eq!(state.get_drep(&tx_cred).unwrap().deposit, 500000000); } diff --git a/modules/governance_state/src/state.rs b/modules/governance_state/src/state.rs index 437b357b..fc57a6cb 100644 --- a/modules/governance_state/src/state.rs +++ b/modules/governance_state/src/state.rs @@ -170,14 +170,15 @@ impl State { fn insert_voting_procedure( &mut self, voter: &Voter, - transaction: &DataHash, + transaction: &[u8; 32], voter_votes: &SingleVoterVotes, ) -> Result<()> { for (action_id, procedure) in voter_votes.voting_procedures.iter() { let votes = self.votes.entry(action_id.clone()).or_insert_with(|| HashMap::new()); - if let Some((prev_trans, prev_vote)) = - votes.insert(voter.clone(), (transaction.clone(), procedure.clone())) - { + if let Some((prev_trans, prev_vote)) = votes.insert( + voter.clone(), + (transaction.clone().to_vec(), procedure.clone()), + ) { // Re-voting is allowed; new vote must be treated as the proper one, // older is to be discarded. if tracing::enabled!(tracing::Level::DEBUG) { diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index bae3e41f..4e4b31c7 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -17,7 +17,7 @@ pub async fn handle_dreps_list_blockfrost( let msg = Arc::new(Message::StateQuery(StateQuery::Governance( GovernanceStateQuery::GetDRepsList, ))); - let raw = context.message_bus.request("drep-state", msg).await?; + let raw = context.message_bus.request("cardano.query.dreps", msg).await?; let message = Arc::try_unwrap(raw).unwrap_or_else(|arc| (*arc).clone()); match message { Message::StateQueryResponse(StateQueryResponse::Governance( @@ -68,7 +68,7 @@ pub async fn handle_single_drep_blockfrost( }, ))); - let raw = context.message_bus.request("drep-state", msg).await?; + let raw = context.message_bus.request("cardano.query.dreps", msg).await?; let message = Arc::try_unwrap(raw).unwrap_or_else(|arc| (*arc).clone()); match message { diff --git a/modules/tx_unpacker/src/map_parameters.rs b/modules/tx_unpacker/src/map_parameters.rs index c17354d8..0d33bfb1 100644 --- a/modules/tx_unpacker/src/map_parameters.rs +++ b/modules/tx_unpacker/src/map_parameters.rs @@ -200,6 +200,7 @@ fn map_relay(relay: &PallasRelay) -> Relay { /// Derive our TxCertificate from a Pallas Certificate pub fn map_certificate( cert: &MultiEraCert, + tx_hash: [u8; 32], tx_index: usize, cert_index: usize, ) -> Result { @@ -425,24 +426,36 @@ pub fn map_certificate( } conway::Certificate::RegDRepCert(cred, coin, anchor) => { - Ok(TxCertificate::DRepRegistration(DRepRegistration { - credential: map_stake_credential(cred), - deposit: *coin, - anchor: map_nullable_anchor(&anchor), + Ok(TxCertificate::DRepRegistration(DRepRegistrationWithPos { + reg: DRepRegistration { + credential: map_stake_credential(cred), + deposit: *coin, + anchor: map_nullable_anchor(&anchor), + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, })) } - conway::Certificate::UnRegDRepCert(cred, coin) => { - Ok(TxCertificate::DRepDeregistration(DRepDeregistration { - credential: map_stake_credential(cred), - refund: *coin, - })) - } + conway::Certificate::UnRegDRepCert(cred, coin) => Ok( + TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { + reg: DRepDeregistration { + credential: map_stake_credential(cred), + refund: *coin, + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, + }), + ), conway::Certificate::UpdateDRepCert(cred, anchor) => { - Ok(TxCertificate::DRepUpdate(DRepUpdate { - credential: map_stake_credential(cred), - anchor: map_nullable_anchor(&anchor), + Ok(TxCertificate::DRepUpdate(DRepUpdateWithPos { + reg: DRepUpdate { + credential: map_stake_credential(cred), + anchor: map_nullable_anchor(&anchor), + }, + tx_hash: tx_hash, + cert_index: cert_index as u64, })) } } @@ -797,10 +810,14 @@ fn map_vote(vote: &conway::Vote) -> Vote { } } -fn map_single_governance_voting_procedure(proc: &conway::VotingProcedure) -> VotingProcedure { +fn map_single_governance_voting_procedure( + vote_index: u32, + proc: &conway::VotingProcedure, +) -> VotingProcedure { VotingProcedure { vote: map_vote(&proc.vote), anchor: map_nullable_anchor(&proc.anchor), + vote_index, } } @@ -823,9 +840,12 @@ pub fn map_all_governance_voting_procedures( .get_mut(&voter) .ok_or_else(|| anyhow!("Cannot find voter {:?}, which must present", voter))?; - for (pallas_action_id, pallas_voting_procedure) in pallas_pair.iter() { + for (vote_index, (pallas_action_id, pallas_voting_procedure)) in + pallas_pair.iter().enumerate() + { let action_id = map_gov_action_id(pallas_action_id)?; - let vp = map_single_governance_voting_procedure(&pallas_voting_procedure); + let vp = + map_single_governance_voting_procedure(vote_index as u32, &pallas_voting_procedure); single_voter.voting_procedures.insert(action_id, vp); } } diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 2328e208..6c1419a9 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -212,10 +212,11 @@ impl TxUnpacker { } if publish_certificates_topic.is_some() { + let tx_hash = tx.hash(); for ( cert_index, cert) in certs.iter().enumerate() { - match map_parameters::map_certificate(&cert, tx_index, cert_index) { + match map_parameters::map_certificate(&cert, *tx_hash, tx_index, cert_index) { Ok(tx_cert) => { - certificates.push(tx_cert); + certificates.push( tx_cert); }, Err(_e) => { // TODO error unexpected @@ -257,7 +258,7 @@ impl TxUnpacker { if let Some(pallas_vp) = votes { // Nonempty set -- governance_message.voting_procedures will not be empty match map_parameters::map_all_governance_voting_procedures(pallas_vp) { - Ok(vp) => voting_procedures.push((tx.hash().to_vec(), vp)), + Ok(vp) => voting_procedures.push((*tx.hash(), vp)), Err(e) => error!("Cannot decode governance voting procedures in slot {}: {e}", block.slot) } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 04d5a952..88181615 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -33,6 +33,16 @@ address-delta-topic = "cardano.address.delta" [module.spo-state] [module.drep-state] +# Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled) +store-info = false +# Enables /governance/dreps/{drep_id}/delegators endpoint +store-delegators = false +# Enables /governance/dreps/{drep_id}/metadata endpoint +store-metadata = false +# Enables /governance/dreps/{drep_id}/updates endpoint +store-updates = false +# Enables /governance/dreps/{drep_id}/votes endpoint +store-votes = false [module.governance-state]