Skip to content

Commit

Permalink
feat: improve console wallet responsiveness (#3304)
Browse files Browse the repository at this point in the history
Description
---
1. Improved console wallet responsiveness by introducing a [**edit** _debouncer with a_] cooldown period for calculating the balance, as this is a database call that grows linearly with the amount of UTXOs in the wallet and does not need to be repeated multiple times per second in certain scenarios. [**Edit** _All events that can change the balance and are captured in the wallet event monitor are followed by the `trigger_balance_refresh` command that will send an event to the debouncer, which in turn will collect all such events and only perform one balance update database query after the end of the cooldown period._] ~~A new base node state change (metadata) event will ensure the last request to calculate balance will be fulfilled.~~
2. Submit connectivity failed event if the metadata RPC requests time out so that 'Waiting for data...' will be displayed in the wallet.
3. Removed the RPC client ping call as a method when monitoring the base node to calculate latency and instead replaced it with the time measured to obtain metadata via the RPC client. 

Motivation and Context
---
1. Wallets with growing numbers of UTXOs become more and more sluggish due to repeated database calls to calculate the balance; this is much more pronounced when many broadcast monitoring protocols are running.
2. When the metadata RPC requests time out previous base node state would be displayed; this could carry on for 30 minutes or more with consecutive timeouts where the correct user feedback would be that the wallet is waiting for data.
3. The time measured for the ping call always follows the time it takes for the metadata call, which is evident as well when the base node gets busy and will often take longer than the metadata call. Having two longish RPC calls with no added value results in more RPC request timeouts than is necessary. See time measurement plots below:

   ![image](https://user-images.githubusercontent.com/39146854/132186041-5afd5e15-8a3b-4635-82f4-800b2e6b750a.png)
   ![image](https://user-images.githubusercontent.com/39146854/132186070-c9913a23-98e8-4461-8b86-bcd0ebbfb4f0.png)
   ![image](https://user-images.githubusercontent.com/39146854/132186089-fe4eff9b-bb22-4de2-b9fa-93f5accf3a8f.png)
   ![image](https://user-images.githubusercontent.com/39146854/132186110-12eec3af-e363-4802-a2e7-c9ce44062892.png)
   ![image](https://user-images.githubusercontent.com/39146854/132186520-f533078e-907e-4407-96f2-d5f64e4c55c4.png)

How Has This Been Tested?
---
System-level testing during a stressed coin split
  • Loading branch information
hansieodendaal committed Sep 21, 2021
1 parent e126033 commit 73017a4
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 19 deletions.
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/ui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub fn run(app: App<CrosstermBackend<Stdout>>) -> Result<(), ExitCodes> {
app.app_state.refresh_contacts_state().await?;
trace!(target: LOG_TARGET, "Refreshing connected peers state");
app.app_state.refresh_connected_peers_state().await?;
trace!(target: LOG_TARGET, "Starting balance enquiry debouncer");
app.app_state.start_balance_enquiry_debouncer().await?;
trace!(target: LOG_TARGET, "Starting app state event monitor");
app.app_state.start_event_monitor(app.notifier.clone()).await;
Result::<_, UiError>::Ok(())
Expand Down
36 changes: 28 additions & 8 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::{
notifier::Notifier,
ui::{
state::{
debouncer::BalanceEnquiryDebouncer,
tasks::{send_one_sided_transaction_task, send_transaction_task},
wallet_event_monitor::WalletEventMonitor,
},
Expand All @@ -74,6 +75,7 @@ use crate::{
utils::db::{CUSTOM_BASE_NODE_ADDRESS_KEY, CUSTOM_BASE_NODE_PUBLIC_KEY_KEY},
wallet_modes::PeerConfig,
};
use tari_wallet::output_manager_service::handle::OutputManagerHandle;

const LOG_TARGET: &str = "wallet::console_wallet::app_state";

Expand All @@ -86,6 +88,8 @@ pub struct AppState {
node_config: GlobalConfig,
config: AppStateConfig,
wallet_connectivity: WalletConnectivityHandle,
output_manager_service: OutputManagerHandle,
balance_enquiry_debouncer: BalanceEnquiryDebouncer,
}

impl AppState {
Expand All @@ -98,25 +102,45 @@ impl AppState {
node_config: GlobalConfig,
) -> Self {
let wallet_connectivity = wallet.wallet_connectivity.clone();
let output_manager_service = wallet.output_manager_service.clone();
let inner = AppStateInner::new(node_identity, network, wallet, base_node_selected, base_node_config);
let cached_data = inner.data.clone();

let inner = Arc::new(RwLock::new(inner));
Self {
inner: Arc::new(RwLock::new(inner)),
inner: inner.clone(),
cached_data,
cache_update_cooldown: None,
completed_tx_filter: TransactionFilter::ABANDONED_COINBASES,
node_config,
node_config: node_config.clone(),
config: AppStateConfig::default(),
wallet_connectivity,
output_manager_service: output_manager_service.clone(),
balance_enquiry_debouncer: BalanceEnquiryDebouncer::new(
inner,
Duration::from_secs(node_config.wallet_balance_enquiry_cooldown_period),
output_manager_service,
),
}
}

pub async fn start_event_monitor(&self, notifier: Notifier) {
let event_monitor = WalletEventMonitor::new(self.inner.clone());
let balance_enquiry_debounce_tx = self.balance_enquiry_debouncer.clone().get_sender();
let event_monitor = WalletEventMonitor::new(self.inner.clone(), balance_enquiry_debounce_tx);
tokio::spawn(event_monitor.run(notifier));
}

pub async fn start_balance_enquiry_debouncer(&self) -> Result<(), UiError> {
tokio::spawn(self.balance_enquiry_debouncer.clone().run());
let _ = self
.balance_enquiry_debouncer
.clone()
.get_sender()
.send(())
.map_err(|e| UiError::SendError(e.to_string()));
Ok(())
}

pub async fn refresh_transaction_state(&mut self) -> Result<(), UiError> {
let mut inner = self.inner.write().await;
inner.refresh_full_transaction_state().await?;
Expand Down Expand Up @@ -525,7 +549,6 @@ impl AppStateInner {
});

self.data.completed_txs = completed_transactions;
self.refresh_balance().await?;
self.updated = true;
Ok(())
}
Expand Down Expand Up @@ -583,7 +606,6 @@ impl AppStateInner {
.partial_cmp(&a.timestamp)
.expect("Should be able to compare timestamps")
});
self.refresh_balance().await?;
self.updated = true;
return Ok(());
}
Expand All @@ -600,7 +622,6 @@ impl AppStateInner {
});
},
}
self.refresh_balance().await?;
self.updated = true;
Ok(())
}
Expand Down Expand Up @@ -642,8 +663,7 @@ impl AppStateInner {
Ok(())
}

pub async fn refresh_balance(&mut self) -> Result<(), UiError> {
let balance = self.wallet.output_manager_service.get_balance().await?;
pub async fn refresh_balance(&mut self, balance: Balance) -> Result<(), UiError> {
self.data.balance = balance;
self.updated = true;

Expand Down
139 changes: 139 additions & 0 deletions applications/tari_console_wallet/src/ui/state/debouncer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::ui::state::AppStateInner;
use log::*;
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tari_wallet::output_manager_service::handle::OutputManagerHandle;
use tokio::{
sync::{broadcast, RwLock},
time,
};

const LOG_TARGET: &str = "wallet::console_wallet::debouncer";

#[derive(Clone)]
pub(crate) struct BalanceEnquiryDebouncer {
app_state_inner: Arc<RwLock<AppStateInner>>,
output_manager_service: OutputManagerHandle,
balance_enquiry_cooldown_period: Duration,
tx: broadcast::Sender<()>,
}

impl BalanceEnquiryDebouncer {
pub fn new(
app_state_inner: Arc<RwLock<AppStateInner>>,
balance_enquiry_cooldown_period: Duration,
output_manager_service: OutputManagerHandle,
) -> Self {
// This channel must only be size 1; the debouncer will ensure that the balance is updated timeously
let (tx, _) = broadcast::channel(1);
Self {
app_state_inner,
output_manager_service,
balance_enquiry_cooldown_period,
tx,
}
}

pub async fn run(mut self) {
let balance_enquiry_events = &mut self.tx.subscribe();
let mut shutdown_signal = self.app_state_inner.read().await.get_shutdown_signal();
let delay = time::sleep(self.balance_enquiry_cooldown_period);
tokio::pin!(delay);

debug!(target: LOG_TARGET, "Balance enquiry debouncer starting");
if let Ok(balance) = self.output_manager_service.get_balance().await {
trace!(
target: LOG_TARGET,
"Initial balance: available {}, incoming {}, outgoing {}",
balance.available_balance,
balance.pending_incoming_balance,
balance.pending_outgoing_balance
);
let mut inner = self.app_state_inner.write().await;
if let Err(e) = inner.refresh_balance(balance).await {
warn!(target: LOG_TARGET, "Error refresh app_state: {}", e);
}
}
loop {
tokio::select! {
_ = &mut delay => {
if let Ok(result) = time::timeout(
self.balance_enquiry_cooldown_period,
balance_enquiry_events.recv()
).await {
match result {
Ok(_) => {
let start_time = Instant::now();
match self.output_manager_service.get_balance().await {
Ok(balance) => {
trace!(
target: LOG_TARGET,
"Updating balance ({} ms): available {}, incoming {}, outgoing {}",
start_time.elapsed().as_millis(),
balance.available_balance,
balance.pending_incoming_balance,
balance.pending_outgoing_balance
);
let mut inner = self.app_state_inner.write().await;
if let Err(e) = inner.refresh_balance(balance).await {
warn!(target: LOG_TARGET, "Error refresh app_state: {}", e);
}
}
Err(e) => {
warn!(target: LOG_TARGET, "Could not obtain balance ({})", e);
}
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
trace!(target: LOG_TARGET, "Balance enquiry debouncer lagged {} update requests", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
info!(
target: LOG_TARGET,
"Balance enquiry debouncer shutting down because the channel was closed"
);
break;
}
}
}
},
_ = shutdown_signal.wait() => {
info!(
target: LOG_TARGET,
"Balance enquiry debouncer shutting down because the shutdown signal was received"
);
break;
},
}
}
}

pub fn get_sender(self) -> broadcast::Sender<()> {
self.tx
}
}
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/ui/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod app_state;
mod debouncer;
mod tasks;
mod wallet_event_monitor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ const LOG_TARGET: &str = "wallet::console_wallet::wallet_event_monitor";

pub struct WalletEventMonitor {
app_state_inner: Arc<RwLock<AppStateInner>>,
balance_enquiry_debounce_tx: broadcast::Sender<()>,
}

impl WalletEventMonitor {
pub fn new(app_state_inner: Arc<RwLock<AppStateInner>>) -> Self {
Self { app_state_inner }
pub fn new(
app_state_inner: Arc<RwLock<AppStateInner>>,
balance_enquiry_debounce_tx: broadcast::Sender<()>,
) -> Self {
Self {
app_state_inner,
balance_enquiry_debounce_tx,
}
}

pub async fn run(mut self, notifier: Notifier) {
Expand Down Expand Up @@ -75,36 +82,43 @@ impl WalletEventMonitor {
match (*msg).clone() {
TransactionEvent::ReceivedFinalizedTransaction(tx_id) => {
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_received(tx_id);
},
TransactionEvent::TransactionMinedUnconfirmed(tx_id, confirmations) => {
self.trigger_confirmations_refresh(tx_id, confirmations).await;
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_mined_unconfirmed(tx_id, confirmations);
},
TransactionEvent::TransactionMined(tx_id) => {
self.trigger_confirmations_cleanup(tx_id).await;
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_mined(tx_id);
},
TransactionEvent::TransactionCancelled(tx_id) => {
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_cancelled(tx_id);
},
TransactionEvent::ReceivedTransaction(tx_id) |
TransactionEvent::ReceivedTransactionReply(tx_id) |
TransactionEvent::TransactionBroadcast(tx_id) |
TransactionEvent::TransactionMinedRequestTimedOut(tx_id) | TransactionEvent::TransactionImported(tx_id) => {
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
},
TransactionEvent::TransactionDirectSendResult(tx_id, true) |
TransactionEvent::TransactionStoreForwardSendResult(tx_id, true) |
TransactionEvent::TransactionCompletedImmediately(tx_id) => {
self.trigger_tx_state_refresh(tx_id).await;
self.trigger_balance_refresh();
notifier.transaction_sent(tx_id);
},
TransactionEvent::TransactionValidationSuccess(_) => {
self.trigger_full_tx_state_refresh().await;
self.trigger_balance_refresh();
},
// Only the above variants trigger state refresh
_ => (),
Expand Down Expand Up @@ -162,6 +176,7 @@ impl WalletEventMonitor {
}
BaseNodeEvent::BaseNodePeerSet(peer) => {
self.trigger_base_node_peer_refresh(*peer).await;
self.trigger_balance_refresh();
}
}
},
Expand All @@ -176,7 +191,7 @@ impl WalletEventMonitor {
Ok(msg) => {
trace!(target: LOG_TARGET, "Output Manager Service Callback Handler event {:?}", msg);
if let OutputManagerEvent::TxoValidationSuccess(_,_) = &*msg {
self.trigger_balance_refresh().await;
self.trigger_balance_refresh();
}
},
Err(broadcast::error::RecvError::Lagged(n)) => {
Expand Down Expand Up @@ -249,10 +264,8 @@ impl WalletEventMonitor {
}
}

async fn trigger_balance_refresh(&mut self) {
let mut inner = self.app_state_inner.write().await;

if let Err(e) = inner.refresh_balance().await {
fn trigger_balance_refresh(&mut self) {
if let Err(e) = self.balance_enquiry_debounce_tx.send(()) {
warn!(target: LOG_TARGET, "Error refresh app_state: {}", e);
}
}
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_console_wallet/src/ui/ui_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ pub enum UiError {
AddressParseError,
#[error("Peer did not include an address")]
NoAddressError,
#[error("Channel send error: `{0}`")]
SendError(String),
}
Loading

0 comments on commit 73017a4

Please sign in to comment.