Skip to content

Commit

Permalink
perf: improve DAG crawling performance with better parallelisation
Browse files Browse the repository at this point in the history
  • Loading branch information
grumbach committed Feb 20, 2024
1 parent 37c252a commit 9a864de
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
6 changes: 3 additions & 3 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,15 @@ impl Client {
/// let spend_address = SpendAddress::new(xorname);
/// // Here we get the spend address
/// let spend = client.get_spend_from_network(spend_address).await?;
/// // Example: We can use the spend address to get its unique public key:
/// // Example: We can use the spend to get its unique public key:
/// let unique_pubkey = spend.unique_pubkey();
/// # Ok(())
/// # }
/// ```
pub async fn get_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
let key = NetworkAddress::from_spend_address(address).to_record_key();

trace!(
info!(
"Getting spend at {address:?} with record_key {:?}",
PrettyPrintRecordKey::from(&key)
);
Expand All @@ -887,7 +887,7 @@ impl Client {
"failed to get spend at {address:?}: {err:?}"
)),
})?;
debug!(
info!(
"For spend at {address:?} got record from the network, {:?}",
PrettyPrintRecordKey::from(&record.key)
);
Expand Down
66 changes: 45 additions & 21 deletions sn_client/src/audit/spend_dag_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ use crate::{Error, Result};
use futures::future::join_all;
use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult};
use std::collections::BTreeSet;
use tokio::task::JoinSet;

impl Client {
/// Builds a SpendDag from a given SpendAddress recursively following descendants all the way to UTxOs
/// Started from Genesis this gives the entire SpendDag of the Network at a certain point in time
/// Once the DAG collected, verifies all the transactions
pub async fn spend_dag_build_from(&self, spend_addr: SpendAddress) -> WalletResult<SpendDag> {
info!("Building spend DAG from {spend_addr:?}");
let mut dag = SpendDag::new();

// get first spend
let first_spend = match self.get_spend_from_network(spend_addr).await {
Ok(s) => s,
Err(Error::MissingSpendRecord(_)) => {
// the cashnote was not spent yet, so it's an UTXO
trace!("UTXO at {spend_addr:?}");
info!("UTXO at {spend_addr:?}");
return Ok(dag);
}
Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())),
Expand All @@ -41,35 +43,47 @@ impl Client {
while !txs_to_follow.is_empty() {
let mut next_gen_tx = BTreeSet::new();

// gather all descendants in parallel
let mut tasks = vec![];
let mut addrs = vec![];
for descendant_tx in txs_to_follow.iter() {
let descendant_tx_hash = descendant_tx.hash();
let descendant_keys = descendant_tx
.outputs
.iter()
.map(|output| output.unique_pubkey);
let addrs_to_follow = descendant_keys.map(|k| SpendAddress::from_unique_pubkey(&k));
debug!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");
info!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");

// get all descendant spends in parallel
let tasks: Vec<_> = addrs_to_follow
let tasks_for_this_descendant: Vec<_> = addrs_to_follow
.clone()
.map(|a| self.get_spend_from_network(a))
.collect();
let spends_res = join_all(tasks).await.into_iter().collect::<Vec<_>>();

// add spends to dag
for res in spends_res.iter().zip(addrs_to_follow) {
match res {
(Ok(spend), addr) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
(Err(Error::MissingSpendRecord(_)), addr) => {
trace!("Reached UTXO at {addr:?}");
}
(Err(err), addr) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
tasks.extend(tasks_for_this_descendant);
addrs.extend(addrs_to_follow);
}

// wait for tasks to complete
info!(
"Gen {gen} - Getting {} spends from {} txs",
tasks.len(),
txs_to_follow.len()
);
let spends_res = join_all(tasks).await.into_iter().collect::<Vec<_>>();
info!("Gen {gen} - Got those {} spends", spends_res.len());

// insert spends in the dag
for res in spends_res.iter().zip(addrs) {
match res {
(Ok(spend), addr) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
(Err(Error::MissingSpendRecord(_)), addr) => {
info!("Reached UTXO at {addr:?}");
}
(Err(err), addr) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
}
}
Expand Down Expand Up @@ -104,7 +118,7 @@ impl Client {
/// ```text
/// ... --
/// \
/// ... ---- ... --
/// ... ---- ... --
/// \ \
/// Spend0 -> Spend1 -----> Spend2 ---> Spend5 ---> Spend2 ---> Genesis
/// \ /
Expand Down Expand Up @@ -217,11 +231,21 @@ impl Client {
/// Extends an existing SpendDag starting from the utxos in this DAG
/// Covers the entirety of currently existing Spends if the DAG was built from Genesis
pub async fn spend_dag_continue_from_utxos(&self, dag: &mut SpendDag) -> WalletResult<()> {
info!("Gathering spend DAG from utxos...");
let utxos = dag.get_utxos();
let mut tasks = JoinSet::new();
for utxo in utxos {
let sub_dag = self.spend_dag_build_from(utxo).await?;
info!("Launching task to gather utxo: {:?}", utxo);
let self_clone = self.clone();
tasks.spawn(async move { self_clone.spend_dag_build_from(utxo).await });
}
while let Some(res) = tasks.join_next().await {
let sub_dag = res.map_err(|e| {
WalletError::FailedToGetSpend(format!("DAG gathering task failed: {e}"))
})??;
dag.merge(sub_dag);
}
info!("Done gathering spend DAG from utxos");
Ok(())
}
}

0 comments on commit 9a864de

Please sign in to comment.