Skip to content

Commit

Permalink
peer_manager: make some variables log fields
Browse files Browse the repository at this point in the history
This starts using structured logging a bit, causing log messages to
mostly more static strings, and the actual error messages, specific
endpoint addresses etc. be log fields.

This makes it easier to grep for specific log messages, as well as
filter things down for a specific peer.

Code-wise, this is mostly a matter of moving from format-strings
interpolating various variables to actually putting these in front of
the message.

Frequently, we can also avoid writing it out explicitly alltogether:

In `connect_peer` we already have `endpoint`, so we can ask it for
`proto()` and `address()`, and just record it in a span, and it'll be
present in all child log messages - as it's somewhere part of the span
hierarchy and printed by our log formatters.
  • Loading branch information
flokli authored and LeeSmet committed Jul 8, 2024
1 parent 0e353cb commit c2ede66
Showing 1 changed file with 58 additions and 54 deletions.
112 changes: 58 additions & 54 deletions mycelium/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::net::TcpStream;
use tokio::net::{TcpListener, UdpSocket};
use tokio::task::AbortHandle;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};

/// Magic bytes to identify a multicast UDP packet used in link local peer discovery.
const MYCELIUM_MULTICAST_DISCOVERY_MAGIC: &[u8; 8] = b"mycelium";
Expand Down Expand Up @@ -383,19 +383,19 @@ where
// We successfully connected, reset the connection_attempts counter to 0
pi.connection_attempts = 0;
} else {
// Only log an error on the first connection failure, to avoid spamming the logs
// Only log with error level on the first connection failure, to avoid spamming the logs
if pi.connection_attempts == 0 {
error!("Couldn't connect to {endpoint}, turn on debug logging for more details");
error!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Couldn't connect to endpoint, turn on debug logging for more details");
} else {
debug!("Couldn't connect to {endpoint}, attempt {}", pi.connection_attempts + 1)
debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), attempt=%pi.connection_attempts+1, "Couldn't connect to endpoint")
}

// Connection failed, add a failed attempt and forget about the peer if
// needed.
pi.connection_attempts += 1;
if pi.pt == PeerType::LinkLocalDiscovery
&& pi.connection_attempts >= MAX_FAILED_LOCAL_PEER_CONNECTION_ATTEMPTS {
info!("Forgetting about locally discovered peer {endpoint} after failing to connect to it");
info!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Forgetting about locally discovered peer after failing to connect to it");
peers.remove(&endpoint);
}
}
Expand All @@ -408,9 +408,9 @@ where
// check if there is an entry for the peer in the router's peer list
for (endpoint, pi) in self.peers.lock().unwrap().iter_mut() {
if !pi.connecting && !pi.pr.alive() {
debug!("Found dead peer {endpoint}");
debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Found dead peer");
if pi.pt == PeerType::Inbound {
debug!("Refusing to reconnect to inbound peer");
debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Refusing to reconnect to inbound peer");
continue
}
// Mark that we are connecting to the peer.
Expand All @@ -425,12 +425,13 @@ where
}

/// Create a new connection to a remote peer
#[instrument(skip_all, fields(endpoint.proto=%endpoint.proto(), endpoint.address=%endpoint.address()))]
async fn connect_peer(
self: Arc<Self>,
endpoint: Endpoint,
ct: ConnectionTraffic,
) -> (Endpoint, Option<Peer>) {
debug!("Connecting to {endpoint}");
debug!("Connecting");
match endpoint.proto() {
Protocol::Tcp | Protocol::Tls => self.connect_tcp_peer(endpoint, ct).await,
Protocol::Quic => self.connect_quic_peer(endpoint, ct).await,
Expand All @@ -443,12 +444,14 @@ where
ct: ConnectionTraffic,
) -> (Endpoint, Option<Peer>) {
match (endpoint.proto(), &self.private_network_config) {
(Protocol::Tcp, Some(_)) => warn!("Attempting to connect to {} over Tcp while a private network is configured, connection will be upgraded to Tls", endpoint.address()),
(Protocol::Tcp, Some(_)) => {
warn!("Attempting to connect over Tcp while a private network is configured, connection will be upgraded to Tls")
}
(Protocol::Tls, None) => {
warn!("Attempting to connect to {} over Tls while a private network is not enabled, refusing to connect. Use \"Tcp\" instead", endpoint.address());
warn!("Attempting to connect over Tls while a private network is not enabled, refusing to connect. Use \"Tcp\" instead");
return (endpoint, None);
},
_ => {},
}
_ => {}
}

#[cfg(feature = "private-network")]
Expand Down Expand Up @@ -481,10 +484,10 @@ where
.await
{
Ok(peer_stream) => {
debug!("Opened connection to {endpoint}");
debug!("Opened connection");
// Make sure Nagle's algorithm is disabled as it can cause latency spikes.
if let Err(e) = peer_stream.set_nodelay(true) {
debug!("Couldn't disable Nagle's algorithm on stream {e}");
debug!(err=%e, "Couldn't disable Nagle's algorithm on stream");
return (endpoint, None);
}

Expand All @@ -504,14 +507,14 @@ where
let ssl = match Ssl::new(connector.context()) {
Ok(ssl) => ssl,
Err(e) => {
debug!("Failed to create SSL object from acceptor after connecting to remote {endpoint}: {e}");
debug!(err=%e, "Failed to create SSL object from acceptor after connecting to remote");
return (endpoint, None);
}
};
let mut ssl_stream = match tokio_openssl::SslStream::new(ssl, peer_stream) {
Ok(ssl_stream) => ssl_stream,
Err(e) => {
debug!("Failed to create TLS stream from tcp connection to {endpoint}: {e}");
debug!(err=%e, "Failed to create TLS stream from tcp connection to endpoint");
return (endpoint, None);
}
};
Expand All @@ -520,10 +523,10 @@ where
let pinned_stream = Pin::new(&mut ssl_stream);
if let Err(e) = pinned_stream.connect().await {
// Error here is likely a misconfigured server.
debug!("Could not initiate TLS stream to {endpoint} {e}");
debug!(err=%e, "Could not initiate TLS stream");
return (endpoint, None);
}
debug!("Completed TLS handshake to {endpoint}");
debug!("Completed TLS handshake");

Peer::new(
router_data_tx,
Expand Down Expand Up @@ -557,17 +560,17 @@ where

match res {
Ok(new_peer) => {
info!("Connected to new peer {}", endpoint);
info!("Connected to new peer");
(endpoint, Some(new_peer))
}
Err(e) => {
debug!("Failed to spawn peer {endpoint}: {e}");
debug!(err=%e, "Failed to spawn peer");
(endpoint, None)
}
}
}
Err(e) => {
debug!("Couldn't connect to {endpoint}: {e}");
debug!(err=%e, "Couldn't connect");
(endpoint, None)
}
}
Expand All @@ -594,7 +597,7 @@ where
) {
Ok(qcc) => qcc,
Err(err) => {
debug!("Failed to build quic client config: {err}");
debug!(%err, "Failed to build quic client config");
return (endpoint, None);
}
};
Expand Down Expand Up @@ -636,27 +639,27 @@ where
};
match res {
Ok(new_peer) => {
info!("Connected to new peer {}", endpoint);
info!("Connected to new peer");
(endpoint, Some(new_peer))
}
Err(e) => {
debug!("Failed to spawn peer {endpoint}: {e}");
debug!(err=%e, "Failed to spawn peer");
(endpoint, None)
}
}
}
Err(e) => {
debug!("Couldn't open bidirectional quic stream to {endpoint}: {e}");
debug!(err=%e, "Couldn't open bidirectional quic stream");
(endpoint, None)
}
},
Err(e) => {
debug!("Couldn't complete quic connection to {endpoint}: {e}");
debug!(err=%e, "Couldn't complete quic connection");
(endpoint, None)
}
},
Err(e) => {
debug!("Couldn't initiate connection to {endpoint}: {e}");
debug!(err=%e, "Couldn't initiate connection");
(endpoint, None)
}
}
Expand Down Expand Up @@ -713,14 +716,14 @@ where
let ssl = match Ssl::new(acceptor.context()) {
Ok(ssl) => ssl,
Err(e) => {
error!("Failed to create SSL object from acceptor after {remote} connected: {e}");
error!(%remote, err=%e, "Failed to create SSL object from acceptor after remote connected");
continue;
}
};
let mut ssl_stream = match tokio_openssl::SslStream::new(ssl, stream) {
Ok(ssl_stream) => ssl_stream,
Err(e) => {
error!("Failed to create TLS stream from tcp connection from {remote}: {e}");
error!(%remote, err=%e, "Failed to create TLS stream from tcp connection");
continue;
}
};
Expand All @@ -730,10 +733,10 @@ where
if let Err(e) = pinned_stream.accept().await {
// An error at this point generally means the handshake failed,
// client error.
debug!("Could not accept TLS stream from {remote} {e}");
debug!(%remote, err=%e, "Could not accept TLS stream");
continue;
}
debug!("Accepted TLS handshake from {remote}");
debug!(%remote, "Accepted TLS handshake");

Peer::new(
router_data_tx.clone(),
Expand Down Expand Up @@ -767,11 +770,11 @@ where
let new_peer = match new_peer {
Ok(peer) => peer,
Err(e) => {
error!("Failed to spawn peer: {e}");
error!(err=%e, "Failed to spawn peer");
continue;
}
};
info!("Accepted new inbound peer {}", remote);
info!("Accepted new inbound peer");
self.add_peer(
Endpoint::new(
if self.private_network_config.is_some() {
Expand All @@ -787,12 +790,12 @@ where
);
}
Err(e) => {
error!("Error accepting connection: {}", e);
error!(err=%e, "Error accepting connection");
}
}
},
Err(e) => {
error!("Error starting listener: {}", e);
error!(err=%e, "Error starting listener");
}
}
}
Expand Down Expand Up @@ -834,15 +837,15 @@ where
let con = match con.into_future().await {
Ok(con) => con,
Err(e) => {
debug!("Failed to accept quic connection: {e}");
debug!(err=%e, "Failed to accept quic connection");
return;
}
};

let quic_peer = match con.accept_bi().await {
Ok((tx, rx)) => Quic::new(tx, rx, con.remote_address()),
Err(e) => {
debug!("Failed to accept bidirectional quic stream: {e}");
debug!(err=%e, "Failed to accept bidirectional quic stream");
return;
}
};
Expand All @@ -859,11 +862,11 @@ where
) {
Ok(peer) => peer,
Err(e) => {
error!("Failed to spawn peer: {e}");
error!(err=%e, "Failed to spawn peer");
return;
}
};
info!("Accepted new inbound quic peer {}", con.remote_address());
info!(remote=%con.remote_address(), "Accepted new inbound quic peer");
self.add_peer(
Endpoint::new(Protocol::Quic, con.remote_address()),
PeerType::Inbound,
Expand All @@ -882,6 +885,7 @@ where
}

/// Add a new peer identifier we discovered.
#[instrument(skip_all,fields(peer.endpoint=%endpoint))]
fn add_peer(
&self,
endpoint: Endpoint,
Expand All @@ -899,7 +903,7 @@ where
if known_endpoint.address().ip() == endpoint.address().ip()
&& known_endpoint.proto() == endpoint.proto()
{
trace!("Refusing to add link local discovered address {endpoint} as there already is a reverse connection {known_endpoint}");
trace!(peer.known_endpoint=%known_endpoint, "Refusing to add link local discovered address as there already is a reverse connection");
return;
}
}
Expand All @@ -922,7 +926,7 @@ where
if let Some(p) = peer {
self.router.lock().unwrap().add_peer_interface(p);
}
info!("Added new peer {endpoint}");
info!("Added new peer");
} else if discovery_type == PeerType::Inbound {
// We got an inbound peer with a duplicate entry. This is possible if the sending port
// is the same as the previous one, which generally happens with our Quic setup. In
Expand Down Expand Up @@ -954,9 +958,9 @@ where
router.handle_dead_peer(old_peer);
}
}
info!("Replaced existing inbound peer {endpoint}");
info!("Replaced existing inbound peer");
} else {
debug!("Ignoring request to add {endpoint} as it already exists");
debug!("Ignoring request to add as it already exists");
}
self.metrics.peer_manager_known_peers(peers.len());
}
Expand All @@ -978,15 +982,15 @@ where
Ok(sock) => sock,
Err(e) => {
// We won't participate in link local discovery
error!("Failed to bind multicast discovery socket: {e}");
error!(err=%e, "Failed to bind multicast discovery socket");
warn!("Link local peer discovery disabled");
return;
}
};

info!(
"Bound multicast discovery interface to {}",
sock.local_addr().expect("can look up our own address")
bind_address=%sock.local_addr().expect("can look up our own address"),
"Bound multicast discovery interface",
);

// Keep track of which interfaces we are already a part of.
Expand All @@ -1011,14 +1015,14 @@ where
Err(e) if e.kind() == tokio::io::ErrorKind::AddrInUse => {
// This could happen if the multicast listener is already bound but we
// somehow forgot about it.
debug!("Multicast group on interface {new_iface} already in use, consider it to be joined");
debug!(%new_iface, "Multicast group on interface already in use, consider it to be joined");
joined_interfaces.insert(new_iface);
}
Err(e) => {
warn!("Failed to join multicast group on interface {new_iface}: {e}");
warn!(%new_iface, err=%e, "Failed to join multicast group on interface");
}
Ok(()) => {
debug!("Joined multicast group on interface {new_iface}");
debug!(%new_iface, "Joined multicast group on interface");
joined_interfaces.insert(new_iface);
}
}
Expand Down Expand Up @@ -1052,28 +1056,28 @@ where
tokio::select! {
_ = send_timer.tick() => {
if let Err(e) = join_new_interfaces(&mut joined_interfaces) {
error!("Issue while joining new IPv6 multicast interfaces: {e}");
error!(err=%e, "Issue while joining new IPv6 multicast interfaces");
};
for iface in &joined_interfaces {
let dst = SocketAddrV6::new(multicast_destination, peer_discovery_port, 0, *iface);
debug!("Sending multicast discovery beacon to {dst}");
debug!(%dst, %iface, "Sending multicast discovery beacon");
if let Err(e) = sock.send_to(
&beacon,
dst,
)
.await {
error!("Could not send multicast discovery beacon ({e}) on interface {iface}");
error!(iface=%iface, err=%e,"Could not send multicast discovery beacon on interface");
}
}
},
recv_res = sock.recv_from(&mut buf) => {
match recv_res {
Err(e) => {
warn!("Failed to receive multicast message {e}");
warn!(err=%e, "Failed to receive multicast message");
continue;
}
Ok((n, remote)) => {
trace!("Received {n} bytes from {remote}");
trace!(%remote, bytes_received=%n, "Received bytes from remote");
self.handle_discovery_packet(&buf[..n], remote);
}
}
Expand Down

0 comments on commit c2ede66

Please sign in to comment.