diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index f3bed75ba2..bfd3b23146 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -4870,11 +4870,13 @@ pub unsafe extern "C" fn comms_config_create( ..Default::default() }, network_discovery: NetworkDiscoveryConfig { + min_desired_peers: 16, initial_peer_sync_delay: Some(Duration::from_secs(25)), ..Default::default() }, connectivity: DhtConnectivityConfig { - update_interval: Duration::from_secs(180), + update_interval: Duration::from_secs(5 * 60), + minimum_desired_tcpv4_node_ratio: 0.0, ..Default::default() }, ..Default::default() diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index 7f91bee5b1..6f2ae82e14 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -262,9 +262,9 @@ database_url = "data/wallet/dht.db" # The size of the buffer (channel) which holds pending outbound message requests. Default: 20 #outbound_buffer_size = 20 # The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8 -#num_neighbouring_nodes = 8 +num_neighbouring_nodes = 6 # Number of random peers to include. Default: 4 -#num_random_nodes = 4 +num_random_nodes = 2 # Connections above the configured number of neighbouring and random nodes will be removed (default: false) minimize_connections = true # Send to this many peers when using the broadcast strategy. Default: 8 @@ -313,7 +313,7 @@ minimize_connections = true #join_cooldown_interval = 120 # 10 * 60 # The interval to update the neighbouring and random pools, if necessary. Default: 2 minutes -#connectivity.update_interval = 120 # 2 * 60 +connectivity.update_interval = 300 # 2 * 60 # The interval to change the random pool peers. Default = 2 hours #connectivity.random_pool_refresh_interval = 7_200 # 2 * 60 * 60 # Length of cooldown when high connection failure rates are encountered. Default: 45s @@ -321,13 +321,13 @@ minimize_connections = true # The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create, # making sybil attacks costly. This setting does not guarantee this ratio is maintained. # Currently, it only emits a warning if the ratio is below this setting. Default: 0.1 (10%) -#connectivity.minimum_desired_tcpv4_node_ratio = 0.1 +connectivity.minimum_desired_tcpv4_node_ratio = 0.0 # True to enable network discovery, false to disable it. Default: true #network_discovery.enabled = true # A threshold for the minimum number of peers this node should ideally be aware of. If below this threshold a # more "aggressive" strategy is employed. Default: 50 -network_discovery.min_desired_peers = 8 +network_discovery.min_desired_peers = 16 # The period to wait once the number of rounds given by `idle_after_num_rounds` has completed. Default: 30 mins #network_discovery.idle_period = 1_800 # 30 * 60 # The minimum number of network discovery rounds to perform before idling (going to sleep). If there are less @@ -341,7 +341,7 @@ network_discovery.min_desired_peers = 8 # The maximum number of peers we allow per round of sync. (Default: 500) #network_discovery.max_peers_to_sync_per_round = 500 # Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled) -network_discovery.initial_peer_sync_delay = 15 +network_discovery.initial_peer_sync_delay = 25 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/comms/core/src/connection_manager/dial_state.rs b/comms/core/src/connection_manager/dial_state.rs index 3651b22449..a6002c1088 100644 --- a/comms/core/src/connection_manager/dial_state.rs +++ b/comms/core/src/connection_manager/dial_state.rs @@ -38,6 +38,8 @@ pub struct DialState { cancel_signal: ShutdownSignal, /// Reply channel for a connection result reply_tx: Option>>, + /// Whether to minimize connections + minimize_connections: bool, } impl DialState { @@ -46,12 +48,14 @@ impl DialState { peer: Box, reply_tx: Option>>, cancel_signal: ShutdownSignal, + minimize_connections: bool, ) -> Self { Self { peer, attempts: 0, reply_tx, cancel_signal, + minimize_connections, } } @@ -66,6 +70,11 @@ impl DialState { self } + /// Returns true if the connections should be minimized + pub fn get_minimize_connections(&self) -> bool { + self.minimize_connections + } + /// The number of attempts pub fn num_attempts(&self) -> usize { self.attempts diff --git a/comms/core/src/connection_manager/dialer.rs b/comms/core/src/connection_manager/dialer.rs index e502b9bd4e..34d0fba886 100644 --- a/comms/core/src/connection_manager/dialer.rs +++ b/comms/core/src/connection_manager/dialer.rs @@ -81,6 +81,7 @@ pub(crate) enum DialerRequest { Dial( Box, Option>>, + bool, // minimize_connections ), CancelPendingDial(NodeId), NotifyNewInboundConnection(Box), @@ -175,8 +176,8 @@ where use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection}; debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request); match request { - Dial(peer, reply_tx) => { - self.handle_dial_peer_request(pending_dials, peer, reply_tx); + Dial(peer, reply_tx, minimize_connections) => { + self.handle_dial_peer_request(pending_dials, peer, reply_tx, minimize_connections); }, CancelPendingDial(peer_id) => { self.cancel_dial(&peer_id); @@ -317,6 +318,7 @@ where pending_dials: &mut DialFuturesUnordered, peer: Box, reply_tx: Option>>, + minimize_connections: bool, ) { if self.is_pending_dial(&peer.node_id) { debug!( @@ -337,7 +339,7 @@ where let backoff = Arc::clone(&self.backoff); - let dial_state = DialState::new(peer, reply_tx, cancel_signal); + let dial_state = DialState::new(peer, reply_tx, cancel_signal, minimize_connections); let node_identity = Arc::clone(&self.node_identity); let conn_man_notifier = self.conn_man_notifier.clone(); let supported_protocols = self.our_supported_protocols.clone(); @@ -539,7 +541,7 @@ where } } - /// Attempts to dial a peer sequentially on all addresses. + /// Attempts to dial a peer sequentially on all addresses; if connections are to be minimized only. /// Returns ownership of the given `DialState` and a success or failure result for the dial, /// or None if the dial was cancelled inflight async fn dial_peer( @@ -551,7 +553,30 @@ where DialState, Result<(NoiseSocket, Multiaddr), ConnectionManagerError>, ) { - let addresses = dial_state.peer().addresses.clone().into_vec(); + let addresses = if dial_state.get_minimize_connections() { + dial_state + .peer() + .addresses + .clone() + .iter() + .filter(|addr| addr.last_failed_reason().is_none()) + .map(|addr| addr.address().clone()) + .collect::>() + } else { + dial_state.peer().addresses.clone().into_vec() + }; + if addresses.is_empty() { + let node_id_hex = dial_state.peer().node_id.clone().to_hex(); + debug!( + target: LOG_TARGET, + "No more contactable addresses for peer '{}'", + node_id_hex + ); + return ( + dial_state, + Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)), + ); + } let cancel_signal = dial_state.get_cancel_signal(); for address in addresses { debug!( diff --git a/comms/core/src/connection_manager/error.rs b/comms/core/src/connection_manager/error.rs index 80cddff827..7d2bb4af76 100644 --- a/comms/core/src/connection_manager/error.rs +++ b/comms/core/src/connection_manager/error.rs @@ -74,7 +74,7 @@ pub enum ConnectionManagerError { IdentityProtocolError(#[from] IdentityProtocolError), #[error("The dial was cancelled")] DialCancelled, - #[error("Invalid multiaddr: {0}")] + #[error("Invalid multiaddr")] InvalidMultiaddr(String), #[error("Failed to send wire format byte")] WireFormatSendFailed, @@ -82,6 +82,8 @@ pub enum ConnectionManagerError { ListenerOneshotCancelled, #[error("Peer validation error: {0}")] PeerValidationError(#[from] PeerValidatorError), + #[error("No contactable addresses for peer {0} left")] + NoContactableAddressesForPeer(String), } impl From for ConnectionManagerError { diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 82b174b87e..10e70ac088 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -378,11 +378,17 @@ where use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening}; trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request); match request { - DialPeer { node_id, reply_tx } => { + DialPeer { + node_id, + reply_tx, + minimize_connections, + } => { let tracing_id = tracing::Span::current().id(); let span = span!(Level::TRACE, "connection_manager::handle_request"); span.follows_from(tracing_id); - self.dial_peer(node_id, reply_tx).instrument(span).await + self.dial_peer(node_id, reply_tx, minimize_connections) + .instrument(span) + .await }, CancelDial(node_id) => { if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await { @@ -493,10 +499,11 @@ where &mut self, node_id: NodeId, reply: Option>>, + minimize_connections: bool, ) { match self.peer_manager.find_by_node_id(&node_id).await { Ok(Some(peer)) => { - self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply)) + self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply, minimize_connections)) .await; }, Ok(None) => { diff --git a/comms/core/src/connection_manager/requester.rs b/comms/core/src/connection_manager/requester.rs index 40a09da7f9..41c61bdd1f 100644 --- a/comms/core/src/connection_manager/requester.rs +++ b/comms/core/src/connection_manager/requester.rs @@ -37,6 +37,7 @@ pub enum ConnectionManagerRequest { DialPeer { node_id: NodeId, reply_tx: Option>>, + minimize_connections: bool, }, /// Cancels a pending dial if one exists CancelDial(NodeId), @@ -75,9 +76,14 @@ impl ConnectionManagerRequester { } /// Attempt to connect to a remote peer - pub async fn dial_peer(&mut self, node_id: NodeId) -> Result { + pub async fn dial_peer( + &mut self, + node_id: NodeId, + minimize_connections: bool, + ) -> Result { let (reply_tx, reply_rx) = oneshot::channel(); - self.send_dial_peer(node_id, Some(reply_tx)).await?; + self.send_dial_peer(node_id, Some(reply_tx), minimize_connections) + .await?; reply_rx .await .map_err(|_| ConnectionManagerError::ActorRequestCanceled)? @@ -97,9 +103,14 @@ impl ConnectionManagerRequester { &mut self, node_id: NodeId, reply_tx: Option>>, + minimize_connections: bool, ) -> Result<(), ConnectionManagerError> { self.sender - .send(ConnectionManagerRequest::DialPeer { node_id, reply_tx }) + .send(ConnectionManagerRequest::DialPeer { + node_id, + reply_tx, + minimize_connections, + }) .await .map_err(|_| ConnectionManagerError::SendToActorFailed)?; Ok(()) diff --git a/comms/core/src/connection_manager/tests/listener_dialer.rs b/comms/core/src/connection_manager/tests/listener_dialer.rs index a9ff5d4b12..40a3859811 100644 --- a/comms/core/src/connection_manager/tests/listener_dialer.rs +++ b/comms/core/src/connection_manager/tests/listener_dialer.rs @@ -129,7 +129,7 @@ async fn smoke() { let (reply_tx, reply_rx) = oneshot::channel(); request_tx - .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false)) .await .unwrap(); @@ -237,7 +237,7 @@ async fn banned() { let (reply_tx, reply_rx) = oneshot::channel(); request_tx - .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx), false)) .await .unwrap(); diff --git a/comms/core/src/connection_manager/tests/manager.rs b/comms/core/src/connection_manager/tests/manager.rs index 1abde14e21..270b172702 100644 --- a/comms/core/src/connection_manager/tests/manager.rs +++ b/comms/core/src/connection_manager/tests/manager.rs @@ -76,7 +76,7 @@ async fn connect_to_nonexistent_peer() { rt_handle.spawn(connection_manager.run()); - let err = requester.dial_peer(NodeId::default()).await.unwrap_err(); + let err = requester.dial_peer(NodeId::default(), false).await.unwrap_err(); unpack_enum!(ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFoundError) = err); shutdown.trigger(); @@ -150,7 +150,10 @@ async fn dial_success() { .await .unwrap(); - let mut conn_out = conn_man1.dial_peer(node_identity2.node_id().clone()).await.unwrap(); + let mut conn_out = conn_man1 + .dial_peer(node_identity2.node_id().clone(), false) + .await + .unwrap(); assert_eq!(conn_out.peer_node_id(), node_identity2.node_id()); let peer2 = peer_manager1 .find_by_node_id(conn_out.peer_node_id()) @@ -272,7 +275,10 @@ async fn dial_success_aux_tcp_listener() { ); conn_man2.wait_until_listening().await.unwrap(); - let mut connection = conn_man2.dial_peer(node_identity1.node_id().clone()).await.unwrap(); + let mut connection = conn_man2 + .dial_peer(node_identity1.node_id().clone(), false) + .await + .unwrap(); assert_eq!(connection.peer_node_id(), node_identity1.node_id()); let mut substream_out = connection.open_substream(&TEST_PROTO).await.unwrap(); @@ -356,8 +362,8 @@ async fn simultaneous_dial_events() { // Dial at the same time let (result1, result2) = future::join( - conn_man1.dial_peer(node_identities[1].node_id().clone()), - conn_man2.dial_peer(node_identities[0].node_id().clone()), + conn_man1.dial_peer(node_identities[1].node_id().clone(), false), + conn_man2.dial_peer(node_identities[0].node_id().clone(), false), ) .await; @@ -420,7 +426,7 @@ async fn dial_cancelled() { let node_id = node_identity2.node_id().clone(); async move { ready_tx.send(()).unwrap(); - cm.dial_peer(node_id).await + cm.dial_peer(node_id, false).await } }); diff --git a/comms/core/src/connectivity/manager.rs b/comms/core/src/connectivity/manager.rs index ed61e8ad8f..30e650ac14 100644 --- a/comms/core/src/connectivity/manager.rs +++ b/comms/core/src/connectivity/manager.rs @@ -164,6 +164,10 @@ impl ConnectivityManagerActor { }) } + fn minimize_connections(&self) -> bool { + self.config.maintain_n_closest_connections_only.is_some() + } + pub async fn run(mut self) { debug!(target: LOG_TARGET, "ConnectivityManager started"); @@ -335,7 +339,11 @@ impl ConnectivityManagerActor { }, } - if let Err(err) = self.connection_manager.send_dial_peer(node_id, reply_tx).await { + if let Err(err) = self + .connection_manager + .send_dial_peer(node_id, reply_tx, self.minimize_connections()) + .await + { error!( target: LOG_TARGET, "Failed to send dial request to connection manager: {:?}", err diff --git a/comms/core/src/peer_manager/peer.rs b/comms/core/src/peer_manager/peer.rs index f4786d57ce..657722deb4 100644 --- a/comms/core/src/peer_manager/peer.rs +++ b/comms/core/src/peer_manager/peer.rs @@ -208,6 +208,11 @@ impl Peer { self.addresses.last_seen() } + /// Provides info about the failure status of all addresses + pub fn all_addresses_failed(&self) -> bool { + self.addresses.iter().all(|a| a.last_failed_reason().is_some()) + } + /// Provides that length of time since the last successful interaction with the peer pub fn last_seen_since(&self) -> Option { self.last_seen() diff --git a/comms/core/src/test_utils/mocks/connection_manager.rs b/comms/core/src/test_utils/mocks/connection_manager.rs index a84a2a65f6..6a3c441632 100644 --- a/comms/core/src/test_utils/mocks/connection_manager.rs +++ b/comms/core/src/test_utils/mocks/connection_manager.rs @@ -131,7 +131,9 @@ impl ConnectionManagerMock { self.state.inc_call_count(); self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer { node_id, mut reply_tx } => { + DialPeer { + node_id, mut reply_tx, .. + } => { // Send Ok(&mut conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses) let result = self .state diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index 6aceaf04f4..a388cf10d3 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -828,7 +828,7 @@ impl DhtConnectivity { return false; } // we have tried to connect to this peer, and we have never made a successful attempt at connection - if peer.last_connect_attempt().is_some() && peer.last_seen().is_none() { + if peer.all_addresses_failed() { return false; }