From 0b94bcbcb4eababa07be1bee8621da50e99fead8 Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 22 Apr 2024 12:54:02 +0200 Subject: [PATCH] Limit wallet peer connections Added functionality to limit the number of base node peer connections that a wallet can have, based on a config setting. The furtherest nodes will be disconnected, but nodes on the allow list (e.g. connected base node) will be ignored. --- .../src/grpc/wallet_grpc_server.rs | 2 +- .../src/ui/state/wallet_event_monitor.rs | 2 +- base_layer/chat_ffi/src/byte_vector.rs | 2 +- .../contacts/src/contacts_service/service.rs | 2 +- .../src/chain_storage/blockchain_database.rs | 2 +- .../core/src/chain_storage/lmdb_db/lmdb_db.rs | 4 +- .../priority/prioritized_transaction.rs | 2 +- .../core/src/transactions/crypto_factories.rs | 2 +- .../transaction_components/encrypted_data.rs | 2 +- .../transaction_protocol/sender.rs | 2 +- base_layer/mmr/src/backend.rs | 2 +- .../mmr/src/sparse_merkle_tree/proofs.rs | 2 +- base_layer/p2p/src/initialization.rs | 5 + .../p2p/src/services/liveness/service.rs | 3 +- base_layer/p2p/src/transport.rs | 2 +- .../src/connectivity_service/service.rs | 3 +- .../wallet/src/connectivity_service/test.rs | 3 +- .../src/output_manager_service/service.rs | 4 +- .../utxo_scanner_service/utxo_scanner_task.rs | 3 +- base_layer/wallet_ffi/src/lib.rs | 31 +- common/config/presets/c_base_node_c.toml | 6 +- common/config/presets/d_console_wallet.toml | 16 +- comms/core/src/builder/mod.rs | 14 + comms/core/src/connection_manager/dialer.rs | 22 +- comms/core/src/connection_manager/error.rs | 4 +- comms/core/src/connection_manager/manager.rs | 7 +- .../src/connection_manager/peer_connection.rs | 20 +- .../tests/listener_dialer.rs | 3 +- comms/core/src/connectivity/config.rs | 4 + .../core/src/connectivity/connection_pool.rs | 11 +- comms/core/src/connectivity/manager.rs | 106 +++++- comms/core/src/connectivity/requester.rs | 39 ++- comms/core/src/connectivity/test.rs | 13 +- comms/core/src/lib.rs | 7 + .../src/net_address/multiaddr_with_stats.rs | 1 + .../net_address/mutliaddresses_with_stats.rs | 6 + comms/core/src/peer_manager/peer.rs | 5 + comms/core/src/peer_manager/peer_query.rs | 20 +- comms/core/src/protocol/rpc/client/tests.rs | 7 +- .../test_utils/mocks/connection_manager.rs | 4 +- .../test_utils/mocks/connectivity_manager.rs | 5 + .../src/test_utils/mocks/peer_connection.rs | 2 +- comms/dht/examples/memory_net/utilities.rs | 9 +- comms/dht/src/config.rs | 4 + comms/dht/src/connectivity/mod.rs | 326 +++++++++++++----- comms/dht/src/connectivity/test.rs | 10 +- .../src/network_discovery/state_machine.rs | 2 +- comms/dht/src/store_forward/error.rs | 3 + .../dht/src/store_forward/saf_handler/task.rs | 3 +- comms/dht/src/store_forward/service.rs | 74 +++- 50 files changed, 646 insertions(+), 187 deletions(-) diff --git a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs index f74f793664..563bb004e7 100644 --- a/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs +++ b/applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs @@ -160,7 +160,7 @@ impl WalletGrpcServer { fn get_consensus_constants(&self) -> Result<&ConsensusConstants, WalletStorageError> { // If we don't have the chain metadata, we hope that VNReg consensus constants did not change - worst case, we - // spend more than we need to or the the transaction is rejected. + // spend more than we need to or the transaction is rejected. let height = self .wallet .db diff --git a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs index 768d9c3b17..91cdca75a8 100644 --- a/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs +++ b/applications/minotari_console_wallet/src/ui/state/wallet_event_monitor.rs @@ -200,7 +200,7 @@ impl WalletEventMonitor { ); match msg { ConnectivityEvent::PeerConnected(_) | - ConnectivityEvent::PeerDisconnected(_) => { + ConnectivityEvent::PeerDisconnected(..) => { self.trigger_peer_state_refresh().await; }, // Only the above variants trigger state refresh diff --git a/base_layer/chat_ffi/src/byte_vector.rs b/base_layer/chat_ffi/src/byte_vector.rs index 233840c66d..cc666adbb5 100644 --- a/base_layer/chat_ffi/src/byte_vector.rs +++ b/base_layer/chat_ffi/src/byte_vector.rs @@ -100,7 +100,7 @@ pub unsafe extern "C" fn chat_byte_vector_destroy(bytes: *mut ChatByteVector) { /// /// # Safety /// None -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn chat_byte_vector_get_at( diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index c09e13fa32..47acbbf4ea 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -564,7 +564,7 @@ where T: ContactsBackend + 'static fn handle_connectivity_event(&mut self, event: ConnectivityEvent) { use ConnectivityEvent::{PeerBanned, PeerDisconnected}; match event { - PeerDisconnected(node_id) | PeerBanned(node_id) => { + PeerDisconnected(node_id, _) | PeerBanned(node_id) => { if let Some(pos) = self.liveness_data.iter().position(|p| *p.node_id() == node_id) { debug!( target: LOG_TARGET, diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index 3439e072d1..f869ce5a77 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -2407,7 +2407,7 @@ fn get_previous_timestamps( Ok(timestamps) } -/// Gets all blocks ordered from the the block that connects (via prev_hash) to the main chain, to the orphan tip. +/// Gets all blocks ordered from the block that connects (via prev_hash) to the main chain, to the orphan tip. #[allow(clippy::ptr_arg)] fn get_orphan_link_main_chain( db: &T, diff --git a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs index 6f8e4bd159..0db74da2fe 100644 --- a/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs +++ b/base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs @@ -2530,9 +2530,9 @@ impl BlockchainBackend for LMDBDatabase { } trace!( target: LOG_TARGET, - "Finished calculating new smt (size: {}), took: #{}s", + "Finished calculating new smt (size: {}), took: {:.2?}", smt.size(), - start.elapsed().as_millis() + start.elapsed() ); Ok(smt) } diff --git a/base_layer/core/src/mempool/priority/prioritized_transaction.rs b/base_layer/core/src/mempool/priority/prioritized_transaction.rs index 0c78db88b7..e3fb1664eb 100644 --- a/base_layer/core/src/mempool/priority/prioritized_transaction.rs +++ b/base_layer/core/src/mempool/priority/prioritized_transaction.rs @@ -35,7 +35,7 @@ use crate::transactions::{ }; /// Create a unique unspent transaction priority based on the transaction fee, maturity of the oldest input UTXO and the -/// excess_sig. The excess_sig is included to ensure the the priority key unique so it can be used with a BTreeMap. +/// excess_sig. The excess_sig is included to ensure the priority key unique so it can be used with a BTreeMap. /// Normally, duplicate keys will be overwritten in a BTreeMap. #[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)] pub struct FeePriority(Vec); diff --git a/base_layer/core/src/transactions/crypto_factories.rs b/base_layer/core/src/transactions/crypto_factories.rs index c2cc0f9841..7b1e560f50 100644 --- a/base_layer/core/src/transactions/crypto_factories.rs +++ b/base_layer/core/src/transactions/crypto_factories.rs @@ -31,7 +31,7 @@ impl CryptoFactories { /// /// ## Parameters /// - /// * `max_proof_range`: Sets the the maximum value in range proofs, where `max = 2^max_proof_range` + /// * `max_proof_range`: Sets the maximum value in range proofs, where `max = 2^max_proof_range` pub fn new(max_proof_range: usize) -> Self { Self { commitment: Arc::new(CommitmentFactory::default()), diff --git a/base_layer/core/src/transactions/transaction_components/encrypted_data.rs b/base_layer/core/src/transactions/transaction_components/encrypted_data.rs index 273fd14740..f3df4a9ef2 100644 --- a/base_layer/core/src/transactions/transaction_components/encrypted_data.rs +++ b/base_layer/core/src/transactions/transaction_components/encrypted_data.rs @@ -23,7 +23,7 @@ // Portions of this file were originally copyrighted (c) 2018 The Grin Developers, issued under the Apache License, // Version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0. -//! Encrypted data using the the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce. +//! Encrypted data using the extended-nonce variant XChaCha20-Poly1305 encryption with secure random nonce. use std::mem::size_of; diff --git a/base_layer/core/src/transactions/transaction_protocol/sender.rs b/base_layer/core/src/transactions/transaction_protocol/sender.rs index 2e6ae88294..01a15de043 100644 --- a/base_layer/core/src/transactions/transaction_protocol/sender.rs +++ b/base_layer/core/src/transactions/transaction_protocol/sender.rs @@ -472,7 +472,7 @@ impl SenderTransactionProtocol { Ok((public_nonce, public_excess)) } - /// Add partial signatures, add the the recipient info to sender state and move to the Finalizing state + /// Add partial signatures, add the recipient info to sender state and move to the Finalizing state pub async fn add_single_recipient_info( &mut self, mut rec: RecipientSignedMessage, diff --git a/base_layer/mmr/src/backend.rs b/base_layer/mmr/src/backend.rs index 69235daf01..fe5943420b 100644 --- a/base_layer/mmr/src/backend.rs +++ b/base_layer/mmr/src/backend.rs @@ -41,7 +41,7 @@ pub trait ArrayLike { /// Return the item at the given index fn get(&self, index: usize) -> Result, Self::Error>; - /// Remove all stored items from the the backend. + /// Remove all stored items from the backend. fn clear(&mut self) -> Result<(), Self::Error>; /// Finds the index of the specified stored item, it will return None if the object could not be found. diff --git a/base_layer/mmr/src/sparse_merkle_tree/proofs.rs b/base_layer/mmr/src/sparse_merkle_tree/proofs.rs index cf7b3405fe..0cd10a9c04 100644 --- a/base_layer/mmr/src/sparse_merkle_tree/proofs.rs +++ b/base_layer/mmr/src/sparse_merkle_tree/proofs.rs @@ -98,7 +98,7 @@ pub struct InclusionProof { /// ``` pub struct ExclusionProof { siblings: Vec, - // The terminal node of the tree proof, or `None` if the the node is `Empty`. + // The terminal node of the tree proof, or `None` if the node is `Empty`. leaf: Option>, phantom: std::marker::PhantomData, } diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 5a67de59ae..75465d69d6 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -562,6 +562,11 @@ impl ServiceInitializer for P2pInitializer { network_byte: self.network.as_byte(), user_agent: self.user_agent.clone(), }) + .with_minimize_connections(if self.config.dht.minimize_connections { + Some(self.config.dht.num_neighbouring_nodes + self.config.dht.num_random_nodes) + } else { + None + }) .set_self_liveness_check(config.listener_self_liveness_check_interval); if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses { diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index dc4797b60a..033b0d0bb0 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -28,6 +28,7 @@ use tari_comms::{ connectivity::{ConnectivityRequester, ConnectivitySelection}, peer_manager::NodeId, types::CommsPublicKey, + Minimized, PeerManager, }; use tari_comms_dht::{ @@ -360,7 +361,7 @@ where target: LOG_TARGET, "Disconnecting peer {} that failed {} rounds of pings", node_id, max_allowed_ping_failures ); - conn.disconnect().await?; + conn.disconnect(Minimized::No).await?; } } self.state.clear_failed_pings(); diff --git a/base_layer/p2p/src/transport.rs b/base_layer/p2p/src/transport.rs index a220fa9d0e..939a96329e 100644 --- a/base_layer/p2p/src/transport.rs +++ b/base_layer/p2p/src/transport.rs @@ -147,7 +147,7 @@ pub struct TorTransportConfig { /// When set to true, outbound TCP connections bypass the tor proxy. Defaults to false for better privacy, setting /// to true may improve network performance for TCP nodes. pub proxy_bypass_for_outbound_tcp: bool, - /// If set, instructs tor to forward traffic the the provided address. Otherwise, an OS-assigned port on 127.0.0.1 + /// If set, instructs tor to forward traffic the provided address. Otherwise, an OS-assigned port on 127.0.0.1 /// is used. pub forward_address: Option, /// If set, the listener will bind to this address instead of the forward_address. diff --git a/base_layer/wallet/src/connectivity_service/service.rs b/base_layer/wallet/src/connectivity_service/service.rs index bb845a53f9..f880739bfb 100644 --- a/base_layer/wallet/src/connectivity_service/service.rs +++ b/base_layer/wallet/src/connectivity_service/service.rs @@ -27,6 +27,7 @@ use tari_comms::{ connectivity::{ConnectivityError, ConnectivityRequester}, peer_manager::{NodeId, Peer}, protocol::rpc::{RpcClientLease, RpcClientPool}, + Minimized, PeerConnection, }; use tari_core::base_node::{rpc::BaseNodeWalletRpcClient, sync::rpc::BaseNodeSyncRpcClient}; @@ -225,7 +226,7 @@ impl WalletConnectivityService { async fn disconnect_base_node(&mut self, node_id: NodeId) { if let Ok(Some(mut connection)) = self.connectivity.get_connection(node_id.clone()).await { - match connection.disconnect().await { + match connection.disconnect(Minimized::No).await { Ok(_) => debug!(target: LOG_TARGET, "Disconnected base node peer {}", node_id), Err(e) => error!(target: LOG_TARGET, "Failed to disconnect base node: {}", e), } diff --git a/base_layer/wallet/src/connectivity_service/test.rs b/base_layer/wallet/src/connectivity_service/test.rs index ed4c021835..a0da32e40d 100644 --- a/base_layer/wallet/src/connectivity_service/test.rs +++ b/base_layer/wallet/src/connectivity_service/test.rs @@ -34,6 +34,7 @@ use tari_comms::{ mocks::{create_connectivity_mock, ConnectivityManagerMockState}, node_identity::build_node_identity, }, + Minimized, }; use tari_shutdown::Shutdown; use tari_test_utils::runtime::spawn_until_shutdown; @@ -177,7 +178,7 @@ async fn it_gracefully_handles_connect_fail_reconnect() { mock_state.add_active_connection(conn.clone()).await; // Empty out all the calls let _result = mock_state.take_calls().await; - conn.disconnect().await.unwrap(); + conn.disconnect(Minimized::No).await.unwrap(); let barrier = Arc::new(Barrier::new(2)); let pending_request = task::spawn({ diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 98ec0f2e20..a24deb7df6 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -1293,7 +1293,7 @@ where let uo_len = uo.len(); trace!( target: LOG_TARGET, - "select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {})", + "select_utxos profile - fetch_unspent_outputs_for_spending: {} outputs, {} ms (at {} ms)", uo_len, start_new.elapsed().as_millis(), start.elapsed().as_millis(), @@ -1362,7 +1362,7 @@ where let enough_spendable = utxos_total_value > amount + fee_with_change; trace!( target: LOG_TARGET, - "select_utxos profile - final_selection: {} outputs from {}, {} ms (at {})", + "select_utxos profile - final_selection: {} outputs from {}, {} ms (at {} ms)", utxos.len(), uo_len, start_new.elapsed().as_millis(), diff --git a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs index ebd3e6f544..593eedb24f 100644 --- a/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs +++ b/base_layer/wallet/src/utxo_scanner_service/utxo_scanner_task.rs @@ -38,6 +38,7 @@ use tari_comms::{ protocol::rpc::RpcClientLease, traits::OrOptional, types::CommsPublicKey, + Minimized, PeerConnection, }; use tari_core::{ @@ -193,7 +194,7 @@ where }); if let Ok(Some(connection)) = self.resources.comms_connectivity.get_connection(peer.clone()).await { - if connection.clone().disconnect().await.is_ok() { + if connection.clone().disconnect(Minimized::No).await.is_ok() { debug!(target: LOG_TARGET, "Disconnected base node peer {}", peer); } } diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index c8f79f3452..a5fff1dadc 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -27,7 +27,7 @@ //! becoming a `CompletedTransaction` with the `Completed` status. This means that the transaction has been //! negotiated between the parties and is now ready to be broadcast to the Base Layer. The funds are still encumbered //! as pending because the transaction has not been mined yet. -//! 3. The finalized `CompletedTransaction` will be sent back to the the receiver so that they have a copy. +//! 3. The finalized `CompletedTransaction` will be sent back to the receiver so that they have a copy. //! 4. The wallet will broadcast the `CompletedTransaction` to a Base Node to be added to the mempool. Its status will //! move from `Completed` to `Broadcast`. //! 5. Wait until the transaction is mined. The `CompleteTransaction` status will then move from `Broadcast` to `Mined` @@ -131,7 +131,13 @@ use tari_comms::{ transports::MemoryTransport, types::CommsPublicKey, }; -use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig}; +use tari_comms_dht::{ + store_forward::SafConfig, + DbConnectionUrl, + DhtConfig, + DhtConnectivityConfig, + NetworkDiscoveryConfig, +}; use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact}; use tari_core::{ borsh::FromBytes, @@ -818,7 +824,7 @@ pub unsafe extern "C" fn byte_vector_destroy(bytes: *mut ByteVector) { /// /// # Safety /// None -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn byte_vector_get_at(ptr: *mut ByteVector, position: c_uint, error_out: *mut c_int) -> c_uchar { @@ -1778,7 +1784,7 @@ pub unsafe extern "C" fn unblinded_outputs_get_length( /// /// # Safety /// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn unblinded_outputs_get_at( @@ -2884,7 +2890,7 @@ pub unsafe extern "C" fn contacts_get_length(contacts: *mut TariContacts, error_ /// /// # Safety /// The ```contact_destroy``` method must be called when finished with a TariContact to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn contacts_get_at( @@ -3185,7 +3191,7 @@ pub unsafe extern "C" fn completed_transactions_get_length( /// # Safety /// The ```completed_transaction_destroy``` method must be called when finished with a TariCompletedTransaction to /// prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn completed_transactions_get_at( @@ -3278,7 +3284,7 @@ pub unsafe extern "C" fn pending_outbound_transactions_get_length( /// # Safety /// The ```pending_outbound_transaction_destroy``` method must be called when finished with a /// TariPendingOutboundTransaction to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn pending_outbound_transactions_get_at( @@ -3370,7 +3376,7 @@ pub unsafe extern "C" fn pending_inbound_transactions_get_length( /// # Safety /// The ```pending_inbound_transaction_destroy``` method must be called when finished with a /// TariPendingOutboundTransaction to prevent a memory leak -// converting between here is fine as its used to clamp the the array to length +// converting between here is fine as its used to clamp the array to length #[allow(clippy::cast_possible_wrap)] #[no_mangle] pub unsafe extern "C" fn pending_inbound_transactions_get_at( @@ -4851,6 +4857,9 @@ pub unsafe extern "C" fn comms_config_create( max_concurrent_inbound_tasks: 25, max_concurrent_outbound_tasks: 50, dht: DhtConfig { + num_neighbouring_nodes: 5, + num_random_nodes: 1, + minimize_connections: true, discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs), database_url: DbConnectionUrl::File(dht_database_path), auto_join: true, @@ -4861,9 +4870,15 @@ pub unsafe extern "C" fn comms_config_create( ..Default::default() }, network_discovery: NetworkDiscoveryConfig { + min_desired_peers: 16, initial_peer_sync_delay: Some(Duration::from_secs(25)), ..Default::default() }, + connectivity: DhtConnectivityConfig { + update_interval: Duration::from_secs(5 * 60), + minimum_desired_tcpv4_node_ratio: 0.0, + ..Default::default() + }, ..Default::default() }, allow_test_addresses: true, diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index c086b27e99..c125d4e4cd 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -189,7 +189,7 @@ listener_self_liveness_check_interval = 15 # When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for # better privacy #tor.proxy_bypass_for_outbound_tcp = false -# If set, instructs tor to forward traffic the the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port) +# If set, instructs tor to forward traffic the provided address. (e.g. "/dns4/my-base-node/tcp/32123") (default = OS-assigned port) #tor.forward_address = # If set, the listener will bind to this address instead of the forward_address. You need to make sure that this listener is connectable from the forward_address. #tor.listener_address_override = @@ -213,7 +213,9 @@ database_url = "data/base_node/dht.db" # The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8 #num_neighbouring_nodes = 8 # Number of random peers to include. Default: 4 -#num_random_nodes= 4 +#num_random_nodes = 4 +# Connections above the configured number of neighbouring and random nodes will be removed (default: false) +#minimize_connections = false # Send to this many peers when using the broadcast strategy. Default: 8 #broadcast_factor = 8 # Send to this many peers when using the propagate strategy. Default: 4 diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index 7d076434bf..950aca3b0e 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -242,7 +242,7 @@ event_channel_size = 3500 # When using the tor transport and set to true, outbound TCP connections bypass the tor proxy. Defaults to false for # better privacy #tor.proxy_bypass_for_outbound_tcp = false -# If set, instructs tor to forward traffic the the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = ) +# If set, instructs tor to forward traffic the provided address. (e.g. "/ip4/127.0.0.1/tcp/0") (default = ) #tor.forward_address = # Use a SOCKS5 proxy transport. This transport recognises any addresses supported by the proxy. @@ -262,9 +262,11 @@ database_url = "data/wallet/dht.db" # The size of the buffer (channel) which holds pending outbound message requests. Default: 20 #outbound_buffer_size = 20 # The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour. Default: 8 -#num_neighbouring_nodes = 8 +num_neighbouring_nodes = 5 # Number of random peers to include. Default: 4 -#num_random_nodes= 4 +num_random_nodes = 1 +# Connections above the configured number of neighbouring and random nodes will be removed (default: false) +minimize_connections = true # Send to this many peers when using the broadcast strategy. Default: 8 #broadcast_factor = 8 # Send to this many peers when using the propagate strategy. Default: 4 @@ -311,7 +313,7 @@ database_url = "data/wallet/dht.db" #join_cooldown_interval = 120 # 10 * 60 # The interval to update the neighbouring and random pools, if necessary. Default: 2 minutes -#connectivity.update_interval = 120 # 2 * 60 +connectivity.update_interval = 300 # 2 * 60 # The interval to change the random pool peers. Default = 2 hours #connectivity.random_pool_refresh_interval = 7_200 # 2 * 60 * 60 # Length of cooldown when high connection failure rates are encountered. Default: 45s @@ -319,13 +321,13 @@ database_url = "data/wallet/dht.db" # The minimum desired ratio of TCPv4 to Tor connections. TCPv4 addresses have some significant cost to create, # making sybil attacks costly. This setting does not guarantee this ratio is maintained. # Currently, it only emits a warning if the ratio is below this setting. Default: 0.1 (10%) -#connectivity.minimum_desired_tcpv4_node_ratio = 0.1 +connectivity.minimum_desired_tcpv4_node_ratio = 0.0 # True to enable network discovery, false to disable it. Default: true #network_discovery.enabled = true # A threshold for the minimum number of peers this node should ideally be aware of. If below this threshold a # more "aggressive" strategy is employed. Default: 50 -network_discovery.min_desired_peers = 8 +network_discovery.min_desired_peers = 16 # The period to wait once the number of rounds given by `idle_after_num_rounds` has completed. Default: 30 mins #network_discovery.idle_period = 1_800 # 30 * 60 # The minimum number of network discovery rounds to perform before idling (going to sleep). If there are less @@ -339,7 +341,7 @@ network_discovery.min_desired_peers = 8 # The maximum number of peers we allow per round of sync. (Default: 500) #network_discovery.max_peers_to_sync_per_round = 500 # Initial refresh sync peers delay period, when a configured connection needs preference. (Default: Disabled) -network_discovery.initial_peer_sync_delay = 15 +network_discovery.initial_peer_sync_delay = 25 # Length of time to ban a peer if the peer misbehaves at the DHT-level. Default: 6 hrs #ban_duration = 21_600 # 6 * 60 * 60 diff --git a/comms/core/src/builder/mod.rs b/comms/core/src/builder/mod.rs index 26f4cc503a..43b78874b0 100644 --- a/comms/core/src/builder/mod.rs +++ b/comms/core/src/builder/mod.rs @@ -70,6 +70,7 @@ use crate::{ /// # #[tokio::main] /// # async fn main() { /// use std::env::temp_dir; +/// use tari_comms::connectivity::ConnectivityConfig; /// /// use tari_storage::{ /// lmdb_store::{LMDBBuilder, LMDBConfig}, @@ -126,6 +127,7 @@ pub struct CommsBuilder { connection_manager_config: ConnectionManagerConfig, connectivity_config: ConnectivityConfig, shutdown_signal: Option, + maintain_n_closest_connections_only: Option, } impl Default for CommsBuilder { @@ -139,6 +141,7 @@ impl Default for CommsBuilder { connection_manager_config: ConnectionManagerConfig::default(), connectivity_config: ConnectivityConfig::default(), shutdown_signal: None, + maintain_n_closest_connections_only: None, } } } @@ -292,6 +295,17 @@ impl CommsBuilder { self } + /// The closest number of peer connections to maintain; connections above the threshold will be removed + pub fn with_minimize_connections(mut self, connections: Option) -> Self { + self.maintain_n_closest_connections_only = connections; + self.connectivity_config.maintain_n_closest_connections_only = connections; + if let Some(val) = connections { + self.connectivity_config.reaper_min_connection_threshold = val; + } + self.connectivity_config.connection_pool_refresh_interval = Duration::from_secs(180); + self + } + fn make_peer_manager(&mut self) -> Result, CommsBuilderError> { let file_lock = self.peer_storage_file_lock.take(); diff --git a/comms/core/src/connection_manager/dialer.rs b/comms/core/src/connection_manager/dialer.rs index 8226eab9d5..f4ad9a2b70 100644 --- a/comms/core/src/connection_manager/dialer.rs +++ b/comms/core/src/connection_manager/dialer.rs @@ -491,7 +491,11 @@ where config: &ConnectionManagerConfig, ) -> (DialState, DialResult) { // Container for dial - let mut dial_state = Some(dial_state); + let mut dial_state = { + let mut temp_state = dial_state; + temp_state.peer_mut().addresses.reset_connection_attempts(); + Some(temp_state) + }; let mut transport = Some(transport); loop { @@ -539,7 +543,7 @@ where } } - /// Attempts to dial a peer sequentially on all addresses. + /// Attempts to dial a peer sequentially on all addresses; if connections are to be minimized only. /// Returns ownership of the given `DialState` and a success or failure result for the dial, /// or None if the dial was cancelled inflight async fn dial_peer( @@ -552,6 +556,18 @@ where Result<(NoiseSocket, Multiaddr), ConnectionManagerError>, ) { let addresses = dial_state.peer().addresses.clone().into_vec(); + if addresses.is_empty() { + let node_id_hex = dial_state.peer().node_id.clone().to_hex(); + debug!( + target: LOG_TARGET, + "No more contactable addresses for peer '{}'", + node_id_hex + ); + return ( + dial_state, + Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)), + ); + } let cancel_signal = dial_state.get_cancel_signal(); for address in addresses { debug!( @@ -598,7 +614,7 @@ where let noise_upgrade_time = timer.elapsed(); debug!( - "Dial - upgraded noise: {} on address: {} on tcp after: {}", + "Dial - upgraded noise: {} on address: {} on tcp after: {} ms", node_id.short_str(), moved_address, timer.elapsed().as_millis() diff --git a/comms/core/src/connection_manager/error.rs b/comms/core/src/connection_manager/error.rs index 80cddff827..7d2bb4af76 100644 --- a/comms/core/src/connection_manager/error.rs +++ b/comms/core/src/connection_manager/error.rs @@ -74,7 +74,7 @@ pub enum ConnectionManagerError { IdentityProtocolError(#[from] IdentityProtocolError), #[error("The dial was cancelled")] DialCancelled, - #[error("Invalid multiaddr: {0}")] + #[error("Invalid multiaddr")] InvalidMultiaddr(String), #[error("Failed to send wire format byte")] WireFormatSendFailed, @@ -82,6 +82,8 @@ pub enum ConnectionManagerError { ListenerOneshotCancelled, #[error("Peer validation error: {0}")] PeerValidationError(#[from] PeerValidatorError), + #[error("No contactable addresses for peer {0} left")] + NoContactableAddressesForPeer(String), } impl From for ConnectionManagerError { diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index 82b174b87e..ff88f17501 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -54,6 +54,7 @@ use crate::{ peer_validator::PeerValidatorConfig, protocol::{NodeNetworkInfo, ProtocolEvent, ProtocolId, Protocols}, transports::{TcpTransport, Transport}, + Minimized, PeerManager, }; @@ -67,7 +68,7 @@ const DIALER_REQUEST_CHANNEL_SIZE: usize = 32; pub enum ConnectionManagerEvent { // Peer connection PeerConnected(Box), - PeerDisconnected(ConnectionId, NodeId), + PeerDisconnected(ConnectionId, NodeId, Minimized), PeerConnectFailed(NodeId, ConnectionManagerError), PeerInboundConnectFailed(ConnectionManagerError), @@ -84,7 +85,9 @@ impl fmt::Display for ConnectionManagerEvent { use ConnectionManagerEvent::*; match self { PeerConnected(conn) => write!(f, "PeerConnected({})", conn), - PeerDisconnected(id, node_id) => write!(f, "PeerDisconnected({}, {})", id, node_id.short_str()), + PeerDisconnected(id, node_id, minimized) => { + write!(f, "PeerDisconnected({}, {}, {:?})", id, node_id.short_str(), minimized) + }, PeerConnectFailed(node_id, err) => write!(f, "PeerConnectFailed({}, {:?})", node_id.short_str(), err), PeerInboundConnectFailed(err) => write!(f, "PeerInboundConnectFailed({:?})", err), NewInboundSubstream(node_id, protocol, _) => write!( diff --git a/comms/core/src/connection_manager/peer_connection.rs b/comms/core/src/connection_manager/peer_connection.rs index c8e86f2394..4c23a7ef1f 100644 --- a/comms/core/src/connection_manager/peer_connection.rs +++ b/comms/core/src/connection_manager/peer_connection.rs @@ -58,6 +58,7 @@ use crate::{ peer_manager::{NodeId, PeerFeatures}, protocol::{ProtocolId, ProtocolNegotiation}, utils::atomic_ref_counter::AtomicRefCounter, + Minimized, }; const LOG_TARGET: &str = "comms::connection_manager::peer_connection"; @@ -118,7 +119,7 @@ pub enum PeerConnectionRequest { reply_tx: oneshot::Sender, PeerConnectionError>>, }, /// Disconnect all substreams and close the transport connection - Disconnect(bool, oneshot::Sender>), + Disconnect(bool, oneshot::Sender>, Minimized), } /// ID type for peer connections @@ -276,20 +277,20 @@ impl PeerConnection { /// Immediately disconnects the peer connection. This can only fail if the peer connection worker /// is shut down (and the peer is already disconnected) - pub async fn disconnect(&mut self) -> Result<(), PeerConnectionError> { + pub async fn disconnect(&mut self, minimized: Minimized) -> Result<(), PeerConnectionError> { let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send(PeerConnectionRequest::Disconnect(false, reply_tx)) + .send(PeerConnectionRequest::Disconnect(false, reply_tx, minimized)) .await?; reply_rx .await .map_err(|_| PeerConnectionError::InternalReplyCancelled)? } - pub(crate) async fn disconnect_silent(&mut self) -> Result<(), PeerConnectionError> { + pub(crate) async fn disconnect_silent(&mut self, minimized: Minimized) -> Result<(), PeerConnectionError> { let (reply_tx, reply_rx) = oneshot::channel(); self.request_tx - .send(PeerConnectionRequest::Disconnect(true, reply_tx)) + .send(PeerConnectionRequest::Disconnect(true, reply_tx, minimized)) .await?; reply_rx .await @@ -388,7 +389,7 @@ impl PeerConnectionActor { } } - if let Err(err) = self.disconnect(false).await { + if let Err(err) = self.disconnect(false, Minimized::No).await { warn!( target: LOG_TARGET, "[{}] Failed to politely close connection to peer '{}' because '{}'", @@ -413,7 +414,7 @@ impl PeerConnectionActor { "Reply oneshot closed when sending reply", ); }, - Disconnect(silent, reply_tx) => { + Disconnect(silent, reply_tx, minimized) => { debug!( target: LOG_TARGET, "[{}] Disconnect{}requested for {} connection to peer '{}'", @@ -422,7 +423,7 @@ impl PeerConnectionActor { self.direction, self.peer_node_id.short_str() ); - let _result = reply_tx.send(self.disconnect(silent).await); + let _result = reply_tx.send(self.disconnect(silent, minimized).await); }, } } @@ -518,7 +519,7 @@ impl PeerConnectionActor { /// # Arguments /// /// silent - true to suppress the PeerDisconnected event, false to publish the event - async fn disconnect(&mut self, silent: bool) -> Result<(), PeerConnectionError> { + async fn disconnect(&mut self, silent: bool, minimized: Minimized) -> Result<(), PeerConnectionError> { self.request_rx.close(); match self.control.close().await { Err(yamux::ConnectionError::Closed) => { @@ -536,6 +537,7 @@ impl PeerConnectionActor { self.notify_event(ConnectionManagerEvent::PeerDisconnected( self.id, self.peer_node_id.clone(), + minimized, )) .await; } diff --git a/comms/core/src/connection_manager/tests/listener_dialer.rs b/comms/core/src/connection_manager/tests/listener_dialer.rs index a9ff5d4b12..6db482ac33 100644 --- a/comms/core/src/connection_manager/tests/listener_dialer.rs +++ b/comms/core/src/connection_manager/tests/listener_dialer.rs @@ -46,6 +46,7 @@ use crate::{ protocol::ProtocolId, test_utils::{build_peer_manager, node_identity::build_node_identity}, transports::MemoryTransport, + Minimized, }; #[tokio::test] @@ -161,7 +162,7 @@ async fn smoke() { assert_eq!(buf, *b"HELLO"); } - conn1.disconnect().await.unwrap(); + conn1.disconnect(Minimized::No).await.unwrap(); shutdown.trigger(); diff --git a/comms/core/src/connectivity/config.rs b/comms/core/src/connectivity/config.rs index 2ebc47fe91..02a65b3c7d 100644 --- a/comms/core/src/connectivity/config.rs +++ b/comms/core/src/connectivity/config.rs @@ -49,6 +49,9 @@ pub struct ConnectivityConfig { /// next connection attempt. /// Default: 24 hours pub expire_peer_last_seen_duration: Duration, + /// The closest number of peer connections to maintain; connections above the threshold will be removed + /// (default: disabled) + pub maintain_n_closest_connections_only: Option, } impl Default for ConnectivityConfig { @@ -62,6 +65,7 @@ impl Default for ConnectivityConfig { max_failures_mark_offline: 1, connection_tie_break_linger: Duration::from_secs(2), expire_peer_last_seen_duration: Duration::from_secs(24 * 60 * 60), + maintain_n_closest_connections_only: None, } } } diff --git a/comms/core/src/connectivity/connection_pool.rs b/comms/core/src/connectivity/connection_pool.rs index fb8fe017c5..ee06f64d17 100644 --- a/comms/core/src/connectivity/connection_pool.rs +++ b/comms/core/src/connectivity/connection_pool.rs @@ -24,7 +24,7 @@ use std::{collections::HashMap, fmt, time::Duration}; use nom::lib::std::collections::hash_map::Entry; -use crate::{peer_manager::NodeId, PeerConnection}; +use crate::{peer_manager::NodeId, Minimized, PeerConnection}; /// Status type for connections #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -34,7 +34,7 @@ pub enum ConnectionStatus { Connected, Retrying, Failed, - Disconnected, + Disconnected(Minimized), } impl fmt::Display for ConnectionStatus { @@ -124,7 +124,7 @@ impl ConnectionPool { entry_mut.status = if conn.is_connected() { ConnectionStatus::Connected } else { - ConnectionStatus::Disconnected + ConnectionStatus::Disconnected(Minimized::No) }; entry_mut.set_connection(conn); entry_mut.status @@ -237,7 +237,10 @@ impl ConnectionPool { } pub fn count_disconnected(&self) -> usize { - self.count_filtered(|c| c.status() == ConnectionStatus::Disconnected) + self.count_filtered(|c| { + c.status() == ConnectionStatus::Disconnected(Minimized::Yes) || + c.status() == ConnectionStatus::Disconnected(Minimized::No) + }) } pub fn count_entries(&self) -> usize { diff --git a/comms/core/src/connectivity/manager.rs b/comms/core/src/connectivity/manager.rs index 1e9b7d18e3..a385ddbb8f 100644 --- a/comms/core/src/connectivity/manager.rs +++ b/comms/core/src/connectivity/manager.rs @@ -55,6 +55,7 @@ use crate::{ }, peer_manager::NodeId, utils::datetime::format_duration, + Minimized, NodeIdentity, PeerConnection, PeerManager, @@ -226,7 +227,7 @@ impl ConnectivityManagerActor { let tracing_id = tracing::Span::current().id(); let span = span!(Level::TRACE, "handle_dial_peer"); span.follows_from(tracing_id); - self.handle_dial_peer(node_id, reply_tx).instrument(span).await; + self.handle_dial_peer(node_id.clone(), reply_tx).instrument(span).await; }, SelectConnections(selection, reply) => { let _result = reply.send(self.select_connections(selection).await); @@ -255,6 +256,10 @@ impl ConnectivityManagerActor { let states = self.pool.all().into_iter().cloned().collect(); let _result = reply.send(states); }, + GetMinimizeConnectionsThreshold(reply) => { + let minimize_connections_threshold = self.config.maintain_n_closest_connections_only; + let _result = reply.send(minimize_connections_threshold); + }, BanPeer(node_id, duration, reason) => { if self.allow_list.contains(&node_id) { info!( @@ -269,7 +274,7 @@ impl ConnectivityManagerActor { }, AddPeerToAllowList(node_id) => { if !self.allow_list.contains(&node_id) { - self.allow_list.push(node_id) + self.allow_list.push(node_id.clone()); } }, RemovePeerFromAllowList(node_id) => { @@ -277,6 +282,10 @@ impl ConnectivityManagerActor { self.allow_list.remove(index); } }, + GetAllowList(reply) => { + let allow_list = self.allow_list.clone(); + let _result = reply.send(allow_list); + }, GetActiveConnections(reply) => { let _result = reply.send( self.pool @@ -286,6 +295,10 @@ impl ConnectivityManagerActor { .collect(), ); }, + GetNodeIdentity(reply) => { + let identity = self.node_identity.as_ref(); + let _result = reply.send(identity.clone()); + }, } } @@ -352,7 +365,7 @@ impl ConnectivityManagerActor { if !conn.is_connected() { continue; } - match conn.disconnect_silent().await { + match conn.disconnect_silent(Minimized::No).await { Ok(_) => { node_ids.push(conn.peer_node_id().clone()); }, @@ -369,7 +382,7 @@ impl ConnectivityManagerActor { } for node_id in node_ids { - self.publish_event(ConnectivityEvent::PeerDisconnected(node_id)); + self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, Minimized::No)); } } @@ -389,11 +402,67 @@ impl ConnectivityManagerActor { if self.config.is_connection_reaping_enabled { self.reap_inactive_connections().await; } + if let Some(threshold) = self.config.maintain_n_closest_connections_only { + self.maintain_n_closest_peer_connections_only(threshold).await; + } self.update_connectivity_status(); self.update_connectivity_metrics(); Ok(()) } + async fn maintain_n_closest_peer_connections_only(&mut self, threshold: usize) { + // Select all active peer connections (that are communication nodes) + let mut connections = match self + .select_connections(ConnectivitySelection::closest_to( + self.node_identity.node_id().clone(), + self.pool.count_connected_nodes(), + vec![], + )) + .await + { + Ok(peers) => peers, + Err(e) => { + warn!( + target: LOG_TARGET, + "Connectivity error trying to maintain {} closest peers ({:?})", + threshold, + e + ); + return; + }, + }; + let num_connections = connections.len(); + + // Remove peers that are on the allow list + connections.retain(|conn| !self.allow_list.contains(conn.peer_node_id())); + debug!( + target: LOG_TARGET, + "minimize_connections: Filtered peers: {}, Handles: {}", + connections.len(), + num_connections, + ); + + // Disconnect all remaining peers above the threshold + for conn in connections.iter_mut().skip(threshold) { + debug!( + target: LOG_TARGET, + "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers", + conn.peer_node_id(), + threshold + ); + if let Err(err) = conn.disconnect(Minimized::Yes).await { + // Already disconnected + debug!( + target: LOG_TARGET, + "Peer '{}' already disconnected. Error: {:?}", + conn.peer_node_id().short_str(), + err + ); + } + self.pool.remove(conn.peer_node_id()); + } + } + async fn reap_inactive_connections(&mut self) { let excess_connections = self .pool @@ -418,7 +487,7 @@ impl ConnectivityManagerActor { conn.peer_node_id().short_str(), conn.handle_count() ); - if let Err(err) = conn.disconnect().await { + if let Err(err) = conn.disconnect(Minimized::Yes).await { // Already disconnected debug!( target: LOG_TARGET, @@ -432,7 +501,9 @@ impl ConnectivityManagerActor { fn clean_connection_pool(&mut self) { let cleared_states = self.pool.filter_drain(|state| { - state.status() == ConnectionStatus::Failed || state.status() == ConnectionStatus::Disconnected + state.status() == ConnectionStatus::Failed || + state.status() == ConnectionStatus::Disconnected(Minimized::Yes) || + state.status() == ConnectionStatus::Disconnected(Minimized::No) }); if !cleared_states.is_empty() { @@ -560,7 +631,7 @@ impl ConnectivityManagerActor { TieBreak::UseNew | TieBreak::None => {}, } }, - PeerDisconnected(id, node_id) => { + PeerDisconnected(id, node_id, _minimized) => { if let Some(conn) = self.pool.get_connection(node_id) { if conn.id() != *id { debug!( @@ -586,7 +657,7 @@ impl ConnectivityManagerActor { } let (node_id, mut new_status, connection) = match event { - PeerDisconnected(_, node_id) => (node_id, ConnectionStatus::Disconnected, None), + PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None), PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())), PeerConnectFailed(node_id, ConnectionManagerError::DialCancelled) => { @@ -632,7 +703,7 @@ impl ConnectivityManagerActor { use ConnectionStatus::{Connected, Disconnected, Failed}; match (old_status, new_status) { - (_, Connected) => match self.pool.get_connection(&node_id).cloned() { + (_, Connected) => match self.pool.get_connection_mut(&node_id).cloned() { Some(conn) => { self.mark_connection_success(conn.peer_node_id().clone()); self.publish_event(ConnectivityEvent::PeerConnected(conn.into())); @@ -642,11 +713,14 @@ impl ConnectivityManagerActor { ConnectionPool::get_connection is Some" ), }, - (Connected, Disconnected) => { - self.publish_event(ConnectivityEvent::PeerDisconnected(node_id)); + (Connected, Disconnected(..)) => { + self.publish_event(ConnectivityEvent::PeerDisconnected(node_id, match new_status { + ConnectionStatus::Disconnected(reason) => reason, + _ => Minimized::No, + })); }, // Was not connected so don't broadcast event - (_, Disconnected) => {}, + (_, Disconnected(..)) => {}, (_, Failed) => { self.publish_event(ConnectivityEvent::PeerConnectFailed(node_id)); }, @@ -692,7 +766,7 @@ impl ConnectivityManagerActor { existing_conn.direction(), ); - let _result = existing_conn.disconnect_silent().await; + let _result = existing_conn.disconnect_silent(Minimized::Yes).await; self.pool.remove(existing_conn.peer_node_id()); TieBreak::UseNew } else { @@ -708,7 +782,7 @@ impl ConnectivityManagerActor { existing_conn.direction(), ); - let _result = new_conn.clone().disconnect_silent().await; + let _result = new_conn.clone().disconnect_silent(Minimized::Yes).await; TieBreak::KeepExisting } }, @@ -887,7 +961,7 @@ impl ConnectivityManagerActor { self.publish_event(ConnectivityEvent::PeerBanned(node_id.clone())); if let Some(conn) = self.pool.get_connection_mut(node_id) { - conn.disconnect().await?; + conn.disconnect(Minimized::Yes).await?; let status = self.pool.get_connection_status(node_id); debug!( target: LOG_TARGET, @@ -903,7 +977,7 @@ impl ConnectivityManagerActor { let status = self.pool.get_connection_status(node_id); if matches!( status, - ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected + ConnectionStatus::NotConnected | ConnectionStatus::Failed | ConnectionStatus::Disconnected(_) ) { to_remove.push(node_id.clone()); } diff --git a/comms/core/src/connectivity/requester.rs b/comms/core/src/connectivity/requester.rs index b2eff5f35e..4b5bbf34c1 100644 --- a/comms/core/src/connectivity/requester.rs +++ b/comms/core/src/connectivity/requester.rs @@ -41,6 +41,8 @@ use super::{ use crate::{ connection_manager::ConnectionManagerError, peer_manager::{NodeId, Peer}, + Minimized, + NodeIdentity, PeerConnection, }; @@ -54,7 +56,7 @@ pub type ConnectivityEventTx = broadcast::Sender; /// Node connectivity events emitted by the ConnectivityManager. #[derive(Debug, Clone)] pub enum ConnectivityEvent { - PeerDisconnected(NodeId), + PeerDisconnected(NodeId, Minimized), PeerConnected(Box), PeerConnectFailed(NodeId), PeerBanned(NodeId), @@ -69,7 +71,7 @@ impl fmt::Display for ConnectivityEvent { #[allow(clippy::enum_glob_use)] use ConnectivityEvent::*; match self { - PeerDisconnected(node_id) => write!(f, "PeerDisconnected({})", node_id), + PeerDisconnected(node_id, minimized) => write!(f, "PeerDisconnected({}, {:?})", node_id, minimized), PeerConnected(node_id) => write!(f, "PeerConnected({})", node_id), PeerConnectFailed(node_id) => write!(f, "PeerConnectFailed({})", node_id), PeerBanned(node_id) => write!(f, "PeerBanned({})", node_id), @@ -96,11 +98,14 @@ pub enum ConnectivityRequest { ), GetConnection(NodeId, oneshot::Sender>), GetAllConnectionStates(oneshot::Sender>), + GetMinimizeConnectionsThreshold(oneshot::Sender>), GetActiveConnections(oneshot::Sender>), BanPeer(NodeId, Duration, String), AddPeerToAllowList(NodeId), RemovePeerFromAllowList(NodeId), + GetAllowList(oneshot::Sender>), GetPeerStats(NodeId, oneshot::Sender>), + GetNodeIdentity(oneshot::Sender), } /// Handle to make requests and read events from the ConnectivityManager actor. @@ -233,6 +238,16 @@ impl ConnectivityRequester { reply_rx.await.map_err(|_| ConnectivityError::ActorResponseCancelled) } + /// Get the optional minimize connections setting. + pub async fn get_minimize_connections_threshold(&mut self) -> Result, ConnectivityError> { + let (reply_tx, reply_rx) = oneshot::channel(); + self.sender + .send(ConnectivityRequest::GetMinimizeConnectionsThreshold(reply_tx)) + .await + .map_err(|_| ConnectivityError::ActorDisconnected)?; + reply_rx.await.map_err(|_| ConnectivityError::ActorResponseCancelled) + } + /// Get all currently connection [PeerConnection](crate::PeerConnection]s. pub async fn get_active_connections(&mut self) -> Result, ConnectivityError> { let (reply_tx, reply_rx) = oneshot::channel(); @@ -272,6 +287,26 @@ impl ConnectivityRequester { Ok(()) } + /// Retrieve self's allow list. + pub async fn get_allow_list(&mut self) -> Result, ConnectivityError> { + let (reply_tx, reply_rx) = oneshot::channel(); + self.sender + .send(ConnectivityRequest::GetAllowList(reply_tx)) + .await + .map_err(|_| ConnectivityError::ActorDisconnected)?; + reply_rx.await.map_err(|_| ConnectivityError::ActorResponseCancelled) + } + + /// Retrieve self's node identity. + pub async fn get_node_identity(&mut self) -> Result { + let (reply_tx, reply_rx) = oneshot::channel(); + self.sender + .send(ConnectivityRequest::GetNodeIdentity(reply_tx)) + .await + .map_err(|_| ConnectivityError::ActorDisconnected)?; + reply_rx.await.map_err(|_| ConnectivityError::ActorResponseCancelled) + } + /// Removes a peer from an allow list that prevents it from being banned. pub async fn remove_peer_from_allow_list(&mut self, node_id: NodeId) -> Result<(), ConnectivityError> { self.sender diff --git a/comms/core/src/connectivity/test.rs b/comms/core/src/connectivity/test.rs index ef5dd65d9f..1a0723fa37 100644 --- a/comms/core/src/connectivity/test.rs +++ b/comms/core/src/connectivity/test.rs @@ -43,6 +43,7 @@ use crate::{ mocks::{create_connection_manager_mock, create_peer_connection_mock_pair, ConnectionManagerMockState}, node_identity::{build_many_node_identities, build_node_identity}, }, + Minimized, NodeIdentity, PeerManager, }; @@ -203,6 +204,7 @@ async fn online_then_offline_then_online() { cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } @@ -224,6 +226,7 @@ async fn online_then_offline_then_online() { cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } @@ -420,10 +423,11 @@ async fn pool_management() { if conn != important_connection { assert_eq!(conn.handle_count(), 2); // The peer connection mock does not "automatically" publish event to connectivity manager - conn.disconnect().await.unwrap(); + conn.disconnect(Minimized::No).await.unwrap(); cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( conn.id(), conn.peer_node_id().clone(), + Minimized::No, )); } } @@ -432,7 +436,7 @@ async fn pool_management() { let events = collect_try_recv!(event_stream, take = 9, timeout = Duration::from_secs(10)); for event in events { - unpack_enum!(ConnectivityEvent::PeerDisconnected(_) = event); + unpack_enum!(ConnectivityEvent::PeerDisconnected(..) = event); } assert_eq!(important_connection.handle_count(), 2); @@ -440,15 +444,16 @@ async fn pool_management() { let conns = connectivity.get_active_connections().await.unwrap(); assert_eq!(conns.len(), 1); - important_connection.disconnect().await.unwrap(); + important_connection.disconnect(Minimized::No).await.unwrap(); cm_mock_state.publish_event(ConnectionManagerEvent::PeerDisconnected( important_connection.id(), important_connection.peer_node_id().clone(), + Minimized::No, )); drop(important_connection); let mut events = collect_try_recv!(event_stream, take = 1, timeout = Duration::from_secs(10)); - unpack_enum!(ConnectivityEvent::PeerDisconnected(_) = events.remove(0)); + unpack_enum!(ConnectivityEvent::PeerDisconnected(..) = events.remove(0)); let conns = connectivity.get_active_connections().await.unwrap(); assert!(conns.is_empty()); } diff --git a/comms/core/src/lib.rs b/comms/core/src/lib.rs index b9c493a09a..face0f13c5 100644 --- a/comms/core/src/lib.rs +++ b/comms/core/src/lib.rs @@ -64,3 +64,10 @@ pub use async_trait::async_trait; pub use bytes::{Buf, BufMut, Bytes, BytesMut}; #[cfg(feature = "rpc")] pub use tower::make::MakeService; + +/// Was the connection closed due to minimize_connections +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Minimized { + Yes, + No, +} diff --git a/comms/core/src/net_address/multiaddr_with_stats.rs b/comms/core/src/net_address/multiaddr_with_stats.rs index 29e1fcfb96..fc5d3345e5 100644 --- a/comms/core/src/net_address/multiaddr_with_stats.rs +++ b/comms/core/src/net_address/multiaddr_with_stats.rs @@ -183,6 +183,7 @@ impl MultiaddrWithStats { /// Reset the connection attempts on this net address for a later session of retries pub fn reset_connection_attempts(&mut self) { self.connection_attempts = 0; + self.last_failed_reason = None; } /// Mark that a connection could not be established with this net address diff --git a/comms/core/src/net_address/mutliaddresses_with_stats.rs b/comms/core/src/net_address/mutliaddresses_with_stats.rs index 8b9627e00c..6aad909315 100644 --- a/comms/core/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/core/src/net_address/mutliaddresses_with_stats.rs @@ -467,9 +467,15 @@ mod test { assert_eq!(net_addresses.addresses[0].connection_attempts(), 2); assert_eq!(net_addresses.addresses[1].connection_attempts(), 1); assert_eq!(net_addresses.addresses[2].connection_attempts(), 1); + assert!(net_addresses.addresses[0].last_failed_reason().is_some()); + assert!(net_addresses.addresses[1].last_failed_reason().is_some()); + assert!(net_addresses.addresses[2].last_failed_reason().is_some()); net_addresses.reset_connection_attempts(); assert_eq!(net_addresses.addresses[0].connection_attempts(), 0); assert_eq!(net_addresses.addresses[1].connection_attempts(), 0); assert_eq!(net_addresses.addresses[2].connection_attempts(), 0); + assert!(net_addresses.addresses[0].last_failed_reason().is_none()); + assert!(net_addresses.addresses[1].last_failed_reason().is_none()); + assert!(net_addresses.addresses[2].last_failed_reason().is_none()); } } diff --git a/comms/core/src/peer_manager/peer.rs b/comms/core/src/peer_manager/peer.rs index f4786d57ce..657722deb4 100644 --- a/comms/core/src/peer_manager/peer.rs +++ b/comms/core/src/peer_manager/peer.rs @@ -208,6 +208,11 @@ impl Peer { self.addresses.last_seen() } + /// Provides info about the failure status of all addresses + pub fn all_addresses_failed(&self) -> bool { + self.addresses.iter().all(|a| a.last_failed_reason().is_some()) + } + /// Provides that length of time since the last successful interaction with the peer pub fn last_seen_since(&self) -> Option { self.last_seen() diff --git a/comms/core/src/peer_manager/peer_query.rs b/comms/core/src/peer_manager/peer_query.rs index 71f56b3fd7..fe049e7e37 100644 --- a/comms/core/src/peer_manager/peer_query.rs +++ b/comms/core/src/peer_manager/peer_query.rs @@ -246,7 +246,7 @@ mod test { #[test] fn limit_query() { - // Create 20 peers were the 1st and last one is bad + // Create some good peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -262,11 +262,7 @@ mod test { #[test] fn select_where_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -292,11 +288,7 @@ mod test { #[test] fn select_where_limit_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; @@ -333,11 +325,7 @@ mod test { #[test] fn sort_by_query() { - // Create peer manager with random peers - let mut sample_peers = Vec::new(); - // Create 20 peers were the 1st and last one is bad - let _rng = rand::rngs::OsRng; - sample_peers.push(create_test_peer(true)); + // Create some good and bad peers let db = HashmapDatabase::new(); let mut id_counter = 0; diff --git a/comms/core/src/protocol/rpc/client/tests.rs b/comms/core/src/protocol/rpc/client/tests.rs index ecd484f3dd..4f8d0a1491 100644 --- a/comms/core/src/protocol/rpc/client/tests.rs +++ b/comms/core/src/protocol/rpc/client/tests.rs @@ -76,7 +76,10 @@ async fn setup(num_concurrent_sessions: usize) -> (PeerConnection, PeerConnectio mod lazy_pool { use super::*; - use crate::protocol::rpc::client::pool::{LazyPool, RpcClientPoolError}; + use crate::{ + protocol::rpc::client::pool::{LazyPool, RpcClientPoolError}, + Minimized, + }; #[tokio::test] async fn it_connects_lazily() { @@ -168,7 +171,7 @@ mod lazy_pool { let (mut peer_conn, _, _shutdown) = setup(2).await; let mut pool = LazyPool::::new(peer_conn.clone(), 2, Default::default()); let mut _conn1 = pool.get_least_used_or_connect().await.unwrap(); - peer_conn.disconnect().await.unwrap(); + peer_conn.disconnect(Minimized::No).await.unwrap(); let err = pool.get_least_used_or_connect().await.unwrap_err(); unpack_enum!(RpcClientPoolError::PeerConnectionDropped { .. } = err); } diff --git a/comms/core/src/test_utils/mocks/connection_manager.rs b/comms/core/src/test_utils/mocks/connection_manager.rs index a84a2a65f6..6a3c441632 100644 --- a/comms/core/src/test_utils/mocks/connection_manager.rs +++ b/comms/core/src/test_utils/mocks/connection_manager.rs @@ -131,7 +131,9 @@ impl ConnectionManagerMock { self.state.inc_call_count(); self.state.add_call(format!("{:?}", req)).await; match req { - DialPeer { node_id, mut reply_tx } => { + DialPeer { + node_id, mut reply_tx, .. + } => { // Send Ok(&mut conn) if we have an active connection, otherwise Err(DialConnectFailedAllAddresses) let result = self .state diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index ae81bc05db..52e88f60b9 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -293,6 +293,11 @@ impl ConnectivityManagerMock { .await; }, WaitStarted(reply) => reply.send(()).unwrap(), + GetNodeIdentity(_) => unimplemented!(), + GetAllowList(reply) => { + let _result = reply.send(vec![]); + }, + GetMinimizeConnectionsThreshold(_) => unimplemented!(), } } } diff --git a/comms/core/src/test_utils/mocks/peer_connection.rs b/comms/core/src/test_utils/mocks/peer_connection.rs index dbd67f5045..10ca0d2469 100644 --- a/comms/core/src/test_utils/mocks/peer_connection.rs +++ b/comms/core/src/test_utils/mocks/peer_connection.rs @@ -221,7 +221,7 @@ impl PeerConnectionMock { reply_tx.send(Err(err)).unwrap(); }, }, - Disconnect(_, reply_tx) => { + Disconnect(_, reply_tx, _minimized) => { self.receiver.close(); reply_tx.send(self.state.disconnect().await).unwrap(); }, diff --git a/comms/dht/examples/memory_net/utilities.rs b/comms/dht/examples/memory_net/utilities.rs index 3d07a3edd5..7812a53e8a 100644 --- a/comms/dht/examples/memory_net/utilities.rs +++ b/comms/dht/examples/memory_net/utilities.rs @@ -631,8 +631,13 @@ fn connection_manager_logger( println!("'{}' connected to '{}'", node_name, get_name(conn.peer_node_id()),); }, }, - PeerDisconnected(_, node_id) => { - println!("'{}' disconnected from '{}'", get_name(node_id), node_name); + PeerDisconnected(_, node_id, minimized) => { + println!( + "'{}' disconnected from '{}', {:?}", + get_name(node_id), + node_name, + minimized + ); }, PeerConnectFailed(node_id, err) => { println!( diff --git a/comms/dht/src/config.rs b/comms/dht/src/config.rs index 4937eaacb4..ec502499bc 100644 --- a/comms/dht/src/config.rs +++ b/comms/dht/src/config.rs @@ -50,6 +50,9 @@ pub struct DhtConfig { /// Number of random peers to include /// Default: 4 pub num_random_nodes: usize, + /// Connections above the configured number of neighbouring and random nodes will be removed + /// (default: false) + pub minimize_connections: bool, /// Send to this many peers when using the broadcast strategy /// Default: 8 pub broadcast_factor: usize, @@ -169,6 +172,7 @@ impl Default for DhtConfig { protocol_version: DhtProtocolVersion::latest(), num_neighbouring_nodes: 8, num_random_nodes: 4, + minimize_connections: false, propagation_factor: 4, broadcast_factor: 8, outbound_buffer_size: 20, diff --git a/comms/dht/src/connectivity/mod.rs b/comms/dht/src/connectivity/mod.rs index 9094396392..c0d18c7332 100644 --- a/comms/dht/src/connectivity/mod.rs +++ b/comms/dht/src/connectivity/mod.rs @@ -47,7 +47,8 @@ use tari_comms::{ ConnectivitySelection, }, multiaddr, - peer_manager::{NodeDistance, NodeId, PeerManagerError, PeerQuery, PeerQuerySortBy}, + peer_manager::{NodeDistance, NodeId, Peer, PeerManagerError, PeerQuery, PeerQuerySortBy}, + Minimized, NodeIdentity, PeerConnection, PeerManager, @@ -84,6 +85,8 @@ pub(crate) struct DhtConnectivity { neighbours: Vec, /// A randomly-selected set of peers, excluding neighbouring peers. random_pool: Vec, + /// The random pool history. + previous_random: Vec, /// Used to track when the random peer pool was last refreshed random_pool_last_refresh: Option, /// Holds references to peer connections that should be kept alive @@ -121,6 +124,7 @@ impl DhtConnectivity { dht_events, cooldown_in_effect: None, shutdown_signal, + previous_random: vec![], } } @@ -356,11 +360,12 @@ impl DhtConnectivity { .count() } - fn connected_peers_iter(&self) -> impl Iterator { + fn connected_pool_peers_iter(&self) -> impl Iterator { self.connection_handles.iter().map(|c| c.peer_node_id()) } async fn refresh_neighbour_pool(&mut self) -> Result<(), DhtConnectivityError> { + self.remove_allow_list_peers_from_pools().await?; let mut new_neighbours = self .fetch_neighbouring_peers(self.config.num_neighbouring_nodes, &[]) .await?; @@ -385,14 +390,9 @@ impl DhtConnectivity { debug!( target: LOG_TARGET, - "Adding {} neighbouring peer(s), removing {} peers", - new_neighbours.len(), - difference.len() - ); - debug!( - target: LOG_TARGET, - "Adding {} peer(s) to DHT connectivity manager: {}", + "Adding {} neighbouring peer(s), removing {} peers: {}", new_neighbours.len(), + difference.len(), new_neighbours .iter() .map(ToString::to_string) @@ -401,14 +401,17 @@ impl DhtConnectivity { ); new_neighbours.iter().cloned().for_each(|peer| { - self.insert_neighbour(peer); + self.insert_neighbour_ordered_by_distance(peer); }); + self.dial_multiple_peers(&new_neighbours).await?; - if !new_neighbours.is_empty() { - self.connectivity.request_many_dials(new_neighbours).await?; - } + Ok(()) + } - self.redial_neighbours_as_required().await?; + async fn dial_multiple_peers(&self, peers_to_dial: &[NodeId]) -> Result<(), DhtConnectivityError> { + if !peers_to_dial.is_empty() { + self.connectivity.request_many_dials(peers_to_dial.to_vec()).await?; + } Ok(()) } @@ -432,7 +435,7 @@ impl DhtConnectivity { "Redialling {} disconnected peer(s)", to_redial.len() ); - self.connectivity.request_many_dials(to_redial).await?; + self.dial_multiple_peers(&to_redial).await?; } Ok(()) @@ -451,9 +454,12 @@ impl DhtConnectivity { } async fn refresh_random_pool(&mut self) -> Result<(), DhtConnectivityError> { - let mut random_peers = self - .fetch_random_peers(self.config.num_random_nodes, &self.neighbours) - .await?; + self.remove_allow_list_peers_from_pools().await?; + let mut exclude = self.neighbours.clone(); + if self.config.minimize_connections { + exclude.extend(self.previous_random.iter().cloned()); + } + let mut random_peers = self.fetch_random_peers(self.config.num_random_nodes, &exclude).await?; if random_peers.is_empty() { info!( target: LOG_TARGET, @@ -482,18 +488,21 @@ impl DhtConnectivity { random_peers, difference ); - self.random_pool.extend(random_peers.clone()); + for peer in &random_peers { + self.insert_random_peer_ordered_by_distance(peer.clone()); + } // Drop any connection handles that removed from the random pool difference.iter().for_each(|peer| { self.remove_connection_handle(peer); }); - self.connectivity.request_many_dials(random_peers).await?; + self.dial_multiple_peers(&random_peers).await?; self.random_pool_last_refresh = Some(Instant::now()); Ok(()) } async fn handle_new_peer_connected(&mut self, conn: PeerConnection) -> Result<(), DhtConnectivityError> { + self.remove_allow_list_peers_from_pools().await?; if conn.peer_features().is_client() { debug!( target: LOG_TARGET, @@ -503,10 +512,19 @@ impl DhtConnectivity { return Ok(()); } + if self.is_allow_list_peer(conn.peer_node_id()).await? { + debug!( + target: LOG_TARGET, + "Unmanaged peer '{}' connected", + conn.peer_node_id() + ); + return Ok(()); + } + if self.is_pool_peer(conn.peer_node_id()) { debug!( target: LOG_TARGET, - "Added peer {} to connection handles", + "Added pool peer '{}' to connection handles", conn.peer_node_id() ); self.insert_connection_handle(conn); @@ -523,20 +541,56 @@ impl DhtConnectivity { ); let peer_to_insert = conn.peer_node_id().clone(); - self.insert_connection_handle(conn); - if let Some(node_id) = self.insert_neighbour(peer_to_insert.clone()) { - // If we kicked a neighbour out of our neighbour pool but the random pool is not full. - // Add the neighbour to the random pool, otherwise remove the handle from the connection pool - if self.random_pool.len() < self.config.num_random_nodes { - debug!( - target: LOG_TARGET, - "Moving peer '{}' from neighbouring pool to random pool", peer_to_insert - ); - self.random_pool.push(node_id); - } else { - self.remove_connection_handle(&node_id) - } + if let Some(node_id) = self.insert_neighbour_ordered_by_distance(peer_to_insert.clone()) { + // If we kicked a neighbour out of our neighbour pool, add it to the random pool if + // it is not full or if it is closer than the furthest random peer. + debug!( + target: LOG_TARGET, + "Moving peer '{}' from neighbouring pool to random pool if not full or closer", peer_to_insert + ); + self.insert_random_peer_ordered_by_distance(node_id) } + self.insert_connection_handle(conn); + } + + Ok(()) + } + + async fn pool_peers_with_active_connections_by_distance(&self) -> Result, DhtConnectivityError> { + let query = PeerQuery::new() + .select_where(|peer| { + self.connection_handles + .iter() + .any(|conn| conn.peer_node_id() == &peer.node_id) + }) + .sort_by(PeerQuerySortBy::DistanceFrom(self.node_identity.node_id())); + let peers_by_distance = self.peer_manager.perform_query(query).await?; + debug!( + target: LOG_TARGET, + "minimize_connections: Filtered peers: {}, Handles: {}", + peers_by_distance.len(), + self.connection_handles.len(), + ); + Ok(peers_by_distance) + } + + async fn minimize_connections(&mut self) -> Result<(), DhtConnectivityError> { + // Retrieve all communication node peers with an active connection status + let mut peers_by_distance = self.pool_peers_with_active_connections_by_distance().await?; + let peer_allow_list = self.peer_allow_list().await?; + peers_by_distance.retain(|p| !peer_allow_list.contains(&p.node_id)); + + // Remove all above threshold connections + let threshold = self.config.num_neighbouring_nodes + self.config.num_random_nodes; + for peer in peers_by_distance.iter_mut().skip(threshold) { + debug!( + target: LOG_TARGET, + "minimize_connections: Disconnecting '{}' because the node is not among the {} closest peers", + peer.node_id, + threshold + ); + self.replace_pool_peer(&peer.node_id).await?; + self.remove_connection_handle(&peer.node_id); } Ok(()) @@ -562,7 +616,18 @@ impl DhtConnectivity { debug!(target: LOG_TARGET, "Connectivity event: {}", event); match event { PeerConnected(conn) => { - self.handle_new_peer_connected(*conn).await?; + self.handle_new_peer_connected(*conn.clone()).await?; + debug!( + target: LOG_TARGET, + "Peer: node_id '{}', allow_list '{}', connected '{}'", + conn.peer_node_id(), + self.is_allow_list_peer(conn.peer_node_id()).await?, + conn.is_connected(), + ); + + if self.config.minimize_connections { + self.minimize_connections().await?; + } }, PeerConnectFailed(node_id) => { self.connection_handles.retain(|c| *c.peer_node_id() != node_id); @@ -572,6 +637,7 @@ impl DhtConnectivity { "Failed to clear metrics for peer `{}`. Metric collector is shut down.", node_id ); }; + self.remove_allow_list_peers_from_pools().await?; if !self.is_pool_peer(&node_id) { debug!(target: LOG_TARGET, "{} is not managed by the DHT. Ignoring", node_id); return Ok(()); @@ -579,7 +645,13 @@ impl DhtConnectivity { self.replace_pool_peer(&node_id).await?; self.log_status(); }, - PeerDisconnected(node_id) => { + PeerDisconnected(node_id, minimized) => { + debug!( + target: LOG_TARGET, + "Peer: node_id '{}', allow_list '{}', connected 'false'", + node_id, + self.is_allow_list_peer(&node_id).await?, + ); self.connection_handles.retain(|c| *c.peer_node_id() != node_id); if self.metrics_collector.clear_metrics(node_id.clone()).await.is_err() { debug!( @@ -587,14 +659,27 @@ impl DhtConnectivity { "Failed to clear metrics for peer `{}`. Metric collector is shut down.", node_id ); }; + self.remove_allow_list_peers_from_pools().await?; if !self.is_pool_peer(&node_id) { debug!(target: LOG_TARGET, "{} is not managed by the DHT. Ignoring", node_id); return Ok(()); } + if minimized == Minimized::Yes || self.config.minimize_connections { + debug!( + target: LOG_TARGET, + "Peer '{}' was disconnected because it was minimized, will not reconnect.", + node_id + ); + // Remove from managed pool if applicable + self.replace_pool_peer(&node_id).await?; + // In case the connections was not managed, remove the connection handle + self.remove_connection_handle(&node_id); + return Ok(()); + } debug!(target: LOG_TARGET, "Pool peer {} disconnected. Redialling...", node_id); // Attempt to reestablish the lost connection to the pool peer. If reconnection fails, // it is replaced with another peer (replace_pool_peer via PeerConnectFailed) - self.connectivity.request_many_dials([node_id]).await?; + self.dial_multiple_peers(&[node_id]).await?; }, ConnectivityStateOnline(n) => { self.refresh_peer_pools().await?; @@ -621,15 +706,47 @@ impl DhtConnectivity { Ok(()) } + async fn peer_allow_list(&mut self) -> Result, DhtConnectivityError> { + Ok(self.connectivity.get_allow_list().await?) + } + + async fn all_connected_comms_nodes(&mut self) -> Result, DhtConnectivityError> { + let all_connections = self + .connectivity + .select_connections(ConnectivitySelection::closest_to( + self.node_identity.node_id().clone(), + usize::MAX, + vec![], + )) + .await?; + let comms_nodes = all_connections + .iter() + .filter(|p| p.peer_features().is_node()) + .map(|p| p.peer_node_id().clone()) + .collect(); + Ok(comms_nodes) + } + async fn replace_pool_peer(&mut self, current_peer: &NodeId) -> Result<(), DhtConnectivityError> { + self.remove_allow_list_peers_from_pools().await?; + if self.is_allow_list_peer(current_peer).await? { + debug!( + target: LOG_TARGET, + "Peer '{}' is on the allow list, ignoring replacement.", + current_peer + ); + return Ok(()); + } + if self.random_pool.contains(current_peer) { - let exclude = self.get_pool_peers(); - let pos = self - .random_pool - .iter() - .position(|n| n == current_peer) - .expect("unreachable panic"); - self.random_pool.swap_remove(pos); + let mut exclude = self.get_pool_peers(); + if self.config.minimize_connections { + exclude.extend(self.previous_random.iter().cloned()); + self.previous_random.push(current_peer.clone()); + } + + self.random_pool.retain(|n| n != current_peer); + self.remove_connection_handle(current_peer); debug!( target: LOG_TARGET, @@ -637,12 +754,8 @@ impl DhtConnectivity { ); match self.fetch_random_peers(1, &exclude).await?.pop() { Some(new_peer) => { - self.remove_connection_handle(current_peer); - if let Some(pos) = self.random_pool.iter().position(|n| n == current_peer) { - self.random_pool.swap_remove(pos); - } - self.random_pool.push(new_peer.clone()); - self.connectivity.request_many_dials([new_peer]).await?; + self.insert_random_peer_ordered_by_distance(new_peer.clone()); + self.dial_multiple_peers(&[new_peer]).await?; }, None => { debug!( @@ -658,25 +771,18 @@ impl DhtConnectivity { if self.neighbours.contains(current_peer) { let exclude = self.get_pool_peers(); - let pos = self - .neighbours - .iter() - .position(|n| n == current_peer) - .expect("unreachable panic"); - self.neighbours.remove(pos); + + self.neighbours.retain(|n| n != current_peer); + self.remove_connection_handle(current_peer); debug!( target: LOG_TARGET, "Peer '{}' in neighbour pool is offline. Adding a new peer if possible", current_peer ); match self.fetch_neighbouring_peers(1, &exclude).await?.pop() { - Some(node_id) => { - self.remove_connection_handle(current_peer); - if let Some(pos) = self.neighbours.iter().position(|n| n == current_peer) { - self.neighbours.remove(pos); - } - self.insert_neighbour(node_id.clone()); - self.connectivity.request_many_dials([node_id]).await?; + Some(new_peer) => { + self.insert_neighbour_ordered_by_distance(new_peer.clone()); + self.dial_multiple_peers(&[new_peer]).await?; }, None => { info!( @@ -690,32 +796,68 @@ impl DhtConnectivity { } } + self.log_status(); + Ok(()) } - fn insert_neighbour(&mut self, node_id: NodeId) -> Option { + fn insert_neighbour_ordered_by_distance(&mut self, node_id: NodeId) -> Option { let dist = node_id.distance(self.node_identity.node_id()); let pos = self .neighbours .iter() .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist); - let removed_peer = if self.neighbours.len() + 1 > self.config.num_neighbouring_nodes { + match pos { + Some(idx) => { + self.neighbours.insert(idx, node_id); + }, + None => { + self.neighbours.push(node_id); + }, + } + + if self.neighbours.len() > self.config.num_neighbouring_nodes { self.neighbours.pop() } else { None - }; + } + } + + fn insert_random_peer_ordered_by_distance(&mut self, node_id: NodeId) { + let dist = node_id.distance(self.node_identity.node_id()); + let pos = self + .random_pool + .iter() + .position(|node_id| node_id.distance(self.node_identity.node_id()) > dist); match pos { Some(idx) => { - self.neighbours.insert(idx, node_id); + self.random_pool.insert(idx, node_id); }, None => { - self.neighbours.push(node_id); + self.random_pool.push(node_id); }, } - removed_peer + if self.random_pool.len() > self.config.num_random_nodes { + if let Some(removed_peer) = self.random_pool.pop() { + if self.config.minimize_connections { + self.previous_random.push(removed_peer.clone()); + } + } + } + } + + async fn remove_allow_list_peers_from_pools(&mut self) -> Result<(), DhtConnectivityError> { + let allow_list = self.peer_allow_list().await?; + self.neighbours.retain(|n| !allow_list.contains(n)); + self.random_pool.retain(|n| !allow_list.contains(n)); + Ok(()) + } + + async fn is_allow_list_peer(&mut self, node_id: &NodeId) -> Result { + Ok(self.peer_allow_list().await?.contains(node_id)) } fn is_pool_peer(&self, node_id: &NodeId) -> bool { @@ -742,18 +884,37 @@ impl DhtConnectivity { .expect("already checked") } + async fn max_neighbour_distance_all_conncetions(&mut self) -> Result { + let mut distance = self.get_neighbour_max_distance(); + if self.config.minimize_connections { + let all_connected_comms_nodes = self.all_connected_comms_nodes().await?; + if let Some(node_id) = all_connected_comms_nodes.get(self.config.num_neighbouring_nodes - 1) { + let node_distance = self.node_identity.node_id().distance(node_id); + if node_distance < distance { + distance = node_distance; + } + } + } + Ok(distance) + } + async fn fetch_neighbouring_peers( - &self, + &mut self, n: usize, excluded: &[NodeId], ) -> Result, DhtConnectivityError> { + let peer_allow_list = self.peer_allow_list().await?; + let neighbour_distance = self.max_neighbour_distance_all_conncetions().await?; let peer_manager = &self.peer_manager; - let node_id = self.node_identity.node_id(); - let connected = self.connected_peers_iter().collect::>(); + let self_node_id = self.node_identity.node_id(); + let connected_pool_peers = self.connected_pool_peers_iter().collect::>(); + + let mut excluded = excluded.to_vec(); + excluded.extend(peer_allow_list); // Fetch to all n nearest neighbour Communication Nodes // which are eligible for connection. - // Currently that means: + // Currently, that means: // - The peer isn't banned, // - it has the required features // - it didn't recently fail to connect, and @@ -769,7 +930,7 @@ impl DhtConnectivity { return false; } - if connected.contains(&&peer.node_id) { + if connected_pool_peers.contains(&&peer.node_id) { return false; } @@ -781,7 +942,7 @@ impl DhtConnectivity { return false; } // we have tried to connect to this peer, and we have never made a successful attempt at connection - if peer.last_connect_attempt().is_some() && peer.last_seen().is_none() { + if peer.all_addresses_failed() { return false; } @@ -790,9 +951,16 @@ impl DhtConnectivity { return false; } + if self.config.minimize_connections { + // If the peer is not closer, return false + if self_node_id.distance(&peer.node_id) >= neighbour_distance { + return false; + } + } + true }) - .sort_by(PeerQuerySortBy::DistanceFrom(node_id)) + .sort_by(PeerQuerySortBy::DistanceFrom(self_node_id)) .limit(n); let peers = peer_manager.perform_query(query).await?; @@ -800,8 +968,10 @@ impl DhtConnectivity { Ok(peers.into_iter().map(|p| p.node_id).take(n).collect()) } - async fn fetch_random_peers(&self, n: usize, excluded: &[NodeId]) -> Result, DhtConnectivityError> { - let peers = self.peer_manager.random_peers(n, excluded).await?; + async fn fetch_random_peers(&mut self, n: usize, excluded: &[NodeId]) -> Result, DhtConnectivityError> { + let mut excluded = excluded.to_vec(); + excluded.extend(self.peer_allow_list().await?); + let peers = self.peer_manager.random_peers(n, &excluded).await?; Ok(peers.into_iter().map(|p| p.node_id).collect()) } diff --git a/comms/dht/src/connectivity/test.rs b/comms/dht/src/connectivity/test.rs index 3120aa075b..376f05929f 100644 --- a/comms/dht/src/connectivity/test.rs +++ b/comms/dht/src/connectivity/test.rs @@ -31,6 +31,7 @@ use tari_comms::{ mocks::{create_connectivity_mock, create_dummy_peer_connection, ConnectivityManagerMockState}, node_identity::ordered_node_identities_by_distance, }, + Minimized, NodeIdentity, PeerManager, }; @@ -192,6 +193,7 @@ async fn replace_peer_when_peer_goes_offline() { connectivity.publish_event(ConnectivityEvent::PeerDisconnected( node_identities[4].node_id().clone(), + Minimized::No, )); async_assert!( @@ -242,12 +244,16 @@ async fn insert_neighbour() { // First 8 inserts should not remove a peer (because num_neighbouring_nodes == 8) for ni in shuffled.iter().take(8) { - assert!(dht_connectivity.insert_neighbour(ni.node_id().clone()).is_none()); + assert!(dht_connectivity + .insert_neighbour_ordered_by_distance(ni.node_id().clone()) + .is_none()); } // Next 2 inserts will always remove a node id for ni in shuffled.iter().skip(8) { - assert!(dht_connectivity.insert_neighbour(ni.node_id().clone()).is_some()) + assert!(dht_connectivity + .insert_neighbour_ordered_by_distance(ni.node_id().clone()) + .is_some()) } // Check the first 7 node ids match our neighbours, the last element depends on distance and ordering of inserts diff --git a/comms/dht/src/network_discovery/state_machine.rs b/comms/dht/src/network_discovery/state_machine.rs index 78281d56b9..700b62291b 100644 --- a/comms/dht/src/network_discovery/state_machine.rs +++ b/comms/dht/src/network_discovery/state_machine.rs @@ -308,7 +308,7 @@ impl Display for DiscoveryParams { let _ = write!(peers, "{p}, "); peers }), - self.num_peers_to_request + self.num_peers_to_request, ) } } diff --git a/comms/dht/src/store_forward/error.rs b/comms/dht/src/store_forward/error.rs index d7f47ff225..3c57a236fa 100644 --- a/comms/dht/src/store_forward/error.rs +++ b/comms/dht/src/store_forward/error.rs @@ -24,6 +24,7 @@ use std::time::Duration; use prost::DecodeError; use tari_comms::{ + connectivity::ConnectivityError, message::MessageError, peer_manager::{NodeId, PeerManagerError}, }; @@ -89,4 +90,6 @@ pub enum StoreAndForwardError { StoredAtWasInFuture, #[error("Invariant error (POSSIBLE BUG): {0}")] InvariantError(String), + #[error("ConnectivityError: {0}")] + ConnectivityError(#[from] ConnectivityError), } diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index e630d84c41..6afe6bcc62 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -632,7 +632,8 @@ where S: Service Err(err @ StoreAndForwardError::RequesterChannelClosed) | Err(err @ StoreAndForwardError::DhtOutboundError(_)) | Err(err @ StoreAndForwardError::StorageError(_)) | - Err(err @ StoreAndForwardError::PeerManagerError(_)) => { + Err(err @ StoreAndForwardError::PeerManagerError(_)) | + Err(err @ StoreAndForwardError::ConnectivityError(_)) => { error!(target: LOG_TARGET, "Internal error: {}", err); None }, diff --git a/comms/dht/src/store_forward/service.rs b/comms/dht/src/store_forward/service.rs index f3b09b3643..ae0f7abaf9 100644 --- a/comms/dht/src/store_forward/service.rs +++ b/comms/dht/src/store_forward/service.rs @@ -26,7 +26,7 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use log::*; use tari_comms::{ connectivity::{ConnectivityEvent, ConnectivityEventRx, ConnectivityRequester}, - peer_manager::{NodeId, PeerFeatures}, + peer_manager::{NodeDistance, NodeId, PeerFeatures}, types::CommsPublicKey, PeerManager, }; @@ -203,6 +203,7 @@ pub struct StoreAndForwardService { database: StoreAndForwardDatabase, peer_manager: Arc, connection_events: ConnectivityEventRx, + connectivity: ConnectivityRequester, outbound_requester: OutboundMessageRequester, request_rx: mpsc::Receiver, shutdown_signal: ShutdownSignal, @@ -211,6 +212,8 @@ pub struct StoreAndForwardService { saf_response_signal_rx: mpsc::Receiver<()>, event_publisher: DhtEventSender, local_state: SafLocalState, + ignore_saf_threshold: Option, + node_id: NodeId, } impl StoreAndForwardService { @@ -234,6 +237,7 @@ impl StoreAndForwardService { dht_requester, request_rx, connection_events: connectivity.get_event_subscription(), + connectivity: connectivity.clone(), outbound_requester, shutdown_signal, num_received_saf_responses: Some(0), @@ -241,6 +245,8 @@ impl StoreAndForwardService { saf_response_signal_rx, event_publisher, local_state: Default::default(), + ignore_saf_threshold: None, + node_id: Default::default(), } } @@ -250,6 +256,21 @@ impl StoreAndForwardService { } async fn run(mut self) { + self.ignore_saf_threshold = self + .connectivity + .get_minimize_connections_threshold() + .await + .unwrap_or_else(|err| { + warn!(target: LOG_TARGET, "Failed to get the minimize connections threshold: {:?}", err); + None + }); + self.node_id = self.connectivity.get_node_identity().await.map_or_else( + |err| { + warn!(target: LOG_TARGET, "Failed to get the node identity: {:?}", err); + NodeId::default() + }, + |node_identity| node_identity.node_id().clone(), + ); let mut cleanup_ticker = time::interval(CLEANUP_INTERVAL); cleanup_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); @@ -370,9 +391,54 @@ impl StoreAndForwardService { return Ok(()); } - // Whenever we connect to a peer, request SAF messages - let features = self.peer_manager.get_peer_features(conn.peer_node_id()).await?; - if features.contains(PeerFeatures::DHT_STORE_FORWARD) { + // Whenever we connect to a peer, request SAF messages based on the peer's features + // and the current connectivity state + let request_saf = { + let features = self.peer_manager.get_peer_features(conn.peer_node_id()).await?; + if !features.contains(PeerFeatures::DHT_STORE_FORWARD) { + false + } else if let Some(threshold) = self.ignore_saf_threshold { + let active_connections = self.connectivity.get_active_connections().await?; + let mut active_connections_with_distance = active_connections + .into_iter() + .map(|c| { + let distance = self.node_id.distance(&c.peer_node_id().clone()); + (c, distance) + }) + .collect::>(); + active_connections_with_distance.sort_by(|a, b| a.1.cmp(&b.1)); + // TODO: Hansie remove this assert! + for i in 1..active_connections_with_distance.len() - 1 { + assert!(active_connections_with_distance[i].1 >= active_connections_with_distance[i - 1].1); + } + let saf_ignore_distance = active_connections_with_distance + .get(threshold - 1) + .map(|(_, distance)| distance) + .cloned() + .unwrap_or_else(|| { + active_connections_with_distance + .last() + .map(|(_, distance)| distance) + .cloned() + .unwrap_or(NodeDistance::max_distance()) + }); + + let decision = self.node_id.distance(&conn.peer_node_id().clone()) <= saf_ignore_distance; + trace!( + target: LOG_TARGET, + "Ignore SAF decision for peer: '{}', this node id: '{}', is closer: {}, will request SAF: {}, closer peers threshold: {}", + conn.peer_node_id().short_str(), + self.node_id.short_str(), + decision, + decision, + threshold + ); + decision + } else { + true + } + }; + if request_saf { info!( target: LOG_TARGET, "Connected peer '{}' is a SAF node. Requesting stored messages.",