From 7e79227e5ea4f71cca7a0ee70a6ac0714c09141c Mon Sep 17 00:00:00 2001 From: Divma <26765164+divagant-martian@users.noreply.github.com> Date: Mon, 2 Oct 2023 16:55:35 -0500 Subject: [PATCH] feat(*): log me (#1561) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description second take at #1544 since forced pushed closed prs can't be reopened #### Magicsock ``` 2023-10-02T18:46:34.455601Z DEBUG magicsock{me=2luekswh7o3a5tz4}:derp.actor:recv_detail:client-connect{key=2luekswh7o3a5tz4enymovsoksgnpb2qpmxlvifp6ywwjnacihya}:connect_0: rustls::client::tls13: TLS1.3 encrypted extensions: [ServerNameAck] ``` #### Sync ```log 2023-09-28T20:51:18.908126Z DEBUG sync{me=oywqb57ovmdry243}: iroh::sync_engine::live: sync[dial]: start namespace=NamespaceId(q47zmyim5r6isz22…) peer=PublicKey(2jnygwapdm26wwa2) reason=DirectJoin last_state=None ``` #### Gossip ```logs 2023-09-28T20:58:42.854730Z DEBUG gossip{me=zooj7iazcl7rsoal}: iroh_gossip::net: handle out_event EmitEvent(TopicId(wno5nwxtkhhtnqsm…), Received(GossipEvent { content: <33b>, delivered_from: PublicKey(7a7kkzndvbt6eulu), scope: Neighbors })) ``` #### Downloader ``` 2023-09-28T21:00:11.107411Z DEBUG downloader{me=4vabufwku3wselbl}: iroh::downloader: download completed peer=2jnygwapdm26wwa26tvcprm3m5vqajmhwz7lx5xizwx637ad5sea kind=Blob { hash: Hash(224746ea89d286220e0770f89cda2ab138143b00e384dd795d5a13b77b094822) } ``` ## Notes & open questions probably we will add this in other places or change the logging level but would be good to get this in to at least have something over which we can iterate ## Change checklist - [x] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant. --- iroh-gossip/src/net.rs | 53 ++++++++++++++++-------------- iroh-gossip/src/proto/hyparview.rs | 4 +-- iroh-net/src/key.rs | 8 +++++ iroh-net/src/magicsock.rs | 48 +++++++++++++-------------- iroh/src/downloader.rs | 5 +-- iroh/src/node.rs | 38 +++++++++++---------- iroh/src/sync_engine/live.rs | 29 +++++----------- 7 files changed, 94 insertions(+), 91 deletions(-) diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index d77a9c2349..72e493d68e 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -12,7 +12,7 @@ use tokio::{ sync::{broadcast, mpsc, oneshot, watch}, task::JoinHandle, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, error_span, trace, warn, Instrument}; use self::util::{read_message, write_message, Dialer, Timers}; use crate::proto::{self, PeerData, Scope, TopicId}; @@ -85,6 +85,8 @@ impl Gossip { let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP); let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP); let (on_endpoints_tx, on_endpoints_rx) = watch::channel(Default::default()); + + let me = endpoint.peer_id().fmt_short(); let actor = Actor { endpoint, state, @@ -100,14 +102,18 @@ impl Gossip { subscribers_all: None, subscribers_topic: Default::default(), }; - let actor_handle = tokio::spawn(async move { - if let Err(err) = actor.run().await { - warn!("gossip actor closed with error: {err:?}"); - Err(err) - } else { - Ok(()) + + let actor_handle = tokio::spawn( + async move { + if let Err(err) = actor.run().await { + warn!("gossip actor closed with error: {err:?}"); + Err(err) + } else { + Ok(()) + } } - }); + .instrument(error_span!("gossip", %me)), + ); Self { to_actor_tx, on_endpoints_tx: Arc::new(on_endpoints_tx), @@ -333,7 +339,6 @@ struct Actor { impl Actor { pub async fn run(mut self) -> anyhow::Result<()> { - let me = *self.state.me(); loop { tokio::select! { biased; @@ -341,7 +346,7 @@ impl Actor { match msg { Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?, None => { - debug!(?me, "all gossip handles dropped, stop gossip actor"); + debug!("all gossip handles dropped, stop gossip actor"); break; } } @@ -354,11 +359,11 @@ impl Actor { (peer_id, res) = self.dialer.next_conn() => { match res { Ok(conn) => { - debug!(?me, peer = ?peer_id, "dial successfull"); + debug!(peer = ?peer_id, "dial successfull"); self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?; } Err(err) => { - warn!(?me, peer = ?peer_id, "dial failed: {err}"); + warn!(peer = ?peer_id, "dial failed: {err}"); } } } @@ -383,8 +388,7 @@ impl Actor { } async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> { - let me = *self.state.me(); - trace!(?me, "handle to_actor {msg:?}"); + trace!("handle to_actor {msg:?}"); match msg { ToActor::ConnIncoming(peer_id, origin, conn) => { self.conns.insert(peer_id, conn.clone()); @@ -395,13 +399,13 @@ impl Actor { // Spawn a task for this connection let in_event_tx = self.in_event_tx.clone(); tokio::spawn(async move { - debug!(?me, peer = ?peer_id, "connection established"); + debug!(peer = ?peer_id, "connection established"); match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await { Ok(()) => { - debug!(?me, peer = ?peer_id, "connection closed without error") + debug!(peer = ?peer_id, "connection closed without error") } Err(err) => { - debug!(?me, peer = ?peer_id, "connection closed with error {err:?}") + debug!(peer = ?peer_id, "connection closed with error {err:?}") } } in_event_tx @@ -458,11 +462,10 @@ impl Actor { } async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> { - let me = *self.state.me(); if matches!(event, InEvent::TimerExpired(_)) { - trace!(?me, "handle in_event {event:?}"); + trace!("handle in_event {event:?}"); } else { - debug!(?me, "handle in_event {event:?}"); + debug!("handle in_event {event:?}"); }; if let InEvent::PeerDisconnected(peer) = &event { self.conn_send_tx.remove(peer); @@ -470,9 +473,9 @@ impl Actor { let out = self.state.handle(event, now); for event in out { if matches!(event, OutEvent::ScheduleTimer(_, _)) { - trace!(?me, "handle out_event {event:?}"); + trace!("handle out_event {event:?}"); } else { - debug!(?me, "handle out_event {event:?}"); + debug!("handle out_event {event:?}"); }; match event { OutEvent::SendMessage(peer_id, message) => { @@ -482,7 +485,7 @@ impl Actor { self.conn_send_tx.remove(&peer_id); } } else { - debug!(?me, peer = ?peer_id, "dial"); + debug!(peer = ?peer_id, "dial"); self.dialer.queue_dial(peer_id, GOSSIP_ALPN); // TODO: Enforce max length self.pending_sends.entry(peer_id).or_default().push(message); @@ -516,13 +519,13 @@ impl Actor { OutEvent::PeerData(peer, data) => match decode_peer_data(&data) { Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"), Ok(info) => { - debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}"); + debug!(peer = ?peer, "add known addrs: {info:?}"); let peer_addr = PeerAddr { peer_id: peer, info, }; if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await { - debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}"); + debug!(peer = ?peer, "add known failed: {err:?}"); } } }, diff --git a/iroh-gossip/src/proto/hyparview.rs b/iroh-gossip/src/proto/hyparview.rs index 89b871145d..f6e74e8e1f 100644 --- a/iroh-gossip/src/proto/hyparview.rs +++ b/iroh-gossip/src/proto/hyparview.rs @@ -651,7 +651,7 @@ where io.push(OutEvent::EmitEvent(Event::NeighborDown(peer))); let data = self.peer_data.remove(&peer); self.add_passive(peer, data, io); - debug!(peer = ?self.me, other = ?peer, "removed from active view, reason: {reason:?}"); + debug!(other = ?peer, "removed from active view, reason: {reason:?}"); Some(peer) } else { None @@ -701,7 +701,7 @@ where fn add_active_unchecked(&mut self, peer: PI, priority: Priority, io: &mut impl IO) { self.passive_view.remove(&peer); self.active_view.insert(peer); - debug!(peer = ?self.me, other = ?peer, "add to active view"); + debug!(other = ?peer, "add to active view"); let message = Message::Neighbor(Neighbor { priority, diff --git a/iroh-net/src/key.rs b/iroh-net/src/key.rs index a2dbe158ee..e0917b0f40 100644 --- a/iroh-net/src/key.rs +++ b/iroh-net/src/key.rs @@ -82,6 +82,14 @@ impl PublicKey { pub fn verify(&self, message: &[u8], signature: &Signature) -> Result<(), SignatureError> { self.public.verify_strict(message, signature) } + + /// Convert to a base32 string limited to the first 10 bytes for a friendly string + /// representation of the key. + pub fn fmt_short(&self) -> String { + let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]); + text.make_ascii_lowercase(); + text + } } impl TryFrom<&[u8]> for PublicKey { diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 80b6280713..9df8e80a97 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -42,7 +42,7 @@ use tokio::{ sync::{self, mpsc, Mutex}, time, }; -use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument}; use crate::{ config::{self, DERP_MAGIC_IP}, @@ -204,7 +204,8 @@ struct Inner { actor_sender: mpsc::Sender, /// Sends network messages. network_sender: mpsc::Sender>, - name: String, + /// String representation of the peer_id of this node. + me: String, #[allow(clippy::type_complexity)] #[debug("on_endpoints: Option>")] on_endpoints: Option>, @@ -309,16 +310,13 @@ impl MagicSock { /// /// [`Callbacks::on_endpoint`]: crate::magicsock::conn::Callbacks::on_endpoints pub async fn new(opts: Options) -> Result { - let name = format!( - "magic-{}", - hex::encode(&opts.secret_key.public().as_bytes()[..8]) - ); + let me = opts.secret_key.public().fmt_short(); if crate::util::derp_only_mode() { warn!("creating a MagicSock that will only send packets over a DERP relay connection."); } - Self::with_name(name.clone(), opts) - .instrument(info_span!("magicsock", %name)) + Self::with_name(me.clone(), opts) + .instrument(error_span!("magicsock", %me)) .await } @@ -327,7 +325,7 @@ impl MagicSock { self.inner.has_derp_region(region).await } - async fn with_name(name: String, opts: Options) -> Result { + async fn with_name(me: String, opts: Options) -> Result { let port_mapper = portmapper::Client::default().await; let Options { @@ -375,7 +373,7 @@ impl MagicSock { let (network_sender, network_receiver) = mpsc::channel(128); let inner = Arc::new(Inner { - name, + me, on_endpoints, on_derp_active, on_net_info, @@ -523,7 +521,7 @@ impl MagicSock { } /// Triggers an address discovery. The provided why string is for debug logging only. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn re_stun(&self, why: &'static str) { self.inner .actor_sender @@ -552,7 +550,7 @@ impl MagicSock { // TODO // /// Handles a "ping" CLI query. - // #[instrument(skip_all, fields(self.name = %self.name))] + // #[instrument(skip_all, fields(me = %self.inner.me))] // pub async fn ping(&self, peer: config::Node, mut res: config::PingResult, cb: F) // where // F: Fn(config::PingResult) -> BoxFuture<'static, ()> + Send + Sync + 'static, @@ -586,7 +584,7 @@ impl MagicSock { // } /// Sets the connection's preferred local port. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn set_preferred_port(&self, port: u16) { let (s, r) = sync::oneshot::channel(); self.inner @@ -609,7 +607,7 @@ impl MagicSock { } } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] /// Add addresses for a node to the magic socket's addresbook. pub async fn add_peer_addr(&self, addr: PeerAddr) -> Result<()> { let (s, r) = sync::oneshot::channel(); @@ -624,7 +622,7 @@ impl MagicSock { /// Closes the connection. /// /// Only the first close does anything. Any later closes return nil. - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn close(&self) -> Result<()> { if self.inner.is_closed() { return Ok(()); @@ -647,7 +645,7 @@ impl MagicSock { /// Closes and re-binds the UDP sockets and resets the DERP connection. /// It should be followed by a call to ReSTUN. - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn rebind_all(&self) { let (s, r) = sync::oneshot::channel(); self.inner @@ -708,7 +706,7 @@ fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool } impl AsyncUdpSocket for MagicSock { - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn poll_send( &self, _udp_state: &quinn_udp::UdpState, @@ -763,7 +761,7 @@ impl AsyncUdpSocket for MagicSock { Poll::Pending } - #[instrument(skip_all, fields(name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn poll_recv( &self, cx: &mut Context, @@ -828,7 +826,7 @@ impl AsyncUdpSocket for MagicSock { "[QUINN] <- {} ({}b) ({}) ({:?}, {:?})", meta_out.addr, meta_out.len, - self.inner.name, + self.inner.me, meta_out.dst_ip, source ); @@ -1839,7 +1837,7 @@ impl Actor { } /// Records the new endpoints, reporting whether they're changed. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn set_endpoints(&mut self, endpoints: &[config::Endpoint]) -> bool { self.last_endpoints_time = Some(Instant::now()); for (_de, f) in self.on_endpoint_refreshed.drain() { @@ -1857,7 +1855,7 @@ impl Actor { true } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) { let endpoint = self.peer_map.by_id(&endpoint_id); if endpoint.is_none() { @@ -1921,7 +1919,7 @@ impl Actor { } } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn rebind_all(&mut self) { inc!(MagicsockMetrics, rebind_calls); if let Err(err) = self.rebind(CurrentPortFate::Keep).await { @@ -1936,7 +1934,7 @@ impl Actor { /// Resets the preferred address for all peers. /// This is called when connectivity changes enough that we no longer trust the old routes. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] fn reset_endpoint_states(&mut self) { for (_, ep) in self.peer_map.endpoints_mut() { ep.note_connectivity_change(); @@ -1945,7 +1943,7 @@ impl Actor { /// Closes and re-binds the UDP sockets. /// We consider it successful if we manage to bind the IPv4 socket. - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] async fn rebind(&mut self, cur_port_fate: CurrentPortFate) -> Result<()> { let mut ipv6_addr = None; @@ -1985,7 +1983,7 @@ impl Actor { Ok(()) } - #[instrument(skip_all, fields(self.name = %self.inner.name))] + #[instrument(skip_all, fields(me = %self.inner.me))] pub async fn set_preferred_port(&mut self, port: u16) { let existing_port = self.inner.port.swap(port, Ordering::Relaxed); if existing_port == port { diff --git a/iroh/src/downloader.rs b/iroh/src/downloader.rs index 14927265e5..aaa679c1fc 100644 --- a/iroh/src/downloader.rs +++ b/iroh/src/downloader.rs @@ -43,7 +43,7 @@ use iroh_bytes::{ use iroh_net::{key::PublicKey, MagicEndpoint}; use tokio::sync::{mpsc, oneshot}; use tokio_util::{sync::CancellationToken, time::delay_queue}; -use tracing::{debug, trace}; +use tracing::{debug, error_span, trace, Instrument}; mod get; mod invariants; @@ -225,6 +225,7 @@ impl Downloader { S: Store, C: CollectionParser, { + let me = endpoint.peer_id().fmt_short(); let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY); let dialer = iroh_gossip::net::util::Dialer::new(endpoint); @@ -237,7 +238,7 @@ impl Downloader { let service = Service::new(getter, dialer, concurrency_limits, msg_rx); - service.run() + service.run().instrument(error_span!("downloader", %me)) }; rt.local_pool().spawn_pinned(create_future); Self { next_id: 0, msg_tx } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 1514aed324..3af560e6a5 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -53,7 +53,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::dial::Ticket; use crate::downloader::Downloader; @@ -424,22 +424,26 @@ where inner: inner.clone(), collection_parser: self.collection_parser.clone(), }; - rt2.main().spawn(async move { - Self::run( - endpoint, - callbacks, - cb_receiver, - handler, - self.rpc_endpoint, - internal_rpc, - self.custom_get_handler, - self.auth_handler, - self.collection_parser, - rt3, - gossip, - ) - .await - }) + let me = endpoint.peer_id().fmt_short(); + rt2.main().spawn( + async move { + Self::run( + endpoint, + callbacks, + cb_receiver, + handler, + self.rpc_endpoint, + internal_rpc, + self.custom_get_handler, + self.auth_handler, + self.collection_parser, + rt3, + gossip, + ) + .await + } + .instrument(error_span!("node", %me)), + ) }; let node = Node { inner, diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index c40e06d919..2563566d9f 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -35,7 +35,7 @@ use tokio::{ task::JoinError, }; use tokio_util::sync::CancellationToken; -use tracing::{debug, debug_span, error, warn, Instrument}; +use tracing::{debug, error, error_span, warn, Instrument}; pub use iroh_sync::ContentStatus; @@ -183,7 +183,7 @@ impl LiveSync { downloader: Downloader, ) -> Self { let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP); - let me = base32::fmt_short(endpoint.peer_id()); + let me = endpoint.peer_id().fmt_short(); let mut actor = Actor::new( endpoint, gossip, @@ -193,12 +193,14 @@ impl LiveSync { to_actor_rx, to_actor_tx.clone(), ); - let span = debug_span!("sync", %me); - let task = rt.main().spawn(async move { - if let Err(err) = actor.run().instrument(span).await { - error!("live sync failed: {err:?}"); + let task = rt.main().spawn( + async move { + if let Err(err) = actor.run().await { + error!("live sync failed: {err:?}"); + } } - }); + .instrument(error_span!("sync", %me)), + ); let handle = LiveSync { to_actor_tx, task: task.map_err(Arc::new).boxed().shared(), @@ -1016,16 +1018,3 @@ async fn notify_all(subs: &mut HashMap, event: LiveEve } } } - -/// Utilities for working with byte array identifiers -// TODO: copy-pasted from iroh-gossip/src/proto/util.rs -// Unify into iroh-common crate or similar -pub(super) mod base32 { - /// Convert to a base32 string limited to the first 10 bytes - pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String { - let len = bytes.as_ref().len().min(10); - let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]); - text.make_ascii_lowercase(); - text - } -}