From 8c7066bc48f5f0b5494626e5f1da42656e92f217 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 30 Aug 2021 16:22:20 +0400 Subject: [PATCH] fix: add periodic connection check to wallet connectivity service (#3237) Description --- - Adds a periodic check of the connection status and attempts a reconnect if no longer connected. Previously it was assumed that this can be done lazily because some caller will always call `obtain_base_node_wallet_rpc_client`, but this may not be the case. A periodic check is added. - Clean up some state checking to use the wallet connectivity service. Motivation and Context --- Improves snappiness of the connectivity and chain state updates in the wallet How Has This Been Tested? --- Manually on the console wallet + existing tests --- .../src/ui/components/base_node.rs | 7 ++- .../src/ui/state/app_state.rs | 7 +++ .../mock_base_node_service.rs | 21 +++---- .../wallet/src/base_node_service/monitor.rs | 57 +++---------------- .../wallet/src/base_node_service/service.rs | 5 +- .../wallet/src/connectivity_service/handle.rs | 4 +- .../src/connectivity_service/service.rs | 35 ++++++------ comms/src/protocol/rpc/client_pool.rs | 9 +++ 8 files changed, 55 insertions(+), 90 deletions(-) diff --git a/applications/tari_console_wallet/src/ui/components/base_node.rs b/applications/tari_console_wallet/src/ui/components/base_node.rs index d9a271e291..c51421d89f 100644 --- a/applications/tari_console_wallet/src/ui/components/base_node.rs +++ b/applications/tari_console_wallet/src/ui/components/base_node.rs @@ -42,9 +42,9 @@ impl BaseNode { impl Component for BaseNode { fn draw(&mut self, f: &mut Frame, area: Rect, app_state: &AppState) where B: Backend { - let base_node_state = app_state.get_base_node_state(); + let current_online_status = app_state.get_wallet_connectivity().get_connectivity_status(); - let chain_info = match base_node_state.online { + let chain_info = match current_online_status { OnlineStatus::Connecting => Spans::from(vec![ Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)), Span::raw(" "), @@ -56,6 +56,7 @@ impl Component for BaseNode { Span::styled("Offline", Style::default().fg(Color::Red)), ]), OnlineStatus::Online => { + let base_node_state = app_state.get_base_node_state(); if let Some(metadata) = base_node_state.clone().chain_metadata { let tip = metadata.height_of_longest_chain(); @@ -92,7 +93,7 @@ impl Component for BaseNode { Spans::from(vec![ Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)), Span::raw(" "), - Span::styled("Error", Style::default().fg(Color::Red)), + Span::styled("Waiting for data...", Style::default().fg(Color::White)), ]) } }, diff --git a/applications/tari_console_wallet/src/ui/state/app_state.rs b/applications/tari_console_wallet/src/ui/state/app_state.rs index 6d9293ef47..4736151b9d 100644 --- a/applications/tari_console_wallet/src/ui/state/app_state.rs +++ b/applications/tari_console_wallet/src/ui/state/app_state.rs @@ -84,6 +84,7 @@ pub struct AppState { completed_tx_filter: TransactionFilter, node_config: GlobalConfig, config: AppStateConfig, + wallet_connectivity: WalletConnectivityHandle, } impl AppState { @@ -95,6 +96,7 @@ impl AppState { base_node_config: PeerConfig, node_config: GlobalConfig, ) -> Self { + let wallet_connectivity = wallet.wallet_connectivity.clone(); let inner = AppStateInner::new(node_identity, network, wallet, base_node_selected, base_node_config); let cached_data = inner.data.clone(); @@ -105,6 +107,7 @@ impl AppState { completed_tx_filter: TransactionFilter::ABANDONED_COINBASES, node_config, config: AppStateConfig::default(), + wallet_connectivity, } } @@ -352,6 +355,10 @@ impl AppState { &self.cached_data.base_node_state } + pub fn get_wallet_connectivity(&self) -> WalletConnectivityHandle { + self.wallet_connectivity.clone() + } + pub fn get_selected_base_node(&self) -> &Peer { &self.cached_data.base_node_selected } diff --git a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs index 1bc57ed9d2..9aa981150d 100644 --- a/base_layer/wallet/src/base_node_service/mock_base_node_service.rs +++ b/base_layer/wallet/src/base_node_service/mock_base_node_service.rs @@ -20,13 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{ - base_node_service::{ - error::BaseNodeServiceError, - handle::{BaseNodeServiceRequest, BaseNodeServiceResponse}, - service::BaseNodeState, - }, - connectivity_service::OnlineStatus, +use crate::base_node_service::{ + error::BaseNodeServiceError, + handle::{BaseNodeServiceRequest, BaseNodeServiceResponse}, + service::BaseNodeState, }; use futures::StreamExt; use tari_common_types::chain_metadata::ChainMetadata; @@ -81,30 +78,28 @@ impl MockBaseNodeService { /// Set the mock server state, either online and synced to a specific height, or offline with None pub fn set_base_node_state(&mut self, height: Option) { - let (chain_metadata, is_synced, online) = match height { + let (chain_metadata, is_synced) = match height { Some(height) => { let metadata = ChainMetadata::new(height, Vec::new(), 0, 0, 0); - (Some(metadata), Some(true), OnlineStatus::Online) + (Some(metadata), Some(true)) }, - None => (None, None, OnlineStatus::Offline), + None => (None, None), }; self.state = BaseNodeState { chain_metadata, is_synced, updated: None, latency: None, - online, } } pub fn set_default_base_node_state(&mut self) { - let metadata = ChainMetadata::new(std::u64::MAX, Vec::new(), 0, 0, 0); + let metadata = ChainMetadata::new(u64::MAX, Vec::new(), 0, 0, 0); self.state = BaseNodeState { chain_metadata: Some(metadata), is_synced: Some(true), updated: None, latency: None, - online: OnlineStatus::Online, } } diff --git a/base_layer/wallet/src/base_node_service/monitor.rs b/base_layer/wallet/src/base_node_service/monitor.rs index 5a2c3a7e76..0bd1180c3e 100644 --- a/base_layer/wallet/src/base_node_service/monitor.rs +++ b/base_layer/wallet/src/base_node_service/monitor.rs @@ -25,7 +25,7 @@ use crate::{ handle::{BaseNodeEvent, BaseNodeEventSender}, service::BaseNodeState, }, - connectivity_service::{OnlineStatus, WalletConnectivityHandle}, + connectivity_service::WalletConnectivityHandle, error::WalletStorageError, storage::database::{WalletBackend, WalletDatabase}, }; @@ -33,7 +33,7 @@ use chrono::Utc; use log::*; use std::{convert::TryFrom, sync::Arc, time::Duration}; use tari_common_types::chain_metadata::ChainMetadata; -use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcError}; +use tari_comms::protocol::rpc::RpcError; use tokio::{sync::RwLock, time}; const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor"; @@ -78,9 +78,6 @@ impl BaseNodeMonitor { }, Err(e @ BaseNodeMonitorError::RpcFailed(_)) => { warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e); - debug!(target: LOG_TARGET, "Setting as OFFLINE and retrying...",); - - self.set_offline().await; continue; }, Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) | @@ -96,34 +93,19 @@ impl BaseNodeMonitor { ); } - async fn update_connectivity_status(&self) -> NodeId { - let mut watcher = self.wallet_connectivity.get_connectivity_status_watch(); - loop { - use OnlineStatus::*; - match watcher.recv().await.unwrap_or(Offline) { - Online => match self.wallet_connectivity.get_current_base_node_id() { - Some(node_id) => return node_id, - _ => continue, - }, - Connecting => { - self.set_connecting().await; - }, - Offline => { - self.set_offline().await; - }, - } - } - } - async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> { loop { - let peer_node_id = self.update_connectivity_status().await; let mut client = self .wallet_connectivity .obtain_base_node_wallet_rpc_client() .await .ok_or(BaseNodeMonitorError::NodeShuttingDown)?; + let base_node_id = match self.wallet_connectivity.get_current_base_node_id() { + Some(n) => n, + None => continue, + }; + let tip_info = client.get_tip_info().await?; let chain_metadata = tip_info @@ -138,7 +120,7 @@ impl BaseNodeMonitor { debug!( target: LOG_TARGET, "Base node {} Tip: {} ({}) Latency: {} ms", - peer_node_id, + base_node_id, chain_metadata.height_of_longest_chain(), if is_synced { "Synced" } else { "Syncing..." }, latency.as_millis() @@ -151,7 +133,6 @@ impl BaseNodeMonitor { is_synced: Some(is_synced), updated: Some(Utc::now().naive_utc()), latency: Some(latency), - online: OnlineStatus::Online, }) .await; @@ -163,28 +144,6 @@ impl BaseNodeMonitor { Ok(()) } - async fn set_connecting(&self) { - self.map_state(|_| BaseNodeState { - chain_metadata: None, - is_synced: None, - updated: Some(Utc::now().naive_utc()), - latency: None, - online: OnlineStatus::Connecting, - }) - .await; - } - - async fn set_offline(&self) { - self.map_state(|_| BaseNodeState { - chain_metadata: None, - is_synced: None, - updated: Some(Utc::now().naive_utc()), - latency: None, - online: OnlineStatus::Offline, - }) - .await; - } - async fn map_state(&self, transform: F) where F: FnOnce(&BaseNodeState) -> BaseNodeState { let new_state = { diff --git a/base_layer/wallet/src/base_node_service/service.rs b/base_layer/wallet/src/base_node_service/service.rs index 3da987c8b1..eb2b91ebda 100644 --- a/base_layer/wallet/src/base_node_service/service.rs +++ b/base_layer/wallet/src/base_node_service/service.rs @@ -27,7 +27,7 @@ use super::{ }; use crate::{ base_node_service::monitor::BaseNodeMonitor, - connectivity_service::{OnlineStatus, WalletConnectivityHandle}, + connectivity_service::WalletConnectivityHandle, storage::database::{WalletBackend, WalletDatabase}, }; use chrono::NaiveDateTime; @@ -49,8 +49,6 @@ pub struct BaseNodeState { pub is_synced: Option, pub updated: Option, pub latency: Option, - pub online: OnlineStatus, - // pub base_node_peer: Option, } impl Default for BaseNodeState { @@ -60,7 +58,6 @@ impl Default for BaseNodeState { is_synced: None, updated: None, latency: None, - online: OnlineStatus::Connecting, } } } diff --git a/base_layer/wallet/src/connectivity_service/handle.rs b/base_layer/wallet/src/connectivity_service/handle.rs index ac218edc5e..ac17805edb 100644 --- a/base_layer/wallet/src/connectivity_service/handle.rs +++ b/base_layer/wallet/src/connectivity_service/handle.rs @@ -102,8 +102,8 @@ impl WalletConnectivityHandle { reply_rx.await.ok() } - pub async fn get_connectivity_status(&mut self) -> OnlineStatus { - self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline) + pub fn get_connectivity_status(&mut self) -> OnlineStatus { + *self.online_status_rx.borrow() } pub fn get_connectivity_status_watch(&self) -> watch::Receiver { diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index c0cf474b96..cf0901339f 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -30,6 +30,7 @@ use futures::{ future, future::Either, stream::Fuse, + FutureExt, StreamExt, }; use log::*; @@ -40,7 +41,7 @@ use tari_comms::{ PeerConnection, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; -use tokio::time; +use tokio::{time, time::Duration}; const LOG_TARGET: &str = "wallet::connectivity"; @@ -90,6 +91,7 @@ impl WalletConnectivityService { debug!(target: LOG_TARGET, "Wallet connectivity service has started."); let mut base_node_watch_rx = self.base_node_watch.get_receiver().fuse(); loop { + let mut check_connection = time::delay_for(Duration::from_secs(1)).fuse(); futures::select! { req = self.request_stream.select_next_some() => { self.handle_request(req).await; @@ -99,11 +101,23 @@ impl WalletConnectivityService { // This will block the rest until the connection is established. This is what we want. self.setup_base_node_connection().await; } + }, + _ = check_connection => { + self.check_connection().await; } } } } + async fn check_connection(&mut self) { + if let Some(pool) = self.pools.as_ref() { + if !pool.base_node_wallet_rpc_client.is_connected().await { + debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect..."); + self.setup_base_node_connection().await; + } + } + } + async fn handle_request(&mut self, request: WalletConnectivityRequest) { use WalletConnectivityRequest::*; match request { @@ -138,7 +152,6 @@ impl WalletConnectivityService { target: LOG_TARGET, "Base node connection failed: {}. Reconnecting...", e ); - self.trigger_reconnect(); self.pending_requests.push(reply.into()); }, }, @@ -169,7 +182,6 @@ impl WalletConnectivityService { target: LOG_TARGET, "Base node connection failed: {}. Reconnecting...", e ); - self.trigger_reconnect(); self.pending_requests.push(reply.into()); }, }, @@ -186,21 +198,6 @@ impl WalletConnectivityService { } } - fn trigger_reconnect(&mut self) { - let peer = self - .base_node_watch - .borrow() - .clone() - .expect("trigger_reconnect called before base node is set"); - // Trigger the watch so that a peer connection is reinitiated - self.set_base_node_peer(peer); - } - - fn set_base_node_peer(&mut self, peer: Peer) { - self.pools = None; - self.base_node_watch.broadcast(Some(peer)); - } - fn current_base_node(&self) -> Option { self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone()) } @@ -236,7 +233,7 @@ impl WalletConnectivityService { } else { self.set_online_status(OnlineStatus::Offline); } - error!(target: LOG_TARGET, "{}", e); + warn!(target: LOG_TARGET, "{}", e); time::delay_for(self.config.base_node_monitor_refresh_interval).await; continue; }, diff --git a/comms/src/protocol/rpc/client_pool.rs b/comms/src/protocol/rpc/client_pool.rs index 6829b41265..7cf99ed419 100644 --- a/comms/src/protocol/rpc/client_pool.rs +++ b/comms/src/protocol/rpc/client_pool.rs @@ -61,6 +61,11 @@ where T: RpcPoolClient + From + NamedProtocolService + Clone let mut pool = self.pool.lock().await; pool.get_least_used_or_connect().await } + + pub async fn is_connected(&self) -> bool { + let pool = self.pool.lock().await; + pool.is_connected() + } } #[derive(Clone)] @@ -111,6 +116,10 @@ where T: RpcPoolClient + From + NamedProtocolService + Clone } } + pub fn is_connected(&self) -> bool { + self.connection.is_connected() + } + pub(super) fn refresh_num_active_connections(&mut self) -> usize { self.prune(); self.clients.len()