Skip to content

Commit

Permalink
feat: improve node and wallet connection times (#6284)
Browse files Browse the repository at this point in the history
Description
---
- Improved peer db stats to only update if the specific public address
was successfully connected to. This alleviates issues where the node
tries to connect to a peer's public address with a quality score > 0
where it should be 0.
- Update stats with ping-pong.
- Added a peer sync delay config option to let a preferred connection
get a head start in trying to connect.
- Discarded all peer identity claims without valid signatures
immediately when they are received.
- Reduced the minimum number of sync peers a wallet wants to connect to
from 50 to 8.

Motivation and Context
---
Peer db connection stats were not reflected correctly.
Wallet connection/re-connection times were slow.

How Has This Been Tested?
---
System-level testing

**Wallet re-connection times** with many pre-connected peers.


![image](https://github.com/tari-project/tari/assets/39146854/50effe9b-f554-4e48-8b84-72ea4152dd3b)


What process can a PR reviewer use to test or verify this change?
---
Code walk-through

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
hansieodendaal committed Apr 19, 2024
1 parent b5bda7c commit fc55bf9
Show file tree
Hide file tree
Showing 40 changed files with 382 additions and 163 deletions.
6 changes: 5 additions & 1 deletion applications/minotari_app_grpc/proto/network.proto
Expand Up @@ -70,7 +70,11 @@ message Address{
bytes address =1;
string last_seen = 2;
uint32 connection_attempts = 3;
uint64 avg_latency = 5;
AverageLatency avg_latency = 5;
}

message AverageLatency {
uint64 latency = 1;
}

message ListConnectedPeersResponse {
Expand Down
5 changes: 4 additions & 1 deletion applications/minotari_app_grpc/src/conversions/peer.rs
Expand Up @@ -72,7 +72,10 @@ impl From<MultiaddrWithStats> for grpc::Address {
None => String::new(),
};
let connection_attempts = address_with_stats.connection_attempts();
let avg_latency = address_with_stats.avg_latency().as_secs();
let avg_latency = address_with_stats
.avg_latency()
.map(|val| grpc::AverageLatency { latency: val.as_secs() });

Self {
address,
last_seen,
Expand Down
2 changes: 0 additions & 2 deletions applications/minotari_node/src/bootstrap.rs
Expand Up @@ -24,7 +24,6 @@ use std::{
cmp,
str::FromStr,
sync::{Arc, RwLock},
time::Duration,
};

use log::*;
Expand Down Expand Up @@ -122,7 +121,6 @@ where B: BlockchainBackend + 'static
let tor_identity = load_from_json(&base_node_config.tor_identity_file)
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;
p2p_config.transport.tor.identity = tor_identity;
p2p_config.listener_liveness_check_interval = Some(Duration::from_secs(15));

let mut handles = StackBuilder::new(self.interrupt_signal)
.add_initializer(P2pInitializer::new(
Expand Down
Expand Up @@ -88,7 +88,7 @@ impl CommandContext {
println!("Addresses:");
peer.addresses.addresses().iter().for_each(|a| {
println!(
"- {} Score: {} - Source: {} Latency: {:?} - Last Seen: {} - Last Failure:{}",
"- {} Score: {:?} - Source: {} Latency: {:?} - Last Seen: {} - Last Failure:{}",
a.address(),
a.quality_score(),
a.source(),
Expand Down
10 changes: 5 additions & 5 deletions applications/minotari_node/src/commands/command/status.rs
Expand Up @@ -27,7 +27,7 @@ use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use clap::Parser;
use minotari_app_utilities::consts;
use tari_comms::connection_manager::LivenessStatus;
use tari_comms::connection_manager::SelfLivenessStatus;
use tokio::time;

use super::{CommandContext, HandleCommand};
Expand Down Expand Up @@ -127,14 +127,14 @@ impl CommandContext {
);

match self.comms.liveness_status() {
LivenessStatus::Disabled => {},
LivenessStatus::Checking => {
SelfLivenessStatus::Disabled => {},
SelfLivenessStatus::Checking => {
status_line.add("⏳️️");
},
LivenessStatus::Unreachable => {
SelfLivenessStatus::Unreachable => {
status_line.add("️🔌");
},
LivenessStatus::Live(latency) => {
SelfLivenessStatus::Live(latency) => {
status_line.add(format!("⚡️ {:.2?}", latency));
},
}
Expand Down
1 change: 1 addition & 0 deletions base_layer/contacts/src/chat_client/src/config.rs
Expand Up @@ -168,6 +168,7 @@ impl ChatClientConfig {
database_url: DbConnectionUrl::file("data/chat_client/dht.sqlite"),
network_discovery: NetworkDiscoveryConfig {
enabled: true,
initial_peer_sync_delay: None,
..NetworkDiscoveryConfig::default()
},
saf: SafConfig {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/contacts/tests/contacts_service.rs
Expand Up @@ -96,7 +96,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
user_agent: "tari/test-contacts-service".to_string(),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
1 change: 1 addition & 0 deletions base_layer/core/tests/helpers/nodes.rs
Expand Up @@ -357,6 +357,7 @@ async fn setup_base_node_services(
let handles = StackBuilder::new(shutdown.to_signal())
.add_initializer(RegisterHandle::new(dht))
.add_initializer(RegisterHandle::new(comms.connectivity()))
.add_initializer(RegisterHandle::new(comms.peer_manager()))
.add_initializer(LivenessInitializer::new(
liveness_service_config,
Arc::clone(&subscription_factory),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/config.rs
Expand Up @@ -112,7 +112,7 @@ pub struct P2pConfig {
pub listener_liveness_max_sessions: usize,
/// If Some, enables periodic socket-level liveness checks
#[serde(with = "serializers::optional_seconds")]
pub listener_liveness_check_interval: Option<Duration>,
pub listener_self_liveness_check_interval: Option<Duration>,
/// CIDR for addresses allowed to enter into liveness check mode on the listener.
pub listener_liveness_allowlist_cidrs: StringList,
/// User agent string for this node
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Default for P2pConfig {
},
allow_test_addresses: false,
listener_liveness_max_sessions: 0,
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
listener_liveness_allowlist_cidrs: StringList::default(),
user_agent: String::new(),
auxiliary_tcp_listener_address: None,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/initialization.rs
Expand Up @@ -559,7 +559,7 @@ impl ServiceInitializer for P2pInitializer {
network_byte: self.network.as_byte(),
user_agent: config.user_agent.clone(),
})
.set_liveness_check(config.listener_liveness_check_interval);
.set_self_liveness_check(config.listener_self_liveness_check_interval);

if config.allow_test_addresses || config.dht.peer_validator_config.allow_test_addresses {
// The default is false, so ensure that both settings are true in this case
Expand Down
9 changes: 8 additions & 1 deletion base_layer/p2p/src/services/liveness/error.rs
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_comms::{connectivity::ConnectivityError, message::MessageError, PeerConnectionError};
use tari_comms::{
connectivity::ConnectivityError,
message::MessageError,
peer_manager::PeerManagerError,
PeerConnectionError,
};
use tari_comms_dht::{outbound::DhtOutboundError, DhtActorError};
use tari_service_framework::reply_channel::TransportChannelError;
use thiserror::Error;
Expand Down Expand Up @@ -53,4 +58,6 @@ pub enum LivenessError {
NodeIdDoesNotExist,
#[error("PingPongDecodeError: {0}")]
PingPongDecodeError(#[from] prost::DecodeError),
#[error("Peer not found: `{0}`")]
PeerNotFoundError(#[from] PeerManagerError),
}
4 changes: 3 additions & 1 deletion base_layer/p2p/src/services/liveness/mod.rs
Expand Up @@ -63,7 +63,7 @@ use std::sync::Arc;

use futures::{Stream, StreamExt};
use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_comms::{connectivity::ConnectivityRequester, PeerManager};
use tari_comms_dht::Dht;
use tari_service_framework::{
async_trait,
Expand Down Expand Up @@ -136,6 +136,7 @@ impl ServiceInitializer for LivenessInitializer {
let dht = handles.expect_handle::<Dht>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
let outbound_messages = dht.outbound_requester();
let peer_manager = handles.expect_handle::<Arc<PeerManager>>();

let service = LivenessService::new(
config,
Expand All @@ -146,6 +147,7 @@ impl ServiceInitializer for LivenessInitializer {
outbound_messages,
publisher,
handles.get_shutdown_signal(),
peer_manager,
);
service.run().await;
debug!(target: LOG_TARGET, "Liveness service has shut down");
Expand Down
42 changes: 39 additions & 3 deletions base_layer/p2p/src/services/liveness/service.rs
Expand Up @@ -28,6 +28,7 @@ use tari_comms::{
connectivity::{ConnectivityRequester, ConnectivitySelection},
peer_manager::NodeId,
types::CommsPublicKey,
PeerManager,
};
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
Expand Down Expand Up @@ -64,6 +65,7 @@ pub struct LivenessService<THandleStream, TPingStream> {
event_publisher: LivenessEventSender,
shutdown_signal: ShutdownSignal,
monitored_peers: Arc<RwLock<Vec<NodeId>>>,
peer_manager: Arc<PeerManager>,
}

impl<TRequestStream, TPingStream> LivenessService<TRequestStream, TPingStream>
Expand All @@ -80,6 +82,7 @@ where
outbound_messaging: OutboundMessageRequester,
event_publisher: LivenessEventSender,
shutdown_signal: ShutdownSignal,
peer_manager: Arc<PeerManager>,
) -> Self {
Self {
request_rx: Some(request_rx),
Expand All @@ -91,6 +94,7 @@ where
shutdown_signal,
config: config.clone(),
monitored_peers: Arc::new(RwLock::new(config.monitored_peers)),
peer_manager,
}
}

Expand Down Expand Up @@ -157,8 +161,8 @@ where
inner: ping_pong_msg,
..
} = msg;
let node_id = source_peer.node_id;
let public_key = source_peer.public_key;
let node_id = source_peer.node_id.clone();
let public_key = source_peer.public_key.clone();
let message_tag = dht_header.message_tag;
let ping_pong_msg = match ping_pong_msg {
Ok(p) => p,
Expand Down Expand Up @@ -214,8 +218,14 @@ where
message_tag,
);

let pong_event = PingPongEvent::new(node_id, maybe_latency, ping_pong_msg.metadata.into());
let pong_event = PingPongEvent::new(node_id.clone(), maybe_latency, ping_pong_msg.metadata.into());
self.publish_event(LivenessEvent::ReceivedPong(Box::new(pong_event)));

if let Some(address) = source_peer.last_address_used() {
self.peer_manager
.update_peer_address_latency_and_last_seen(&public_key, &address, maybe_latency)
.await?;
}
},
}
Ok(())
Expand Down Expand Up @@ -386,6 +396,7 @@ mod test {
net_address::MultiaddressesWithStats,
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags},
test_utils::mocks::create_connectivity_mock,
types::CommsDatabase,
};
use tari_comms_dht::{
envelope::{DhtMessageHeader, DhtMessageType},
Expand All @@ -395,6 +406,8 @@ mod test {
use tari_crypto::keys::PublicKey;
use tari_service_framework::reply_channel;
use tari_shutdown::Shutdown;
use tari_storage::lmdb_store::{LMDBBuilder, LMDBConfig};
use tari_test_utils::{paths::create_temporary_data_path, random};
use tokio::{
sync::{broadcast, mpsc, oneshot},
task,
Expand All @@ -406,6 +419,24 @@ mod test {
services::liveness::{handle::LivenessHandle, state::Metadata},
};

pub fn build_peer_manager() -> Arc<PeerManager> {
let database_name = random::string(8);
let path = create_temporary_data_path();
let datastore = LMDBBuilder::new()
.set_path(path.to_str().unwrap())
.set_env_config(LMDBConfig::default())
.set_max_number_of_databases(1)
.add_database(&database_name, lmdb_zero::db::CREATE)
.build()
.unwrap();

let peer_database = datastore.get_handle(&database_name).unwrap();

PeerManager::new(CommsDatabase::new(Arc::new(peer_database)), None)
.map(Arc::new)
.unwrap()
}

#[tokio::test]
async fn get_ping_pong_count() {
let mut state = LivenessState::new();
Expand Down Expand Up @@ -436,6 +467,7 @@ mod test {
outbound_messaging,
publisher,
shutdown.to_signal(),
build_peer_manager(),
);

// Run the service
Expand Down Expand Up @@ -471,6 +503,7 @@ mod test {
outbound_messaging,
publisher,
shutdown.to_signal(),
build_peer_manager(),
);

// Run the LivenessService
Expand Down Expand Up @@ -544,6 +577,7 @@ mod test {
let (publisher, _) = broadcast::channel(200);

let shutdown = Shutdown::new();

let service = LivenessService::new(
Default::default(),
stream::empty(),
Expand All @@ -553,6 +587,7 @@ mod test {
outbound_messaging,
publisher,
shutdown.to_signal(),
build_peer_manager(),
);

task::spawn(service.run());
Expand Down Expand Up @@ -595,6 +630,7 @@ mod test {
outbound_messaging,
publisher.clone(),
shutdown.to_signal(),
build_peer_manager(),
);

task::spawn(service.run());
Expand Down
1 change: 1 addition & 0 deletions base_layer/p2p/tests/services/liveness.rs
Expand Up @@ -54,6 +54,7 @@ pub async fn setup_liveness_service(
let handles = StackBuilder::new(comms.shutdown_signal())
.add_initializer(RegisterHandle::new(dht.clone()))
.add_initializer(RegisterHandle::new(comms.connectivity()))
.add_initializer(RegisterHandle::new(comms.peer_manager()))
.add_initializer(LivenessInitializer::new(
Default::default(),
Arc::clone(&subscription_factory),
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet/src/config.rs
Expand Up @@ -131,7 +131,7 @@ impl Default for WalletConfig {
fn default() -> Self {
let p2p = P2pConfig {
datastore_path: PathBuf::from("peer_db/wallet"),
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
..Default::default()
};
Self {
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/tests/other/mod.rs
Expand Up @@ -154,7 +154,7 @@ async fn create_wallet(
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
};

let sql_database_path = comms_config
Expand Down Expand Up @@ -692,7 +692,7 @@ async fn test_import_utxo() {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
};
let config = WalletConfig {
p2p: comms_config,
Expand Down
8 changes: 6 additions & 2 deletions base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -131,7 +131,7 @@ use tari_comms::{
transports::MemoryTransport,
types::CommsPublicKey,
};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig, NetworkDiscoveryConfig};
use tari_contacts::contacts_service::{handle::ContactsServiceHandle, types::Contact};
use tari_core::{
borsh::FromBytes,
Expand Down Expand Up @@ -4860,6 +4860,10 @@ pub unsafe extern "C" fn comms_config_create(
auto_request: true,
..Default::default()
},
network_discovery: NetworkDiscoveryConfig {
initial_peer_sync_delay: Some(Duration::from_secs(25)),
..Default::default()
},
..Default::default()
},
allow_test_addresses: true,
Expand All @@ -4868,7 +4872,7 @@ pub unsafe extern "C" fn comms_config_create(
user_agent: format!("tari/mobile_wallet/{}", env!("CARGO_PKG_VERSION")),
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_liveness_check_interval: None,
listener_self_liveness_check_interval: None,
};

Box::into_raw(Box::new(config))
Expand Down

0 comments on commit fc55bf9

Please sign in to comment.