Skip to content

Commit

Permalink
fix(comms): spawn liveness check after address is final (#4919)
Browse files Browse the repository at this point in the history
Description
---
- Spawns the liveness check after the public address has been finalised

Motivation and Context
---
Fixes #4915 and fixes an edge case where the old address is used for liveness checks if the user deletes their tor identity before starting the node.

How Has This Been Tested?
---
Manually
  • Loading branch information
sdbondi committed Nov 15, 2022
1 parent ea810f4 commit f558a11
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 50 deletions.
2 changes: 1 addition & 1 deletion applications/tari_base_node/src/commands/command/status.rs
Expand Up @@ -131,7 +131,7 @@ impl CommandContext {
),
);

match self.comms.listening_info().liveness_status() {
match self.comms.liveness_status() {
LivenessStatus::Disabled => {},
LivenessStatus::Checking => {
status_line.add("⏳️️");
Expand Down
30 changes: 27 additions & 3 deletions comms/core/src/builder/comms_node.rs
Expand Up @@ -26,7 +26,7 @@ use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc},
sync::{broadcast, mpsc, watch},
};

use super::{CommsBuilderError, CommsShutdown};
Expand All @@ -37,6 +37,8 @@ use crate::{
ConnectionManagerRequest,
ConnectionManagerRequester,
ListenerInfo,
LivenessCheck,
LivenessStatus,
},
connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester},
multiaddr::Multiaddr,
Expand Down Expand Up @@ -132,6 +134,7 @@ impl UnspawnedCommsNode {
}

/// Spawn a new node using the specified [Transport](crate::transports::Transport).
#[allow(clippy::too_many_lines)]
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -187,8 +190,8 @@ impl UnspawnedCommsNode {
let noise_config = NoiseConfig::new(node_identity.clone());

let mut connection_manager = ConnectionManager::new(
connection_manager_config,
transport,
connection_manager_config.clone(),
transport.clone(),
noise_config,
dial_backoff,
connection_manager_request_rx,
Expand Down Expand Up @@ -235,13 +238,27 @@ impl UnspawnedCommsNode {
node_identity.public_address()
);

// Spawn liveness check now that we have the final address
let liveness_watch = connection_manager_config
.liveness_self_check_interval
.map(|interval| {
LivenessCheck::spawn(
transport,
node_identity.public_address(),
interval,
shutdown_signal.clone(),
)
})
.unwrap_or_else(|| watch::channel(LivenessStatus::Disabled).1);

Ok(CommsNode {
shutdown_signal,
connection_manager_requester,
connectivity_requester,
listening_info,
node_identity,
peer_manager,
liveness_watch,
hidden_service,
complete_signals: ext_context.drain_complete_signals(),
})
Expand Down Expand Up @@ -286,6 +303,8 @@ pub struct CommsNode {
peer_manager: Arc<PeerManager>,
/// The bind addresses of the listener(s)
listening_info: ListenerInfo,
/// Current liveness status
liveness_watch: watch::Receiver<LivenessStatus>,
/// `Some` if the comms node is configured to run via a hidden service, otherwise `None`
hidden_service: Option<tor::HiddenService>,
/// The 'reciprocal' shutdown signals for each comms service
Expand Down Expand Up @@ -328,6 +347,11 @@ impl CommsNode {
&self.listening_info
}

/// Returns the current liveness status
pub fn liveness_status(&self) -> LivenessStatus {
*self.liveness_watch.borrow()
}

/// Return the Ip/Tcp address that this node is listening on
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
Expand Down
32 changes: 6 additions & 26 deletions comms/core/src/connection_manager/listener.rs
Expand Up @@ -36,7 +36,7 @@ use log::*;
use tari_shutdown::{oneshot_trigger, oneshot_trigger::OneshotTrigger, ShutdownSignal};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
sync::{mpsc, watch},
sync::mpsc,
time,
};
use tokio_stream::StreamExt;
Expand All @@ -53,7 +53,7 @@ use super::{
use crate::{
bounded_executor::BoundedExecutor,
connection_manager::{
liveness::{LivenessCheck, LivenessSession, LivenessStatus},
liveness::LivenessSession,
metrics,
wire_mode::{WireMode, LIVENESS_WIRE_MODE},
},
Expand Down Expand Up @@ -83,7 +83,7 @@ pub struct PeerListener<TTransport> {
node_identity: Arc<NodeIdentity>,
our_supported_protocols: Vec<ProtocolId>,
liveness_session_count: Arc<AtomicUsize>,
on_listening: OneshotTrigger<Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError>>,
on_listening: OneshotTrigger<Result<Multiaddr, ConnectionManagerError>>,
}

impl<TTransport> PeerListener<TTransport>
Expand Down Expand Up @@ -121,10 +121,7 @@ where
/// in binding the listener socket
// This returns an impl Future and is not async because we want to exclude &self from the future so that it has a
// 'static lifetime as well as to flatten the oneshot result for ergonomics
pub fn on_listening(
&self,
) -> impl Future<Output = Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError>> + 'static
{
pub fn on_listening(&self) -> impl Future<Output = Result<Multiaddr, ConnectionManagerError>> + 'static {
let signal = self.on_listening.to_signal();
signal.map(|r| r.ok_or(ConnectionManagerError::ListenerOneshotCancelled)?)
}
Expand All @@ -135,7 +132,7 @@ where
self
}

pub async fn listen(self) -> Result<(Multiaddr, watch::Receiver<LivenessStatus>), ConnectionManagerError> {
pub async fn listen(self) -> Result<Multiaddr, ConnectionManagerError> {
let on_listening = self.on_listening();
runtime::current().spawn(self.run());
on_listening.await
Expand All @@ -148,9 +145,7 @@ where
Ok((mut inbound, address)) => {
info!(target: LOG_TARGET, "Listening for peer connections on '{}'", address);

let liveness_watch = self.spawn_liveness_check();

self.on_listening.broadcast(Ok((address, liveness_watch)));
self.on_listening.broadcast(Ok(address));

loop {
tokio::select! {
Expand Down Expand Up @@ -234,21 +229,6 @@ where
});
}

fn spawn_liveness_check(&self) -> watch::Receiver<LivenessStatus> {
match self.config.liveness_self_check_interval {
Some(interval) => LivenessCheck::spawn(
self.transport.clone(),
self.node_identity.public_address(),
interval,
self.shutdown_signal.clone(),
),
None => {
let (_, rx) = watch::channel(LivenessStatus::Disabled);
rx
},
}
}

async fn spawn_listen_task(&self, mut socket: TTransport::Output, peer_addr: Multiaddr) {
let node_identity = self.node_identity.clone();
let peer_manager = self.peer_manager.clone();
Expand Down
21 changes: 4 additions & 17 deletions comms/core/src/connection_manager/manager.rs
Expand Up @@ -28,7 +28,7 @@ use tari_shutdown::{Shutdown, ShutdownSignal};
use time::Duration;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc, oneshot, watch},
sync::{broadcast, mpsc, oneshot},
task,
time,
};
Expand All @@ -43,7 +43,7 @@ use super::{
};
use crate::{
backoff::Backoff,
connection_manager::{liveness::LivenessStatus, metrics, ConnectionDirection, ConnectionId},
connection_manager::{metrics, ConnectionDirection, ConnectionId},
multiplexing::Substream,
noise::NoiseConfig,
peer_manager::{NodeId, NodeIdentity, PeerManagerError},
Expand Down Expand Up @@ -149,7 +149,6 @@ impl Default for ConnectionManagerConfig {
pub struct ListenerInfo {
bind_address: Multiaddr,
aux_bind_address: Option<Multiaddr>,
liveness_watch: watch::Receiver<LivenessStatus>,
}

impl ListenerInfo {
Expand All @@ -163,17 +162,6 @@ impl ListenerInfo {
pub fn auxiliary_bind_address(&self) -> Option<&Multiaddr> {
self.aux_bind_address.as_ref()
}

/// Returns the current liveness status
pub fn liveness_status(&self) -> LivenessStatus {
*self.liveness_watch.borrow()
}

/// Waits for liveness status to change from the last time the value was checked.
pub async fn liveness_status_changed(&mut self) -> Option<LivenessStatus> {
self.liveness_watch.changed().await.ok()?;
Some(*self.liveness_watch.borrow())
}
}

/// The actor responsible for connection management.
Expand Down Expand Up @@ -346,17 +334,16 @@ where
listener.set_supported_protocols(self.protocols.get_supported_protocols());

let mut listener_info = match listener.listen().await {
Ok((bind_address, liveness_watch)) => ListenerInfo {
Ok(bind_address) => ListenerInfo {
bind_address,
aux_bind_address: None,
liveness_watch,
},
Err(err) => return Err(err),
};

if let Some(mut listener) = self.aux_listener.take() {
listener.set_supported_protocols(self.protocols.get_supported_protocols());
let (addr, _) = listener.listen().await?;
let addr = listener.listen().await?;
debug!(target: LOG_TARGET, "Aux TCP listener bound to address {}", addr);
listener_info.aux_bind_address = Some(addr);
}
Expand Down
1 change: 1 addition & 0 deletions comms/core/src/connection_manager/mod.rs
Expand Up @@ -52,6 +52,7 @@ mod peer_connection;
pub use peer_connection::{ConnectionId, NegotiatedSubstream, PeerConnection, PeerConnectionRequest};

mod liveness;
pub(crate) use liveness::LivenessCheck;
pub use liveness::LivenessStatus;

mod wire_mode;
Expand Down
6 changes: 3 additions & 3 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Expand Up @@ -66,7 +66,7 @@ async fn listen() -> Result<(), Box<dyn Error>> {
shutdown.to_signal(),
);

let (mut bind_addr, _) = listener.listen().await?;
let mut bind_addr = listener.listen().await?;

unpack_enum!(Protocol::Memory(port) = bind_addr.pop().unwrap());
assert!(port > 0);
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn smoke() {
listener.set_supported_protocols(supported_protocols.clone());

// Get the listening address of the peer
let (address, _) = listener.listen().await.unwrap();
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
Expand Down Expand Up @@ -207,7 +207,7 @@ async fn banned() {
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let (address, _) = listener.listen().await.unwrap();
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
// The listener has banned the dialer peer
Expand Down

0 comments on commit f558a11

Please sign in to comment.