Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/queries/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ pub enum BlocksStateQuery {
GetUTxOHashes {
utxo_ids: Vec<UTxOIdentifier>,
},
GetTransactionHashesAndTimestamps {
tx_ids: Vec<TxIdentifier>,
},
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand All @@ -96,6 +99,7 @@ pub enum BlocksStateQueryResponse {
BlockHashes(BlockHashes),
TransactionHashes(TransactionHashes),
UTxOHashes(UTxOHashes),
TransactionHashesAndTimestamps(TransactionHashesAndTimeStamps),
Error(QueryError),
}

Expand Down Expand Up @@ -240,3 +244,9 @@ pub struct UTxOHashes {
pub block_hashes: Vec<BlockHash>,
pub tx_hashes: Vec<TxHash>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TransactionHashesAndTimeStamps {
pub tx_hashes: Vec<TxHash>,
pub timestamps: Vec<u64>,
}
18 changes: 12 additions & 6 deletions modules/address_state/src/address_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) =

// Configuration defaults
const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db");
const DEFAULT_CLEAR_ON_START: (&str, bool) = ("clear-on-start", true);
const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false);
const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false);
const DEFAULT_STORE_TRANSACTIONS: (&str, bool) = ("store-transactions", false);
Expand Down Expand Up @@ -178,6 +179,7 @@ impl AddressState {
// Get configuration flags and query topic
let storage_config = AddressStorageConfig {
db_path: get_string_flag(&config, DEFAULT_ADDRESS_DB_PATH),
clear_on_start: get_bool_flag(&config, DEFAULT_CLEAR_ON_START),
skip_until: None,
store_info: get_bool_flag(&config, DEFAULT_STORE_INFO),
store_totals: get_bool_flag(&config, DEFAULT_STORE_TOTALS),
Expand Down Expand Up @@ -209,9 +211,11 @@ impl AddressState {
match state.get_address_utxos(address).await {
Ok(Some(utxos)) => AddressStateQueryResponse::AddressUTxOs(utxos),
Ok(None) => match address.to_string() {
Ok(addr_str) => AddressStateQueryResponse::Error(
QueryError::not_found(format!("Address {}", addr_str)),
),
Ok(addr_str) => {
AddressStateQueryResponse::Error(QueryError::not_found(
format!("Address {} not found", addr_str),
))
}
Err(e) => {
AddressStateQueryResponse::Error(QueryError::internal_error(
format!("Could not convert address to string: {}", e),
Expand All @@ -227,9 +231,11 @@ impl AddressState {
match state.get_address_transactions(address).await {
Ok(Some(txs)) => AddressStateQueryResponse::AddressTransactions(txs),
Ok(None) => match address.to_string() {
Ok(addr_str) => AddressStateQueryResponse::Error(
QueryError::not_found(format!("Address {}", addr_str)),
),
Ok(addr_str) => {
AddressStateQueryResponse::Error(QueryError::not_found(
format!("Address {} not found", addr_str),
))
}
Err(e) => {
AddressStateQueryResponse::Error(QueryError::internal_error(
format!("Could not convert address to string: {}", e),
Expand Down
81 changes: 54 additions & 27 deletions modules/address_state/src/immutable_address_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ pub struct ImmutableAddressStore {
}

impl ImmutableAddressStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024).temporary(true);
pub fn new(path: impl AsRef<Path>, clear_on_start: bool) -> Result<Self> {
let path = path.as_ref();
if path.exists() && clear_on_start {
std::fs::remove_dir_all(path)?;
}
let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024);
let keyspace = Keyspace::open(cfg)?;

let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?;
Expand All @@ -52,15 +56,36 @@ impl ImmutableAddressStore {
/// for an entire epoch. Skips any partitions that have already stored the given epoch.
/// All writes are batched and committed atomically, preventing on-disk corruption in case of failure.
pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> {
let persist_utxos = config.store_info
&& !self.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch).await?;
let persist_txs = config.store_transactions
&& !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?;
let persist_totals = config.store_totals
&& !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?;
// Skip if all options disabled
if !(config.store_info || config.store_transactions || config.store_totals) {
debug!("no persistence needed for epoch {epoch} (all stores disabled)");
return Ok(());
}

// Determine which partitions need persistence
let (persist_utxos, persist_txs, persist_totals) = if config.clear_on_start {
(
config.store_info,
config.store_transactions,
config.store_totals,
)
} else {
let utxos = config.store_info
&& !self
.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch)
.await?;
let txs = config.store_transactions
&& !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?;
let totals = config.store_totals
&& !self
.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch)
.await?;
(utxos, txs, totals)
};

// Skip if all partitions have already been persisted for the epoch
if !(persist_utxos || persist_txs || persist_totals) {
debug!("no persistence needed for epoch {epoch} (already persisted or disabled)");
debug!("no persistence needed for epoch {epoch}");
return Ok(());
}

Expand Down Expand Up @@ -120,22 +145,14 @@ impl ImmutableAddressStore {
}

// Metadata markers
if persist_utxos {
batch.insert(
&self.utxos,
ADDRESS_UTXOS_EPOCH_COUNTER,
epoch.to_le_bytes(),
);
}
if persist_txs {
batch.insert(&self.txs, ADDRESS_TXS_EPOCH_COUNTER, epoch.to_le_bytes());
}
if persist_totals {
batch.insert(
&self.totals,
ADDRESS_TOTALS_EPOCH_COUNTER,
epoch.to_le_bytes(),
);
for (enabled, part, key) in [
(persist_utxos, &self.utxos, ADDRESS_UTXOS_EPOCH_COUNTER),
(persist_txs, &self.txs, ADDRESS_TXS_EPOCH_COUNTER),
(persist_totals, &self.totals, ADDRESS_TOTALS_EPOCH_COUNTER),
] {
if enabled {
batch.insert(part, key, epoch.to_le_bytes());
}
}

match batch.commit() {
Expand All @@ -158,13 +175,18 @@ impl ImmutableAddressStore {
pub async fn get_utxos(&self, address: &Address) -> Result<Option<Vec<UTxOIdentifier>>> {
let key = address.to_bytes_key()?;

let db_raw = self.utxos.get(&key)?;
let db_had_key = db_raw.is_some();

let mut live: Vec<UTxOIdentifier> =
self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();
db_raw.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default();

let pending = self.pending.lock().await;
let mut pending_touched = false;
for block_map in pending.iter() {
if let Some(entry) = block_map.get(address) {
if let Some(deltas) = &entry.utxos {
pending_touched = true;
for delta in deltas {
match delta {
UtxoDelta::Created(u) => live.push(*u),
Expand All @@ -175,8 +197,13 @@ impl ImmutableAddressStore {
}
}

// Only return None if the address never existed
if live.is_empty() {
Ok(None)
if db_had_key || pending_touched {
Ok(Some(vec![]))
} else {
Ok(None)
}
} else {
Ok(Some(live))
}
Expand Down
18 changes: 15 additions & 3 deletions modules/address_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
#[derive(Debug, Default, Clone)]
pub struct AddressStorageConfig {
pub db_path: String,
pub clear_on_start: bool,
pub skip_until: Option<u64>,

pub store_info: bool,
Expand Down Expand Up @@ -60,7 +61,7 @@ impl State {
PathBuf::from(&config.db_path)
};

let store = Arc::new(ImmutableAddressStore::new(&db_path)?);
let store = Arc::new(ImmutableAddressStore::new(&db_path, config.clear_on_start)?);

let mut config = config.clone();
config.skip_until = store.get_last_epoch_stored().await?;
Expand All @@ -81,14 +82,20 @@ impl State {
}

let store = self.immutable.clone();
let mut db_had_address = false;
let mut combined: HashSet<UTxOIdentifier> = match store.get_utxos(address).await? {
Some(db) => db.into_iter().collect(),
Some(db) => {
db_had_address = true;
db.into_iter().collect()
}
None => HashSet::new(),
};

let mut pending_touched = false;
for map in self.volatile.window.iter() {
if let Some(entry) = map.get(address) {
if let Some(deltas) = &entry.utxos {
pending_touched = true;
for delta in deltas {
match delta {
UtxoDelta::Created(u) => {
Expand All @@ -104,7 +111,11 @@ impl State {
}

if combined.is_empty() {
Ok(None)
if db_had_address || pending_touched {
Ok(Some(vec![]))
} else {
Ok(None)
}
} else {
Ok(Some(combined.into_iter().collect()))
}
Expand Down Expand Up @@ -242,6 +253,7 @@ mod tests {
let dir = tempdir().unwrap();
AddressStorageConfig {
db_path: dir.path().to_string_lossy().into_owned(),
clear_on_start: true,
skip_until: None,
store_info: true,
store_transactions: true,
Expand Down
Loading