Skip to content

Commit

Permalink
fix: add periodic connection check to wallet connectivity service (#3237
Browse files Browse the repository at this point in the history
)


Description
---
- Adds a periodic check of the connection status and attempts a
  reconnect if no longer connected. Previously it was assumed that this
  can be done lazily because some caller will always call
  `obtain_base_node_wallet_rpc_client`, but this may not be the case. A
  periodic check is added.
- Clean up some state checking to use the wallet connectivity service.

Motivation and Context
---
Improves snappiness of the connectivity and chain state updates in the wallet

How Has This Been Tested?
---
Manually on the console wallet + existing tests
  • Loading branch information
sdbondi committed Aug 30, 2021
1 parent 8d82fd0 commit 8c7066b
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 90 deletions.
Expand Up @@ -42,9 +42,9 @@ impl BaseNode {
impl<B: Backend> Component<B> for BaseNode {
fn draw(&mut self, f: &mut Frame<B>, area: Rect, app_state: &AppState)
where B: Backend {
let base_node_state = app_state.get_base_node_state();
let current_online_status = app_state.get_wallet_connectivity().get_connectivity_status();

let chain_info = match base_node_state.online {
let chain_info = match current_online_status {
OnlineStatus::Connecting => Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Expand All @@ -56,6 +56,7 @@ impl<B: Backend> Component<B> for BaseNode {
Span::styled("Offline", Style::default().fg(Color::Red)),
]),
OnlineStatus::Online => {
let base_node_state = app_state.get_base_node_state();
if let Some(metadata) = base_node_state.clone().chain_metadata {
let tip = metadata.height_of_longest_chain();

Expand Down Expand Up @@ -92,7 +93,7 @@ impl<B: Backend> Component<B> for BaseNode {
Spans::from(vec![
Span::styled("Chain Tip:", Style::default().fg(Color::Magenta)),
Span::raw(" "),
Span::styled("Error", Style::default().fg(Color::Red)),
Span::styled("Waiting for data...", Style::default().fg(Color::White)),
])
}
},
Expand Down
7 changes: 7 additions & 0 deletions applications/tari_console_wallet/src/ui/state/app_state.rs
Expand Up @@ -84,6 +84,7 @@ pub struct AppState {
completed_tx_filter: TransactionFilter,
node_config: GlobalConfig,
config: AppStateConfig,
wallet_connectivity: WalletConnectivityHandle,
}

impl AppState {
Expand All @@ -95,6 +96,7 @@ impl AppState {
base_node_config: PeerConfig,
node_config: GlobalConfig,
) -> Self {
let wallet_connectivity = wallet.wallet_connectivity.clone();
let inner = AppStateInner::new(node_identity, network, wallet, base_node_selected, base_node_config);
let cached_data = inner.data.clone();

Expand All @@ -105,6 +107,7 @@ impl AppState {
completed_tx_filter: TransactionFilter::ABANDONED_COINBASES,
node_config,
config: AppStateConfig::default(),
wallet_connectivity,
}
}

Expand Down Expand Up @@ -352,6 +355,10 @@ impl AppState {
&self.cached_data.base_node_state
}

pub fn get_wallet_connectivity(&self) -> WalletConnectivityHandle {
self.wallet_connectivity.clone()
}

pub fn get_selected_base_node(&self) -> &Peer {
&self.cached_data.base_node_selected
}
Expand Down
21 changes: 8 additions & 13 deletions base_layer/wallet/src/base_node_service/mock_base_node_service.rs
Expand Up @@ -20,13 +20,10 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{
base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::BaseNodeState,
},
connectivity_service::OnlineStatus,
use crate::base_node_service::{
error::BaseNodeServiceError,
handle::{BaseNodeServiceRequest, BaseNodeServiceResponse},
service::BaseNodeState,
};
use futures::StreamExt;
use tari_common_types::chain_metadata::ChainMetadata;
Expand Down Expand Up @@ -81,30 +78,28 @@ impl MockBaseNodeService {

/// Set the mock server state, either online and synced to a specific height, or offline with None
pub fn set_base_node_state(&mut self, height: Option<u64>) {
let (chain_metadata, is_synced, online) = match height {
let (chain_metadata, is_synced) = match height {
Some(height) => {
let metadata = ChainMetadata::new(height, Vec::new(), 0, 0, 0);
(Some(metadata), Some(true), OnlineStatus::Online)
(Some(metadata), Some(true))
},
None => (None, None, OnlineStatus::Offline),
None => (None, None),
};
self.state = BaseNodeState {
chain_metadata,
is_synced,
updated: None,
latency: None,
online,
}
}

pub fn set_default_base_node_state(&mut self) {
let metadata = ChainMetadata::new(std::u64::MAX, Vec::new(), 0, 0, 0);
let metadata = ChainMetadata::new(u64::MAX, Vec::new(), 0, 0, 0);
self.state = BaseNodeState {
chain_metadata: Some(metadata),
is_synced: Some(true),
updated: None,
latency: None,
online: OnlineStatus::Online,
}
}

Expand Down
57 changes: 8 additions & 49 deletions base_layer/wallet/src/base_node_service/monitor.rs
Expand Up @@ -25,15 +25,15 @@ use crate::{
handle::{BaseNodeEvent, BaseNodeEventSender},
service::BaseNodeState,
},
connectivity_service::{OnlineStatus, WalletConnectivityHandle},
connectivity_service::WalletConnectivityHandle,
error::WalletStorageError,
storage::database::{WalletBackend, WalletDatabase},
};
use chrono::Utc;
use log::*;
use std::{convert::TryFrom, sync::Arc, time::Duration};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcError};
use tari_comms::protocol::rpc::RpcError;
use tokio::{sync::RwLock, time};

const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";
Expand Down Expand Up @@ -78,9 +78,6 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
},
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
debug!(target: LOG_TARGET, "Setting as OFFLINE and retrying...",);

self.set_offline().await;
continue;
},
Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
Expand All @@ -96,34 +93,19 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
);
}

async fn update_connectivity_status(&self) -> NodeId {
let mut watcher = self.wallet_connectivity.get_connectivity_status_watch();
loop {
use OnlineStatus::*;
match watcher.recv().await.unwrap_or(Offline) {
Online => match self.wallet_connectivity.get_current_base_node_id() {
Some(node_id) => return node_id,
_ => continue,
},
Connecting => {
self.set_connecting().await;
},
Offline => {
self.set_offline().await;
},
}
}
}

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
loop {
let peer_node_id = self.update_connectivity_status().await;
let mut client = self
.wallet_connectivity
.obtain_base_node_wallet_rpc_client()
.await
.ok_or(BaseNodeMonitorError::NodeShuttingDown)?;

let base_node_id = match self.wallet_connectivity.get_current_base_node_id() {
Some(n) => n,
None => continue,
};

let tip_info = client.get_tip_info().await?;

let chain_metadata = tip_info
Expand All @@ -138,7 +120,7 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
debug!(
target: LOG_TARGET,
"Base node {} Tip: {} ({}) Latency: {} ms",
peer_node_id,
base_node_id,
chain_metadata.height_of_longest_chain(),
if is_synced { "Synced" } else { "Syncing..." },
latency.as_millis()
Expand All @@ -151,7 +133,6 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
is_synced: Some(is_synced),
updated: Some(Utc::now().naive_utc()),
latency: Some(latency),
online: OnlineStatus::Online,
})
.await;

Expand All @@ -163,28 +144,6 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
Ok(())
}

async fn set_connecting(&self) {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Connecting,
})
.await;
}

async fn set_offline(&self) {
self.map_state(|_| BaseNodeState {
chain_metadata: None,
is_synced: None,
updated: Some(Utc::now().naive_utc()),
latency: None,
online: OnlineStatus::Offline,
})
.await;
}

async fn map_state<F>(&self, transform: F)
where F: FnOnce(&BaseNodeState) -> BaseNodeState {
let new_state = {
Expand Down
5 changes: 1 addition & 4 deletions base_layer/wallet/src/base_node_service/service.rs
Expand Up @@ -27,7 +27,7 @@ use super::{
};
use crate::{
base_node_service::monitor::BaseNodeMonitor,
connectivity_service::{OnlineStatus, WalletConnectivityHandle},
connectivity_service::WalletConnectivityHandle,
storage::database::{WalletBackend, WalletDatabase},
};
use chrono::NaiveDateTime;
Expand All @@ -49,8 +49,6 @@ pub struct BaseNodeState {
pub is_synced: Option<bool>,
pub updated: Option<NaiveDateTime>,
pub latency: Option<Duration>,
pub online: OnlineStatus,
// pub base_node_peer: Option<Peer>,
}

impl Default for BaseNodeState {
Expand All @@ -60,7 +58,6 @@ impl Default for BaseNodeState {
is_synced: None,
updated: None,
latency: None,
online: OnlineStatus::Connecting,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/connectivity_service/handle.rs
Expand Up @@ -102,8 +102,8 @@ impl WalletConnectivityHandle {
reply_rx.await.ok()
}

pub async fn get_connectivity_status(&mut self) -> OnlineStatus {
self.online_status_rx.recv().await.unwrap_or(OnlineStatus::Offline)
pub fn get_connectivity_status(&mut self) -> OnlineStatus {
*self.online_status_rx.borrow()
}

pub fn get_connectivity_status_watch(&self) -> watch::Receiver<OnlineStatus> {
Expand Down
35 changes: 16 additions & 19 deletions base_layer/wallet/src/connectivity_service/service.rs
Expand Up @@ -30,6 +30,7 @@ use futures::{
future,
future::Either,
stream::Fuse,
FutureExt,
StreamExt,
};
use log::*;
Expand All @@ -40,7 +41,7 @@ use tari_comms::{
PeerConnection,
};
use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient};
use tokio::time;
use tokio::{time, time::Duration};

const LOG_TARGET: &str = "wallet::connectivity";

Expand Down Expand Up @@ -90,6 +91,7 @@ impl WalletConnectivityService {
debug!(target: LOG_TARGET, "Wallet connectivity service has started.");
let mut base_node_watch_rx = self.base_node_watch.get_receiver().fuse();
loop {
let mut check_connection = time::delay_for(Duration::from_secs(1)).fuse();
futures::select! {
req = self.request_stream.select_next_some() => {
self.handle_request(req).await;
Expand All @@ -99,11 +101,23 @@ impl WalletConnectivityService {
// This will block the rest until the connection is established. This is what we want.
self.setup_base_node_connection().await;
}
},
_ = check_connection => {
self.check_connection().await;
}
}
}
}

async fn check_connection(&mut self) {
if let Some(pool) = self.pools.as_ref() {
if !pool.base_node_wallet_rpc_client.is_connected().await {
debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect...");
self.setup_base_node_connection().await;
}
}
}

async fn handle_request(&mut self, request: WalletConnectivityRequest) {
use WalletConnectivityRequest::*;
match request {
Expand Down Expand Up @@ -138,7 +152,6 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_requests.push(reply.into());
},
},
Expand Down Expand Up @@ -169,7 +182,6 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"Base node connection failed: {}. Reconnecting...", e
);
self.trigger_reconnect();
self.pending_requests.push(reply.into());
},
},
Expand All @@ -186,21 +198,6 @@ impl WalletConnectivityService {
}
}

fn trigger_reconnect(&mut self) {
let peer = self
.base_node_watch
.borrow()
.clone()
.expect("trigger_reconnect called before base node is set");
// Trigger the watch so that a peer connection is reinitiated
self.set_base_node_peer(peer);
}

fn set_base_node_peer(&mut self, peer: Peer) {
self.pools = None;
self.base_node_watch.broadcast(Some(peer));
}

fn current_base_node(&self) -> Option<NodeId> {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}
Expand Down Expand Up @@ -236,7 +233,7 @@ impl WalletConnectivityService {
} else {
self.set_online_status(OnlineStatus::Offline);
}
error!(target: LOG_TARGET, "{}", e);
warn!(target: LOG_TARGET, "{}", e);
time::delay_for(self.config.base_node_monitor_refresh_interval).await;
continue;
},
Expand Down
9 changes: 9 additions & 0 deletions comms/src/protocol/rpc/client_pool.rs
Expand Up @@ -61,6 +61,11 @@ where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
let mut pool = self.pool.lock().await;
pool.get_least_used_or_connect().await
}

pub async fn is_connected(&self) -> bool {
let pool = self.pool.lock().await;
pool.is_connected()
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -111,6 +116,10 @@ where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
}
}

pub fn is_connected(&self) -> bool {
self.connection.is_connected()
}

pub(super) fn refresh_num_active_connections(&mut self) -> usize {
self.prune();
self.clients.len()
Expand Down

0 comments on commit 8c7066b

Please sign in to comment.