From c2ede66b171683c869e38f937f61f0c07b639368 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 5 Jul 2024 18:02:11 +0300 Subject: [PATCH] peer_manager: make some variables log fields This starts using structured logging a bit, causing log messages to mostly more static strings, and the actual error messages, specific endpoint addresses etc. be log fields. This makes it easier to grep for specific log messages, as well as filter things down for a specific peer. Code-wise, this is mostly a matter of moving from format-strings interpolating various variables to actually putting these in front of the message. Frequently, we can also avoid writing it out explicitly alltogether: In `connect_peer` we already have `endpoint`, so we can ask it for `proto()` and `address()`, and just record it in a span, and it'll be present in all child log messages - as it's somewhere part of the span hierarchy and printed by our log formatters. --- mycelium/src/peer_manager.rs | 112 ++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 858b20b..8349075 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -28,7 +28,7 @@ use tokio::net::TcpStream; use tokio::net::{TcpListener, UdpSocket}; use tokio::task::AbortHandle; use tokio::time::MissedTickBehavior; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; /// Magic bytes to identify a multicast UDP packet used in link local peer discovery. const MYCELIUM_MULTICAST_DISCOVERY_MAGIC: &[u8; 8] = b"mycelium"; @@ -383,11 +383,11 @@ where // We successfully connected, reset the connection_attempts counter to 0 pi.connection_attempts = 0; } else { - // Only log an error on the first connection failure, to avoid spamming the logs + // Only log with error level on the first connection failure, to avoid spamming the logs if pi.connection_attempts == 0 { - error!("Couldn't connect to {endpoint}, turn on debug logging for more details"); + error!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Couldn't connect to endpoint, turn on debug logging for more details"); } else { - debug!("Couldn't connect to {endpoint}, attempt {}", pi.connection_attempts + 1) + debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), attempt=%pi.connection_attempts+1, "Couldn't connect to endpoint") } // Connection failed, add a failed attempt and forget about the peer if @@ -395,7 +395,7 @@ where pi.connection_attempts += 1; if pi.pt == PeerType::LinkLocalDiscovery && pi.connection_attempts >= MAX_FAILED_LOCAL_PEER_CONNECTION_ATTEMPTS { - info!("Forgetting about locally discovered peer {endpoint} after failing to connect to it"); + info!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Forgetting about locally discovered peer after failing to connect to it"); peers.remove(&endpoint); } } @@ -408,9 +408,9 @@ where // check if there is an entry for the peer in the router's peer list for (endpoint, pi) in self.peers.lock().unwrap().iter_mut() { if !pi.connecting && !pi.pr.alive() { - debug!("Found dead peer {endpoint}"); + debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Found dead peer"); if pi.pt == PeerType::Inbound { - debug!("Refusing to reconnect to inbound peer"); + debug!(endpoint.address=%endpoint.address(), endpoint.proto=%endpoint.proto(), "Refusing to reconnect to inbound peer"); continue } // Mark that we are connecting to the peer. @@ -425,12 +425,13 @@ where } /// Create a new connection to a remote peer + #[instrument(skip_all, fields(endpoint.proto=%endpoint.proto(), endpoint.address=%endpoint.address()))] async fn connect_peer( self: Arc, endpoint: Endpoint, ct: ConnectionTraffic, ) -> (Endpoint, Option) { - debug!("Connecting to {endpoint}"); + debug!("Connecting"); match endpoint.proto() { Protocol::Tcp | Protocol::Tls => self.connect_tcp_peer(endpoint, ct).await, Protocol::Quic => self.connect_quic_peer(endpoint, ct).await, @@ -443,12 +444,14 @@ where ct: ConnectionTraffic, ) -> (Endpoint, Option) { match (endpoint.proto(), &self.private_network_config) { - (Protocol::Tcp, Some(_)) => warn!("Attempting to connect to {} over Tcp while a private network is configured, connection will be upgraded to Tls", endpoint.address()), + (Protocol::Tcp, Some(_)) => { + warn!("Attempting to connect over Tcp while a private network is configured, connection will be upgraded to Tls") + } (Protocol::Tls, None) => { - warn!("Attempting to connect to {} over Tls while a private network is not enabled, refusing to connect. Use \"Tcp\" instead", endpoint.address()); + warn!("Attempting to connect over Tls while a private network is not enabled, refusing to connect. Use \"Tcp\" instead"); return (endpoint, None); - }, - _ => {}, + } + _ => {} } #[cfg(feature = "private-network")] @@ -481,10 +484,10 @@ where .await { Ok(peer_stream) => { - debug!("Opened connection to {endpoint}"); + debug!("Opened connection"); // Make sure Nagle's algorithm is disabled as it can cause latency spikes. if let Err(e) = peer_stream.set_nodelay(true) { - debug!("Couldn't disable Nagle's algorithm on stream {e}"); + debug!(err=%e, "Couldn't disable Nagle's algorithm on stream"); return (endpoint, None); } @@ -504,14 +507,14 @@ where let ssl = match Ssl::new(connector.context()) { Ok(ssl) => ssl, Err(e) => { - debug!("Failed to create SSL object from acceptor after connecting to remote {endpoint}: {e}"); + debug!(err=%e, "Failed to create SSL object from acceptor after connecting to remote"); return (endpoint, None); } }; let mut ssl_stream = match tokio_openssl::SslStream::new(ssl, peer_stream) { Ok(ssl_stream) => ssl_stream, Err(e) => { - debug!("Failed to create TLS stream from tcp connection to {endpoint}: {e}"); + debug!(err=%e, "Failed to create TLS stream from tcp connection to endpoint"); return (endpoint, None); } }; @@ -520,10 +523,10 @@ where let pinned_stream = Pin::new(&mut ssl_stream); if let Err(e) = pinned_stream.connect().await { // Error here is likely a misconfigured server. - debug!("Could not initiate TLS stream to {endpoint} {e}"); + debug!(err=%e, "Could not initiate TLS stream"); return (endpoint, None); } - debug!("Completed TLS handshake to {endpoint}"); + debug!("Completed TLS handshake"); Peer::new( router_data_tx, @@ -557,17 +560,17 @@ where match res { Ok(new_peer) => { - info!("Connected to new peer {}", endpoint); + info!("Connected to new peer"); (endpoint, Some(new_peer)) } Err(e) => { - debug!("Failed to spawn peer {endpoint}: {e}"); + debug!(err=%e, "Failed to spawn peer"); (endpoint, None) } } } Err(e) => { - debug!("Couldn't connect to {endpoint}: {e}"); + debug!(err=%e, "Couldn't connect"); (endpoint, None) } } @@ -594,7 +597,7 @@ where ) { Ok(qcc) => qcc, Err(err) => { - debug!("Failed to build quic client config: {err}"); + debug!(%err, "Failed to build quic client config"); return (endpoint, None); } }; @@ -636,27 +639,27 @@ where }; match res { Ok(new_peer) => { - info!("Connected to new peer {}", endpoint); + info!("Connected to new peer"); (endpoint, Some(new_peer)) } Err(e) => { - debug!("Failed to spawn peer {endpoint}: {e}"); + debug!(err=%e, "Failed to spawn peer"); (endpoint, None) } } } Err(e) => { - debug!("Couldn't open bidirectional quic stream to {endpoint}: {e}"); + debug!(err=%e, "Couldn't open bidirectional quic stream"); (endpoint, None) } }, Err(e) => { - debug!("Couldn't complete quic connection to {endpoint}: {e}"); + debug!(err=%e, "Couldn't complete quic connection"); (endpoint, None) } }, Err(e) => { - debug!("Couldn't initiate connection to {endpoint}: {e}"); + debug!(err=%e, "Couldn't initiate connection"); (endpoint, None) } } @@ -713,14 +716,14 @@ where let ssl = match Ssl::new(acceptor.context()) { Ok(ssl) => ssl, Err(e) => { - error!("Failed to create SSL object from acceptor after {remote} connected: {e}"); + error!(%remote, err=%e, "Failed to create SSL object from acceptor after remote connected"); continue; } }; let mut ssl_stream = match tokio_openssl::SslStream::new(ssl, stream) { Ok(ssl_stream) => ssl_stream, Err(e) => { - error!("Failed to create TLS stream from tcp connection from {remote}: {e}"); + error!(%remote, err=%e, "Failed to create TLS stream from tcp connection"); continue; } }; @@ -730,10 +733,10 @@ where if let Err(e) = pinned_stream.accept().await { // An error at this point generally means the handshake failed, // client error. - debug!("Could not accept TLS stream from {remote} {e}"); + debug!(%remote, err=%e, "Could not accept TLS stream"); continue; } - debug!("Accepted TLS handshake from {remote}"); + debug!(%remote, "Accepted TLS handshake"); Peer::new( router_data_tx.clone(), @@ -767,11 +770,11 @@ where let new_peer = match new_peer { Ok(peer) => peer, Err(e) => { - error!("Failed to spawn peer: {e}"); + error!(err=%e, "Failed to spawn peer"); continue; } }; - info!("Accepted new inbound peer {}", remote); + info!("Accepted new inbound peer"); self.add_peer( Endpoint::new( if self.private_network_config.is_some() { @@ -787,12 +790,12 @@ where ); } Err(e) => { - error!("Error accepting connection: {}", e); + error!(err=%e, "Error accepting connection"); } } }, Err(e) => { - error!("Error starting listener: {}", e); + error!(err=%e, "Error starting listener"); } } } @@ -834,7 +837,7 @@ where let con = match con.into_future().await { Ok(con) => con, Err(e) => { - debug!("Failed to accept quic connection: {e}"); + debug!(err=%e, "Failed to accept quic connection"); return; } }; @@ -842,7 +845,7 @@ where let quic_peer = match con.accept_bi().await { Ok((tx, rx)) => Quic::new(tx, rx, con.remote_address()), Err(e) => { - debug!("Failed to accept bidirectional quic stream: {e}"); + debug!(err=%e, "Failed to accept bidirectional quic stream"); return; } }; @@ -859,11 +862,11 @@ where ) { Ok(peer) => peer, Err(e) => { - error!("Failed to spawn peer: {e}"); + error!(err=%e, "Failed to spawn peer"); return; } }; - info!("Accepted new inbound quic peer {}", con.remote_address()); + info!(remote=%con.remote_address(), "Accepted new inbound quic peer"); self.add_peer( Endpoint::new(Protocol::Quic, con.remote_address()), PeerType::Inbound, @@ -882,6 +885,7 @@ where } /// Add a new peer identifier we discovered. + #[instrument(skip_all,fields(peer.endpoint=%endpoint))] fn add_peer( &self, endpoint: Endpoint, @@ -899,7 +903,7 @@ where if known_endpoint.address().ip() == endpoint.address().ip() && known_endpoint.proto() == endpoint.proto() { - trace!("Refusing to add link local discovered address {endpoint} as there already is a reverse connection {known_endpoint}"); + trace!(peer.known_endpoint=%known_endpoint, "Refusing to add link local discovered address as there already is a reverse connection"); return; } } @@ -922,7 +926,7 @@ where if let Some(p) = peer { self.router.lock().unwrap().add_peer_interface(p); } - info!("Added new peer {endpoint}"); + info!("Added new peer"); } else if discovery_type == PeerType::Inbound { // We got an inbound peer with a duplicate entry. This is possible if the sending port // is the same as the previous one, which generally happens with our Quic setup. In @@ -954,9 +958,9 @@ where router.handle_dead_peer(old_peer); } } - info!("Replaced existing inbound peer {endpoint}"); + info!("Replaced existing inbound peer"); } else { - debug!("Ignoring request to add {endpoint} as it already exists"); + debug!("Ignoring request to add as it already exists"); } self.metrics.peer_manager_known_peers(peers.len()); } @@ -978,15 +982,15 @@ where Ok(sock) => sock, Err(e) => { // We won't participate in link local discovery - error!("Failed to bind multicast discovery socket: {e}"); + error!(err=%e, "Failed to bind multicast discovery socket"); warn!("Link local peer discovery disabled"); return; } }; info!( - "Bound multicast discovery interface to {}", - sock.local_addr().expect("can look up our own address") + bind_address=%sock.local_addr().expect("can look up our own address"), + "Bound multicast discovery interface", ); // Keep track of which interfaces we are already a part of. @@ -1011,14 +1015,14 @@ where Err(e) if e.kind() == tokio::io::ErrorKind::AddrInUse => { // This could happen if the multicast listener is already bound but we // somehow forgot about it. - debug!("Multicast group on interface {new_iface} already in use, consider it to be joined"); + debug!(%new_iface, "Multicast group on interface already in use, consider it to be joined"); joined_interfaces.insert(new_iface); } Err(e) => { - warn!("Failed to join multicast group on interface {new_iface}: {e}"); + warn!(%new_iface, err=%e, "Failed to join multicast group on interface"); } Ok(()) => { - debug!("Joined multicast group on interface {new_iface}"); + debug!(%new_iface, "Joined multicast group on interface"); joined_interfaces.insert(new_iface); } } @@ -1052,28 +1056,28 @@ where tokio::select! { _ = send_timer.tick() => { if let Err(e) = join_new_interfaces(&mut joined_interfaces) { - error!("Issue while joining new IPv6 multicast interfaces: {e}"); + error!(err=%e, "Issue while joining new IPv6 multicast interfaces"); }; for iface in &joined_interfaces { let dst = SocketAddrV6::new(multicast_destination, peer_discovery_port, 0, *iface); - debug!("Sending multicast discovery beacon to {dst}"); + debug!(%dst, %iface, "Sending multicast discovery beacon"); if let Err(e) = sock.send_to( &beacon, dst, ) .await { - error!("Could not send multicast discovery beacon ({e}) on interface {iface}"); + error!(iface=%iface, err=%e,"Could not send multicast discovery beacon on interface"); } } }, recv_res = sock.recv_from(&mut buf) => { match recv_res { Err(e) => { - warn!("Failed to receive multicast message {e}"); + warn!(err=%e, "Failed to receive multicast message"); continue; } Ok((n, remote)) => { - trace!("Received {n} bytes from {remote}"); + trace!(%remote, bytes_received=%n, "Received bytes from remote"); self.handle_discovery_packet(&buf[..n], remote); } }