Skip to content

Commit

Permalink
fix(wallet): fix aggressive disconnects in wallet connectivity (#3807)
Browse files Browse the repository at this point in the history
Description
---
- Disconnects the wallet peer only if the pool is unable to create a new RPC session.
- Clear the `pool` when disconnecting, so that requests for a client session will wait
- Immediately return an error if dialling a peer that is banned 

Motivation and Context
---
Observed the wallet continuously disconnecting and reconnecting to base node, resulting in many of these message:
`Outbound messaging protocol failed for peer xxxxx: IO Error: 5e5731a0/4: connection is closed`

This is not a confirmed fix for the issue, because it is difficult to reproduce, but I suspect the problem was not clearing the 
pool in wallet connectivity when disconnecting was causing disconnect to be called whenever a peer session was requested by the monitor etc., sometimes timed in such a way that it would disconnect the new connection that was just established, and that would continue indefinitely. 

How Has This Been Tested?
---
Manually, switching base nodes, stop base node while connected then restart
  • Loading branch information
sdbondi committed Feb 17, 2022
1 parent 8df9021 commit 86e0154
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 16 deletions.
52 changes: 36 additions & 16 deletions base_layer/wallet/src/connectivity_service/service.rs
Expand Up @@ -24,7 +24,7 @@ use std::{mem, time::Duration};

use log::*;
use tari_comms::{
connectivity::ConnectivityRequester,
connectivity::{ConnectivityError, ConnectivityRequester},
peer_manager::{NodeId, Peer},
protocol::rpc::{RpcClientLease, RpcClientPool},
PeerConnection,
Expand Down Expand Up @@ -116,12 +116,21 @@ impl WalletConnectivityService {
}

async fn check_connection(&mut self) {
if let Some(pool) = self.pools.as_ref() {
if !pool.base_node_wallet_rpc_client.is_connected().await {
debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect...");
debug!(target: LOG_TARGET, "HERE1");
match self.pools.as_ref() {
Some(pool) => {
debug!(target: LOG_TARGET, "HERE2");
if !pool.base_node_wallet_rpc_client.is_connected().await {
debug!(target: LOG_TARGET, "Peer connection lost. Attempting to reconnect...");
self.set_online_status(OnlineStatus::Offline);
self.setup_base_node_connection().await;
}
},
None => {
debug!(target: LOG_TARGET, "No connection. Attempting to connect...");
self.set_online_status(OnlineStatus::Offline);
self.setup_base_node_connection().await;
}
},
}
}

Expand Down Expand Up @@ -216,10 +225,12 @@ impl WalletConnectivityService {
}

async fn disconnect_base_node(&mut self, node_id: NodeId) {
if let Ok(Some(connection)) = self.connectivity.get_connection(node_id.clone()).await {
if connection.clone().disconnect().await.is_ok() {
debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id);
if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await {
match connection.disconnect().await {
Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id),
Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e),
}
self.pools = None;
};
}

Expand Down Expand Up @@ -248,20 +259,29 @@ impl WalletConnectivityService {
break;
},
Ok(false) => {
// Retry with updated peer
self.disconnect_base_node(node_id).await;
debug!(
target: LOG_TARGET,
"The peer has changed while connecting. Attempting to connect to new base node."
);
continue;
},
Err(WalletConnectivityError::ConnectivityError(ConnectivityError::DialCancelled)) => {
debug!(
target: LOG_TARGET,
"Dial was cancelled. Retrying after {}s ...",
self.config.base_node_monitor_refresh_interval.as_secs()
);
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
continue;
},
Err(e) => {
if self.current_base_node() != Some(node_id.clone()) {
self.set_online_status(OnlineStatus::Connecting);
} else {
warn!(target: LOG_TARGET, "{}", e);
if self.current_base_node().as_ref() == Some(&node_id) {
self.disconnect_base_node(node_id).await;
self.set_online_status(OnlineStatus::Offline);
time::sleep(self.config.base_node_monitor_refresh_interval).await;
}
warn!(target: LOG_TARGET, "{}", e);
self.disconnect_base_node(node_id).await;
time::sleep(self.config.base_node_monitor_refresh_interval).await;
continue;
},
}
Expand Down
15 changes: 15 additions & 0 deletions comms/src/connectivity/manager.rs
Expand Up @@ -228,6 +228,21 @@ impl ConnectivityManagerActor {
let span = span!(Level::TRACE, "handle_request");
span.follows_from(tracing_id);
async move {
match self.peer_manager.is_peer_banned(&node_id).await {
Ok(true) => {
if let Some(reply) = reply_tx {
let _ = reply.send(Err(ConnectionManagerError::PeerBanned));
}
return;
},
Ok(false) => {},
Err(err) => {
if let Some(reply) = reply_tx {
let _ = reply.send(Err(err.into()));
}
return;
},
}
match self.pool.get(&node_id) {
Some(state) if state.is_connected() => {
debug!(
Expand Down
4 changes: 4 additions & 0 deletions comms/src/peer_manager/manager.rs
Expand Up @@ -264,6 +264,10 @@ impl PeerManager {
.ban_peer_by_node_id(node_id, duration, reason)
}

pub async fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
self.peer_storage.read().await.is_peer_banned(node_id)
}

/// Changes the offline flag bit of the peer. Return the previous offline state.
pub async fn set_offline(&self, node_id: &NodeId, is_offline: bool) -> Result<bool, PeerManagerError> {
self.peer_storage.write().await.set_offline(node_id, is_offline)
Expand Down
7 changes: 7 additions & 0 deletions comms/src/peer_manager/peer_storage.rs
Expand Up @@ -498,6 +498,13 @@ where DS: KeyValueStore<PeerId, Peer>
Ok(node_id)
}

pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)?
.ok_or(PeerManagerError::PeerNotFoundError)?;
Ok(peer.is_banned())
}

/// Changes the OFFLINE flag bit of the peer.
pub fn set_offline(&mut self, node_id: &NodeId, offline: bool) -> Result<bool, PeerManagerError> {
let peer_key = *self
Expand Down

0 comments on commit 86e0154

Please sign in to comment.