From 4b5579d41b488d8e24b30c1ccee1169b7f6f6c4c Mon Sep 17 00:00:00 2001 From: William Hankins Date: Thu, 13 Nov 2025 23:11:10 +0000 Subject: [PATCH 1/5] feat: address utxos and transactions endpoints Signed-off-by: William Hankins --- common/src/queries/blocks.rs | 10 + modules/address_state/src/address_state.rs | 16 +- .../src/immutable_address_store.rs | 6 +- modules/chain_store/src/chain_store.rs | 133 ++++++++++-- .../rest_blockfrost/src/handlers/accounts.rs | 12 +- .../rest_blockfrost/src/handlers/addresses.rs | 200 ++++++++++++++++-- modules/rest_blockfrost/src/types.rs | 10 +- 7 files changed, 347 insertions(+), 40 deletions(-) diff --git a/common/src/queries/blocks.rs b/common/src/queries/blocks.rs index 26788bc7..0bda2960 100644 --- a/common/src/queries/blocks.rs +++ b/common/src/queries/blocks.rs @@ -72,6 +72,9 @@ pub enum BlocksStateQuery { GetUTxOHashes { utxo_ids: Vec, }, + GetTransactionHashesAndTimestamps { + tx_ids: Vec, + }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -96,6 +99,7 @@ pub enum BlocksStateQueryResponse { BlockHashes(BlockHashes), TransactionHashes(TransactionHashes), UTxOHashes(UTxOHashes), + TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps), Error(QueryError), } @@ -240,3 +244,9 @@ pub struct UTxOHashes { pub block_hashes: Vec, pub tx_hashes: Vec, } + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionHashesAndTimeStamps { + pub tx_hashes: Vec, + pub timestamps: Vec, +} diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index ef07fed0..4ba7227b 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -209,9 +209,11 @@ impl AddressState { match state.get_address_utxos(address).await { Ok(Some(utxos)) => AddressStateQueryResponse::AddressUTxOs(utxos), Ok(None) => match address.to_string() { - Ok(addr_str) => AddressStateQueryResponse::Error( - QueryError::not_found(format!("Address {}", addr_str)), - ), + Ok(addr_str) => { + AddressStateQueryResponse::Error(QueryError::not_found( + format!("Address {} not found", addr_str), + )) + } Err(e) => { AddressStateQueryResponse::Error(QueryError::internal_error( format!("Could not convert address to string: {}", e), @@ -227,9 +229,11 @@ impl AddressState { match state.get_address_transactions(address).await { Ok(Some(txs)) => AddressStateQueryResponse::AddressTransactions(txs), Ok(None) => match address.to_string() { - Ok(addr_str) => AddressStateQueryResponse::Error( - QueryError::not_found(format!("Address {}", addr_str)), - ), + Ok(addr_str) => { + AddressStateQueryResponse::Error(QueryError::not_found( + format!("Address {} not found", addr_str), + )) + } Err(e) => { AddressStateQueryResponse::Error(QueryError::internal_error( format!("Could not convert address to string: {}", e), diff --git a/modules/address_state/src/immutable_address_store.rs b/modules/address_state/src/immutable_address_store.rs index b06084f4..dbc3b103 100644 --- a/modules/address_state/src/immutable_address_store.rs +++ b/modules/address_state/src/immutable_address_store.rs @@ -31,7 +31,11 @@ pub struct ImmutableAddressStore { impl ImmutableAddressStore { pub fn new(path: impl AsRef) -> Result { - let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true); + let path = path.as_ref(); + if path.exists() { + std::fs::remove_dir_all(path)?; + } + let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024); let keyspace = Keyspace::open(cfg)?; let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?; diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index f280fb4f..c4dc86aa 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -2,6 +2,7 @@ mod stores; use crate::stores::{fjall::FjallStore, Block, Store}; use acropolis_codec::{block::map_to_block_issuer, map_parameters}; +use acropolis_common::queries::blocks::TransactionHashesAndTimeStamps; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ crypto::keyhash_224, @@ -330,20 +331,59 @@ impl ChainStore { let mut block_hashes = Vec::with_capacity(utxo_ids.len()); for utxo in utxo_ids { - if let Ok(Some(block)) = store.get_block_by_number(utxo.block_number().into()) { - if let Ok(hash) = Self::get_block_hash(&block) { - if let Ok(tx_hashes_in_block) = - Self::to_block_transaction_hashes(&block) - { - if let Some(tx_hash) = - tx_hashes_in_block.get(utxo.tx_index() as usize) - { - tx_hashes.push(*tx_hash); - block_hashes.push(hash); - } - } + let block = match store.get_block_by_number(utxo.block_number().into()) { + Ok(Some(b)) => b, + Ok(None) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!("Block {} not found", utxo.block_number()), + ))) } - } + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!("Failed to fetch block {}: {e}", utxo.block_number()), + ))) + } + }; + + let block_hash = match Self::get_block_hash(&block) { + Ok(h) => h, + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "Failed to extract block hash for block {}: {e}", + utxo.block_number() + ), + ))) + } + }; + + let tx_hashes_in_block = match Self::to_block_transaction_hashes(&block) { + Ok(h) => h, + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "Failed to extract tx list for block {}: {e}", + utxo.block_number() + ), + ))) + } + }; + + let tx_hash = match tx_hashes_in_block.get(utxo.tx_index() as usize) { + Some(h) => h, + None => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "tx_index {} out of bounds for block {}", + utxo.tx_index(), + utxo.block_number() + ), + ))) + } + }; + + tx_hashes.push(*tx_hash); + block_hashes.push(block_hash); } Ok(BlocksStateQueryResponse::UTxOHashes(UTxOHashes { @@ -351,6 +391,73 @@ impl ChainStore { tx_hashes, })) } + BlocksStateQuery::GetTransactionHashesAndTimestamps { tx_ids } => { + let mut tx_hashes = Vec::with_capacity(tx_ids.len()); + let mut timestamps = Vec::with_capacity(tx_ids.len()); + + for tx in tx_ids { + let block = match store.get_block_by_number(tx.block_number().into()) { + Ok(Some(b)) => b, + Ok(None) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!("Block {} not found", tx.block_number()), + ))) + } + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!("Failed to fetch block {}: {e}", tx.block_number()), + ))) + } + }; + + let hashes_in_block = match Self::to_block_transaction_hashes(&block) { + Ok(h) => h, + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "Failed to extract tx hashes for block {}: {e}", + tx.block_number() + ), + ))) + } + }; + + let tx_hash = match hashes_in_block.get(tx.tx_index() as usize) { + Some(h) => h, + None => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "tx_index {} out of bounds for block {}", + tx.tx_index(), + tx.block_number() + ), + ))) + } + }; + + let block_info = match Self::to_block_info(block, store, state, false) { + Ok(info) => info, + Err(e) => { + return Ok(BlocksStateQueryResponse::Error(QueryError::not_found( + format!( + "Failed to build block info for block {}: {e}", + tx.block_number() + ), + ))) + } + }; + + tx_hashes.push(*tx_hash); + timestamps.push(block_info.timestamp); + } + + Ok(BlocksStateQueryResponse::TransactionHashesAndTimestamps( + TransactionHashesAndTimeStamps { + tx_hashes, + timestamps, + }, + )) + } } } diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 206187d9..69086c55 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use crate::handlers_config::HandlersConfig; use crate::types::{ - AccountAddressREST, AccountRewardREST, AccountTotalsREST, AccountUTxOREST, - AccountWithdrawalREST, AmountList, DelegationUpdateREST, RegistrationUpdateREST, + AccountAddressREST, AccountRewardREST, AccountTotalsREST, AccountWithdrawalREST, AmountList, + DelegationUpdateREST, RegistrationUpdateREST, UTxOREST, }; use acropolis_common::messages::{Message, RESTResponse, StateQuery, StateQueryResponse}; use acropolis_common::queries::accounts::{AccountsStateQuery, AccountsStateQueryResponse}; @@ -784,13 +784,13 @@ pub async fn handle_account_utxos_blockfrost( msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Blocks( - BlocksStateQueryResponse::UTxOHashes(utxos), - )) => Ok(utxos), + BlocksStateQueryResponse::UTxOHashes(hashes), + )) => Ok(hashes), Message::StateQueryResponse(StateQueryResponse::Blocks( BlocksStateQueryResponse::Error(e), )) => Err(e), _ => Err(QueryError::internal_error( - "Unexpected message type while retrieving account UTxOs", + "Unexpected message type while retrieving UTxO hashes", )), }, ) @@ -846,7 +846,7 @@ pub async fn handle_account_utxos_blockfrost( None => None, }; - rest_response.push(AccountUTxOREST { + rest_response.push(UTxOREST { address: entry.address.to_string()?, tx_hash, output_index, diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 5848380b..73901c6b 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use crate::types::AddressTotalsREST; +use crate::types::{AddressTotalsREST, TransactionInfoREST, UTxOREST}; use crate::{handlers_config::HandlersConfig, types::AddressInfoREST}; +use acropolis_common::queries::blocks::{BlocksStateQuery, BlocksStateQueryResponse}; use acropolis_common::queries::errors::QueryError; use acropolis_common::rest_error::RESTError; use acropolis_common::{ @@ -13,6 +14,8 @@ use acropolis_common::{ }, Address, Value, }; +use acropolis_common::{Datum, ReferenceScript}; +use blake2::{Blake2b512, Digest}; use caryatid_sdk::Context; /// Handle `/addresses/{address}` Blockfrost-compatible endpoint @@ -30,7 +33,7 @@ pub async fn handle_address_single_blockfrost( let address_type = address.kind().to_string(); let is_script = address.is_script(); - let address_query_msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( AddressStateQuery::GetAddressUTxOs { address: address.clone(), }, @@ -39,7 +42,7 @@ pub async fn handle_address_single_blockfrost( let utxo_identifiers = query_state( &context, &handlers_config.addresses_query_topic, - address_query_msg, + msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::Addresses( AddressStateQueryResponse::AddressUTxOs(utxo_identifiers), @@ -78,14 +81,14 @@ pub async fn handle_address_single_blockfrost( } }; - let utxos_query_msg = Arc::new(Message::StateQuery(StateQuery::UTxOs( + let msg = Arc::new(Message::StateQuery(StateQuery::UTxOs( UTxOStateQuery::GetUTxOsSum { utxo_identifiers }, ))); let address_balance = query_state( &context, &handlers_config.utxos_query_topic, - utxos_query_msg, + msg, |message| match message { Message::StateQueryResponse(StateQueryResponse::UTxOs( UTxOStateQueryResponse::UTxOsSum(balance), @@ -166,11 +169,123 @@ pub async fn handle_address_totals_blockfrost( /// Handle `/addresses/{address}/utxos` Blockfrost-compatible endpoint pub async fn handle_address_utxos_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Address UTxOs endpoint")) + let address = parse_address(¶ms)?; + let address_str = address.to_string()?; + + // Get utxos from address state + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressUTxOs { address }, + ))); + let utxo_identifiers = query_state( + &context, + &handlers_config.addresses_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressUTxOs(utxos), + )) => Ok(utxos), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving address UTxOs", + )), + }, + ) + .await?; + + // Get TxHashes and BlockHashes from UTxOIdentifiers + let msg = Arc::new(Message::StateQuery(StateQuery::Blocks( + BlocksStateQuery::GetUTxOHashes { + utxo_ids: utxo_identifiers.clone(), + }, + ))); + let hashes = query_state( + &context, + &handlers_config.blocks_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::UTxOHashes(hashes), + )) => Ok(hashes), + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving UTxO hashes", + )), + }, + ) + .await?; + + // Get UTxO balances from utxo state + let msg = Arc::new(Message::StateQuery(StateQuery::UTxOs( + UTxOStateQuery::GetUTxOs { + utxo_identifiers: utxo_identifiers.clone(), + }, + ))); + let entries = query_state( + &context, + &handlers_config.utxos_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::UTxOs(utxos), + )) => Ok(utxos), + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving UTxO entries", + )), + }, + ) + .await?; + + let mut rest_response = Vec::with_capacity(entries.len()); + for (i, entry) in entries.into_iter().enumerate() { + let tx_hash = hex::encode(hashes.tx_hashes[i]); + let block_hash = hex::encode(hashes.block_hashes[i]); + let output_index = utxo_identifiers[i].output_index(); + let (data_hash, inline_datum) = match &entry.datum { + Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), + Some(Datum::Inline(bytes)) => (None, Some(hex::encode(bytes))), + None => (None, None), + }; + let reference_script_hash = match &entry.reference_script { + Some(script) => { + let bytes = match script { + ReferenceScript::Native(b) + | ReferenceScript::PlutusV1(b) + | ReferenceScript::PlutusV2(b) + | ReferenceScript::PlutusV3(b) => b, + }; + let mut hasher = Blake2b512::new(); + hasher.update(bytes); + let result = hasher.finalize(); + Some(hex::encode(&result[..32])) + } + None => None, + }; + + rest_response.push(UTxOREST { + address: address_str.clone(), + tx_hash, + output_index, + amount: entry.value.into(), + block: block_hash, + data_hash, + inline_datum, + reference_script_hash, + }) + } + + let json = serde_json::to_string_pretty(&rest_response)?; + Ok(RESTResponse::with_json(200, &json)) } /// Handle `/addresses/{address}/utxos/{asset}` Blockfrost-compatible endpoint @@ -184,11 +299,70 @@ pub async fn handle_address_asset_utxos_blockfrost( /// Handle `/addresses/{address}/transactions` Blockfrost-compatible endpoint pub async fn handle_address_transactions_blockfrost( - _context: Arc>, - _params: Vec, - _handlers_config: Arc, + context: Arc>, + params: Vec, + handlers_config: Arc, ) -> Result { - Err(RESTError::not_implemented("Address transactions endpoint")) + let address = parse_address(¶ms)?; + + // Get tx identifiers from address state + let msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressTransactions { address }, + ))); + let tx_identifiers = query_state( + &context, + &handlers_config.addresses_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressTransactions(txs), + )) => Ok(txs), + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving address transactions", + )), + }, + ) + .await?; + + // Get tx hashes and timestamps from chain store + let msg = Arc::new(Message::StateQuery(StateQuery::Blocks( + BlocksStateQuery::GetTransactionHashesAndTimestamps { + tx_ids: tx_identifiers.clone(), + }, + ))); + let tx_info = query_state( + &context, + &handlers_config.blocks_query_topic, + msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::TransactionHashesAndTimestamps(info), + )) => Ok(info), + Message::StateQueryResponse(StateQueryResponse::Blocks( + BlocksStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error( + "Unexpected message type while retrieving transaction hashes and timestamps", + )), + }, + ) + .await?; + + let mut rest_response = Vec::with_capacity(tx_identifiers.len()); + for (i, tx_id) in tx_identifiers.iter().enumerate() { + rest_response.push(TransactionInfoREST { + tx_hash: hex::encode(tx_info.tx_hashes[i]), + tx_index: tx_id.tx_index(), + block_height: tx_id.block_number(), + block_time: tx_info.timestamps[i], + }); + } + + let json = serde_json::to_string_pretty(&rest_response)?; + Ok(RESTResponse::with_json(200, &json)) } fn parse_address(params: &[String]) -> Result { diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index a8530a8e..3e809213 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -927,7 +927,7 @@ pub struct AccountAddressREST { } #[derive(serde::Serialize)] -pub struct AccountUTxOREST { +pub struct UTxOREST { pub address: String, pub tx_hash: String, pub output_index: u16, @@ -953,3 +953,11 @@ pub struct AddressTotalsREST { pub sent_sum: AmountList, pub tx_count: u64, } + +#[derive(Serialize)] +pub struct TransactionInfoREST { + pub tx_hash: String, + pub tx_index: u16, + pub block_height: u32, + pub block_time: u64, +} From 37ea0a9fe2ceb69ef0309aced29561add7218109 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 14 Nov 2025 17:26:00 +0000 Subject: [PATCH 2/5] fix: add tx_index field to UTxORest Signed-off-by: William Hankins --- modules/rest_blockfrost/src/handlers/accounts.rs | 2 ++ modules/rest_blockfrost/src/handlers/addresses.rs | 2 ++ modules/rest_blockfrost/src/types.rs | 1 + 3 files changed, 5 insertions(+) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index 69086c55..d510585c 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -824,6 +824,7 @@ pub async fn handle_account_utxos_blockfrost( for (i, entry) in entries.into_iter().enumerate() { let tx_hash = hashes.tx_hashes.get(i).map(hex::encode).unwrap_or_default(); let block_hash = hashes.block_hashes.get(i).map(hex::encode).unwrap_or_default(); + let tx_index = utxo_identifiers[i].tx_index(); let output_index = utxo_identifiers.get(i).map(|id| id.output_index()).unwrap_or(0); let (data_hash, inline_datum) = match &entry.datum { Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), @@ -849,6 +850,7 @@ pub async fn handle_account_utxos_blockfrost( rest_response.push(UTxOREST { address: entry.address.to_string()?, tx_hash, + tx_index, output_index, amount: entry.value.into(), block: block_hash, diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 73901c6b..2bd4f718 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -250,6 +250,7 @@ pub async fn handle_address_utxos_blockfrost( for (i, entry) in entries.into_iter().enumerate() { let tx_hash = hex::encode(hashes.tx_hashes[i]); let block_hash = hex::encode(hashes.block_hashes[i]); + let tx_index = utxo_identifiers[i].tx_index(); let output_index = utxo_identifiers[i].output_index(); let (data_hash, inline_datum) = match &entry.datum { Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), @@ -275,6 +276,7 @@ pub async fn handle_address_utxos_blockfrost( rest_response.push(UTxOREST { address: address_str.clone(), tx_hash, + tx_index, output_index, amount: entry.value.into(), block: block_hash, diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 3e809213..cdb41214 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -930,6 +930,7 @@ pub struct AccountAddressREST { pub struct UTxOREST { pub address: String, pub tx_hash: String, + pub tx_index: u16, pub output_index: u16, pub amount: AmountList, pub block: String, From 9de57fe8592074231d471429add39512b2b3d798 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 14 Nov 2025 21:39:10 +0000 Subject: [PATCH 3/5] fix: return Some([]) for addresses with past UTxO activity to match Blockfrost Signed-off-by: William Hankins --- .../address_state/src/immutable_address_store.rs | 14 ++++++++++++-- modules/address_state/src/state.rs | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/modules/address_state/src/immutable_address_store.rs b/modules/address_state/src/immutable_address_store.rs index dbc3b103..65c6094f 100644 --- a/modules/address_state/src/immutable_address_store.rs +++ b/modules/address_state/src/immutable_address_store.rs @@ -162,13 +162,18 @@ impl ImmutableAddressStore { pub async fn get_utxos(&self, address: &Address) -> Result>> { let key = address.to_bytes_key()?; + let db_raw = self.utxos.get(&key)?; + let db_had_key = db_raw.is_some(); + let mut live: Vec = - self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); + db_raw.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); let pending = self.pending.lock().await; + let mut pending_touched = false; for block_map in pending.iter() { if let Some(entry) = block_map.get(address) { if let Some(deltas) = &entry.utxos { + pending_touched = true; for delta in deltas { match delta { UtxoDelta::Created(u) => live.push(*u), @@ -179,8 +184,13 @@ impl ImmutableAddressStore { } } + // Only return None if the address never existed if live.is_empty() { - Ok(None) + if db_had_key || pending_touched { + Ok(Some(vec![])) + } else { + Ok(None) + } } else { Ok(Some(live)) } diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 897153fe..90c62995 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -81,14 +81,20 @@ impl State { } let store = self.immutable.clone(); + let mut db_had_address = false; let mut combined: HashSet = match store.get_utxos(address).await? { - Some(db) => db.into_iter().collect(), + Some(db) => { + db_had_address = true; + db.into_iter().collect() + } None => HashSet::new(), }; + let mut pending_touched = false; for map in self.volatile.window.iter() { if let Some(entry) = map.get(address) { if let Some(deltas) = &entry.utxos { + pending_touched = true; for delta in deltas { match delta { UtxoDelta::Created(u) => { @@ -104,7 +110,11 @@ impl State { } if combined.is_empty() { - Ok(None) + if db_had_address || pending_touched { + Ok(Some(vec![])) + } else { + Ok(None) + } } else { Ok(Some(combined.into_iter().collect())) } From 74502fcf5707d31b1e1a4896de16924e2c00d876 Mon Sep 17 00:00:00 2001 From: William Hankins Date: Fri, 14 Nov 2025 23:50:46 +0000 Subject: [PATCH 4/5] fix: add config option for clearing address db on start Signed-off-by: William Hankins --- modules/address_state/src/address_state.rs | 2 + .../src/immutable_address_store.rs | 63 +++++++++++-------- modules/address_state/src/state.rs | 4 +- processes/omnibus/omnibus.toml | 2 + 4 files changed, 45 insertions(+), 26 deletions(-) diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs index 4ba7227b..8445756b 100644 --- a/modules/address_state/src/address_state.rs +++ b/modules/address_state/src/address_state.rs @@ -33,6 +33,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = // Configuration defaults const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true); const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false); const DEFAULT_STORE_TRANSACTIONS: (&str, bool) = ("store-transactions", false); @@ -178,6 +179,7 @@ impl AddressState { // Get configuration flags and query topic let storage_config = AddressStorageConfig { db_path: get_string_flag(&config, DEFAULT_ADDRESS_DB_PATH), + clear_on_start: get_bool_flag(&config, DEFAULT_CLEAR_ON_START), skip_until: None, store_info: get_bool_flag(&config, DEFAULT_STORE_INFO), store_totals: get_bool_flag(&config, DEFAULT_STORE_TOTALS), diff --git a/modules/address_state/src/immutable_address_store.rs b/modules/address_state/src/immutable_address_store.rs index 65c6094f..079518c7 100644 --- a/modules/address_state/src/immutable_address_store.rs +++ b/modules/address_state/src/immutable_address_store.rs @@ -30,9 +30,9 @@ pub struct ImmutableAddressStore { } impl ImmutableAddressStore { - pub fn new(path: impl AsRef) -> Result { + pub fn new(path: impl AsRef, clear_on_start: bool) -> Result { let path = path.as_ref(); - if path.exists() { + if path.exists() && clear_on_start { std::fs::remove_dir_all(path)?; } let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024); @@ -56,15 +56,36 @@ impl ImmutableAddressStore { /// for an entire epoch. Skips any partitions that have already stored the given epoch. /// All writes are batched and committed atomically, preventing on-disk corruption in case of failure. pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> { - let persist_utxos = config.store_info - && !self.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch).await?; - let persist_txs = config.store_transactions - && !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?; - let persist_totals = config.store_totals - && !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?; + // Skip if all options disabled + if !(config.store_info || config.store_transactions || config.store_totals) { + debug!("no persistence needed for epoch {epoch} (all stores disabled)"); + return Ok(()); + } + + // Determine which partitions need persistence + let (persist_utxos, persist_txs, persist_totals) = if config.clear_on_start { + ( + config.store_info, + config.store_transactions, + config.store_totals, + ) + } else { + let utxos = config.store_info + && !self + .epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch) + .await?; + let txs = config.store_transactions + && !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?; + let totals = config.store_totals + && !self + .epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch) + .await?; + (utxos, txs, totals) + }; + // Skip if all partitions have already been persisted for the epoch if !(persist_utxos || persist_txs || persist_totals) { - debug!("no persistence needed for epoch {epoch} (already persisted or disabled)"); + debug!("no persistence needed for epoch {epoch}"); return Ok(()); } @@ -124,22 +145,14 @@ impl ImmutableAddressStore { } // Metadata markers - if persist_utxos { - batch.insert( - &self.utxos, - ADDRESS_UTXOS_EPOCH_COUNTER, - epoch.to_le_bytes(), - ); - } - if persist_txs { - batch.insert(&self.txs, ADDRESS_TXS_EPOCH_COUNTER, epoch.to_le_bytes()); - } - if persist_totals { - batch.insert( - &self.totals, - ADDRESS_TOTALS_EPOCH_COUNTER, - epoch.to_le_bytes(), - ); + for (enabled, part, key) in [ + (persist_utxos, &self.utxos, ADDRESS_UTXOS_EPOCH_COUNTER), + (persist_txs, &self.txs, ADDRESS_TXS_EPOCH_COUNTER), + (persist_totals, &self.totals, ADDRESS_TOTALS_EPOCH_COUNTER), + ] { + if enabled { + batch.insert(part, key, epoch.to_le_bytes()); + } } match batch.commit() { diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs index 90c62995..9c4f930b 100644 --- a/modules/address_state/src/state.rs +++ b/modules/address_state/src/state.rs @@ -17,6 +17,7 @@ use crate::{ #[derive(Debug, Default, Clone)] pub struct AddressStorageConfig { pub db_path: String, + pub clear_on_start: bool, pub skip_until: Option, pub store_info: bool, @@ -60,7 +61,7 @@ impl State { PathBuf::from(&config.db_path) }; - let store = Arc::new(ImmutableAddressStore::new(&db_path)?); + let store = Arc::new(ImmutableAddressStore::new(&db_path, config.clear_on_start)?); let mut config = config.clone(); config.skip_until = store.get_last_epoch_stored().await?; @@ -252,6 +253,7 @@ mod tests { let dir = tempdir().unwrap(); AddressStorageConfig { db_path: dir.path().to_string_lossy().into_owned(), + clear_on_start: true, skip_until: None, store_info: true, store_transactions: true, diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index c2f95e46..207c8b84 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -138,6 +138,8 @@ index-by-policy = false clear-on-start = true [module.address-state] +# Clear state on start up (default true) +clear-on-start = true # Enables /addresses/{address}, /addresses/{address}/extended, # /addresses/{address}/utxos/{asset}, and /addresses/{address}/utxos endpoints store-info = false From 56e8801038314e938e1683f977ea8ad266957abb Mon Sep 17 00:00:00 2001 From: William Hankins Date: Tue, 18 Nov 2025 23:05:33 +0000 Subject: [PATCH 5/5] refactor: use UTxOREST::new() constructor instead of inline struct building Signed-off-by: William Hankins --- .../rest_blockfrost/src/handlers/accounts.rs | 46 ++++-------------- .../rest_blockfrost/src/handlers/addresses.rs | 45 +++-------------- modules/rest_blockfrost/src/types.rs | 48 ++++++++++++++++++- 3 files changed, 61 insertions(+), 78 deletions(-) diff --git a/modules/rest_blockfrost/src/handlers/accounts.rs b/modules/rest_blockfrost/src/handlers/accounts.rs index d510585c..418b3b87 100644 --- a/modules/rest_blockfrost/src/handlers/accounts.rs +++ b/modules/rest_blockfrost/src/handlers/accounts.rs @@ -17,8 +17,7 @@ use acropolis_common::queries::utils::query_state; use acropolis_common::queries::utxos::{UTxOStateQuery, UTxOStateQueryResponse}; use acropolis_common::rest_error::RESTError; use acropolis_common::serialization::{Bech32Conversion, Bech32WithHrp}; -use acropolis_common::{DRepChoice, Datum, ReferenceScript, StakeAddress}; -use blake2::{Blake2b512, Digest}; +use acropolis_common::{DRepChoice, StakeAddress}; use caryatid_sdk::Context; #[derive(serde::Serialize)] @@ -822,42 +821,13 @@ pub async fn handle_account_utxos_blockfrost( let mut rest_response = Vec::with_capacity(entries.len()); for (i, entry) in entries.into_iter().enumerate() { - let tx_hash = hashes.tx_hashes.get(i).map(hex::encode).unwrap_or_default(); - let block_hash = hashes.block_hashes.get(i).map(hex::encode).unwrap_or_default(); - let tx_index = utxo_identifiers[i].tx_index(); - let output_index = utxo_identifiers.get(i).map(|id| id.output_index()).unwrap_or(0); - let (data_hash, inline_datum) = match &entry.datum { - Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), - Some(Datum::Inline(bytes)) => (None, Some(hex::encode(bytes))), - None => (None, None), - }; - let reference_script_hash = match &entry.reference_script { - Some(script) => { - let bytes = match script { - ReferenceScript::Native(b) - | ReferenceScript::PlutusV1(b) - | ReferenceScript::PlutusV2(b) - | ReferenceScript::PlutusV3(b) => b, - }; - let mut hasher = Blake2b512::new(); - hasher.update(bytes); - let result = hasher.finalize(); - Some(hex::encode(&result[..32])) - } - None => None, - }; - - rest_response.push(UTxOREST { - address: entry.address.to_string()?, - tx_hash, - tx_index, - output_index, - amount: entry.value.into(), - block: block_hash, - data_hash, - inline_datum, - reference_script_hash, - }) + rest_response.push(UTxOREST::new( + entry.address.to_string()?, + &utxo_identifiers[i], + &entry, + hashes.tx_hashes[i].as_ref(), + hashes.block_hashes[i].as_ref(), + )); } let json = serde_json::to_string_pretty(&rest_response)?; diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs index 2bd4f718..3d800c64 100644 --- a/modules/rest_blockfrost/src/handlers/addresses.rs +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -14,8 +14,6 @@ use acropolis_common::{ }, Address, Value, }; -use acropolis_common::{Datum, ReferenceScript}; -use blake2::{Blake2b512, Digest}; use caryatid_sdk::Context; /// Handle `/addresses/{address}` Blockfrost-compatible endpoint @@ -248,42 +246,13 @@ pub async fn handle_address_utxos_blockfrost( let mut rest_response = Vec::with_capacity(entries.len()); for (i, entry) in entries.into_iter().enumerate() { - let tx_hash = hex::encode(hashes.tx_hashes[i]); - let block_hash = hex::encode(hashes.block_hashes[i]); - let tx_index = utxo_identifiers[i].tx_index(); - let output_index = utxo_identifiers[i].output_index(); - let (data_hash, inline_datum) = match &entry.datum { - Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), - Some(Datum::Inline(bytes)) => (None, Some(hex::encode(bytes))), - None => (None, None), - }; - let reference_script_hash = match &entry.reference_script { - Some(script) => { - let bytes = match script { - ReferenceScript::Native(b) - | ReferenceScript::PlutusV1(b) - | ReferenceScript::PlutusV2(b) - | ReferenceScript::PlutusV3(b) => b, - }; - let mut hasher = Blake2b512::new(); - hasher.update(bytes); - let result = hasher.finalize(); - Some(hex::encode(&result[..32])) - } - None => None, - }; - - rest_response.push(UTxOREST { - address: address_str.clone(), - tx_hash, - tx_index, - output_index, - amount: entry.value.into(), - block: block_hash, - data_hash, - inline_datum, - reference_script_hash, - }) + rest_response.push(UTxOREST::new( + address_str.clone(), + &utxo_identifiers[i], + &entry, + hashes.tx_hashes[i].as_ref(), + hashes.block_hashes[i].as_ref(), + )) } let json = serde_json::to_string_pretty(&rest_response)?; diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index cdb41214..405a798c 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -5,10 +5,12 @@ use acropolis_common::{ queries::{accounts::AccountReward, blocks::BlockInfo, governance::DRepActionUpdate}, rest_helper::ToCheckedF64, serialization::{Bech32WithHrp, DisplayFromBech32, PoolPrefix}, - AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, KeyHash, PolicyAsset, - PoolEpochState, PoolId, PoolUpdateAction, Relay, TxHash, ValueMap, Vote, VrfKeyHash, + AssetAddressEntry, AssetMetadataStandard, AssetMintRecord, Datum, KeyHash, PolicyAsset, + PoolEpochState, PoolId, PoolUpdateAction, ReferenceScript, Relay, TxHash, UTXOValue, + UTxOIdentifier, ValueMap, Vote, VrfKeyHash, }; use anyhow::Result; +use blake2::{Blake2b512, Digest}; use num_traits::ToPrimitive; use rust_decimal::Decimal; use serde::Serialize; @@ -939,6 +941,48 @@ pub struct UTxOREST { pub reference_script_hash: Option, } +impl UTxOREST { + pub fn new( + address: String, + utxo_id: &UTxOIdentifier, + entry: &UTXOValue, + tx_hash: &[u8], + block_hash: &[u8], + ) -> Self { + let (data_hash, inline_datum) = match &entry.datum { + Some(Datum::Hash(h)) => (Some(hex::encode(h)), None), + Some(Datum::Inline(bytes)) => (None, Some(hex::encode(bytes))), + None => (None, None), + }; + + let reference_script_hash = entry.reference_script.as_ref().map(|script| { + let bytes = match script { + ReferenceScript::Native(b) + | ReferenceScript::PlutusV1(b) + | ReferenceScript::PlutusV2(b) + | ReferenceScript::PlutusV3(b) => b, + }; + + let mut hasher = Blake2b512::new(); + hasher.update(bytes); + let result = hasher.finalize(); + hex::encode(&result[..32]) + }); + + Self { + address, + tx_hash: hex::encode(tx_hash), + tx_index: utxo_id.tx_index(), + output_index: utxo_id.output_index(), + amount: entry.value.clone().into(), + block: hex::encode(block_hash), + data_hash, + inline_datum, + reference_script_hash, + } + } +} + #[derive(serde::Serialize)] pub struct AccountTotalsREST { pub stake_address: String,