diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index df57075270..4122f9acea 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -24,7 +24,7 @@ use std::{mem, time::Duration}; use log::*; use tari_comms::{ - connectivity::ConnectivityRequester, + connectivity::{ConnectivityError, ConnectivityRequester}, peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, PeerConnection, @@ -116,12 +116,21 @@ impl WalletConnectivityService { } async fn check_connection(&mut self) { - if let Some(pool) = self.pools.as_ref() { - if !pool.base_node_wallet_rpc_client.is_connected().await { - debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect..."); + debug!(target: LOG_TARGET, "HERE1"); + match self.pools.as_ref() { + Some(pool) => { + debug!(target: LOG_TARGET, "HERE2"); + if !pool.base_node_wallet_rpc_client.is_connected().await { + debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect..."); + self.set_online_status(OnlineStatus::Offline); + self.setup_base_node_connection().await; + } + }, + None => { + debug!(target: LOG_TARGET, "No connection. Attempting to connect..."); self.set_online_status(OnlineStatus::Offline); self.setup_base_node_connection().await; - } + }, } } @@ -216,10 +225,12 @@ impl WalletConnectivityService { } async fn disconnect_base_node(&mut self, node_id: NodeId) { - if let Ok(Some(connection)) = self.connectivity.get_connection(node_id.clone()).await { - if connection.clone().disconnect().await.is_ok() { - debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id); + if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await { + match connection.disconnect().await { + Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id), + Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e), } + self.pools = None; }; } @@ -248,20 +259,29 @@ impl WalletConnectivityService { break; }, Ok(false) => { - // Retry with updated peer - self.disconnect_base_node(node_id).await; + debug!( + target: LOG_TARGET, + "The peer has changed while connecting. Attempting to connect to new base node." + ); + continue; + }, + Err(WalletConnectivityError::ConnectivityError(ConnectivityError::DialCancelled)) => { + debug!( + target: LOG_TARGET, + "Dial was cancelled. Retrying after {}s ...", + self.config.base_node_monitor_refresh_interval.as_secs() + ); + self.set_online_status(OnlineStatus::Offline); time::sleep(self.config.base_node_monitor_refresh_interval).await; continue; }, Err(e) => { - if self.current_base_node() != Some(node_id.clone()) { - self.set_online_status(OnlineStatus::Connecting); - } else { + warn!(target: LOG_TARGET, "{}", e); + if self.current_base_node().as_ref() == Some(&node_id) { + self.disconnect_base_node(node_id).await; self.set_online_status(OnlineStatus::Offline); + time::sleep(self.config.base_node_monitor_refresh_interval).await; } - warn!(target: LOG_TARGET, "{}", e); - self.disconnect_base_node(node_id).await; - time::sleep(self.config.base_node_monitor_refresh_interval).await; continue; }, } diff --git a/comms/src/connectivity/manager.rs b/comms/src/connectivity/manager.rs index b0ae56ea5b..2872ff58cd 100644 --- a/comms/src/connectivity/manager.rs +++ b/comms/src/connectivity/manager.rs @@ -228,6 +228,21 @@ impl ConnectivityManagerActor { let span = span!(Level::TRACE, "handle_request"); span.follows_from(tracing_id); async move { + match self.peer_manager.is_peer_banned(&node_id).await { + Ok(true) => { + if let Some(reply) = reply_tx { + let _ = reply.send(Err(ConnectionManagerError::PeerBanned)); + } + return; + }, + Ok(false) => {}, + Err(err) => { + if let Some(reply) = reply_tx { + let _ = reply.send(Err(err.into())); + } + return; + }, + } match self.pool.get(&node_id) { Some(state) if state.is_connected() => { debug!( diff --git a/comms/src/peer_manager/manager.rs b/comms/src/peer_manager/manager.rs index e10b0be988..6dc1a3d5c5 100644 --- a/comms/src/peer_manager/manager.rs +++ b/comms/src/peer_manager/manager.rs @@ -264,6 +264,10 @@ impl PeerManager { .ban_peer_by_node_id(node_id, duration, reason) } + pub async fn is_peer_banned(&self, node_id: &NodeId) -> Result { + self.peer_storage.read().await.is_peer_banned(node_id) + } + /// Changes the offline flag bit of the peer. Return the previous offline state. pub async fn set_offline(&self, node_id: &NodeId, is_offline: bool) -> Result { self.peer_storage.write().await.set_offline(node_id, is_offline) diff --git a/comms/src/peer_manager/peer_storage.rs b/comms/src/peer_manager/peer_storage.rs index 0f9ca71ac3..75e6413705 100644 --- a/comms/src/peer_manager/peer_storage.rs +++ b/comms/src/peer_manager/peer_storage.rs @@ -498,6 +498,13 @@ where DS: KeyValueStore Ok(node_id) } + pub fn is_peer_banned(&self, node_id: &NodeId) -> Result { + let peer = self + .find_by_node_id(node_id)? + .ok_or(PeerManagerError::PeerNotFoundError)?; + Ok(peer.is_banned()) + } + /// Changes the OFFLINE flag bit of the peer. pub fn set_offline(&mut self, node_id: &NodeId, offline: bool) -> Result { let peer_key = *self