diff --git a/common/src/messages.rs b/common/src/messages.rs index a7f9d1c7..0322e722 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -73,7 +73,7 @@ pub struct GenesisUTxOsMessage { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct UTXODeltasMessage { /// Ordered set of deltas - pub deltas: Vec, + pub deltas: Vec, } /// Message encapsulating multiple asset deltas diff --git a/common/src/queries/accounts.rs b/common/src/queries/accounts.rs index 5537cb55..3a311de3 100644 --- a/common/src/queries/accounts.rs +++ b/common/src/queries/accounts.rs @@ -29,6 +29,7 @@ pub enum AccountsStateQuery { GetAccountMIRHistory { account: StakeAddress }, GetAccountWithdrawalHistory { account: StakeAddress }, GetAccountAssociatedAddresses { account: StakeAddress }, + GetAccountTotalTxCount { account: StakeAddress }, // Epochs-related queries GetActiveStakes {}, @@ -62,6 +63,7 @@ pub enum AccountsStateQueryResponse { AccountMIRHistory(Vec), AccountWithdrawalHistory(Vec), AccountAssociatedAddresses(Vec), + AccountTotalTxCount(u32), // Epochs-related responses ActiveStakes(u64), diff --git a/common/src/stake_addresses.rs b/common/src/stake_addresses.rs index 260006ca..b871284b 100644 --- a/common/src/stake_addresses.rs +++ b/common/src/stake_addresses.rs @@ -605,6 +605,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 100, }); @@ -682,6 +683,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 100, }); @@ -741,6 +743,7 @@ mod tests { let delta = StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 42, }; stake_addresses.process_stake_delta(&delta); @@ -760,12 +763,14 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 100, }); stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: -30, }); @@ -782,6 +787,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 50, }); @@ -789,6 +795,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: -100, }); @@ -810,6 +817,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 42, }); assert_eq!(stake_addresses.get(&stake_address).unwrap().utxo_value, 42); @@ -1021,6 +1029,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); @@ -1028,6 +1037,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); @@ -1055,11 +1065,13 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1079,6 +1091,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); @@ -1105,6 +1118,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); @@ -1112,6 +1126,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); @@ -1119,6 +1134,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr3.clone(), addresses: Vec::new(), + tx_count: 1, delta: 3000, }); stake_addresses.add_to_reward(&addr3, 150); @@ -1159,6 +1175,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); @@ -1166,6 +1183,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); stake_addresses.add_to_reward(&addr2, 100); @@ -1192,11 +1210,13 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1221,6 +1241,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 50); @@ -1228,6 +1249,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1255,6 +1277,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 500); @@ -1262,6 +1285,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1283,6 +1307,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); @@ -1314,6 +1339,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 500); @@ -1321,6 +1347,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1340,6 +1367,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); @@ -1375,6 +1403,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 100); @@ -1382,6 +1411,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1403,6 +1433,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); @@ -1434,6 +1465,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.add_to_reward(&addr1, 100); @@ -1441,6 +1473,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); @@ -1460,6 +1493,7 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); @@ -1550,16 +1584,19 @@ mod tests { stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 1000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 2000, }); stake_addresses.process_stake_delta(&StakeAddressDelta { stake_address: addr3.clone(), addresses: Vec::new(), + tx_count: 1, delta: 3000, }); diff --git a/common/src/types.rs b/common/src/types.rs index ff5fd9dd..d7c90545 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -187,17 +187,33 @@ impl PartialOrd for BlockInfo { } } +// Individual transaction UTxO deltas +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TxUTxODeltas { + // Transaction in which delta occured + pub tx_identifier: TxIdentifier, + + // Created and spent UTxOs + pub inputs: Vec, + pub outputs: Vec, +} + /// Individual address balance change #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct AddressDelta { - /// Address + // Address involved in delta pub address: Address, - /// UTxO causing address delta - pub utxo: UTxOIdentifier, + // Transaction in which delta occured + pub tx_identifier: TxIdentifier, - /// Balance change - pub value: ValueDelta, + // Address impacted UTxOs + pub spent_utxos: Vec, + pub created_utxos: Vec, + + // Sums of spent and created UTxOs + pub sent: Value, + pub received: Value, } /// Stake balance change @@ -209,6 +225,9 @@ pub struct StakeAddressDelta { /// Shelley addresses contributing to the delta pub addresses: Vec, + /// The number of transactions contributing to the delta + pub tx_count: u32, + /// Balance change pub delta: i64, } @@ -339,7 +358,7 @@ pub enum ReferenceScript { } /// Value (lovelace + multiasset) -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] pub struct Value { pub lovelace: u64, pub assets: NativeAssets, @@ -403,59 +422,66 @@ impl AddAssign for ValueMap { } } -/// Hashmap representation of ValueDelta (lovelace + multiasset) -pub struct ValueDeltaMap { - pub lovelace: i64, - pub assets: NativeAssetsDeltaMap, -} - -impl From for ValueDeltaMap { - fn from(value: ValueDelta) -> Self { - let mut assets = HashMap::new(); +impl ValueMap { + pub fn add_value(&mut self, other: &Value) { + // Handle lovelace + self.lovelace = self.lovelace.saturating_add(other.lovelace); - for (policy, asset_list) in value.assets { - let policy_entry = assets.entry(policy).or_insert_with(HashMap::new); - for asset in asset_list { - *policy_entry.entry(asset.name).or_insert(0) += asset.amount; + // Handle multi-assets + for (policy, assets) in &other.assets { + let policy_entry = self.assets.entry(*policy).or_default(); + for asset in assets { + *policy_entry.entry(asset.name).or_default() = policy_entry + .get(&asset.name) + .copied() + .unwrap_or(0) + .saturating_add(asset.amount); } } - - ValueDeltaMap { - lovelace: value.lovelace, - assets, - } } } -impl AddAssign for ValueDeltaMap { - fn add_assign(&mut self, delta: ValueDelta) { - self.lovelace += delta.lovelace; - - for (policy, assets) in delta.assets { - let policy_entry = self.assets.entry(policy).or_default(); - for asset in assets { - *policy_entry.entry(asset.name).or_insert(0) += asset.amount; - } +impl From for Value { + fn from(map: ValueMap) -> Self { + Self { + lovelace: map.lovelace, + assets: map + .assets + .into_iter() + .map(|(policy, assets)| { + ( + policy, + assets + .into_iter() + .map(|(name, amount)| NativeAsset { name, amount }) + .collect(), + ) + }) + .collect(), } } } -impl From for ValueDelta { - fn from(map: ValueDeltaMap) -> Self { - let mut assets_vec = Vec::with_capacity(map.assets.len()); - - for (policy, asset_map) in map.assets { - let inner_assets = asset_map +impl From for ValueDelta { + fn from(map: ValueMap) -> Self { + Self { + lovelace: map.lovelace as i64, + assets: map + .assets .into_iter() - .map(|(name, amount)| NativeAssetDelta { name, amount }) - .collect(); - - assets_vec.push((policy, inner_assets)); - } - - ValueDelta { - lovelace: map.lovelace, - assets: assets_vec, + .map(|(policy, assets)| { + ( + policy, + assets + .into_iter() + .map(|(name, amount)| NativeAssetDelta { + name, + amount: amount as i64, + }) + .collect(), + ) + }) + .collect(), } } } @@ -553,27 +579,6 @@ pub struct TxOutput { pub reference_script: Option, } -/// Transaction input (UTXO reference) -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TxInput { - /// Identifer of the referenced UTxO - pub utxo_identifier: UTxOIdentifier, -} - -/// Option of either TxOutput or TxInput -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub enum UTXODelta { - None(()), - Output(TxOutput), - Input(TxInput), -} - -impl Default for UTXODelta { - fn default() -> Self { - Self::None(()) - } -} - /// Key hash pub type KeyHash = Hash<28>; @@ -2149,6 +2154,12 @@ pub struct AssetAddressEntry { pub quantity: u64, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TxTotals { + pub sent: Value, + pub received: Value, +} + #[derive( Debug, Default, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, )] @@ -2170,25 +2181,19 @@ impl AddAssign for AddressTotals { } impl AddressTotals { - pub fn apply_delta(&mut self, delta: &ValueDelta) { - if delta.lovelace > 0 { - self.received.lovelace += delta.lovelace as u64; - } else if delta.lovelace < 0 { - self.sent.lovelace += (-delta.lovelace) as u64; + pub fn apply_delta(&mut self, delta: &TxTotals) { + self.received.lovelace += delta.received.lovelace; + self.sent.lovelace += delta.sent.lovelace; + + for (policy, assets) in &delta.received.assets { + for asset in assets { + Self::apply_asset(&mut self.received.assets, *policy, asset.name, asset.amount); + } } - for (policy, assets) in &delta.assets { - for a in assets { - if a.amount > 0 { - Self::apply_asset(&mut self.received.assets, *policy, a.name, a.amount as u64); - } else if a.amount < 0 { - Self::apply_asset( - &mut self.sent.assets, - *policy, - a.name, - a.amount.unsigned_abs(), - ); - } + for (policy, assets) in &delta.sent.assets { + for asset in assets { + Self::apply_asset(&mut self.sent.assets, *policy, asset.name, asset.amount); } } diff --git a/modules/accounts_state/src/state.rs b/modules/accounts_state/src/state.rs index e2fa28f7..cb757e47 100644 --- a/modules/accounts_state/src/state.rs +++ b/modules/accounts_state/src/state.rs @@ -1012,6 +1012,7 @@ mod tests { deltas: vec![StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 42, }], }; @@ -1103,6 +1104,7 @@ mod tests { deltas: vec![StakeAddressDelta { stake_address: addr1.clone(), addresses: Vec::new(), + tx_count: 1, delta: 42, }], }; @@ -1113,6 +1115,7 @@ mod tests { deltas: vec![StakeAddressDelta { stake_address: addr2.clone(), addresses: Vec::new(), + tx_count: 1, delta: 21, }], }; @@ -1212,6 +1215,7 @@ mod tests { deltas: vec![StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 99, }], }; @@ -1259,6 +1263,7 @@ mod tests { deltas: vec![StakeAddressDelta { stake_address: stake_address.clone(), addresses: Vec::new(), + tx_count: 1, delta: 99, }], }; @@ -1425,21 +1430,25 @@ mod tests { StakeAddressDelta { stake_address: spo1, addresses: Vec::new(), + tx_count: 1, delta: 100, }, StakeAddressDelta { stake_address: spo2, addresses: Vec::new(), + tx_count: 1, delta: 1_000, }, StakeAddressDelta { stake_address: spo3, addresses: Vec::new(), + tx_count: 1, delta: 10_000, }, StakeAddressDelta { stake_address: spo4, addresses: Vec::new(), + tx_count: 1, delta: 100_000, }, ]; diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index dc84d068..897153fe 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -1,12 +1,12 @@ use std::{ - collections::{HashMap, HashSet}, + collections::HashSet, path::{Path, PathBuf}, sync::Arc, }; use acropolis_common::{ - Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, UTxOIdentifier, - ValueDelta, ValueDeltaMap, + Address, AddressDelta, AddressTotals, BlockInfo, ShelleyAddress, TxIdentifier, TxTotals, + UTxOIdentifier, }; use anyhow::Result; @@ -42,7 +42,7 @@ pub enum UtxoDelta { pub struct AddressEntry { pub utxos: Option>, pub transactions: Option>, - pub totals: Option>, + pub totals: Option>, } #[derive(Clone)] @@ -175,47 +175,29 @@ impl State { pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) { let addresses = self.volatile.window.back_mut().expect("window should never be empty"); - // Keeps track seen txs to avoid overcounting totals tx count and duplicating tx identifiers - let mut seen: HashMap> = HashMap::new(); - for delta in deltas { - let tx_id = TxIdentifier::from(delta.utxo); let entry = addresses.entry(delta.address.clone()).or_default(); if self.config.store_info { let utxos = entry.utxos.get_or_insert(Vec::new()); - if delta.value.lovelace > 0 { - utxos.push(UtxoDelta::Created(delta.utxo)); - } else { - utxos.push(UtxoDelta::Spent(delta.utxo)); + for spent_utxo in &delta.spent_utxos { + utxos.push(UtxoDelta::Spent(*spent_utxo)) + } + for created_utxo in &delta.created_utxos { + utxos.push(UtxoDelta::Created(*created_utxo)) } } - if self.config.store_transactions || self.config.store_totals { - let seen_for_addr = seen.entry(delta.address.clone()).or_default(); + if self.config.store_transactions { + entry.transactions.get_or_insert(Vec::new()).push(delta.tx_identifier); + } + if self.config.store_totals { + let totals = entry.totals.get_or_insert(Vec::new()); - if self.config.store_transactions { - let txs = entry.transactions.get_or_insert(Vec::new()); - if !seen_for_addr.contains(&tx_id) { - txs.push(tx_id); - } - } - if self.config.store_totals { - let totals = entry.totals.get_or_insert(Vec::new()); - - if seen_for_addr.contains(&tx_id) { - if let Some(last_total) = totals.last_mut() { - // Create temporary map for summing same tx deltas efficiently - // TODO: Potentially move upstream to address deltas publisher - let mut map = ValueDeltaMap::from(last_total.clone()); - map += delta.value.clone(); - *last_total = ValueDelta::from(map); - } - } else { - totals.push(delta.value.clone()); - } - } - seen_for_addr.insert(tx_id); + totals.push(TxTotals { + sent: delta.sent.clone(), + received: delta.received.clone(), + }) } } } @@ -249,7 +231,7 @@ impl State { #[cfg(test)] mod tests { use super::*; - use acropolis_common::{Address, AddressDelta, UTxOIdentifier, ValueDelta}; + use acropolis_common::{Address, AddressDelta, UTxOIdentifier, Value}; use tempfile::tempdir; fn dummy_address() -> Address { @@ -274,14 +256,21 @@ mod tests { Ok(state) } - fn delta(addr: &Address, utxo: &UTxOIdentifier, lovelace: i64) -> AddressDelta { + fn delta( + addr: &Address, + tx_id: TxIdentifier, + spent_utxos: Vec, + created_utxos: Vec, + lovelace_sent: u64, + lovelace_received: u64, + ) -> AddressDelta { AddressDelta { address: addr.clone(), - utxo: *utxo, - value: ValueDelta { - lovelace, - assets: Vec::new(), - }, + tx_identifier: tx_id, + spent_utxos, + created_utxos, + sent: Value::new(lovelace_sent, Vec::new()), + received: Value::new(lovelace_received, Vec::new()), } } @@ -293,7 +282,8 @@ mod tests { let addr = dummy_address(); let utxo = UTxOIdentifier::new(0, 0, 0); - let deltas = vec![delta(&addr, &utxo, 1)]; + let tx_id = TxIdentifier::new(0, 0); + let deltas = vec![delta(&addr, tx_id, vec![], vec![utxo], 0, 1)]; // Apply deltas state.apply_address_deltas(&deltas); @@ -334,9 +324,11 @@ mod tests { let addr = dummy_address(); let utxo = UTxOIdentifier::new(0, 0, 0); + let tx_id_create = TxIdentifier::new(0, 0); + let tx_id_spend = TxIdentifier::new(1, 0); - let created = vec![delta(&addr, &utxo, 1)]; - + let created = vec![delta(&addr, tx_id_create, vec![], vec![utxo], 0, 1)]; + let spent = vec![delta(&addr, tx_id_spend, vec![utxo], vec![], 1, 0)]; // Apply delta to volatile state.apply_address_deltas(&created); @@ -352,7 +344,7 @@ mod tests { assert_eq!(after_persist.as_ref().unwrap(), &[utxo]); state.volatile.next_block(); - state.apply_address_deltas(&[delta(&addr, &utxo, -1)]); + state.apply_address_deltas(&spent); // Verify UTxO was removed while in volatile let after_spend_volatile = state.get_address_utxos(&addr).await?; @@ -384,12 +376,21 @@ mod tests { let addr = dummy_address(); let utxo_old = UTxOIdentifier::new(0, 0, 0); let utxo_new = UTxOIdentifier::new(0, 1, 0); + let tx_id_create_old = TxIdentifier::new(0, 0); + let tx_id_spend_old_create_new = TxIdentifier::new(1, 0); state.volatile.epoch_start_block = 1; - state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)]); + state.apply_address_deltas(&[delta(&addr, tx_id_create_old, vec![], vec![utxo_old], 0, 1)]); state.volatile.next_block(); - state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)]); + state.apply_address_deltas(&[delta( + &addr, + tx_id_spend_old_create_new, + vec![utxo_old], + vec![utxo_new], + 1, + 1, + )]); // Verify Create and spend both in volatile is not included in address utxos let volatile = state.get_address_utxos(&addr).await?; @@ -419,67 +420,4 @@ mod tests { Ok(()) } - - #[tokio::test] - async fn test_same_tx_deltas_sums_totals_in_volatile() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - - let mut state = setup_state_and_store().await?; - - let addr = dummy_address(); - let delta_1 = UTxOIdentifier::new(0, 1, 0); - let delta_2 = UTxOIdentifier::new(0, 1, 1); - - state.volatile.epoch_start_block = 1; - - state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]); - - // Verify only 1 totals entry with delta of 2 - let volatile = state - .volatile - .window - .back() - .expect("Window should have a delta") - .get(&addr) - .expect("Entry should be populated") - .totals - .as_ref() - .expect("Totals should be populated"); - - assert_eq!(volatile.len(), 1); - assert_eq!(volatile.first().expect("Should be populated").lovelace, 2); - - Ok(()) - } - - #[tokio::test] - async fn test_same_tx_deltas_prevents_duplicate_identifier_in_volatile() -> Result<()> { - let _ = tracing_subscriber::fmt::try_init(); - - let mut state = setup_state_and_store().await?; - - let addr = dummy_address(); - let delta_1 = UTxOIdentifier::new(0, 1, 0); - let delta_2 = UTxOIdentifier::new(0, 1, 1); - - state.volatile.epoch_start_block = 1; - - state.apply_address_deltas(&[delta(&addr, &delta_1, 1), delta(&addr, &delta_2, 1)]); - - // Verify only 1 transactions entry - let volatile = state - .volatile - .window - .back() - .expect("Window should have a delta") - .get(&addr) - .expect("Entry should be populated") - .transactions - .as_ref() - .expect("Transactions should be populated"); - - assert_eq!(volatile.len(), 1); - - Ok(()) - } } diff --git a/modules/assets_state/src/state.rs b/modules/assets_state/src/state.rs index f07f7687..e8f8adc1 100644 --- a/modules/assets_state/src/state.rs +++ b/modules/assets_state/src/state.rs @@ -1,12 +1,13 @@ //! Acropolis AssetsState: State storage +use std::collections::HashSet; + use crate::asset_registry::{AssetId, AssetRegistry}; use acropolis_common::{ - math::update_value_with_delta, queries::assets::{AssetHistory, PolicyAssets}, Address, AddressDelta, AssetAddressEntry, AssetInfoRecord, AssetMetadataStandard, AssetMintRecord, AssetName, Datum, Lovelace, NativeAssetsDelta, PolicyAsset, PolicyId, - ShelleyAddress, TxIdentifier, UTXODelta, + ShelleyAddress, TxIdentifier, TxUTxODeltas, }; use anyhow::Result; use imbl::{HashMap, Vector}; @@ -355,7 +356,7 @@ impl State { pub fn handle_transactions( &self, - deltas: &[UTXODelta], + deltas: &[TxUTxODeltas], registry: &AssetRegistry, ) -> Result { let mut new_txs = self.transactions.clone(); @@ -369,27 +370,28 @@ impl State { let store_cfg = self.config.store_transactions; - for delta in deltas { - let UTXODelta::Output(output) = delta else { - continue; - }; - - let tx_identifier = TxIdentifier::from(output.utxo_identifier); - for (policy_id, assets) in &output.value.assets { - for asset in assets { - if let Some(asset_id) = registry.lookup_id(policy_id, &asset.name) { - let entry = txs_map.entry(asset_id).or_default(); + for tx in deltas { + let mut tx_asset_ids = HashSet::new(); + for output in &tx.outputs { + for (policy_id, assets) in &output.value.assets { + for asset in assets { + if let Some(asset_id) = registry.lookup_id(policy_id, &asset.name) { + tx_asset_ids.insert(asset_id); + } + } + } + } - let should_push = entry.back() != Some(&tx_identifier); + for asset_id in &tx_asset_ids { + let entry = txs_map.entry(*asset_id).or_default(); - if should_push { - entry.push_back(tx_identifier); + let last = entry.back().copied(); + if last != Some(tx.tx_identifier) { + entry.push_back(tx.tx_identifier); - if let StoreTransactions::Last(max) = store_cfg { - if entry.len() as u64 > max { - entry.pop_front(); - } - } + if let StoreTransactions::Last(max) = store_cfg { + if entry.len() as u64 > max { + entry.pop_front(); } } } @@ -418,30 +420,31 @@ impl State { for address_delta in deltas { if let Address::Shelley(shelley_addr) = &address_delta.address { - for (policy_id, asset_deltas) in &address_delta.value.assets { - for asset_delta in asset_deltas { - if let Some(asset_id) = registry.lookup_id(policy_id, &asset_delta.name) { + for (policy_id, assets) in &address_delta.sent.assets { + for asset in assets { + if let Some(asset_id) = registry.lookup_id(policy_id, &asset.name) { if let Some(holders) = addr_map.get_mut(&asset_id) { - let new_balance = { - let current = holders.entry(shelley_addr.clone()).or_insert(0); - if let Err(e) = - update_value_with_delta(current, asset_delta.amount) - { - error!( - "Address balance update error for {:?}: {e}", - address_delta.address - ); - 0 - } else { - *current - } - }; - - if new_balance == 0 { + let current = holders.entry(shelley_addr.clone()).or_insert(0); + *current = current.saturating_sub(asset.amount); + + if *current == 0 { holders.remove(shelley_addr); } } else { - error!("Address delta for unknown asset_id: {:?}", asset_id); + error!("Sent delta for unknown asset_id: {:?}", asset_id); + } + } + } + } + + for (policy_id, assets) in &address_delta.received.assets { + for asset in assets { + if let Some(asset_id) = registry.lookup_id(policy_id, &asset.name) { + if let Some(holders) = addr_map.get_mut(&asset_id) { + let current = holders.entry(shelley_addr.clone()).or_insert(0); + *current = current.saturating_add(asset.amount); + } else { + error!("Received delta for unknown asset_id: {:?}", asset_id); } } } @@ -540,42 +543,41 @@ impl State { pub fn handle_cip68_metadata( &self, - deltas: &[UTXODelta], + deltas: &[TxUTxODeltas], registry: &AssetRegistry, ) -> Result { let mut new_info = self.info.clone(); - for delta in deltas { - let UTXODelta::Output(output) = delta else { - continue; - }; - let Some(Datum::Inline(blob)) = &output.datum else { - continue; - }; + for tx in deltas { + for output in &tx.outputs { + let Some(Datum::Inline(blob)) = &output.datum else { + continue; + }; - for (policy_id, native_assets) in &output.value.assets { - for asset in native_assets { - let name = &asset.name; + for (policy_id, native_assets) in &output.value.assets { + for asset in native_assets { + let name = &asset.name; - if !name.as_slice().starts_with(&CIP68_LABEL_100) { - continue; - } + if !name.as_slice().starts_with(&CIP68_LABEL_100) { + continue; + } - // NOTE: CIP68 metadata version is included in the blob and is decoded in REST handler - match registry.lookup_id(policy_id, name) { - Some(asset_id) => { - if let Some(record) = - new_info.as_mut().and_then(|m| m.get_mut(&asset_id)) - { - record.onchain_metadata = Some(blob.clone()); + // NOTE: CIP68 metadata version is included in the blob and is decoded in REST handler + match registry.lookup_id(policy_id, name) { + Some(asset_id) => { + if let Some(record) = + new_info.as_mut().and_then(|m| m.get_mut(&asset_id)) + { + record.onchain_metadata = Some(blob.clone()); + } + } + None => { + error!( + "Got CIP-68 datum for unknown asset: {}.{}", + hex::encode(policy_id), + hex::encode(name.as_slice()) + ); } - } - None => { - error!( - "Got CIP-68 datum for unknown asset: {}.{}", - hex::encode(policy_id), - hex::encode(name.as_slice()) - ); } } } @@ -633,8 +635,8 @@ mod tests { }; use acropolis_common::{ Address, AddressDelta, AssetInfoRecord, AssetMetadataStandard, AssetName, Datum, - NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxIdentifier, TxInput, TxOutput, - UTXODelta, UTxOIdentifier, Value, ValueDelta, + NativeAsset, NativeAssetDelta, PolicyId, ShelleyAddress, TxIdentifier, TxOutput, + TxUTxODeltas, UTxOIdentifier, Value, }; fn dummy_policy(byte: u8) -> PolicyId { @@ -732,16 +734,47 @@ mod tests { serde_cbor::to_vec(&serde_cbor::Value::Map(policy_map)).unwrap() } - fn make_address_delta(policy_id: PolicyId, name: AssetName, amount: i64) -> AddressDelta { + fn make_address_delta( + policy_id: PolicyId, + name: AssetName, + sent_amount: u64, + received_amount: u64, + ) -> AddressDelta { AddressDelta { address: dummy_address(), - utxo: UTxOIdentifier::new(0, 0, 0), - value: ValueDelta { - lovelace: 0, - assets: vec![(policy_id, vec![NativeAssetDelta { name, amount }])] - .into_iter() - .collect(), - }, + tx_identifier: TxIdentifier::new(0, 0), + spent_utxos: Vec::new(), + created_utxos: Vec::new(), + + sent: Value::new( + 0, + if sent_amount > 0 { + vec![( + policy_id, + vec![NativeAsset { + name, + amount: sent_amount, + }], + )] + } else { + vec![] + }, + ), + + received: Value::new( + 0, + if received_amount > 0 { + vec![( + policy_id, + vec![NativeAsset { + name, + amount: received_amount, + }], + )] + } else { + vec![] + }, + ), } } @@ -1080,8 +1113,13 @@ mod tests { let datum_blob = vec![1, 2, 3, 4]; let output = make_output(policy_id, reference_name, Some(datum_blob.clone())); - let new_state = - state.handle_cip68_metadata(&[UTXODelta::Output(output)], ®istry).unwrap(); + let tx_deltas = TxUTxODeltas { + tx_identifier: TxIdentifier::new(0, 0), + inputs: Vec::new(), + outputs: vec![output], + }; + + let new_state = state.handle_cip68_metadata(&[tx_deltas], ®istry).unwrap(); let info = new_state.info.expect("info should be Some"); let record = info.get(&reference_id).expect("record should exist"); @@ -1106,8 +1144,13 @@ mod tests { let datum_blob = vec![1, 2, 3, 4]; let output = make_output(policy_id, normal_name, Some(datum_blob.clone())); - let delta = UTXODelta::Output(output); - let new_state = state.handle_cip68_metadata(&[delta], ®istry).unwrap(); + let tx_deltas = TxUTxODeltas { + tx_identifier: TxIdentifier::new(0, 0), + inputs: Vec::new(), + outputs: vec![output], + }; + + let new_state = state.handle_cip68_metadata(&[tx_deltas], ®istry).unwrap(); let info = new_state.info.expect("info should be Some"); let record = info.get(&normal_id).expect("non reference asset should exist"); @@ -1133,8 +1176,13 @@ mod tests { let datum_blob = vec![1, 2, 3, 4]; let output = make_output(policy_id, name, Some(datum_blob)); - let delta = UTXODelta::Output(output); - let new_state = state.handle_cip68_metadata(&[delta], ®istry).unwrap(); + let tx_deltas = TxUTxODeltas { + tx_identifier: TxIdentifier::new(0, 0), + inputs: Vec::new(), + outputs: vec![output], + }; + + let new_state = state.handle_cip68_metadata(&[tx_deltas], ®istry).unwrap(); let info = new_state.info.expect("info should be Some"); @@ -1159,14 +1207,16 @@ mod tests { StoreTransactions::None, ); - let input_delta = UTXODelta::Input(TxInput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), - }); + let input = UTxOIdentifier::new(0, 0, 0); let output = make_output(policy_id, name, None); - let output_delta = UTXODelta::Output(output); - let new_state = - state.handle_cip68_metadata(&[input_delta, output_delta], ®istry).unwrap(); + let tx_deltas = TxUTxODeltas { + tx_identifier: TxIdentifier::new(0, 0), + inputs: vec![input], + outputs: vec![output], + }; + + let new_state = state.handle_cip68_metadata(&[tx_deltas], ®istry).unwrap(); let info = new_state.info.expect("info should be Some"); let record = info.get(&asset_id).unwrap(); @@ -1278,16 +1328,26 @@ mod tests { let output = make_output(policy_id, asset_name, None); - let delta1 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), - ..output.clone() - }); - let delta2 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 1), - ..output - }); + let tx_identifier = TxIdentifier::new(0, 0); + + let tx1 = TxUTxODeltas { + tx_identifier, + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(0, 0, 0), + ..output.clone() + }], + }; + let tx2 = TxUTxODeltas { + tx_identifier, + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(0, 0, 1), + ..output + }], + }; - let new_state = state.handle_transactions(&[delta1, delta2], ®istry).unwrap(); + let new_state = state.handle_transactions(&[tx1, tx2], ®istry).unwrap(); let txs = new_state.transactions.expect("transactions should exist"); let entry = txs.get(&asset_id).expect("asset_id should be present"); @@ -1312,16 +1372,24 @@ mod tests { let out1 = make_output(policy_id, asset_name, None); let out2 = make_output(policy_id, asset_name, None); - let delta1 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), - ..out1 - }); - let delta2 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(10, 0, 0), - ..out2 - }); + let tx1 = TxUTxODeltas { + tx_identifier: TxIdentifier::new(9, 0), + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(9, 0, 0), + ..out1 + }], + }; + let tx2 = TxUTxODeltas { + tx_identifier: TxIdentifier::new(10, 0), + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(10, 0, 0), + ..out2 + }], + }; - let new_state = state.handle_transactions(&[delta1, delta2], ®istry).unwrap(); + let new_state = state.handle_transactions(&[tx1, tx2], ®istry).unwrap(); let txs = new_state.transactions.expect("transactions should exist"); let entry = txs.get(&asset_id).expect("asset_id should be present"); @@ -1347,20 +1415,32 @@ mod tests { ); let base_output = make_output(policy_id, asset_name, None); - let delta1 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(9, 0, 0), - ..base_output.clone() - }); - let delta2 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(8, 0, 0), - ..base_output.clone() - }); - let delta3 = UTXODelta::Output(acropolis_common::TxOutput { - utxo_identifier: UTxOIdentifier::new(7, 0, 0), - ..base_output - }); + let tx1 = TxUTxODeltas { + tx_identifier: TxIdentifier::new(9, 0), + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(9, 0, 0), + ..base_output.clone() + }], + }; + let tx2 = TxUTxODeltas { + tx_identifier: TxIdentifier::new(8, 0), + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(8, 0, 0), + ..base_output.clone() + }], + }; + let tx3 = TxUTxODeltas { + tx_identifier: TxIdentifier::new(7, 0), + inputs: Vec::new(), + outputs: vec![TxOutput { + utxo_identifier: UTxOIdentifier::new(7, 0, 0), + ..base_output + }], + }; - let new_state = state.handle_transactions(&[delta1, delta2, delta3], ®istry).unwrap(); + let new_state = state.handle_transactions(&[tx1, tx2, tx3], ®istry).unwrap(); let txs = new_state.transactions.expect("transactions should exist"); let entry = txs.get(&asset_id).expect("asset_id should be present"); @@ -1384,8 +1464,8 @@ mod tests { StoreTransactions::None, ); - let delta1 = make_address_delta(policy_id, asset_name, 10); - let delta2 = make_address_delta(policy_id, asset_name, 15); + let delta1 = make_address_delta(policy_id, asset_name, 0, 10); + let delta2 = make_address_delta(policy_id, asset_name, 0, 15); let new_state = state.handle_address_deltas(&[delta1, delta2], ®istry).unwrap(); let addr_map = new_state.addresses.unwrap(); @@ -1417,7 +1497,7 @@ mod tests { StoreTransactions::None, ); - let add_delta = make_address_delta(policy_id, asset_name, 10); + let add_delta = make_address_delta(policy_id, asset_name, 0, 10); let state_after_add = state.handle_address_deltas(&[add_delta], ®istry).unwrap(); let addr_map = state_after_add.addresses.as_ref().unwrap(); let holders = addr_map.get(&asset_id).unwrap(); @@ -1433,7 +1513,7 @@ mod tests { 10 ); - let remove_delta = make_address_delta(policy_id, asset_name, -10); + let remove_delta = make_address_delta(policy_id, asset_name, 10, 0); let state_after_remove = state_after_add.handle_address_deltas(&[remove_delta], ®istry).unwrap(); let addr_map = state_after_remove.addresses.as_ref().unwrap(); diff --git a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs index c81ebc73..1cdcc111 100644 --- a/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs +++ b/modules/genesis_bootstrapper/src/genesis_bootstrapper.rs @@ -9,7 +9,7 @@ use acropolis_common::{ UTXODeltasMessage, }, Address, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, GenesisDelegates, Lovelace, - LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutRef, TxOutput, UTXODelta, + LovelaceDelta, Pot, PotDelta, TxHash, TxIdentifier, TxOutRef, TxOutput, TxUTxODeltas, UTxOIdentifier, Value, }; use anyhow::Result; @@ -157,7 +157,11 @@ impl GenesisBootstrapper { reference_script: None, }; - utxo_deltas_message.deltas.push(UTXODelta::Output(tx_output)); + utxo_deltas_message.deltas.push(TxUTxODeltas { + tx_identifier, + inputs: Vec::new(), + outputs: vec![tx_output], + }); total_allocated += amount; } diff --git a/modules/historical_accounts_state/src/historical_accounts_state.rs b/modules/historical_accounts_state/src/historical_accounts_state.rs index 4ee95710..68f950e1 100644 --- a/modules/historical_accounts_state/src/historical_accounts_state.rs +++ b/modules/historical_accounts_state/src/historical_accounts_state.rs @@ -43,6 +43,7 @@ const DEFAULT_STORE_DELEGATION_HISTORY: (&str, bool) = ("store-delegation-histor const DEFAULT_STORE_MIR_HISTORY: (&str, bool) = ("store-mir-history", false); const DEFAULT_STORE_WITHDRAWAL_HISTORY: (&str, bool) = ("store-withdrawal-history", false); const DEFAULT_STORE_ADDRESSES: (&str, bool) = ("store-addresses", false); +const DEFAULT_STORE_TX_COUNT: (&str, bool) = ("store-tx-count", false); /// Historical Accounts State module #[module( @@ -290,6 +291,9 @@ impl HistoricalAccountsState { store_addresses: config .get_bool(DEFAULT_STORE_ADDRESSES.0) .unwrap_or(DEFAULT_STORE_ADDRESSES.1), + store_tx_count: config + .get_bool(DEFAULT_STORE_TX_COUNT.0) + .unwrap_or(DEFAULT_STORE_TX_COUNT.1), }; // Initalize state @@ -387,6 +391,14 @@ impl HistoricalAccountsState { ), } } + AccountsStateQuery::GetAccountTotalTxCount { account } => { + match state.lock().await.get_total_tx_count(account).await { + Ok(count) => AccountsStateQueryResponse::AccountTotalTxCount(count), + Err(e) => AccountsStateQueryResponse::Error( + QueryError::internal_error(e.to_string()), + ), + } + } _ => AccountsStateQueryResponse::Error(QueryError::not_implemented(format!( "Unimplemented query variant: {:?}", query diff --git a/modules/historical_accounts_state/src/immutable_historical_account_store.rs b/modules/historical_accounts_state/src/immutable_historical_account_store.rs index 03ffbf80..6c16e177 100644 --- a/modules/historical_accounts_state/src/immutable_historical_account_store.rs +++ b/modules/historical_accounts_state/src/immutable_historical_account_store.rs @@ -25,6 +25,7 @@ pub struct ImmutableHistoricalAccountStore { mir_history: Partition, withdrawal_history: Partition, addresses: Partition, + tx_count: Partition, keyspace: Keyspace, pub pending: Mutex>>, } @@ -47,6 +48,7 @@ impl ImmutableHistoricalAccountStore { let mir_history = keyspace.open_partition("mir_history", PartitionCreateOptions::default())?; let addresses = keyspace.open_partition("addresses", PartitionCreateOptions::default())?; + let tx_count = keyspace.open_partition("tx_count", PartitionCreateOptions::default())?; Ok(Self { rewards_history, @@ -56,6 +58,7 @@ impl ImmutableHistoricalAccountStore { withdrawal_history, mir_history, addresses, + tx_count, keyspace, pending: Mutex::new(Vec::new()), }) @@ -141,6 +144,13 @@ impl ImmutableHistoricalAccountStore { } } } + + // Persist new tx count + if config.store_tx_count { + if let Some(count) = &entry.tx_count { + batch.insert(&self.tx_count, epoch_key, count.to_le_bytes()); + } + } } match batch.commit() { @@ -297,6 +307,26 @@ impl ImmutableHistoricalAccountStore { Ok((!addresses.is_empty()).then_some(addresses)) } + pub async fn get_tx_count(&self, account: &StakeAddress) -> Result> { + let mut total_count = 0; + + for result in self.tx_count.prefix(account.get_hash().as_ref()) { + let (_, bytes) = result?; + let epoch_count = u32::from_le_bytes(bytes[..4].try_into()?); + total_count += epoch_count; + } + + for block_map in self.pending.lock().await.iter() { + if let Some(entry) = block_map.get(account) { + if let Some(block_count) = &entry.tx_count { + total_count += block_count; + } + } + } + + Ok((total_count != 0).then_some(total_count)) + } + fn merge_block_deltas( block_deltas: Vec>, ) -> HashMap { @@ -317,6 +347,9 @@ impl ImmutableHistoricalAccountStore { Self::extend_opt_vec(&mut agg_entry.withdrawal_history, entry.withdrawal_history); Self::extend_opt_vec(&mut agg_entry.mir_history, entry.mir_history); Self::extend_opt_vec_ordered(&mut agg_entry.addresses, entry.addresses); + if let Some(count) = entry.tx_count { + agg_entry.tx_count = Some(agg_entry.tx_count.unwrap_or(0) + count); + } } acc }) diff --git a/modules/historical_accounts_state/src/state.rs b/modules/historical_accounts_state/src/state.rs index bb5c7eac..254e5f89 100644 --- a/modules/historical_accounts_state/src/state.rs +++ b/modules/historical_accounts_state/src/state.rs @@ -33,6 +33,7 @@ pub struct AccountEntry { pub withdrawal_history: Option>, pub mir_history: Option>, pub addresses: Option>, + pub tx_count: Option, } #[derive(Debug, Clone, minicbor::Decode, minicbor::Encode)] @@ -56,6 +57,7 @@ pub struct HistoricalAccountsConfig { pub store_mir_history: bool, pub store_withdrawal_history: bool, pub store_addresses: bool, + pub store_tx_count: bool, } impl HistoricalAccountsConfig { @@ -67,6 +69,7 @@ impl HistoricalAccountsConfig { || self.store_mir_history || self.store_withdrawal_history || self.store_addresses + || self.store_tx_count } } @@ -228,12 +231,11 @@ impl State { pub fn handle_address_deltas(&mut self, stake_address_deltas: &StakeAddressDeltasMessage) { let window = self.volatile.window.back_mut().expect("window should never be empty"); for delta in stake_address_deltas.deltas.iter() { - window - .entry(delta.stake_address.clone()) - .or_default() - .addresses - .get_or_insert_with(Vec::new) - .extend(delta.addresses.clone()); + let entry = window.entry(delta.stake_address.clone()).or_default(); + + entry.addresses.get_or_insert_with(Vec::new).extend(delta.addresses.clone()); + + *entry.tx_count.get_or_insert(0) += delta.tx_count; } } @@ -376,6 +378,20 @@ impl State { Ok((!addresses.is_empty()).then_some(addresses)) } + pub async fn get_total_tx_count(&self, account: &StakeAddress) -> Result { + let mut total_count = self.immutable.get_tx_count(account).await?.unwrap_or_default(); + + for block_map in self.volatile.window.iter() { + if let Some(entry) = block_map.get(account) { + if let Some(block_tx_count) = &entry.tx_count { + total_count += block_tx_count; + } + } + } + + Ok(total_count) + } + fn handle_stake_registration_change( &mut self, account: &StakeAddress, diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 5ac14472..206187d9 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -680,15 +680,35 @@ pub async fn handle_account_totals_blockfrost( ) .await?; - // TODO: Query historical accounts state to retrieve account tx count instead of - // using the addresses totals as the addresses totals does not deduplicate - // for multi-address transactions, overstating count + // Get account tx count from historical accounts state + let msg = Arc::new(Message::StateQuery(StateQuery::Accounts( + AccountsStateQuery::GetAccountTotalTxCount { + account: account.clone(), + }, + ))); + let tx_count = query_state( + &context, + &handlers_config.historical_accounts_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Accounts( + AccountsStateQueryResponse::AccountTotalTxCount(count), + )) => Ok(count), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving account tx count", + )), + }, + ) + .await?; let rest_response = AccountTotalsREST { stake_address: account.to_string()?, received_sum: totals.received.into(), sent_sum: totals.sent.into(), - tx_count: totals.tx_count, + tx_count, }; let json = serde_json::to_string_pretty(&rest_response)?; diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 8fc576fd..a8530a8e 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -943,7 +943,7 @@ pub struct AccountTotalsREST { pub stake_address: String, pub received_sum: AmountList, pub sent_sum: AmountList, - pub tx_count: u64, + pub tx_count: u32, } #[derive(serde::Serialize)] diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index b5c6f109..c839a8b7 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -1,7 +1,7 @@ use acropolis_common::{ messages::{AddressDeltasMessage, StakeAddressDeltasMessage}, Address, AddressDelta, BlockInfo, Era, ShelleyAddress, ShelleyAddressDelegationPart, - ShelleyAddressPointer, StakeAddress, StakeAddressDelta, StakeCredential, + ShelleyAddressPointer, StakeAddress, StakeAddressDelta, StakeCredential, TxIdentifier, }; use anyhow::{anyhow, Result}; use serde_with::serde_as; @@ -275,13 +275,15 @@ impl Tracker { .map(|a| a.to_string()) .unwrap_or(Ok("(none)".to_owned())) .unwrap_or("(???)".to_owned()); - delta += event.address_delta.value.lovelace; + let lovelace_delta = (event.address_delta.received.lovelace as i64) + - (event.address_delta.sent.lovelace as i64); + delta += lovelace_delta; chunk.push(format!( " blk {}, {}: {} ({:?}) => {} ({:?})", event.block.number, src_addr, - event.address_delta.value.lovelace, + lovelace_delta, event.address_delta.address, dst_addr, event.stake_address @@ -315,7 +317,8 @@ impl Tracker { struct StakeEntry { delta: i64, addresses: Vec, - seen: HashSet, + addresses_seen: HashSet, + txs_seen: HashSet, } /// Iterates through all address deltas in `delta`, leaves only stake addresses @@ -399,15 +402,17 @@ pub fn process_message( let entry = grouped.entry(stake_address).or_insert_with(|| StakeEntry { delta: 0, addresses: Vec::new(), - seen: HashSet::new(), + addresses_seen: HashSet::new(), + txs_seen: HashSet::new(), }); - entry.delta += d.value.lovelace; + entry.delta += (d.received.lovelace as i64) - (d.sent.lovelace as i64); if let Some(shelley) = shelley_opt { - if entry.seen.insert(shelley.clone()) { + if entry.addresses_seen.insert(shelley.clone()) { entry.addresses.push(shelley.clone()); } } + entry.txs_seen.insert(d.tx_identifier); } let deltas = grouped @@ -415,6 +420,7 @@ pub fn process_message( .map(|(stake_address, entry)| StakeAddressDelta { stake_address, addresses: entry.addresses, + tx_count: entry.txs_seen.len() as u32, delta: entry.delta, }) .collect(); @@ -429,8 +435,9 @@ mod test { use acropolis_common::{ messages::AddressDeltasMessage, Address, AddressDelta, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, ShelleyAddress, ShelleyAddressDelegationPart, ShelleyAddressPaymentPart, - ShelleyAddressPointer, StakeAddress, StakeCredential, UTxOIdentifier, ValueDelta, + ShelleyAddressPointer, StakeAddress, StakeCredential, }; + use acropolis_common::{TxIdentifier, Value}; use bech32::{Bech32, Hrp}; use pallas::ledger::addresses::{PaymentKeyHash, ScriptHash, StakeKeyHash}; @@ -438,8 +445,11 @@ mod test { let a = pallas::ledger::addresses::Address::from_bech32(s)?; Ok(AddressDelta { address: map_address(&a)?, - utxo: UTxOIdentifier::new(0, 0, 0), - value: ValueDelta::new(1, Vec::new()), + tx_identifier: TxIdentifier::default(), + spent_utxos: Vec::new(), + created_utxos: Vec::new(), + sent: Value::default(), + received: Value::default(), }) } diff --git a/modules/tx_unpacker/src/tx_unpacker.rs b/modules/tx_unpacker/src/tx_unpacker.rs index 5a06d4ee..95b083e6 100644 --- a/modules/tx_unpacker/src/tx_unpacker.rs +++ b/modules/tx_unpacker/src/tx_unpacker.rs @@ -186,22 +186,23 @@ impl TxUnpacker { } if publish_utxo_deltas_topic.is_some() { - // Add all the inputs - for input in inputs { // MultiEraInput - // Lookup and remove UTxOIdentifier from registry + // Group deltas by tx + let mut tx_utxo_deltas = TxUTxODeltas {tx_identifier, inputs: Vec::new(), outputs: Vec::new()}; + + // Remove inputs from UTxORegistry and push to UTxOIdentifiers to delta + for input in inputs { let oref = input.output_ref(); let tx_ref = TxOutRef::new(TxHash::from(**oref.hash()), oref.index() as u16); match utxo_registry.consume(&tx_ref) { Ok(tx_identifier) => { - // Add TxInput to utxo_deltas - utxo_deltas.push(UTXODelta::Input(TxInput { - utxo_identifier: UTxOIdentifier::new( + tx_utxo_deltas.inputs.push( + UTxOIdentifier::new( tx_identifier.block_number(), tx_identifier.tx_index(), tx_ref.output_index, ), - })); + ); } Err(e) => { error!("Failed to consume input {}: {e}", tx_ref.output_index); @@ -209,9 +210,8 @@ impl TxUnpacker { } } - // Add all the outputs + // Add outputs to UTxORegistry and push TxOutputs to delta for (index, output) in outputs { - // Add TxOutRef to registry match utxo_registry.add( block_number, tx_index, @@ -224,14 +224,13 @@ impl TxUnpacker { match output.address() { Ok(pallas_address) => match map_parameters::map_address(&pallas_address) { Ok(address) => { - // Add TxOutput to utxo_deltas - utxo_deltas.push(UTXODelta::Output(TxOutput { + tx_utxo_deltas.outputs.push(TxOutput { utxo_identifier: utxo_id, address, value: map_parameters::map_value(&output.value()), datum: map_parameters::map_datum(&output.datum()), reference_script: map_parameters::map_reference_script(&output.script_ref()) - })); + }); // catch all output lovelaces total_output += output.value().coin() as u128; @@ -246,6 +245,7 @@ impl TxUnpacker { } } } + utxo_deltas.push(tx_utxo_deltas); } if publish_asset_deltas_topic.is_some() { diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index f498abdb..f9c24c47 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -1,10 +1,9 @@ //! Acropolis UTXOState: State storage use crate::volatile_index::VolatileIndex; use acropolis_common::{ - messages::UTXODeltasMessage, params::SECURITY_PARAMETER_K, BlockInfo, BlockStatus, TxInput, - TxOutput, UTXODelta, + messages::UTXODeltasMessage, params::SECURITY_PARAMETER_K, BlockInfo, BlockStatus, TxOutput, }; -use acropolis_common::{AddressDelta, UTXOValue, UTxOIdentifier, Value, ValueDelta}; +use acropolis_common::{Address, AddressDelta, UTXOValue, UTxOIdentifier, Value, ValueMap}; use anyhow::Result; use async_trait::async_trait; use std::collections::HashMap; @@ -145,39 +144,13 @@ impl State { ); // Delete all UTXOs created in or after this block - let utxos = self.volatile_created.prune_on_or_after(block.number); - for key in utxos { - if let Some(utxo) = self.volatile_utxos.remove(&key) { - // Tell the observer to debit it - if let Some(observer) = self.address_delta_observer.as_ref() { - observer - .observe_delta(&AddressDelta { - address: utxo.address.clone(), - utxo: key, - value: -ValueDelta::from(&utxo.value), - }) - .await; - } - } + for key in self.volatile_created.prune_on_or_after(block.number) { + self.volatile_utxos.remove(&key); } // Any remaining (which were necessarily created before this block) // that were spent in or after this block can be reinstated - let utxos = self.volatile_spent.prune_on_or_after(block.number); - for key in utxos { - if let Some(utxo) = self.volatile_utxos.get(&key) { - // Tell the observer to recredit it - if let Some(observer) = self.address_delta_observer.as_ref() { - observer - .observe_delta(&AddressDelta { - address: utxo.address.clone(), - utxo: key, - value: ValueDelta::from(&utxo.value), - }) - .await; - } - } - } + self.volatile_spent.prune_on_or_after(block.number); // Let the pruner compress the map } @@ -200,15 +173,13 @@ impl State { } /// Observe an input UTXO spend - pub async fn observe_input(&mut self, input: &TxInput, block: &BlockInfo) -> Result<()> { - let key = input.utxo_identifier; - + pub async fn observe_input(&mut self, input: &UTxOIdentifier, block: &BlockInfo) -> Result<()> { if tracing::enabled!(tracing::Level::DEBUG) { - debug!("UTXO << {}", key); + debug!("UTXO << {}", input); } // UTXO exists? - match self.lookup_utxo(&key).await? { + match self.lookup_utxo(input).await? { Some(utxo) => { if tracing::enabled!(tracing::Level::DEBUG) { debug!( @@ -218,33 +189,23 @@ impl State { ); } - // Tell the observer it's spent - if let Some(obs) = &self.address_delta_observer { - obs.observe_delta(&AddressDelta { - address: utxo.address.clone(), - utxo: key, - value: -ValueDelta::from(&utxo.value), - }) - .await; - } - match block.status { BlockStatus::Volatile | BlockStatus::RolledBack => { // Add to volatile spent index - self.volatile_spent.add_utxo(&key); + self.volatile_spent.add_utxo(input); } BlockStatus::Bootstrap | BlockStatus::Immutable => { // Immutable - we can delete it immediately - self.immutable_utxos.delete_utxo(&key).await?; + self.immutable_utxos.delete_utxo(input).await?; } } } _ => { error!( "UTXO output {} unknown in transaction {} of block {}", - &key.output_index(), - key.tx_index(), - key.block_number() + &input.output_index(), + input.tx_index(), + input.block_number() ); } } @@ -294,16 +255,6 @@ impl State { } }; - // Tell the observer - if let Some(obs) = &self.address_delta_observer { - obs.observe_delta(&AddressDelta { - address: output.address.clone(), - utxo: output.utxo_identifier, - value: ValueDelta::from(&output.value), - }) - .await; - } - Ok(()) } @@ -379,19 +330,45 @@ impl State { self.observe_block(block).await?; // Process the deltas - for delta in &deltas.deltas { - // UTXODelta + for tx in &deltas.deltas { + // Temporary map to sum UTxO deltas efficiently + let mut address_map: HashMap = HashMap::new(); - match delta { - UTXODelta::Input(tx_input) => { - self.observe_input(tx_input, block).await?; - } + for input in &tx.inputs { + if let Some(utxo) = self.lookup_utxo(input).await? { + // Remove or mark spent + self.observe_input(input, block).await?; + + let addr = utxo.address.clone(); + let entry = address_map.entry(addr.clone()).or_default(); - UTXODelta::Output(tx_output) => { - self.observe_output(tx_output, block).await?; + entry.spent_utxos.push(*input); + entry.sent.add_value(&utxo.value); } + } + + for output in &tx.outputs { + self.observe_output(output, block).await?; + + let addr = output.address.clone(); + let entry = address_map.entry(addr.clone()).or_default(); - _ => {} + entry.created_utxos.push(output.utxo_identifier); + entry.received.add_value(&output.value); + } + + for (addr, entry) in address_map { + let delta = AddressDelta { + address: addr, + tx_identifier: tx.tx_identifier, + spent_utxos: entry.spent_utxos, + created_utxos: entry.created_utxos, + sent: Value::from(entry.sent), + received: Value::from(entry.received), + }; + if let Some(observer) = self.address_delta_observer.as_ref() { + observer.observe_delta(&delta).await; + } } } @@ -404,6 +381,15 @@ impl State { } } +/// Internal helper used during `handle` aggregation for summing UTxO deltas. +#[derive(Default)] +struct AddressTxMap { + sent: ValueMap, + received: ValueMap, + spent_utxos: Vec, + created_utxos: Vec, +} + // -- Tests -- #[cfg(test)] mod tests { @@ -411,7 +397,7 @@ mod tests { use crate::InMemoryImmutableUTXOStore; use acropolis_common::{ Address, AssetName, BlockHash, ByronAddress, Datum, Era, NativeAsset, ReferenceScript, - Value, + TxUTxODeltas, Value, }; use config::Config; use tokio::sync::Mutex; @@ -482,7 +468,16 @@ mod tests { }; let block = create_block(BlockStatus::Immutable, 1, 1); - state.observe_output(&output, &block).await.unwrap(); + + let deltas = UTXODeltasMessage { + deltas: vec![TxUTxODeltas { + tx_identifier: Default::default(), + inputs: vec![], + outputs: vec![output.clone()], + }], + }; + + state.handle(&block, &deltas).await.unwrap(); assert_eq!(1, state.immutable_utxos.len().await.unwrap()); assert_eq!(1, state.count_valid_utxos().await); @@ -551,13 +546,7 @@ mod tests { assert_eq!(1, state.immutable_utxos.len().await.unwrap()); assert_eq!(1, state.count_valid_utxos().await); - let input = TxInput { - utxo_identifier: UTxOIdentifier::new( - output.utxo_identifier.block_number(), - output.utxo_identifier.tx_index(), - output.utxo_identifier.output_index(), - ), - }; + let input = output.utxo_identifier; let block2 = create_block(BlockStatus::Immutable, 2, 2); state.observe_input(&input, &block2).await.unwrap(); @@ -639,13 +628,7 @@ mod tests { assert_eq!(1, state.count_valid_utxos().await); // Spend it in block 11 - let input = TxInput { - utxo_identifier: UTxOIdentifier::new( - output.utxo_identifier.block_number(), - output.utxo_identifier.tx_index(), - output.utxo_identifier.output_index(), - ), - }; + let input = output.utxo_identifier; let block11 = create_block(BlockStatus::Volatile, 11, 11); state.observe_block(&block11).await.unwrap(); @@ -742,13 +725,7 @@ mod tests { assert_eq!(1, state.volatile_utxos.len()); assert_eq!(1, state.count_valid_utxos().await); - let input = TxInput { - utxo_identifier: UTxOIdentifier::new( - output.utxo_identifier.block_number(), - output.utxo_identifier.tx_index(), - output.utxo_identifier.output_index(), - ), - }; + let input = output.utxo_identifier; let block2 = create_block(BlockStatus::Volatile, 2, 2); state.observe_block(&block2).await.unwrap(); @@ -792,23 +769,37 @@ mod tests { &delta.address, Address::Byron(ByronAddress { payload }) if payload[0] == 99 )); - assert!(delta.value.lovelace == 42 || delta.value.lovelace == -42); + let lovelace_net = (delta.received.lovelace as i64) - (delta.sent.lovelace as i64); + assert!(lovelace_net == 42 || lovelace_net == -42); let mut balance = self.balance.lock().await; - *balance += delta.value.lovelace; + *balance += lovelace_net; let mut asset_balances = self.asset_balances.lock().await; - for (policy, assets) in &delta.value.assets { + + for (policy, assets) in &delta.received.assets { assert_eq!([1u8; 28], *policy); for asset in assets { assert!( - (asset.name == AssetName::new(b"TEST").unwrap() - && (asset.amount == 100 || asset.amount == -100)) + (asset.name == AssetName::new(b"TEST").unwrap() && asset.amount == 100) || (asset.name == AssetName::new(b"FOO").unwrap() - && (asset.amount == 200 || asset.amount == -200)) + && asset.amount == 200) ); let key = (*policy, asset.name); - *asset_balances.entry(key).or_insert(0) += asset.amount; + *asset_balances.entry(key).or_insert(0) += asset.amount as i64; + } + } + + for (policy, assets) in &delta.sent.assets { + assert_eq!([1u8; 28], *policy); + for asset in assets { + assert!( + (asset.name == AssetName::new(b"TEST").unwrap() && asset.amount == 100) + || (asset.name == AssetName::new(b"FOO").unwrap() + && asset.amount == 200) + ); + let key = (*policy, asset.name); + *asset_balances.entry(key).or_insert(0) -= asset.amount as i64; } } } @@ -846,78 +837,35 @@ mod tests { }; let block1 = create_block(BlockStatus::Immutable, 1, 1); - state.observe_output(&output, &block1).await.unwrap(); + let deltas1 = UTXODeltasMessage { + deltas: vec![TxUTxODeltas { + tx_identifier: Default::default(), + inputs: vec![], + outputs: vec![output.clone()], + }], + }; + + state.handle(&block1, &deltas1).await.unwrap(); assert_eq!(1, state.immutable_utxos.len().await.unwrap()); assert_eq!(1, state.count_valid_utxos().await); assert_eq!(42, *observer.balance.lock().await); - let input = TxInput { - utxo_identifier: UTxOIdentifier::new( - output.utxo_identifier.block_number(), - output.utxo_identifier.tx_index(), - output.utxo_identifier.output_index(), - ), - }; + let input = output.utxo_identifier; let block2 = create_block(BlockStatus::Immutable, 2, 2); - state.observe_input(&input, &block2).await.unwrap(); - assert_eq!(0, state.immutable_utxos.len().await.unwrap()); - assert_eq!(0, state.count_valid_utxos().await); - assert_eq!(0, *observer.balance.lock().await); - let ab = observer.asset_balances.lock().await; - assert_eq!( - *ab.get(&([1u8; 28], AssetName::new(b"TEST").unwrap())).unwrap(), - 0 - ); - assert_eq!( - *ab.get(&([1u8; 28], AssetName::new(b"FOO").unwrap())).unwrap(), - 0 - ); - } - - #[tokio::test] - async fn observe_rollback_notifies_balance_debit_on_future_created_utxos() { - let mut state = new_state(); - let observer = Arc::new(TestDeltaObserver::new()); - state.register_address_delta_observer(observer.clone()); - - let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), - address: create_address(99), - value: Value::new( - 42, - vec![( - [1u8; 28], - vec![ - NativeAsset { - name: AssetName::new(b"TEST").unwrap(), - amount: 100, - }, - NativeAsset { - name: AssetName::new(b"FOO").unwrap(), - amount: 200, - }, - ], - )], - ), - datum: None, - reference_script: None, + let deltas2 = UTXODeltasMessage { + deltas: vec![TxUTxODeltas { + tx_identifier: Default::default(), + inputs: vec![input], + outputs: vec![], + }], }; - let block10 = create_block(BlockStatus::Volatile, 10, 10); - state.observe_block(&block10).await.unwrap(); - state.observe_output(&output, &block10).await.unwrap(); - assert_eq!(1, state.volatile_utxos.len()); - assert_eq!(1, state.count_valid_utxos().await); - assert_eq!(42, *observer.balance.lock().await); - - let block9 = create_block(BlockStatus::RolledBack, 200, 9); - state.observe_block(&block9).await.unwrap(); + state.handle(&block2, &deltas2).await.unwrap(); - assert_eq!(0, state.volatile_utxos.len()); + assert_eq!(0, state.immutable_utxos.len().await.unwrap()); assert_eq!(0, state.count_valid_utxos().await); assert_eq!(0, *observer.balance.lock().await); - let ab = observer.asset_balances.lock().await; assert_eq!( *ab.get(&([1u8; 28], AssetName::new(b"TEST").unwrap())).unwrap(), @@ -928,77 +876,4 @@ mod tests { 0 ); } - - #[tokio::test] - async fn observe_rollback_notifies_balance_credit_on_future_spent_utxos() { - let mut state = new_state(); - let observer = Arc::new(TestDeltaObserver::new()); - state.register_address_delta_observer(observer.clone()); - - // Create the UTXO in block 10 - let output = TxOutput { - utxo_identifier: UTxOIdentifier::new(0, 0, 0), - address: create_address(99), - value: Value::new( - 42, - vec![( - [1u8; 28], - vec![ - NativeAsset { - name: AssetName::new(b"TEST").unwrap(), - amount: 100, - }, - NativeAsset { - name: AssetName::new(b"FOO").unwrap(), - amount: 200, - }, - ], - )], - ), - datum: None, - reference_script: None, - }; - - let block10 = create_block(BlockStatus::Volatile, 10, 10); - state.observe_block(&block10).await.unwrap(); - state.observe_output(&output, &block10).await.unwrap(); - assert_eq!(1, state.volatile_utxos.len()); - assert_eq!(1, state.count_valid_utxos().await); - assert_eq!(42, *observer.balance.lock().await); - - // Spend it in block 11 - let input = TxInput { - utxo_identifier: UTxOIdentifier::new( - output.utxo_identifier.block_number(), - output.utxo_identifier.tx_index(), - output.utxo_identifier.output_index(), - ), - }; - - let block11 = create_block(BlockStatus::Volatile, 11, 11); - state.observe_block(&block11).await.unwrap(); - state.observe_input(&input, &block11).await.unwrap(); - assert_eq!(1, state.volatile_utxos.len()); - assert_eq!(0, state.count_valid_utxos().await); - assert_eq!(0, *observer.balance.lock().await); - - // Roll back to 11 - let block11_2 = create_block(BlockStatus::RolledBack, 200, 11); - state.observe_block(&block11_2).await.unwrap(); - - // Should be reinstated - assert_eq!(1, state.volatile_utxos.len()); - assert_eq!(1, state.count_valid_utxos().await); - assert_eq!(42, *observer.balance.lock().await); - - let ab = observer.asset_balances.lock().await; - assert_eq!( - *ab.get(&([1u8; 28], AssetName::new(b"TEST").unwrap())).unwrap(), - 100 - ); - assert_eq!( - *ab.get(&([1u8; 28], AssetName::new(b"FOO").unwrap())).unwrap(), - 200 - ); - } } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 7de217e1..c2f95e46 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -65,13 +65,22 @@ store-stake-addresses = false store-spdd = false [module.historical-accounts-state] +# Enables /accounts/{stake_address}/rewards endpoint store-rewards-history = false +# Enables /accounts/{stake_address}/history endpoint store-active-stake-history = false +# Enables /accounts/{stake_address}/registrations endpoint store-registration-history = false +# Enables /accounts/{stake_address}/delegations endpoint store-delegation-history = false +# Enables /accounts/{stake_address}/mirs endpoint store-mir-history = false +# Enables /accounts/{stake_address}/withdrawals endpoint store-withdrawal-history = false +# Enables /accounts/{stake_address}/addresses endpoint store-addresses = false +# Enables /accounts/{stake_address}/addresses/total endpoint (Requires store-addresses to be enabled) +store-tx-count = false [module.drep-state] # Enables /governance/dreps/{drep_id} endpoint (Requires store-delegators to be enabled)