Skip to content

Commit

Permalink
feat: only trigger UTXO scanning when a new block event is received (#…
Browse files Browse the repository at this point in the history
…3620)

Description
---
Previously the UTXO scanner task was triggered on an interval. This PR updates this process to rather be triggered only when an existing task is not running and new Block event is received from the wallet Base Node monitoring service. It is only when this event is received when we know there will be new block data to scan.

A new event is added to the Base Node Monitoring Service that only fires when a new block is detected to be used in the above functionality

The Utxo Scanner is also refactored into separate files. To help in the review I have highlighted the new code below

An unrelated bug is also fixed with handling of the wallet public address in the Console Wallet that broke the wallet.

How Has This Been Tested?
---
manually and by CI
  • Loading branch information
philipr-za committed Nov 26, 2021
1 parent 2ac0757 commit df1be7e
Show file tree
Hide file tree
Showing 20 changed files with 411 additions and 370 deletions.
2 changes: 1 addition & 1 deletion applications/daily_tests/cron_jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async function runWalletRecoveryTest(instances) {
});

notify(
`🙌 Wallet (Pubkey: ${identity.public_key} ) recovered to a block height of ${numScanned}, completed in ${timeDiffMinutes} minutes (${scannedRate} blocks/min). ${recoveredAmount} µT recovered for ${instances} instance(s).`
`🙌 Wallet (Pubkey: ${identity.public_key} ) recovered scanned ${numScanned} UTXO's, completed in ${timeDiffMinutes} minutes (${scannedRate} UTXOs/min). ${recoveredAmount} µT recovered for ${instances} instance(s).`
);
} catch (err) {
console.error(err);
Expand Down
7 changes: 2 additions & 5 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use rustyline::Editor;
use tari_app_utilities::utilities::create_transport_type;
use tari_common::{exit_codes::ExitCodes, ConfigBootstrap, GlobalConfig};
use tari_comms::{
multiaddr::Multiaddr,
peer_manager::{Peer, PeerFeatures},
types::CommsSecretKey,
NodeIdentity,
Expand Down Expand Up @@ -299,10 +300,7 @@ pub async fn init_wallet(
);

let node_address = match wallet_db.get_node_address().await? {
None => config
.public_address
.clone()
.ok_or_else(|| ExitCodes::ConfigError("node public address error".to_string()))?,
None => config.public_address.clone().unwrap_or_else(Multiaddr::empty),
Some(a) => a,
};

Expand Down Expand Up @@ -403,7 +401,6 @@ pub async fn init_wallet(
config.buffer_size_console_wallet,
)),
Some(config.buffer_rate_limit_console_wallet),
Some(config.scan_for_utxo_interval),
Some(updater_config),
config.autoupdate_check_interval,
);
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_console_wallet/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tari_key_manager::mnemonic::Mnemonic;
use tari_shutdown::Shutdown;
use tari_wallet::{
storage::sqlite_db::WalletSqliteDatabase,
utxo_scanner_service::{handle::UtxoScannerEvent, utxo_scanning::UtxoScannerService},
utxo_scanner_service::{handle::UtxoScannerEvent, service::UtxoScannerService},
WalletSqlite,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ impl WalletEventMonitor {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg);
match (*msg).clone() {
BaseNodeEvent::BaseNodeStateChanged(state) => {
if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() {
self.trigger_base_node_state_refresh(state).await;
}
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/base_node_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum BaseNodeServiceResponse {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum BaseNodeEvent {
BaseNodeStateChanged(BaseNodeState),
NewBlockDetected(u64),
}

/// The Base Node Service Handle is a struct that contains the interfaces used to communicate with a running
Expand Down
32 changes: 21 additions & 11 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
},
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
self.map_state(move |_| BaseNodeState {
self.update_state(BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: None,
Expand Down Expand Up @@ -134,7 +134,7 @@ where
let tip_info = match interrupt(base_node_watch.changed(), client.get_tip_info()).await {
Some(tip_info) => tip_info?,
None => {
self.map_state(|_| Default::default()).await;
self.update_state(Default::default()).await;
continue;
},
};
Expand Down Expand Up @@ -165,7 +165,8 @@ where

let is_synced = tip_info.is_synced;
let height_of_longest_chain = chain_metadata.height_of_longest_chain();
self.map_state(move |_| BaseNodeState {

self.update_state(BaseNodeState {
chain_metadata: Some(chain_metadata),
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
Expand All @@ -184,7 +185,7 @@ where

let delay = time::sleep(self.interval.saturating_sub(latency));
if interrupt(base_node_watch.changed(), delay).await.is_none() {
self.map_state(|_| Default::default()).await;
self.update_state(Default::default()).await;
}
}

Expand All @@ -193,14 +194,23 @@ where
Ok(())
}

async fn map_state<F>(&self, transform: F)
where F: FnOnce(&BaseNodeState) -> BaseNodeState {
let new_state = {
let mut lock = self.state.write().await;
let new_state = transform(&*lock);
*lock = new_state.clone();
new_state
async fn update_state(&self, new_state: BaseNodeState) {
let mut lock = self.state.write().await;
let (new_block_detected, height) = match (new_state.chain_metadata.clone(), (*lock).chain_metadata.clone()) {
(Some(new_metadata), Some(old_metadata)) => (
new_metadata.height_of_longest_chain() != old_metadata.height_of_longest_chain(),
new_metadata.height_of_longest_chain(),
),
(Some(new_metadata), _) => (true, new_metadata.height_of_longest_chain()),
(None, _) => (false, 0),
};

if new_block_detected {
self.publish_event(BaseNodeEvent::NewBlockDetected(height));
}

*lock = new_state.clone();

self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
}

Expand Down
3 changes: 0 additions & 3 deletions base_layer/wallet/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub struct WalletConfig {
pub rate_limit: usize,
pub network: NetworkConsensus,
pub base_node_service_config: BaseNodeServiceConfig,
pub scan_for_utxo_interval: Duration,
pub updater_config: Option<AutoUpdateConfig>,
pub autoupdate_check_interval: Option<Duration>,
}
Expand All @@ -59,7 +58,6 @@ impl WalletConfig {
base_node_service_config: Option<BaseNodeServiceConfig>,
buffer_size: Option<usize>,
rate_limit: Option<usize>,
scan_for_utxo_interval: Option<Duration>,
updater_config: Option<AutoUpdateConfig>,
autoupdate_check_interval: Option<Duration>,
) -> Self {
Expand All @@ -72,7 +70,6 @@ impl WalletConfig {
rate_limit: rate_limit.unwrap_or(50),
network,
base_node_service_config: base_node_service_config.unwrap_or_default(),
scan_for_utxo_interval: scan_for_utxo_interval.unwrap_or_else(|| Duration::from_secs(43200)),
updater_config,
autoupdate_check_interval,
}
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ where
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
3 changes: 2 additions & 1 deletion base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::{
},
types::HashDigest,
util::watch::Watch,
utxo_scanner_service::utxo_scanning::RECOVERY_KEY,
utxo_scanner_service::RECOVERY_KEY,
};
use chrono::{NaiveDateTime, Utc};
use digest::Digest;
Expand Down Expand Up @@ -708,6 +708,7 @@ where
}
self.last_seen_tip_height = state.chain_metadata.map(|cm| cm.height_of_longest_chain());
},
BaseNodeEvent::NewBlockDetected(_) => {},
}
}

Expand Down
47 changes: 12 additions & 35 deletions base_layer/wallet/src/utxo_scanner_service/mod.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,34 @@
// Copyright 2021. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node_service::handle::BaseNodeServiceHandle,
connectivity_service::{WalletConnectivityHandle, WalletConnectivityInterface},
output_manager_service::handle::OutputManagerHandle,
storage::database::{WalletBackend, WalletDatabase},
transaction_service::handle::TransactionServiceHandle,
utxo_scanner_service::{
handle::UtxoScannerHandle,
utxo_scanning::{UtxoScannerMode, UtxoScannerService},
service::UtxoScannerService,
uxto_scanner_service_builder::UtxoScannerMode,
},
};
use futures::future;
use log::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tari_comms::{connectivity::ConnectivityRequester, NodeIdentity};
use tari_core::transactions::CryptoFactories;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;

pub mod error;
pub mod handle;
pub mod utxo_scanning;
pub mod service;
mod utxo_scanner_task;
mod uxto_scanner_service_builder;

pub use utxo_scanner_task::RECOVERY_KEY;

const LOG_TARGET: &str = "wallet::utxo_scanner_service::initializer";

pub struct UtxoScannerServiceInitializer<T> {
interval: Duration,
backend: Option<WalletDatabase<T>>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
Expand All @@ -54,14 +37,8 @@ pub struct UtxoScannerServiceInitializer<T> {
impl<T> UtxoScannerServiceInitializer<T>
where T: WalletBackend + 'static
{
pub fn new(
interval: Duration,
backend: WalletDatabase<T>,
factories: CryptoFactories,
node_identity: Arc<NodeIdentity>,
) -> Self {
pub fn new(backend: WalletDatabase<T>, factories: CryptoFactories, node_identity: Arc<NodeIdentity>) -> Self {
Self {
interval,
backend: Some(backend),
factories,
node_identity,
Expand All @@ -87,19 +64,18 @@ where T: WalletBackend + 'static
.take()
.expect("Cannot start Utxo scanner service without setting a storage backend");
let factories = self.factories.clone();
let interval = self.interval;
let node_identity = self.node_identity.clone();

context.spawn_when_ready(move |handles| async move {
let transaction_service = handles.expect_handle::<TransactionServiceHandle>();
let output_manager_service = handles.expect_handle::<OutputManagerHandle>();
let comms_connectivity = handles.expect_handle::<ConnectivityRequester>();
let wallet_connectivity = handles.expect_handle::<WalletConnectivityHandle>();
let base_node_service_handle = handles.expect_handle::<BaseNodeServiceHandle>();

let scanning_service = UtxoScannerService::<T>::builder()
.with_peers(vec![])
.with_retry_limit(2)
.with_scanning_interval(interval)
.with_mode(UtxoScannerMode::Scanning)
.build_with_resources(
backend,
Expand All @@ -111,6 +87,7 @@ where T: WalletBackend + 'static
factories,
handles.get_shutdown_signal(),
event_sender,
base_node_service_handle,
)
.run();

Expand Down

0 comments on commit df1be7e

Please sign in to comment.