diff --git a/codec/src/map_parameters.rs b/codec/src/map_parameters.rs index b46d5c69..ecae6d64 100644 --- a/codec/src/map_parameters.rs +++ b/codec/src/map_parameters.rs @@ -210,31 +210,32 @@ fn map_relay(relay: &PallasRelay) -> Relay { /// Derive our TxCertificate from a Pallas Certificate pub fn map_certificate( cert: &MultiEraCert, - tx_hash: TxHash, - tx_index: u16, + tx_identifier: TxIdentifier, cert_index: usize, network_id: NetworkId, -) -> Result { +) -> Result { match cert { MultiEraCert::NotApplicable => Err(anyhow!("Not applicable cert!")), MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { - alonzo::Certificate::StakeRegistration(cred) => { - Ok(TxCertificate::StakeRegistration(StakeAddressWithPos { - stake_address: map_stake_address(cred, network_id), - tx_index: tx_index.into(), - cert_index: cert_index.try_into().unwrap(), - })) - } - alonzo::Certificate::StakeDeregistration(cred) => Ok( - TxCertificate::StakeDeregistration(map_stake_address(cred, network_id)), - ), - alonzo::Certificate::StakeDelegation(cred, pool_key_hash) => { - Ok(TxCertificate::StakeDelegation(StakeDelegation { + alonzo::Certificate::StakeRegistration(cred) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeRegistration(map_stake_address(cred, network_id)), + tx_identifier, + cert_index: cert_index.try_into().unwrap(), + }), + alonzo::Certificate::StakeDeregistration(cred) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeDeregistration(map_stake_address(cred, network_id)), + tx_identifier, + cert_index: cert_index.try_into().unwrap(), + }), + alonzo::Certificate::StakeDelegation(cred, pool_key_hash) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeDelegation(StakeDelegation { stake_address: map_stake_address(cred, network_id), operator: pool_key_hash.to_vec(), - })) - } + }), + tx_identifier, + cert_index: cert_index.try_into().unwrap(), + }), alonzo::Certificate::PoolRegistration { operator, vrf_keyhash, @@ -245,61 +246,61 @@ pub fn map_certificate( pool_owners, relays, pool_metadata, - } => Ok(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: PoolRegistration { - operator: operator.to_vec(), - vrf_key_hash: vrf_keyhash.to_vec(), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: StakeAddress::from_binary(reward_account)?, - pool_owners: pool_owners - .iter() - .map(|v| { - StakeAddress::new( - StakeCredential::AddrKeyHash(v.to_vec()), - network_id.clone().into(), - ) - }) - .collect(), - relays: relays.iter().map(map_relay).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, - }, + } => Ok(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(PoolRegistration { + operator: operator.to_vec(), + vrf_key_hash: vrf_keyhash.to_vec(), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, }, - tx_hash, - cert_index: cert_index as u64, - }, - )), - alonzo::Certificate::PoolRetirement(pool_key_hash, epoch) => Ok( - TxCertificate::PoolRetirementWithPos(PoolRetirementWithPos { - ret: PoolRetirement { - operator: pool_key_hash.to_vec(), - epoch: *epoch, + reward_account: StakeAddress::from_binary(reward_account)?, + pool_owners: pool_owners + .iter() + .map(|v| { + StakeAddress::new( + StakeCredential::AddrKeyHash(v.to_vec()), + network_id.clone().into(), + ) + }) + .collect(), + relays: relays.iter().map(map_relay).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, }, - tx_hash, - cert_index: cert_index as u64, }), - ), + tx_identifier, + cert_index: cert_index as u64, + }), + alonzo::Certificate::PoolRetirement(pool_key_hash, epoch) => Ok(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: pool_key_hash.to_vec(), + epoch: *epoch, + }), + tx_identifier, + cert_index: cert_index as u64, + }), alonzo::Certificate::GenesisKeyDelegation( genesis_hash, genesis_delegate_hash, vrf_key_hash, - ) => Ok(TxCertificate::GenesisKeyDelegation(GenesisKeyDelegation { - genesis_hash: genesis_hash.to_vec(), - genesis_delegate_hash: genesis_delegate_hash.to_vec(), - vrf_key_hash: vrf_key_hash.to_vec(), - })), - alonzo::Certificate::MoveInstantaneousRewardsCert(mir) => Ok( - TxCertificate::MoveInstantaneousReward(MoveInstantaneousReward { + ) => Ok(TxCertificateWithPos { + cert: TxCertificate::GenesisKeyDelegation(GenesisKeyDelegation { + genesis_hash: genesis_hash.to_vec(), + genesis_delegate_hash: genesis_delegate_hash.to_vec(), + vrf_key_hash: vrf_key_hash.to_vec(), + }), + tx_identifier, + cert_index: cert_index as u64, + }), + alonzo::Certificate::MoveInstantaneousRewardsCert(mir) => Ok(TxCertificateWithPos { + cert: TxCertificate::MoveInstantaneousReward(MoveInstantaneousReward { source: match mir.source { alonzo::InstantaneousRewardSource::Reserves => { InstantaneousRewardSource::Reserves @@ -322,28 +323,38 @@ pub fn map_certificate( } }, }), - ), + tx_identifier, + cert_index: cert_index as u64, + }), }, // Now repeated for a different type! MultiEraCert::Conway(cert) => { match cert.as_ref().as_ref() { - conway::Certificate::StakeRegistration(cred) => { - Ok(TxCertificate::StakeRegistration(StakeAddressWithPos { - stake_address: map_stake_address(cred, network_id), - tx_index: tx_index.into(), - cert_index: cert_index.try_into().unwrap(), - })) - } - conway::Certificate::StakeDeregistration(cred) => Ok( - TxCertificate::StakeDeregistration(map_stake_address(cred, network_id)), - ), + conway::Certificate::StakeRegistration(cred) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeRegistration(map_stake_address(cred, network_id)), + tx_identifier, + + cert_index: cert_index.try_into().unwrap(), + }), + + conway::Certificate::StakeDeregistration(cred) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeDeregistration(map_stake_address(cred, network_id)), + tx_identifier, + cert_index: cert_index.try_into().unwrap(), + }), + conway::Certificate::StakeDelegation(cred, pool_key_hash) => { - Ok(TxCertificate::StakeDelegation(StakeDelegation { - stake_address: map_stake_address(cred, network_id), - operator: pool_key_hash.to_vec(), - })) + Ok(TxCertificateWithPos { + cert: TxCertificate::StakeDelegation(StakeDelegation { + stake_address: map_stake_address(cred, network_id), + operator: pool_key_hash.to_vec(), + }), + tx_identifier, + cert_index: cert_index.try_into().unwrap(), + }) } + conway::Certificate::PoolRegistration { // TODO relays, pool_metadata operator, @@ -355,156 +366,178 @@ pub fn map_certificate( pool_owners, relays, pool_metadata, - } => Ok(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: PoolRegistration { - operator: operator.to_vec(), - vrf_key_hash: vrf_keyhash.to_vec(), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: StakeAddress::from_binary(reward_account)?, - pool_owners: pool_owners - .into_iter() - .map(|v| { - StakeAddress::new( - StakeCredential::AddrKeyHash(v.to_vec()), - network_id.clone().into(), - ) - }) - .collect(), - relays: relays.iter().map(map_relay).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, - }, + } => Ok(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(PoolRegistration { + operator: operator.to_vec(), + vrf_key_hash: vrf_keyhash.to_vec(), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, }, - tx_hash, - cert_index: cert_index as u64, - }, - )), - conway::Certificate::PoolRetirement(pool_key_hash, epoch) => Ok( - TxCertificate::PoolRetirementWithPos(PoolRetirementWithPos { - ret: PoolRetirement { + reward_account: StakeAddress::from_binary(reward_account)?, + pool_owners: pool_owners + .into_iter() + .map(|v| { + StakeAddress::new( + StakeCredential::AddrKeyHash(v.to_vec()), + network_id.clone().into(), + ) + }) + .collect(), + relays: relays.iter().map(map_relay).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, + }, + }), + tx_identifier, + cert_index: cert_index as u64, + }), + conway::Certificate::PoolRetirement(pool_key_hash, epoch) => { + Ok(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { operator: pool_key_hash.to_vec(), epoch: *epoch, - }, - tx_hash, + }), + tx_identifier, cert_index: cert_index as u64, - }), - ), + }) + } - conway::Certificate::Reg(cred, coin) => { - Ok(TxCertificate::Registration(Registration { + conway::Certificate::Reg(cred, coin) => Ok(TxCertificateWithPos { + cert: TxCertificate::Registration(Registration { stake_address: map_stake_address(cred, network_id), deposit: *coin, - })) - } + }), + tx_identifier, + cert_index: cert_index as u64, + }), - conway::Certificate::UnReg(cred, coin) => { - Ok(TxCertificate::Deregistration(Deregistration { + conway::Certificate::UnReg(cred, coin) => Ok(TxCertificateWithPos { + cert: TxCertificate::Deregistration(Deregistration { stake_address: map_stake_address(cred, network_id), refund: *coin, - })) - } - - conway::Certificate::VoteDeleg(cred, drep) => { - Ok(TxCertificate::VoteDelegation(VoteDelegation { - stake_address: map_stake_address(cred, network_id), - drep: map_drep(drep), - })) - } - - conway::Certificate::StakeVoteDeleg(cred, pool_key_hash, drep) => Ok( - TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { - stake_address: map_stake_address(cred, network_id), - operator: pool_key_hash.to_vec(), - drep: map_drep(drep), }), - ), + tx_identifier, + cert_index: cert_index as u64, + }), - conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => Ok( - TxCertificate::StakeRegistrationAndDelegation(StakeRegistrationAndDelegation { + conway::Certificate::VoteDeleg(cred, drep) => Ok(TxCertificateWithPos { + cert: TxCertificate::VoteDelegation(VoteDelegation { stake_address: map_stake_address(cred, network_id), - operator: pool_key_hash.to_vec(), - deposit: *coin, + drep: map_drep(drep), }), - ), + tx_identifier, + cert_index: cert_index as u64, + }), - conway::Certificate::VoteRegDeleg(cred, drep, coin) => { - Ok(TxCertificate::StakeRegistrationAndVoteDelegation( - StakeRegistrationAndVoteDelegation { + conway::Certificate::StakeVoteDeleg(cred, pool_key_hash, drep) => { + Ok(TxCertificateWithPos { + cert: TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { stake_address: map_stake_address(cred, network_id), + operator: pool_key_hash.to_vec(), drep: map_drep(drep), - deposit: *coin, - }, - )) + }), + tx_identifier, + cert_index: cert_index as u64, + }) } - conway::Certificate::StakeVoteRegDeleg(cred, pool_key_hash, drep, coin) => { - Ok(TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( - StakeRegistrationAndStakeAndVoteDelegation { + conway::Certificate::StakeRegDeleg(cred, pool_key_hash, coin) => { + Ok(TxCertificateWithPos { + cert: TxCertificate::StakeRegistrationAndDelegation( + StakeRegistrationAndDelegation { + stake_address: map_stake_address(cred, network_id), + operator: pool_key_hash.to_vec(), + deposit: *coin, + }, + ), + tx_identifier, + cert_index: cert_index as u64, + }) + } + + conway::Certificate::VoteRegDeleg(cred, drep, coin) => Ok(TxCertificateWithPos { + cert: TxCertificate::StakeRegistrationAndVoteDelegation( + StakeRegistrationAndVoteDelegation { stake_address: map_stake_address(cred, network_id), - operator: pool_key_hash.to_vec(), drep: map_drep(drep), deposit: *coin, }, - )) + ), + tx_identifier, + cert_index: cert_index as u64, + }), + + conway::Certificate::StakeVoteRegDeleg(cred, pool_key_hash, drep, coin) => { + Ok(TxCertificateWithPos { + cert: TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( + StakeRegistrationAndStakeAndVoteDelegation { + stake_address: map_stake_address(cred, network_id), + operator: pool_key_hash.to_vec(), + drep: map_drep(drep), + deposit: *coin, + }, + ), + tx_identifier, + cert_index: cert_index as u64, + }) } conway::Certificate::AuthCommitteeHot(cold_cred, hot_cred) => { - Ok(TxCertificate::AuthCommitteeHot(AuthCommitteeHot { - cold_credential: map_stake_credential(cold_cred), - hot_credential: map_stake_credential(hot_cred), - })) + Ok(TxCertificateWithPos { + cert: TxCertificate::AuthCommitteeHot(AuthCommitteeHot { + cold_credential: map_stake_credential(cold_cred), + hot_credential: map_stake_credential(hot_cred), + }), + tx_identifier, + cert_index: cert_index as u64, + }) } conway::Certificate::ResignCommitteeCold(cold_cred, anchor) => { - Ok(TxCertificate::ResignCommitteeCold(ResignCommitteeCold { - cold_credential: map_stake_credential(cold_cred), - anchor: map_nullable_anchor(anchor), - })) - } - - conway::Certificate::RegDRepCert(cred, coin, anchor) => { - Ok(TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { - credential: map_stake_credential(cred), - deposit: *coin, + Ok(TxCertificateWithPos { + cert: TxCertificate::ResignCommitteeCold(ResignCommitteeCold { + cold_credential: map_stake_credential(cold_cred), anchor: map_nullable_anchor(anchor), - }, - tx_hash, + }), + tx_identifier, cert_index: cert_index as u64, - })) + }) } - conway::Certificate::UnRegDRepCert(cred, coin) => Ok( - TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { - reg: DRepDeregistration { - credential: map_stake_credential(cred), - refund: *coin, - }, - tx_hash, - cert_index: cert_index as u64, + conway::Certificate::RegDRepCert(cred, coin, anchor) => Ok(TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { + credential: map_stake_credential(cred), + deposit: *coin, + anchor: map_nullable_anchor(anchor), }), - ), + tx_identifier, + cert_index: cert_index as u64, + }), - conway::Certificate::UpdateDRepCert(cred, anchor) => { - Ok(TxCertificate::DRepUpdate(DRepUpdateWithPos { - reg: DRepUpdate { - credential: map_stake_credential(cred), - anchor: map_nullable_anchor(anchor), - }, - tx_hash, - cert_index: cert_index as u64, - })) - } + conway::Certificate::UnRegDRepCert(cred, coin) => Ok(TxCertificateWithPos { + cert: TxCertificate::DRepDeregistration(DRepDeregistration { + credential: map_stake_credential(cred), + refund: *coin, + }), + tx_identifier, + cert_index: cert_index as u64, + }), + + conway::Certificate::UpdateDRepCert(cred, anchor) => Ok(TxCertificateWithPos { + cert: TxCertificate::DRepUpdate(DRepUpdate { + credential: map_stake_credential(cred), + anchor: map_nullable_anchor(anchor), + }), + tx_identifier, + cert_index: cert_index as u64, + }), } } diff --git a/common/src/messages.rs b/common/src/messages.rs index 8ed9856e..a214977d 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -91,7 +91,7 @@ pub struct AssetDeltasMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TxCertificatesMessage { /// Ordered set of certificates - pub certificates: Vec, + pub certificates: Vec, } /// Address deltas message diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 31394cfe..979425ce 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::{DRepChoice, KeyHash, PoolLiveStakeInfo, StakeAddress}; +use crate::{DRepChoice, KeyHash, PoolId, PoolLiveStakeInfo, StakeAddress, TxIdentifier}; pub const DEFAULT_ACCOUNTS_QUERY_TOPIC: (&str, &str) = ("accounts-state-query-topic", "cardano.query.accounts"); @@ -15,10 +15,10 @@ pub enum AccountsStateQuery { GetAccountInfo { stake_address: StakeAddress }, GetAccountRewardHistory { stake_key: Vec }, GetAccountHistory { stake_key: Vec }, - GetAccountDelegationHistory { stake_key: Vec }, - GetAccountRegistrationHistory { stake_key: Vec }, + GetAccountRegistrationHistory { account: StakeAddress }, + GetAccountDelegationHistory { account: StakeAddress }, + GetAccountMIRHistory { account: StakeAddress }, GetAccountWithdrawalHistory { stake_key: Vec }, - GetAccountMIRHistory { stake_key: Vec }, GetAccountAssociatedAddresses { stake_key: Vec }, GetAccountAssets { stake_key: Vec }, GetAccountAssetsTotals { stake_key: Vec }, @@ -49,10 +49,10 @@ pub enum AccountsStateQueryResponse { AccountInfo(AccountInfo), AccountRewardHistory(AccountRewardHistory), AccountHistory(AccountHistory), - AccountDelegationHistory(AccountDelegationHistory), - AccountRegistrationHistory(AccountRegistrationHistory), + AccountRegistrationHistory(Vec), + AccountDelegationHistory(Vec), + AccountMIRHistory(Vec), AccountWithdrawalHistory(AccountWithdrawalHistory), - AccountMIRHistory(AccountMIRHistory), AccountAssociatedAddresses(AccountAssociatedAddresses), AccountAssets(AccountAssets), AccountAssetsTotals(AccountAssetsTotals), @@ -97,17 +97,52 @@ pub struct AccountRewardHistory {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountHistory {} -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountDelegationHistory {} +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Decode, minicbor::Encode, +)] +pub struct DelegationUpdate { + #[n(0)] + pub active_epoch: u32, + #[n(1)] + pub tx_identifier: TxIdentifier, + #[n(2)] + pub amount: u64, + #[n(3)] + pub pool: PoolId, +} -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountRegistrationHistory {} +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Decode, minicbor::Encode, +)] +pub struct RegistrationUpdate { + #[n(0)] + pub tx_identifier: TxIdentifier, + #[n(1)] + pub status: RegistrationStatus, +} -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountWithdrawalHistory {} +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Decode, minicbor::Encode, +)] +pub enum RegistrationStatus { + #[n(0)] + Registered, + #[n(1)] + Deregistered, +} + +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Decode, minicbor::Encode, +)] +pub struct AccountWithdrawal { + #[n(0)] + pub tx_identifier: TxIdentifier, + #[n(1)] + pub amount: u64, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AccountMIRHistory {} +pub struct AccountWithdrawalHistory {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AccountAssociatedAddresses {} diff --git a/common/src/queries/governance.rs b/common/src/queries/governance.rs index 56104285..074ae38b 100644 --- a/common/src/queries/governance.rs +++ b/common/src/queries/governance.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use crate::{ - Anchor, DRepCredential, GovActionId, Lovelace, ProposalProcedure, StakeAddress, TxHash, Vote, - Voter, VotingProcedure, + Anchor, DRepCredential, GovActionId, Lovelace, ProposalProcedure, StakeAddress, TxHash, + TxIdentifier, Vote, Voter, VotingProcedure, }; pub const DEFAULT_DREPS_QUERY_TOPIC: (&str, &str) = @@ -77,7 +77,7 @@ pub struct DRepUpdates { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepUpdateEvent { - pub tx_hash: TxHash, + pub tx_identifier: TxIdentifier, pub cert_index: u64, pub action: DRepActionUpdate, } diff --git a/common/src/types.rs b/common/src/types.rs index 5030fcbd..3b6267d2 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -808,14 +808,6 @@ pub struct PoolMetadata { pub hash: DataHash, } -/// Pool registration with position -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolRegistrationWithPos { - pub reg: PoolRegistration, - pub tx_hash: TxHash, - pub cert_index: u64, -} - /// Pool registration data #[serde_as] #[derive( @@ -869,14 +861,6 @@ pub struct PoolRegistration { pub pool_metadata: Option, } -// Pool Retirment with position -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct PoolRetirementWithPos { - pub ret: PoolRetirement, - pub tx_hash: TxHash, - pub cert_index: u64, -} - /// Pool retirement data #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PoolRetirement { @@ -897,23 +881,23 @@ pub enum PoolUpdateAction { /// Pool Update Event #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PoolUpdateEvent { - pub tx_hash: TxHash, + pub tx_identifier: TxIdentifier, pub cert_index: u64, pub action: PoolUpdateAction, } impl PoolUpdateEvent { - pub fn register_event(tx_hash: TxHash, cert_index: u64) -> Self { + pub fn register_event(tx_identifier: TxIdentifier, cert_index: u64) -> Self { Self { - tx_hash, + tx_identifier, cert_index, action: PoolUpdateAction::Registered, } } - pub fn retire_event(tx_hash: TxHash, cert_index: u64) -> Self { + pub fn retire_event(tx_identifier: TxIdentifier, cert_index: u64) -> Self { Self { - tx_hash, + tx_identifier, cert_index, action: PoolUpdateAction::Deregistered, } @@ -1140,13 +1124,6 @@ pub struct DRepRegistration { pub anchor: Option, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepRegistrationWithPos { - pub reg: DRepRegistration, - pub tx_hash: TxHash, - pub cert_index: u64, -} - /// DRep Deregistration = unreg_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepDeregistration { @@ -1157,13 +1134,6 @@ pub struct DRepDeregistration { pub refund: Lovelace, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepDeregistrationWithPos { - pub reg: DRepDeregistration, - pub tx_hash: TxHash, - pub cert_index: u64, -} - /// DRep Update = update_drep_cert #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct DRepUpdate { @@ -1174,13 +1144,6 @@ pub struct DRepUpdate { pub anchor: Option, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct DRepUpdateWithPos { - pub reg: DRepUpdate, - pub tx_hash: TxHash, - pub cert_index: u64, -} - pub type CommitteeCredential = Credential; /// Authorise a committee hot credential @@ -1794,13 +1757,6 @@ pub struct GovernanceOutcome { pub action_to_perform: GovernanceOutcomeVariant, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct StakeAddressWithPos { - pub stake_address: StakeAddress, - pub tx_index: u64, - pub cert_index: u64, -} - /// Certificate in a transaction #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum TxCertificate { @@ -1808,7 +1764,7 @@ pub enum TxCertificate { None(()), /// Stake registration - StakeRegistration(StakeAddressWithPos), + StakeRegistration(StakeAddress), /// Stake de-registration StakeDeregistration(StakeAddress), @@ -1817,10 +1773,10 @@ pub enum TxCertificate { StakeDelegation(StakeDelegation), /// Pool registration - PoolRegistrationWithPos(PoolRegistrationWithPos), + PoolRegistration(PoolRegistration), /// Pool retirement - PoolRetirementWithPos(PoolRetirementWithPos), + PoolRetirement(PoolRetirement), /// Genesis key delegation GenesisKeyDelegation(GenesisKeyDelegation), @@ -1856,13 +1812,20 @@ pub enum TxCertificate { ResignCommitteeCold(ResignCommitteeCold), /// DRep registration - DRepRegistration(DRepRegistrationWithPos), + DRepRegistration(DRepRegistration), /// DRep deregistration - DRepDeregistration(DRepDeregistrationWithPos), + DRepDeregistration(DRepDeregistration), /// DRep update - DRepUpdate(DRepUpdateWithPos), + DRepUpdate(DRepUpdate), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TxCertificateWithPos { + pub cert: TxCertificate, + pub tx_identifier: TxIdentifier, + pub cert_index: u64, } #[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index 79c03c40..9e7a1451 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -875,13 +875,13 @@ impl State { pub fn handle_tx_certificates(&mut self, tx_certs_msg: &TxCertificatesMessage) -> Result<()> { // Handle certificates for tx_cert in tx_certs_msg.certificates.iter() { - match tx_cert { - TxCertificate::StakeRegistration(stake_address_with_pos) => { - self.register_stake_address(&stake_address_with_pos.stake_address, None); + match &tx_cert.cert { + TxCertificate::StakeRegistration(reg) => { + self.register_stake_address(®, None); } - TxCertificate::StakeDeregistration(stake_address) => { - self.deregister_stake_address(&stake_address, None); + TxCertificate::StakeDeregistration(dreg) => { + self.deregister_stake_address(&dreg, None); } TxCertificate::MoveInstantaneousReward(mir) => { @@ -994,7 +994,8 @@ mod tests { Constitution, CostModel, DRepVotingThresholds, NetworkId, PoolVotingThresholds, Pot, PotDelta, Ratio, Registration, StakeAddress, StakeAddressDelta, StakeAndVoteDelegation, StakeCredential, StakeRegistrationAndStakeAndVoteDelegation, - StakeRegistrationAndVoteDelegation, VoteDelegation, Withdrawal, + StakeRegistrationAndVoteDelegation, TxCertificateWithPos, TxIdentifier, VoteDelegation, + Withdrawal, }; // Helper to create a StakeAddress from a byte slice @@ -1367,38 +1368,66 @@ mod tests { let spo3 = create_address(&[0x03]); let spo4 = create_address(&[0x04]); + let tx_identifier = TxIdentifier::default(); + let certificates = vec![ // register the first two SPOs separately from their delegation - TxCertificate::Registration(Registration { - stake_address: spo1.clone(), - deposit: 1, - }), - TxCertificate::Registration(Registration { - stake_address: spo2.clone(), - deposit: 1, - }), - TxCertificate::VoteDelegation(VoteDelegation { - stake_address: spo1.clone(), - drep: DRepChoice::Key(DREP_HASH.to_vec()), - }), - TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { - stake_address: spo2.clone(), - operator: spo1.get_hash().to_vec(), - drep: DRepChoice::Script(DREP_HASH.to_vec()), - }), - TxCertificate::StakeRegistrationAndVoteDelegation(StakeRegistrationAndVoteDelegation { - stake_address: spo3.clone(), - drep: DRepChoice::Abstain, - deposit: 1, - }), - TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( - StakeRegistrationAndStakeAndVoteDelegation { - stake_address: spo4.clone(), - operator: spo1.get_hash().to_vec(), - drep: DRepChoice::NoConfidence, + TxCertificateWithPos { + cert: TxCertificate::Registration(Registration { + stake_address: spo1.clone(), deposit: 1, - }, - ), + }), + tx_identifier, + cert_index: 0, + }, + TxCertificateWithPos { + cert: TxCertificate::Registration(Registration { + stake_address: spo2.clone(), + deposit: 1, + }), + tx_identifier, + cert_index: 0, + }, + TxCertificateWithPos { + cert: TxCertificate::VoteDelegation(VoteDelegation { + stake_address: spo1.clone(), + drep: DRepChoice::Key(DREP_HASH.to_vec()), + }), + tx_identifier, + cert_index: 0, + }, + TxCertificateWithPos { + cert: TxCertificate::StakeAndVoteDelegation(StakeAndVoteDelegation { + stake_address: spo2.clone(), + operator: spo1.get_hash().to_vec(), + drep: DRepChoice::Script(DREP_HASH.to_vec()), + }), + tx_identifier, + cert_index: 0, + }, + TxCertificateWithPos { + cert: TxCertificate::StakeRegistrationAndVoteDelegation( + StakeRegistrationAndVoteDelegation { + stake_address: spo3.clone(), + drep: DRepChoice::Abstain, + deposit: 1, + }, + ), + tx_identifier, + cert_index: 0, + }, + TxCertificateWithPos { + cert: TxCertificate::StakeRegistrationAndStakeAndVoteDelegation( + StakeRegistrationAndStakeAndVoteDelegation { + stake_address: spo4.clone(), + operator: spo1.get_hash().to_vec(), + drep: DRepChoice::NoConfidence, + deposit: 1, + }, + ), + tx_identifier, + cert_index: 0, + }, ]; state.handle_tx_certificates(&TxCertificatesMessage { certificates })?; diff --git a/modules/drep_state/src/state.rs b/modules/drep_state/src/state.rs index 740e0403..4d561e1a 100644 --- a/modules/drep_state/src/state.rs +++ b/modules/drep_state/src/state.rs @@ -7,8 +7,8 @@ use acropolis_common::{ get_query_topic, governance::{DRepActionUpdate, DRepUpdateEvent, VoteRecord}, }, - Anchor, DRepChoice, DRepCredential, Lovelace, StakeAddress, TxCertificate, TxHash, Voter, - VotingProcedures, + Anchor, DRepChoice, DRepCredential, Lovelace, StakeAddress, TxCertificate, + TxCertificateWithPos, TxHash, Voter, VotingProcedures, }; use anyhow::{anyhow, Result}; use caryatid_sdk::Context; @@ -220,7 +220,7 @@ impl State { pub async fn process_certificates( &mut self, context: Arc>, - tx_certs: &Vec, + tx_certs: &Vec, epoch: u64, ) -> Result<()> { let mut batched_delegators = Vec::new(); @@ -228,7 +228,7 @@ impl State { for tx_cert in tx_certs { if store_delegators { - if let Some((delegator, drep)) = Self::extract_delegation_fields(tx_cert) { + if let Some((delegator, drep)) = Self::extract_delegation_fields(&tx_cert.cert) { batched_delegators.push((delegator, drep)); continue; } @@ -317,34 +317,34 @@ impl State { distribution } - fn process_one_cert(&mut self, tx_cert: &TxCertificate, epoch: u64) -> Result { - match tx_cert { + fn process_one_cert(&mut self, tx_cert: &TxCertificateWithPos, epoch: u64) -> Result { + match &tx_cert.cert { TxCertificate::DRepRegistration(reg) => { - let new = match self.dreps.get_mut(®.reg.credential) { + let new = match self.dreps.get_mut(®.credential) { Some(drep) => { - if reg.reg.deposit != 0 { + if reg.deposit != 0 { return Err(anyhow!( "DRep registration {:?}: replacement requires deposit = 0, got {}", - reg.reg.credential, - reg.reg.deposit + reg.credential, + reg.deposit )); } - drep.anchor = reg.reg.anchor.clone(); + drep.anchor = reg.anchor.clone(); false } None => { self.dreps.insert( - reg.reg.credential.clone(), - DRepRecord::new(reg.reg.deposit, reg.reg.anchor.clone()), + reg.credential.clone(), + DRepRecord::new(reg.deposit, reg.anchor.clone()), ); true } }; if self.historical_dreps.is_some() { - if let Err(err) = self.update_historical(®.reg.credential, true, |entry| { + if let Err(err) = self.update_historical(®.credential, true, |entry| { if let Some(info) = entry.info.as_mut() { - info.deposit = reg.reg.deposit; + info.deposit = reg.deposit; info.expired = false; info.retired = false; info.active_epoch = Some(epoch); @@ -352,12 +352,12 @@ impl State { } if let Some(updates) = entry.updates.as_mut() { updates.push(DRepUpdateEvent { - tx_hash: reg.tx_hash, - cert_index: reg.cert_index, + tx_identifier: tx_cert.tx_identifier, + cert_index: tx_cert.cert_index, action: DRepActionUpdate::Registered, }); } - if let Some(anchor) = ®.reg.anchor { + if let Some(anchor) = ®.anchor { if let Some(inner) = entry.metadata.as_mut() { *inner = Some(anchor.clone()); } @@ -372,16 +372,16 @@ impl State { TxCertificate::DRepDeregistration(reg) => { // Update live state - if self.dreps.remove(®.reg.credential).is_none() { + if self.dreps.remove(®.credential).is_none() { return Err(anyhow!( "DRep deregistration {:?}: credential not found", - reg.reg.credential + reg.credential )); } // Update history if enabled if self.historical_dreps.is_some() { - if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Err(err) = self.update_historical(®.credential, false, |entry| { if let Some(info) = entry.info.as_mut() { info.deposit = 0; info.expired = false; @@ -391,8 +391,8 @@ impl State { } if let Some(updates) = entry.updates.as_mut() { updates.push(DRepUpdateEvent { - tx_hash: reg.tx_hash, - cert_index: reg.cert_index, + tx_identifier: tx_cert.tx_identifier, + cert_index: tx_cert.cert_index, action: DRepActionUpdate::Deregistered, }); } @@ -406,13 +406,13 @@ impl State { TxCertificate::DRepUpdate(reg) => { // Update live state - let drep = self.dreps.get_mut(®.reg.credential).ok_or_else(|| { - anyhow!("DRep update {:?}: credential not found", reg.reg.credential) + let drep = self.dreps.get_mut(®.credential).ok_or_else(|| { + anyhow!("DRep update {:?}: credential not found", reg.credential) })?; - drep.anchor = reg.reg.anchor.clone(); + drep.anchor = reg.anchor.clone(); // Update history if enabled - if let Err(err) = self.update_historical(®.reg.credential, false, |entry| { + if let Err(err) = self.update_historical(®.credential, false, |entry| { if let Some(info) = entry.info.as_mut() { info.expired = false; info.retired = false; @@ -420,12 +420,12 @@ impl State { } if let Some(updates) = entry.updates.as_mut() { updates.push(DRepUpdateEvent { - tx_hash: reg.tx_hash, - cert_index: reg.cert_index, + tx_identifier: tx_cert.tx_identifier, + cert_index: tx_cert.cert_index, action: DRepActionUpdate::Updated, }); } - if let Some(anchor) = ®.reg.anchor { + if let Some(anchor) = ®.anchor { if let Some(inner) = entry.metadata.as_mut() { *inner = Some(anchor.clone()); } @@ -569,8 +569,8 @@ fn drep_choice_to_credential(choice: &DRepChoice) -> Option { mod tests { use crate::state::{DRepRecord, DRepStorageConfig, State}; use acropolis_common::{ - Anchor, Credential, DRepDeregistration, DRepDeregistrationWithPos, DRepRegistration, - DRepRegistrationWithPos, DRepUpdate, DRepUpdateWithPos, TxCertificate, TxHash, + Anchor, Credential, DRepDeregistration, DRepRegistration, DRepUpdate, TxCertificate, + TxCertificateWithPos, TxIdentifier, }; const CRED_1: [u8; 28] = [ @@ -585,15 +585,15 @@ mod tests { #[test] fn test_drep_process_one_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), - cert_index: 1, - }); + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }; let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); assert_eq!(state.get_count(), 1); @@ -610,27 +610,28 @@ mod tests { #[test] fn test_drep_do_not_replace_existing_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), - cert_index: 1, - }); + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }; + let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let bad_tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { + let bad_tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 600000000, anchor: None, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; assert!(state.process_one_cert(&bad_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); @@ -647,15 +648,15 @@ mod tests { #[test] fn test_drep_update_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); @@ -663,14 +664,14 @@ mod tests { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { - reg: DRepUpdate { + let update_anchor_tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepUpdate(DRepUpdate { credential: tx_cred.clone(), anchor: Some(anchor.clone()), - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; assert_eq!( state.process_one_cert(&update_anchor_tx_cert, 1).unwrap(), @@ -691,15 +692,15 @@ mod tests { #[test] fn test_drep_do_not_update_nonexistent_certificate() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); @@ -707,14 +708,14 @@ mod tests { url: "https://poop.bike".into(), data_hash: vec![0x13, 0x37], }; - let update_anchor_tx_cert = TxCertificate::DRepUpdate(DRepUpdateWithPos { - reg: DRepUpdate { + let update_anchor_tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepUpdate(DRepUpdate { credential: Credential::AddrKeyHash(CRED_2.to_vec()), anchor: Some(anchor.clone()), - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; assert!(state.process_one_cert(&update_anchor_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); @@ -731,26 +732,26 @@ mod tests { #[test] fn test_drep_deregister() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { - reg: DRepDeregistration { + let unregister_tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepDeregistration(DRepDeregistration { credential: tx_cred.clone(), refund: 500000000, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; assert_eq!( state.process_one_cert(&unregister_tx_cert, 1).unwrap(), true @@ -762,26 +763,26 @@ mod tests { #[test] fn test_drep_do_not_deregister_nonexistent_cert() { let tx_cred = Credential::AddrKeyHash(CRED_1.to_vec()); - let tx_cert = TxCertificate::DRepRegistration(acropolis_common::DRepRegistrationWithPos { - reg: DRepRegistration { + let tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepRegistration(DRepRegistration { credential: tx_cred.clone(), deposit: 500000000, anchor: None, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; let mut state = State::new(DRepStorageConfig::default()); assert_eq!(state.process_one_cert(&tx_cert, 1).unwrap(), true); - let unregister_tx_cert = TxCertificate::DRepDeregistration(DRepDeregistrationWithPos { - reg: DRepDeregistration { + let unregister_tx_cert = TxCertificateWithPos { + cert: TxCertificate::DRepDeregistration(DRepDeregistration { credential: Credential::AddrKeyHash(CRED_2.to_vec()), refund: 500000000, - }, - tx_hash: TxHash::default(), + }), + tx_identifier: TxIdentifier::default(), cert_index: 1, - }); + }; assert!(state.process_one_cert(&unregister_tx_cert, 1).is_err()); assert_eq!(state.get_count(), 1); assert_eq!(state.get_drep(&tx_cred).unwrap().deposit, 500000000); diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 1a8557de..fb957d60 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -2,7 +2,7 @@ //! Manages optional state data needed for Blockfrost alignment use acropolis_common::queries::accounts::{ - AccountsStateQueryResponse, DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC, + AccountsStateQuery, AccountsStateQueryResponse, DEFAULT_HISTORICAL_ACCOUNTS_QUERY_TOPIC, }; use acropolis_common::{ messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, @@ -35,10 +35,10 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = const DEFAULT_HISTORICAL_ACCOUNTS_DB_PATH: (&str, &str) = ("db-path", "./db"); const DEFAULT_STORE_REWARDS_HISTORY: (&str, bool) = ("store-rewards-history", false); const DEFAULT_STORE_ACTIVE_STAKE_HISTORY: (&str, bool) = ("store-active-stake-history", false); -const DEFAULT_STORE_DELEGATION_HISTORY: (&str, bool) = ("store-delegation-history", false); const DEFAULT_STORE_REGISTRATION_HISTORY: (&str, bool) = ("store-registration-history", false); -const DEFAULT_STORE_WITHDRAWAL_HISTORY: (&str, bool) = ("store-withdrawal-history", false); +const DEFAULT_STORE_DELEGATION_HISTORY: (&str, bool) = ("store-delegation-history", false); const DEFAULT_STORE_MIR_HISTORY: (&str, bool) = ("store-mir-history", false); +const DEFAULT_STORE_WITHDRAWAL_HISTORY: (&str, bool) = ("store-withdrawal-history", false); const DEFAULT_STORE_ADDRESSES: (&str, bool) = ("store-addresses", false); /// Historical Accounts State module @@ -145,7 +145,7 @@ impl HistoricalAccountsState { Self::check_sync(¤t_block, &block_info); let mut state = state_mutex.lock().await; state - .handle_tx_certificates(tx_certs_msg) + .handle_tx_certificates(tx_certs_msg, block_info.epoch as u32) .inspect_err(|e| error!("TxCertificates handling error: {e:#}")) .ok(); } @@ -308,7 +308,7 @@ impl HistoricalAccountsState { let state_query = state_mutex.clone(); context.handle(&historical_accounts_query_topic, move |message| { - let _state = state_query.clone(); + let state = state_query.clone(); async move { let Message::StateQuery(StateQuery::Accounts(query)) = message.as_ref() else { return Arc::new(Message::StateQueryResponse(StateQueryResponse::Accounts( @@ -319,6 +319,31 @@ impl HistoricalAccountsState { }; let response = match query { + AccountsStateQuery::GetAccountRegistrationHistory { account } => { + match state.lock().await.get_registration_history(&account).await { + Ok(registrations) => { + AccountsStateQueryResponse::AccountRegistrationHistory( + registrations, + ) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + AccountsStateQuery::GetAccountDelegationHistory { account } => { + match state.lock().await.get_delegation_history(&account).await { + Ok(delegations) => { + AccountsStateQueryResponse::AccountDelegationHistory(delegations) + } + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + AccountsStateQuery::GetAccountMIRHistory { account } => { + match state.lock().await.get_mir_history(&account).await { + Ok(mirs) => AccountsStateQueryResponse::AccountMIRHistory(mirs), + Err(e) => AccountsStateQueryResponse::Error(e.to_string()), + } + } + _ => AccountsStateQueryResponse::Error(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index bae28a5c..bea59143 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -1,6 +1,9 @@ use std::{collections::HashMap, path::Path}; -use acropolis_common::{ShelleyAddress, StakeCredential}; +use acropolis_common::{ + queries::accounts::{AccountWithdrawal, DelegationUpdate, RegistrationUpdate}, + ShelleyAddress, StakeAddress, +}; use anyhow::Result; use fjall::{Keyspace, Partition, PartitionCreateOptions}; use minicbor::{decode, to_vec}; @@ -8,21 +11,18 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use tokio::sync::Mutex; use tracing::{debug, error, info}; -use crate::state::{ - AccountEntry, AccountWithdrawal, ActiveStakeHistory, DelegationUpdate, - HistoricalAccountsConfig, RegistrationUpdate, RewardHistory, -}; +use crate::state::{AccountEntry, ActiveStakeHistory, HistoricalAccountsConfig, RewardHistory}; pub struct ImmutableHistoricalAccountStore { rewards_history: Partition, active_stake_history: Partition, delegation_history: Partition, registration_history: Partition, - withdrawal_history: Partition, mir_history: Partition, + withdrawal_history: Partition, addresses: Partition, keyspace: Keyspace, - pub pending: Mutex>>, + pub pending: Mutex>>, } impl ImmutableHistoricalAccountStore { @@ -146,14 +146,14 @@ impl ImmutableHistoricalAccountStore { } } - pub async fn update_immutable(&self, drained: Vec>) { + pub async fn update_immutable(&self, drained: Vec>) { let mut pending = self.pending.lock().await; pending.extend(drained); } pub async fn _get_rewards_history( &self, - account: &StakeCredential, + account: &StakeAddress, ) -> Result>> { let mut immutable_rewards = self.collect_partition::(&self.rewards_history, &account.get_hash())?; @@ -170,7 +170,7 @@ impl ImmutableHistoricalAccountStore { pub async fn _get_active_stake_history( &self, - account: &StakeCredential, + account: &StakeAddress, ) -> Result>> { let mut immutable_active_stake = self.collect_partition::( &self.active_stake_history, @@ -187,9 +187,28 @@ impl ImmutableHistoricalAccountStore { Ok((!immutable_active_stake.is_empty()).then_some(immutable_active_stake)) } - pub async fn _get_delegation_history( + pub async fn get_registration_history( + &self, + account: &StakeAddress, + ) -> Result>> { + let mut immutable_registrations = self.collect_partition::( + &self.registration_history, + &account.get_hash(), + )?; + + self.merge_pending( + account, + |e| e.registration_history.as_ref(), + &mut immutable_registrations, + ) + .await; + + Ok((!immutable_registrations.is_empty()).then_some(immutable_registrations)) + } + + pub async fn get_delegation_history( &self, - account: &StakeCredential, + account: &StakeAddress, ) -> Result>> { let mut immutable_delegations = self .collect_partition::(&self.delegation_history, &account.get_hash())?; @@ -204,28 +223,21 @@ impl ImmutableHistoricalAccountStore { Ok((!immutable_delegations.is_empty()).then_some(immutable_delegations)) } - pub async fn _get_registration_history( + pub async fn get_mir_history( &self, - account: &StakeCredential, - ) -> Result>> { - let mut immutable_registrations = self.collect_partition::( - &self.registration_history, - &account.get_hash(), - )?; + account: &StakeAddress, + ) -> Result>> { + let mut immutable_mirs = + self.collect_partition::(&self.mir_history, &account.get_hash())?; - self.merge_pending( - account, - |e| e.registration_history.as_ref(), - &mut immutable_registrations, - ) - .await; + self.merge_pending(account, |e| e.mir_history.as_ref(), &mut immutable_mirs).await; - Ok((!immutable_registrations.is_empty()).then_some(immutable_registrations)) + Ok((!immutable_mirs.is_empty()).then_some(immutable_mirs)) } pub async fn _get_withdrawal_history( &self, - account: &StakeCredential, + account: &StakeAddress, ) -> Result>> { let mut immutable_withdrawals = self.collect_partition::( &self.withdrawal_history, @@ -242,21 +254,9 @@ impl ImmutableHistoricalAccountStore { Ok((!immutable_withdrawals.is_empty()).then_some(immutable_withdrawals)) } - pub async fn _get_mir_history( - &self, - account: &StakeCredential, - ) -> Result>> { - let mut immutable_mirs = - self.collect_partition::(&self.mir_history, &account.get_hash())?; - - self.merge_pending(account, |e| e.mir_history.as_ref(), &mut immutable_mirs).await; - - Ok((!immutable_mirs.is_empty()).then_some(immutable_mirs)) - } - pub async fn _get_addresses( &self, - account: &StakeCredential, + account: &StakeAddress, ) -> Result>> { let mut immutable_addresses = self.collect_partition::(&self.addresses, &account.get_hash())?; @@ -267,11 +267,11 @@ impl ImmutableHistoricalAccountStore { } fn merge_block_deltas( - block_deltas: Vec>, - ) -> HashMap { + block_deltas: Vec>, + ) -> HashMap { block_deltas.into_par_iter().reduce(HashMap::new, |mut acc, block_map| { - for (cred, entry) in block_map { - let agg_entry = acc.entry(cred).or_default(); + for (account, entry) in block_map { + let agg_entry = acc.entry(account).or_default(); Self::extend_opt_vec(&mut agg_entry.reward_history, entry.reward_history); Self::extend_opt_vec( @@ -306,7 +306,7 @@ impl ImmutableHistoricalAccountStore { } #[allow(dead_code)] - async fn merge_pending(&self, account: &StakeCredential, f: F, out: &mut Vec) + async fn merge_pending(&self, account: &StakeAddress, f: F, out: &mut Vec) where F: Fn(&AccountEntry) -> Option<&Vec>, T: Clone, @@ -320,9 +320,9 @@ impl ImmutableHistoricalAccountStore { } } - fn make_epoch_key(account: &StakeCredential, epoch: u32) -> [u8; 32] { + fn make_epoch_key(account: &StakeAddress, epoch: u32) -> [u8; 32] { let mut key = [0u8; 32]; - key[..28].copy_from_slice(&account.get_hash()); + key[..28].copy_from_slice(&account.get_credential().get_hash()); key[28..32].copy_from_slice(&epoch.to_be_bytes()); key } diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index e0bbb547..96f1fbdf 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -7,8 +7,13 @@ use acropolis_common::{ messages::{ AddressDeltasMessage, StakeRewardDeltasMessage, TxCertificatesMessage, WithdrawalsMessage, }, - BlockInfo, PoolId, ShelleyAddress, StakeAddress, StakeCredential, TxIdentifier, + queries::accounts::{ + AccountWithdrawal, DelegationUpdate, RegistrationStatus, RegistrationUpdate, + }, + BlockInfo, InstantaneousRewardTarget, PoolId, ShelleyAddress, StakeAddress, StakeCredential, + TxCertificate, TxIdentifier, }; +use tracing::warn; use crate::{ immutable_historical_account_store::ImmutableHistoricalAccountStore, @@ -50,34 +55,6 @@ pub struct ActiveStakeHistory { pub pool: PoolId, } -#[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] -pub struct DelegationUpdate { - #[n(0)] - pub active_epoch: u32, - #[n(1)] - pub tx_identifier: TxIdentifier, - #[n(2)] - pub amount: u64, - #[n(3)] - pub pool: PoolId, -} - -#[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] -pub struct RegistrationUpdate { - #[n(0)] - pub tx_identifier: TxIdentifier, - #[n(1)] - pub deregistered: bool, -} - -#[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] -pub struct AccountWithdrawal { - #[n(0)] - pub tx_identifier: TxIdentifier, - #[n(1)] - pub amount: u64, -} - #[derive(Debug, Clone)] pub struct HistoricalAccountsConfig { pub db_path: String, @@ -86,8 +63,8 @@ pub struct HistoricalAccountsConfig { pub store_active_stake_history: bool, pub store_delegation_history: bool, pub store_registration_history: bool, - pub store_withdrawal_history: bool, pub store_mir_history: bool, + pub store_withdrawal_history: bool, pub store_addresses: bool, } @@ -97,8 +74,8 @@ impl HistoricalAccountsConfig { || self.store_active_stake_history || self.store_delegation_history || self.store_registration_history - || self.store_withdrawal_history || self.store_mir_history + || self.store_withdrawal_history || self.store_addresses } } @@ -143,7 +120,107 @@ impl State { Ok(()) } - pub fn handle_tx_certificates(&mut self, _tx_certs: &TxCertificatesMessage) -> Result<()> { + pub fn handle_tx_certificates( + &mut self, + tx_certs: &TxCertificatesMessage, + epoch: u32, + ) -> Result<()> { + // Handle certificates + for tx_cert in tx_certs.certificates.iter() { + match &tx_cert.cert { + // Pre-Conway stake registration/deregistration certs + TxCertificate::StakeRegistration(stake_address) => { + self.handle_stake_registration_change( + &stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Registered, + ); + } + TxCertificate::StakeDeregistration(stake_address) => { + self.handle_stake_registration_change( + &stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Deregistered, + ); + } + + // Post-Conway stake registration/deregistration certs + TxCertificate::Registration(reg) => { + self.handle_stake_registration_change( + ®.stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Registered, + ); + } + TxCertificate::Deregistration(dreg) => { + self.handle_stake_registration_change( + &dreg.stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Deregistered, + ); + } + + // Registration and delegation certs + TxCertificate::StakeRegistrationAndStakeAndVoteDelegation(delegation) => { + self.handle_stake_registration_change( + &delegation.stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Registered, + ); + self.handle_stake_delegation( + &delegation.stake_address, + &delegation.operator, + &tx_cert.tx_identifier, + epoch, + ); + } + TxCertificate::StakeRegistrationAndDelegation(delegation) => { + self.handle_stake_registration_change( + &delegation.stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Registered, + ); + self.handle_stake_delegation( + &delegation.stake_address, + &delegation.operator, + &tx_cert.tx_identifier, + epoch, + ); + } + + // Delegation certs + TxCertificate::StakeDelegation(delegation) => { + self.handle_stake_delegation( + &delegation.stake_address, + &delegation.operator, + &tx_cert.tx_identifier, + epoch, + ); + } + TxCertificate::StakeAndVoteDelegation(delegation) => { + self.handle_stake_delegation( + &delegation.stake_address, + &delegation.operator, + &tx_cert.tx_identifier, + epoch, + ); + } + TxCertificate::StakeRegistrationAndVoteDelegation(delegation) => { + self.handle_stake_registration_change( + &delegation.stake_address, + &tx_cert.tx_identifier, + RegistrationStatus::Registered, + ); + } + + // MIR certs + TxCertificate::MoveInstantaneousReward(mir) => { + self.handle_mir(&mir.target, &tx_cert.tx_identifier); + } + + _ => (), + }; + } Ok(()) } @@ -155,43 +232,139 @@ impl State { Ok(()) } - pub fn _get_reward_history(&self, _account: StakeAddress) -> Result> { + pub async fn _get_reward_history( + &self, + _account: &StakeCredential, + ) -> Result> { Ok(Vec::new()) } - pub fn _get_active_stake_history( + pub async fn _get_active_stake_history( &self, - _account: StakeCredential, + _account: &StakeAddress, ) -> Result> { Ok(Vec::new()) } - pub fn _get_delegation_history( + pub async fn get_registration_history( &self, - _account: StakeCredential, - ) -> Result> { - Ok(Vec::new()) + account: &StakeAddress, + ) -> Result> { + let mut registrations = + self.immutable.get_registration_history(&account).await?.unwrap_or_default(); + + self.merge_volatile_history( + &account, + |e| e.registration_history.as_ref(), + &mut registrations, + ); + + Ok(registrations) } - pub fn _get_registration_history( + pub async fn get_delegation_history( &self, - _account: StakeCredential, - ) -> Result> { - Ok(Vec::new()) + account: &StakeAddress, + ) -> Result> { + let mut delegations = + self.immutable.get_delegation_history(&account).await?.unwrap_or_default(); + + self.merge_volatile_history( + &account, + |e| e.delegation_history.as_ref(), + &mut delegations, + ); + + Ok(delegations) } - pub fn _get_withdrawal_history( + pub async fn get_mir_history(&self, account: &StakeAddress) -> Result> { + let mut mirs = self.immutable.get_mir_history(&account).await?.unwrap_or_default(); + + self.merge_volatile_history(&account, |e| e.mir_history.as_ref(), &mut mirs); + + Ok(mirs) + } + + pub async fn _get_withdrawal_history( &self, - _account: StakeCredential, + _account: &StakeAddress, ) -> Result> { Ok(Vec::new()) } - pub fn _get_mir_history(&self, _account: StakeCredential) -> Result> { + pub async fn _get_addresses(&self, _account: StakeAddress) -> Result> { Ok(Vec::new()) } - pub fn _get_addresses(&self, _account: StakeCredential) -> Result> { - Ok(Vec::new()) + fn handle_stake_registration_change( + &mut self, + account: &StakeAddress, + tx_identifier: &TxIdentifier, + status: RegistrationStatus, + ) { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + let entry = volatile.entry(account.clone()).or_default(); + let update = RegistrationUpdate { + tx_identifier: *tx_identifier, + status, + }; + entry.registration_history.get_or_insert_with(Vec::new).push(update); + } + + fn handle_stake_delegation( + &mut self, + account: &StakeAddress, + pool: &PoolId, + tx_identifier: &TxIdentifier, + epoch: u32, + ) { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + let entry = volatile.entry(account.clone()).or_default(); + let update = DelegationUpdate { + active_epoch: epoch.saturating_add(2), + tx_identifier: *tx_identifier, + amount: 0, // Amount is set during persistence when active stake is known + pool: pool.clone(), + }; + entry.delegation_history.get_or_insert_with(Vec::new).push(update); + } + + fn handle_mir(&mut self, mir: &InstantaneousRewardTarget, tx_identifier: &TxIdentifier) { + let volatile = self.volatile.window.back_mut().expect("window should never be empty"); + + if let InstantaneousRewardTarget::StakeAddresses(payments) = mir { + for (account, amount) in payments { + if *amount <= 0 { + warn!( + "Ignoring invalid MIR (negative or zero) for stake credential {}", + hex::encode(account.get_hash()) + ); + continue; + } + + let entry = volatile.entry(account.clone()).or_default(); + let update = AccountWithdrawal { + tx_identifier: *tx_identifier, + amount: *amount as u64, + }; + + entry.mir_history.get_or_insert_with(Vec::new).push(update); + } + } + } + + fn merge_volatile_history(&self, account: &StakeAddress, f: F, out: &mut Vec) + where + F: Fn(&AccountEntry) -> Option<&Vec>, + T: Clone, + { + for block_map in self.volatile.window.iter() { + if let Some(entry) = block_map.get(account) { + if let Some(pending) = f(entry) { + out.extend(pending.iter().cloned()); + } + } + } } } diff --git a/modules/historical_accounts_state/src/volatile_historical_accounts.rs b/modules/historical_accounts_state/src/volatile_historical_accounts.rs index 41bf846f..d476627b 100644 --- a/modules/historical_accounts_state/src/volatile_historical_accounts.rs +++ b/modules/historical_accounts_state/src/volatile_historical_accounts.rs @@ -1,12 +1,12 @@ use std::collections::{HashMap, VecDeque}; -use acropolis_common::StakeCredential; +use acropolis_common::StakeAddress; use crate::state::AccountEntry; #[derive(Debug, Clone)] pub struct VolatileHistoricalAccounts { - pub window: VecDeque>, + pub window: VecDeque>, pub start_block: u64, pub epoch_start_block: u64, pub last_persisted_epoch: Option, @@ -45,7 +45,7 @@ impl VolatileHistoricalAccounts { self.epoch_start_block = block_number; } - pub fn rollback_before(&mut self, block: u64) -> Vec<(StakeCredential, AccountEntry)> { + pub fn rollback_before(&mut self, block: u64) -> Vec<(StakeAddress, AccountEntry)> { let mut out = Vec::new(); while self.start_block + self.window.len() as u64 >= block { @@ -58,7 +58,7 @@ impl VolatileHistoricalAccounts { out } - pub fn prune_volatile(&mut self) -> Vec> { + pub fn prune_volatile(&mut self) -> Vec> { let epoch = self.last_persisted_epoch.map(|e| e + 1).unwrap_or(0); let blocks_to_drain = (self.epoch_start_block - self.start_block) as usize; diff --git a/modules/rest_blockfrost/src/handlers/governance.rs b/modules/rest_blockfrost/src/handlers/governance.rs index 13beff49..3e8716c7 100644 --- a/modules/rest_blockfrost/src/handlers/governance.rs +++ b/modules/rest_blockfrost/src/handlers/governance.rs @@ -410,7 +410,7 @@ pub async fn handle_drep_updates_blockfrost( .updates .iter() .map(|event| DRepUpdateREST { - tx_hash: hex::encode(event.tx_hash), + tx_hash: "TxHash lookup not yet implemented".to_string(), cert_index: event.cert_index, action: event.action.clone(), }) diff --git a/modules/rest_blockfrost/src/handlers/pools.rs b/modules/rest_blockfrost/src/handlers/pools.rs index 17eb6b15..cba7a279 100644 --- a/modules/rest_blockfrost/src/handlers/pools.rs +++ b/modules/rest_blockfrost/src/handlers/pools.rs @@ -9,7 +9,7 @@ use acropolis_common::{ }, rest_helper::ToCheckedF64, serialization::Bech32WithHrp, - PoolRetirement, PoolUpdateAction, StakeCredential, TxHash, + PoolRetirement, PoolUpdateAction, StakeCredential, TxIdentifier, }; use anyhow::Result; use caryatid_sdk::Context; @@ -595,24 +595,25 @@ async fn handle_pools_spo_blockfrost( return Ok(RESTResponse::with_json(404, "Pool Not Found")); }; let pool_updates = pool_updates?; - let registrations: Option> = pool_updates.as_ref().map(|updates| { + // TODO: Query TxHash from chainstore module for registrations and retirements + let _registrations: Option> = pool_updates.as_ref().map(|updates| { updates .iter() .filter_map(|update| { if update.action == PoolUpdateAction::Registered { - Some(update.tx_hash) + Some(update.tx_identifier) } else { None } }) .collect() }); - let retirements: Option> = pool_updates.as_ref().map(|updates| { + let _retirements: Option> = pool_updates.as_ref().map(|updates| { updates .iter() .filter_map(|update| { if update.action == PoolUpdateAction::Deregistered { - Some(update.tx_hash) + Some(update.tx_identifier) } else { None } @@ -727,8 +728,8 @@ async fn handle_pools_spo_blockfrost( fixed_cost: pool_info.cost, reward_account, pool_owners, - registration: registrations, - retirement: retirements, + registration: "TxHash lookup not yet implemented".to_string(), + retirement: "TxHash lookup not yet implemented".to_string(), }; match serde_json::to_string(&pool_info_rest) { @@ -1143,7 +1144,7 @@ pub async fn handle_pool_updates_blockfrost( let pool_updates_rest = pool_updates .into_iter() .map(|u| PoolUpdateEventRest { - tx_hash: u.tx_hash, + tx_hash: "TxHash lookup not yet implemented".to_string(), cert_index: u.cert_index, action: u.action, }) diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 480b644a..b4cd3364 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -408,8 +408,7 @@ impl From for PoolRelayRest { #[serde_as] #[derive(Serialize)] pub struct PoolUpdateEventRest { - #[serde_as(as = "Hex")] - pub tx_hash: TxHash, + pub tx_hash: String, pub cert_index: u64, pub action: PoolUpdateAction, } @@ -452,10 +451,9 @@ pub struct PoolInfoRest { pub fixed_cost: u64, pub reward_account: String, pub pool_owners: Vec, - #[serde_as(as = "Option>")] - pub registration: Option>, - #[serde_as(as = "Option>")] - pub retirement: Option>, + // TODO: Query chain store module to retrieve TxHash from TxIdentifier + pub registration: String, + pub retirement: String, } // REST response structure for protocol params diff --git a/modules/spo_state/src/state.rs b/modules/spo_state/src/state.rs index 195edf9d..b1ab81dc 100644 --- a/modules/spo_state/src/state.rs +++ b/modules/spo_state/src/state.rs @@ -10,9 +10,8 @@ use acropolis_common::{ params::TECHNICAL_PARAMETER_POOL_RETIRE_MAX_EPOCH, queries::governance::VoteRecord, stake_addresses::StakeAddressMap, - BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRegistrationWithPos, PoolRetirement, - PoolRetirementWithPos, PoolUpdateEvent, Relay, StakeAddress, TxCertificate, TxHash, Voter, - VotingProcedures, + BlockInfo, KeyHash, PoolMetadata, PoolRegistration, PoolRetirement, PoolUpdateEvent, Relay, + StakeAddress, TxCertificate, TxHash, TxIdentifier, Voter, VotingProcedures, }; use anyhow::Result; use imbl::HashMap; @@ -325,14 +324,10 @@ impl State { fn handle_pool_registration( &mut self, block: &BlockInfo, - reg_with_pos: &PoolRegistrationWithPos, + reg: &PoolRegistration, + tx_identifier: &TxIdentifier, + cert_index: &u64, ) { - let PoolRegistrationWithPos { - reg, - tx_hash, - cert_index, - } = reg_with_pos; - if self.spos.contains_key(®.operator) { debug!( block = block.number, @@ -373,18 +368,19 @@ impl State { .or_insert_with(|| HistoricalSPOState::new(&self.store_config)); historical_spo.add_pool_registration(reg); historical_spo.add_pool_updates(PoolUpdateEvent::register_event( - tx_hash.clone(), + tx_identifier.clone(), *cert_index, )); } } - fn handle_pool_retirement(&mut self, block: &BlockInfo, ret_with_pos: &PoolRetirementWithPos) { - let PoolRetirementWithPos { - ret, - tx_hash, - cert_index, - } = ret_with_pos; + fn handle_pool_retirement( + &mut self, + block: &BlockInfo, + ret: &PoolRetirement, + tx_identifier: &TxIdentifier, + cert_index: &u64, + ) { debug!( "SPO {} wants to retire at the end of epoch {} (cert in block number {})", hex::encode(&ret.operator), @@ -427,7 +423,7 @@ impl State { if let Some(historical_spos) = self.historical_spos.as_mut() { if let Some(historical_spo) = historical_spos.get_mut(&ret.operator) { historical_spo - .add_pool_updates(PoolUpdateEvent::retire_event(tx_hash.clone(), *cert_index)); + .add_pool_updates(PoolUpdateEvent::retire_event(*tx_identifier, *cert_index)); } else { error!( "Historical SPO for {} not registered when try to retire it", @@ -538,18 +534,28 @@ impl State { } // Handle certificates for tx_cert in tx_certs_msg.certificates.iter() { - match tx_cert { + match &tx_cert.cert { // for spo_state - TxCertificate::PoolRegistrationWithPos(reg_with_pos) => { - self.handle_pool_registration(block, reg_with_pos); + TxCertificate::PoolRegistration(reg) => { + self.handle_pool_registration( + block, + ®, + &tx_cert.tx_identifier, + &tx_cert.cert_index, + ); } - TxCertificate::PoolRetirementWithPos(ret_with_pos) => { - self.handle_pool_retirement(block, ret_with_pos); + TxCertificate::PoolRetirement(ret) => { + self.handle_pool_retirement( + block, + &ret, + &tx_cert.tx_identifier, + &tx_cert.cert_index, + ); } // for stake addresses - TxCertificate::StakeRegistration(stake_address_with_pos) => { - self.register_stake_address(&stake_address_with_pos.stake_address); + TxCertificate::StakeRegistration(stake_address) => { + self.register_stake_address(&stake_address); } TxCertificate::StakeDeregistration(stake_address) => { self.deregister_stake_address(&stake_address); @@ -681,7 +687,7 @@ mod tests { use crate::test_utils::*; use acropolis_common::{ state_history::{StateHistory, StateHistoryStore}, - PoolRetirement, Ratio, StakeAddress, TxCertificate, TxHash, + PoolRetirement, Ratio, StakeAddress, TxCertificate, TxCertificateWithPos, TxIdentifier, }; use tokio::sync::Mutex; @@ -730,13 +736,11 @@ mod tests { async fn spo_gets_registered() { let mut state = State::default(); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: default_pool_registration(vec![0], None), - tx_hash: TxHash::default(), - cert_index: 1, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(default_pool_registration(vec![0], None)), + tx_identifier: TxIdentifier::default(), + cert_index: 1, + }); let block = new_block(1); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); @@ -748,16 +752,14 @@ mod tests { async fn pending_deregistration_gets_queued() { let mut state = State::default(); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 1, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 1, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); let block = new_block(0); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.pending_deregistrations.len()); @@ -774,30 +776,26 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 2, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 2, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![1], - epoch: 2, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![1], + epoch: 2, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.pending_deregistrations.len()); @@ -819,32 +817,28 @@ mod tests { let mut state = history.lock().await.get_current_state(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 2, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 2, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); let mut state = history.lock().await.get_current_state(); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![1], - epoch: 2, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![1], + epoch: 2, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); @@ -866,13 +860,11 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: default_pool_registration(vec![0], None), - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(default_pool_registration(vec![0], None)), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); @@ -881,16 +873,14 @@ mod tests { block.number = 1; let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 1, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 1, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.epoch = 1; // SPO get retired at the start of the epoch it requests @@ -909,13 +899,11 @@ mod tests { let mut state = history.lock().await.get_current_state(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: default_pool_registration(vec![0], None), - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(default_pool_registration(vec![0], None)), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); assert_eq!(1, state.spos.len()); let spo = state.spos.get(&vec![0u8]); @@ -925,16 +913,14 @@ mod tests { let mut state = history.lock().await.get_current_state(); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 1, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 1, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); history.lock().await.commit(block.number, state); @@ -967,30 +953,26 @@ mod tests { let mut state = State::default(); let mut block = new_block(0); let mut msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![0], - epoch: 2, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![0], + epoch: 2, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block.number = 1; msg = new_certs_msg(); - msg.certificates.push(TxCertificate::PoolRetirementWithPos( - PoolRetirementWithPos { - ret: PoolRetirement { - operator: vec![1], - epoch: 3, - }, - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRetirement(PoolRetirement { + operator: vec![1], + epoch: 3, + }), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); let mut retiring_pools = state.get_retiring_pools(); retiring_pools.sort_by_key(|p| p.epoch); @@ -1014,13 +996,11 @@ mod tests { let mut block = new_block(0); let mut msg = new_certs_msg(); let spo_id = keyhash_224(&vec![1 as u8]); - msg.certificates.push(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: default_pool_registration(spo_id.clone(), None), - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(default_pool_registration(spo_id.clone(), None)), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block = new_block(2); @@ -1051,13 +1031,11 @@ mod tests { let mut block = new_block(0); let mut msg = new_certs_msg(); let spo_id = keyhash_224(&vec![1 as u8]); - msg.certificates.push(TxCertificate::PoolRegistrationWithPos( - PoolRegistrationWithPos { - reg: default_pool_registration(spo_id.clone(), None), - tx_hash: TxHash::default(), - cert_index: 0, - }, - )); + msg.certificates.push(TxCertificateWithPos { + cert: TxCertificate::PoolRegistration(default_pool_registration(spo_id.clone(), None)), + tx_identifier: TxIdentifier::default(), + cert_index: 0, + }); assert!(state.handle_tx_certs(&block, &msg).is_ok()); block = new_block(2); assert_eq!(true, state.handle_mint(&block, &vec![1])); // Note raw issuer_vkey diff --git a/modules/spo_state/src/test_utils.rs b/modules/spo_state/src/test_utils.rs index 2e8772d8..8babf02f 100644 --- a/modules/spo_state/src/test_utils.rs +++ b/modules/spo_state/src/test_utils.rs @@ -2,7 +2,7 @@ use acropolis_common::{ messages::{ EpochActivityMessage, SPORewardsMessage, SPOStakeDistributionMessage, TxCertificatesMessage, }, - BlockHash, BlockInfo, BlockStatus, Era, TxCertificate, + BlockHash, BlockInfo, BlockStatus, Era, TxCertificateWithPos, }; use crate::store_config::StoreConfig; @@ -75,7 +75,7 @@ pub fn new_block(epoch: u64) -> BlockInfo { pub fn new_certs_msg() -> TxCertificatesMessage { TxCertificatesMessage { - certificates: Vec::::new(), + certificates: Vec::::new(), } } diff --git a/modules/stake_delta_filter/src/state.rs b/modules/stake_delta_filter/src/state.rs index f048e6ee..c898ba0a 100644 --- a/modules/stake_delta_filter/src/state.rs +++ b/modules/stake_delta_filter/src/state.rs @@ -83,16 +83,16 @@ impl State { block: &BlockInfo, msg: &TxCertificatesMessage, ) -> Result<()> { - for cert in msg.certificates.iter() { - if let TxCertificate::StakeRegistration(reg) = cert { + for tx_cert in msg.certificates.iter() { + if let TxCertificate::StakeRegistration(stake_address) = &tx_cert.cert { let ptr = ShelleyAddressPointer { slot: block.slot, - tx_index: reg.tx_index, - cert_index: reg.cert_index, + tx_index: tx_cert.tx_identifier.tx_index() as u64, + cert_index: tx_cert.cert_index, }; // Sets pointer; updates max processed slot - self.pointer_cache.set_pointer(ptr, reg.stake_address.clone(), block.slot); + self.pointer_cache.set_pointer(ptr, stake_address.clone(), block.slot); } } Ok(()) diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index b63c09f3..7111ca3c 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -268,9 +268,8 @@ impl TxUnpacker { } if publish_certificates_topic.is_some() { - let tx_hash = tx.hash(); for ( cert_index, cert) in certs.iter().enumerate() { - match map_parameters::map_certificate(&cert, TxHash(*tx_hash), tx_index, cert_index, network_id.clone()) { + match map_parameters::map_certificate(&cert, tx_identifier, cert_index, network_id.clone()) { Ok(tx_cert) => { certificates.push(tx_cert); }, diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 20bccd9a..6922217e 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -65,6 +65,13 @@ store-stake-addresses = false store-spdd = false [module.historical-accounts-state] +store-rewards-history = false +store-active-stake-history = false +store-registration-history = false +store-delegation-history = false +store-mir-history = false +store-withdrawal-history = false +store-addresses = false [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled)