Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve DAG crawling performance with better parallelisation #1316

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,15 +857,15 @@ impl Client {
/// let spend_address = SpendAddress::new(xorname);
/// // Here we get the spend address
/// let spend = client.get_spend_from_network(spend_address).await?;
/// // Example: We can use the spend address to get its unique public key:
/// // Example: We can use the spend to get its unique public key:
/// let unique_pubkey = spend.unique_pubkey();
/// # Ok(())
/// # }
/// ```
pub async fn get_spend_from_network(&self, address: SpendAddress) -> Result<SignedSpend> {
let key = NetworkAddress::from_spend_address(address).to_record_key();

trace!(
info!(
"Getting spend at {address:?} with record_key {:?}",
PrettyPrintRecordKey::from(&key)
);
Expand All @@ -887,7 +887,7 @@ impl Client {
"failed to get spend at {address:?}: {err:?}"
)),
})?;
debug!(
info!(
"For spend at {address:?} got record from the network, {:?}",
PrettyPrintRecordKey::from(&record.key)
);
Expand Down
66 changes: 45 additions & 21 deletions sn_client/src/audit/spend_dag_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ use crate::{Error, Result};
use futures::future::join_all;
use sn_transfers::{SignedSpend, SpendAddress, WalletError, WalletResult};
use std::collections::BTreeSet;
use tokio::task::JoinSet;

impl Client {
/// Builds a SpendDag from a given SpendAddress recursively following descendants all the way to UTxOs
/// Started from Genesis this gives the entire SpendDag of the Network at a certain point in time
/// Once the DAG collected, verifies all the transactions
pub async fn spend_dag_build_from(&self, spend_addr: SpendAddress) -> WalletResult<SpendDag> {
info!("Building spend DAG from {spend_addr:?}");
let mut dag = SpendDag::new();

// get first spend
let first_spend = match self.get_spend_from_network(spend_addr).await {
Ok(s) => s,
Err(Error::MissingSpendRecord(_)) => {
// the cashnote was not spent yet, so it's an UTXO
trace!("UTXO at {spend_addr:?}");
info!("UTXO at {spend_addr:?}");
return Ok(dag);
}
Err(e) => return Err(WalletError::FailedToGetSpend(e.to_string())),
Expand All @@ -41,35 +43,47 @@ impl Client {
while !txs_to_follow.is_empty() {
let mut next_gen_tx = BTreeSet::new();

// gather all descendants in parallel
let mut tasks = vec![];
let mut addrs = vec![];
for descendant_tx in txs_to_follow.iter() {
let descendant_tx_hash = descendant_tx.hash();
let descendant_keys = descendant_tx
.outputs
.iter()
.map(|output| output.unique_pubkey);
let addrs_to_follow = descendant_keys.map(|k| SpendAddress::from_unique_pubkey(&k));
debug!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");
info!("Gen {gen} - Following descendant Tx : {descendant_tx_hash:?}");

// get all descendant spends in parallel
let tasks: Vec<_> = addrs_to_follow
let tasks_for_this_descendant: Vec<_> = addrs_to_follow
.clone()
.map(|a| self.get_spend_from_network(a))
.collect();
let spends_res = join_all(tasks).await.into_iter().collect::<Vec<_>>();

// add spends to dag
for res in spends_res.iter().zip(addrs_to_follow) {
match res {
(Ok(spend), addr) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
(Err(Error::MissingSpendRecord(_)), addr) => {
trace!("Reached UTXO at {addr:?}");
}
(Err(err), addr) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
tasks.extend(tasks_for_this_descendant);
addrs.extend(addrs_to_follow);
}

// wait for tasks to complete
info!(
"Gen {gen} - Getting {} spends from {} txs",
tasks.len(),
txs_to_follow.len()
);
let spends_res = join_all(tasks).await.into_iter().collect::<Vec<_>>();
info!("Gen {gen} - Got those {} spends", spends_res.len());

// insert spends in the dag
for res in spends_res.iter().zip(addrs) {
match res {
(Ok(spend), addr) => {
dag.insert(addr, spend.clone());
next_gen_tx.insert(spend.spend.spent_tx.clone());
}
(Err(Error::MissingSpendRecord(_)), addr) => {
info!("Reached UTXO at {addr:?}");
}
(Err(err), addr) => {
error!("Could not verify transfer at {addr:?}: {err:?}");
}
}
}
Expand Down Expand Up @@ -104,7 +118,7 @@ impl Client {
/// ```text
/// ... --
/// \
/// ... ---- ... --
/// ... ---- ... --
/// \ \
/// Spend0 -> Spend1 -----> Spend2 ---> Spend5 ---> Spend2 ---> Genesis
/// \ /
Expand Down Expand Up @@ -217,11 +231,21 @@ impl Client {
/// Extends an existing SpendDag starting from the utxos in this DAG
/// Covers the entirety of currently existing Spends if the DAG was built from Genesis
pub async fn spend_dag_continue_from_utxos(&self, dag: &mut SpendDag) -> WalletResult<()> {
info!("Gathering spend DAG from utxos...");
let utxos = dag.get_utxos();
let mut tasks = JoinSet::new();
for utxo in utxos {
let sub_dag = self.spend_dag_build_from(utxo).await?;
info!("Launching task to gather utxo: {:?}", utxo);
let self_clone = self.clone();
tasks.spawn(async move { self_clone.spend_dag_build_from(utxo).await });
}
while let Some(res) = tasks.join_next().await {
let sub_dag = res.map_err(|e| {
WalletError::FailedToGetSpend(format!("DAG gathering task failed: {e}"))
})??;
dag.merge(sub_dag);
}
info!("Done gathering spend DAG from utxos");
Ok(())
}
}