Skip to content

Commit

Permalink
fix(wallet): fix UTXO scanning (#3094)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler committed Jul 14, 2021
2 parents 7b1c29d + ba62545 commit 81422f1
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 51 deletions.
20 changes: 17 additions & 3 deletions base_layer/wallet/src/output_manager_service/service.rs
Expand Up @@ -1241,12 +1241,26 @@ where TBackend: OutputManagerBackend + 'static
let db_output =
DbUnblindedOutput::from_unblinded_output(rewound_output.clone(), &self.resources.factories)?;

rewound_outputs.push(rewound_output);
self.resources.db.add_unspent_output(db_output).await?;
let output_hex = output.commitment.to_hex();
match self.resources.db.add_unspent_output(db_output).await {
Ok(_) => {
rewound_outputs.push(rewound_output);
},
Err(OutputManagerStorageError::DuplicateOutput) => {
warn!(
target: LOG_TARGET,
"Attempt to add scanned output {} that already exists. Ignoring the output.",
output_hex
);
},
Err(err) => {
return Err(err.into());
},
}
trace!(
target: LOG_TARGET,
"One-sided payment Output {} with value {} recovered",
output.commitment.to_hex(),
output_hex,
rewound_result.committed_value,
);
}
Expand Down
103 changes: 57 additions & 46 deletions base_layer/wallet/src/utxo_scanner_service/utxo_scanning.rs
Expand Up @@ -35,7 +35,7 @@ use crate::{
WalletSqlite,
};
use chrono::Utc;
use futures::{pin_mut, FutureExt, StreamExt};
use futures::{pin_mut, StreamExt};
use log::*;
use serde::{Deserialize, Serialize};
use std::{
Expand All @@ -49,7 +49,7 @@ use std::{
use tari_comms::{
connectivity::ConnectivityRequester,
peer_manager::NodeId,
protocol::rpc::RpcStatus,
protocol::rpc::{RpcError, RpcStatus},
types::CommsPublicKey,
NodeIdentity,
PeerConnection,
Expand All @@ -69,7 +69,7 @@ use tari_core::{
};
use tari_service_framework::{reply_channel, reply_channel::SenderService};
use tari_shutdown::ShutdownSignal;
use tokio::{sync::broadcast, time::delay_for};
use tokio::{sync::broadcast, task, time};

pub const LOG_TARGET: &str = "wallet::utxo_scanning";

Expand Down Expand Up @@ -97,9 +97,7 @@ pub struct UtxoScannerServiceBuilder {
}

#[derive(Clone)]
struct UtxoScannerResources<TBackend>
where TBackend: WalletBackend + 'static
{
struct UtxoScannerResources<TBackend> {
pub db: WalletDatabase<TBackend>,
pub connectivity: ConnectivityRequester,
pub output_manager_service: OutputManagerHandle,
Expand Down Expand Up @@ -246,6 +244,10 @@ where TBackend: WalletBackend + 'static

async fn connect_to_peer(&mut self, peer: NodeId) -> Result<PeerConnection, UtxoScannerError> {
self.publish_event(UtxoScannerEvent::ConnectingToBaseNode(peer.clone()));
debug!(
target: LOG_TARGET,
"Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer,
);
match self.resources.connectivity.dial_peer(peer.clone()).await {
Ok(conn) => Ok(conn),
Err(e) => {
Expand Down Expand Up @@ -340,12 +342,15 @@ where TBackend: WalletBackend + 'static
header_count: 1,
};
// this returns the index of the vec of hashes we sent it, that is the last hash it knows of.
if client.find_chain_split(request).await.is_ok() {
Ok(metadata.utxo_index)
} else {
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Ok(0)
match client.find_chain_split(request).await {
Ok(_) => Ok(metadata.utxo_index),
Err(RpcError::RequestFailed(err)) if err.status_code().is_not_found() => {
warn!(target: LOG_TARGET, "Reorg detected: {}", err);
// The node does not know of the last hash we scanned, thus we had a chain split.
// We now start at 0 again.
Ok(0)
},
Err(err) => Err(err.into()),
}
}

Expand Down Expand Up @@ -391,7 +396,7 @@ where TBackend: WalletBackend + 'static
// if running is set to false, we know its been canceled upstream so lets exit the loop
return Ok(total_scanned as u64);
}
let (outputs, utxo_index) = convert_response_to_unblinded_outputs(response, last_utxo_index)?;
let (outputs, utxo_index) = convert_response_to_transaction_outputs(response, last_utxo_index)?;
last_utxo_index = utxo_index;
total_scanned += outputs.len();
iteration_count += 1;
Expand Down Expand Up @@ -685,7 +690,10 @@ where TBackend: WalletBackend + 'static
}

pub async fn run(mut self) -> Result<(), WalletError> {
info!(target: LOG_TARGET, "UTXO scanning service starting");
info!(
target: LOG_TARGET,
"UTXO scanning service starting (interval = {:.2?})", self.scan_for_utxo_interval
);

let request_stream = self
.request_stream
Expand All @@ -695,44 +703,47 @@ where TBackend: WalletBackend + 'static
pin_mut!(request_stream);

let mut shutdown = self.shutdown_signal.clone();
let mut delay_time = Duration::from_secs(1);
let start_at = Instant::now() + Duration::from_secs(1);
let mut work_interval = time::interval_at(start_at.into(), self.scan_for_utxo_interval).fuse();
loop {
let mut work_interval = delay_for(delay_time).fuse();

futures::select! {
_ = work_interval => {
debug!(target: LOG_TARGET, "UTXO scanning service starting scan for utxos");
let task = self.create_task();
let running_flag = self.is_running.clone();
tokio::task::spawn(async move {
let _ = task.run().await;
//we make sure the flag is set to false here
running_flag.store(false, Ordering::Relaxed);
});
delay_time = self.scan_for_utxo_interval;
_ = work_interval.select_next_some() => {
let running_flag = self.is_running.clone();
if !running_flag.load(Ordering::SeqCst) {
let task = self.create_task();
debug!(target: LOG_TARGET, "UTXO scanning service starting scan for utxos");
task::spawn(async move {
if let Err(err) = task.run().await {
error!(target: LOG_TARGET, "Error scanning UTXOs: {}", err);
}
//we make sure the flag is set to false here
running_flag.store(false, Ordering::Relaxed);
});
}
},
request_context = request_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Service API Request");
let (request, reply_tx) = request_context.split();
let response = self.handle_request(request).await.map_err(|e| {
warn!(target: LOG_TARGET, "Error handling request: {:?}", e);
e
});
let _ = reply_tx.send(response).map_err(|e| {
warn!(target: LOG_TARGET, "Failed to send reply");
e
});
},
_ = shutdown => {
// this will stop the task if its running, and let that thread exit gracefully
self.is_running.store(false, Ordering::Relaxed);
info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal");
return Ok(());
request_context = request_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Service API Request");
let (request, reply_tx) = request_context.split();
let response = self.handle_request(request).await.map_err(|e| {
warn!(target: LOG_TARGET, "Error handling request: {:?}", e);
e
});
let _ = reply_tx.send(response).map_err(|e| {
warn!(target: LOG_TARGET, "Failed to send reply");
e
});
},
_ = shutdown => {
// this will stop the task if its running, and let that thread exit gracefully
self.is_running.store(false, Ordering::Relaxed);
info!(target: LOG_TARGET, "UTXO scanning service shutting down because it received the shutdown signal");
return Ok(());
}
}

if self.mode == UtxoScannerMode::Recovery {
return Ok(());
};
}
}
}

Expand All @@ -748,7 +759,7 @@ where TBackend: WalletBackend + 'static
}
}

fn convert_response_to_unblinded_outputs(
fn convert_response_to_transaction_outputs(
response: Vec<Result<proto::base_node::SyncUtxosResponse, RpcStatus>>,
last_utxo_index: u64,
) -> Result<(Vec<TransactionOutput>, u64), UtxoScannerError> {
Expand Down
9 changes: 8 additions & 1 deletion base_layer/wallet/tests/transaction_service/service.rs
Expand Up @@ -922,10 +922,17 @@ fn recover_one_sided_transaction() {
.expect("Could not find completed one-sided tx");
let outputs = completed_tx.transaction.body.outputs().clone();

let unblinded = bob_oms.scan_outputs_for_one_sided_payments(outputs).await.unwrap();
let unblinded = bob_oms
.scan_outputs_for_one_sided_payments(outputs.clone())
.await
.unwrap();
// Bob should be able to claim 1 output.
assert_eq!(1, unblinded.len());
assert_eq!(value, unblinded[0].value);

// Should ignore already existing outputs
let unblinded = bob_oms.scan_outputs_for_one_sided_payments(outputs).await.unwrap();
assert!(unblinded.is_empty());
});
}

Expand Down
2 changes: 1 addition & 1 deletion common/src/configuration/utils.rs
Expand Up @@ -91,7 +91,7 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
)
.unwrap();
cfg.set_default("wallet.base_node_query_timeout", 60).unwrap();
// 60 sec * 60 mintues * 12 hours.
// 60 sec * 60 minutes * 12 hours.
cfg.set_default("wallet.scan_for_utxo_interval", 60 * 60 * 12).unwrap();
cfg.set_default("wallet.transaction_broadcast_monitoring_timeout", 60)
.unwrap();
Expand Down

0 comments on commit 81422f1

Please sign in to comment.