Skip to content

Commit

Permalink
feat(*): log me (#1561)
Browse files Browse the repository at this point in the history
## Description

second take at #1544 since forced pushed closed prs can't be reopened

#### Magicsock
```
2023-10-02T18:46:34.455601Z DEBUG magicsock{me=2luekswh7o3a5tz4}:derp.actor:recv_detail:client-connect{key=2luekswh7o3a5tz4enymovsoksgnpb2qpmxlvifp6ywwjnacihya}:connect_0: rustls::client::tls13: TLS1.3 encrypted extensions: [ServerNameAck]
```
#### Sync
```log
2023-09-28T20:51:18.908126Z DEBUG sync{me=oywqb57ovmdry243}: iroh::sync_engine::live: sync[dial]: start namespace=NamespaceId(q47zmyim5r6isz22…) peer=PublicKey(2jnygwapdm26wwa2) reason=DirectJoin last_state=None
```
#### Gossip
```logs
2023-09-28T20:58:42.854730Z DEBUG gossip{me=zooj7iazcl7rsoal}: iroh_gossip::net: handle out_event EmitEvent(TopicId(wno5nwxtkhhtnqsm…), Received(GossipEvent { content: <33b>, delivered_from: PublicKey(7a7kkzndvbt6eulu), scope: Neighbors }))
```

#### Downloader
```
2023-09-28T21:00:11.107411Z DEBUG downloader{me=4vabufwku3wselbl}: iroh::downloader: download completed peer=2jnygwapdm26wwa26tvcprm3m5vqajmhwz7lx5xizwx637ad5sea kind=Blob { hash: Hash(224746ea89d286220e0770f89cda2ab138143b00e384dd795d5a13b77b094822) }
```

## Notes & open questions

probably we will add this in other places or change the logging level
but would be good to get this in to at least have something over which
we can iterate

## Change checklist

- [x] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
divagant-martian committed Oct 2, 2023
1 parent cf9abc0 commit 7e79227
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 91 deletions.
53 changes: 28 additions & 25 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::{broadcast, mpsc, oneshot, watch},
task::JoinHandle,
};
use tracing::{debug, trace, warn};
use tracing::{debug, error_span, trace, warn, Instrument};

use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, PeerData, Scope, TopicId};
Expand Down Expand Up @@ -85,6 +85,8 @@ impl Gossip {
let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP);
let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
let (on_endpoints_tx, on_endpoints_rx) = watch::channel(Default::default());

let me = endpoint.peer_id().fmt_short();
let actor = Actor {
endpoint,
state,
Expand All @@ -100,14 +102,18 @@ impl Gossip {
subscribers_all: None,
subscribers_topic: Default::default(),
};
let actor_handle = tokio::spawn(async move {
if let Err(err) = actor.run().await {
warn!("gossip actor closed with error: {err:?}");
Err(err)
} else {
Ok(())

let actor_handle = tokio::spawn(
async move {
if let Err(err) = actor.run().await {
warn!("gossip actor closed with error: {err:?}");
Err(err)
} else {
Ok(())
}
}
});
.instrument(error_span!("gossip", %me)),
);
Self {
to_actor_tx,
on_endpoints_tx: Arc::new(on_endpoints_tx),
Expand Down Expand Up @@ -333,15 +339,14 @@ struct Actor {

impl Actor {
pub async fn run(mut self) -> anyhow::Result<()> {
let me = *self.state.me();
loop {
tokio::select! {
biased;
msg = self.to_actor_rx.recv() => {
match msg {
Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?,
None => {
debug!(?me, "all gossip handles dropped, stop gossip actor");
debug!("all gossip handles dropped, stop gossip actor");
break;
}
}
Expand All @@ -354,11 +359,11 @@ impl Actor {
(peer_id, res) = self.dialer.next_conn() => {
match res {
Ok(conn) => {
debug!(?me, peer = ?peer_id, "dial successfull");
debug!(peer = ?peer_id, "dial successfull");
self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?;
}
Err(err) => {
warn!(?me, peer = ?peer_id, "dial failed: {err}");
warn!(peer = ?peer_id, "dial failed: {err}");
}
}
}
Expand All @@ -383,8 +388,7 @@ impl Actor {
}

async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
trace!(?me, "handle to_actor {msg:?}");
trace!("handle to_actor {msg:?}");
match msg {
ToActor::ConnIncoming(peer_id, origin, conn) => {
self.conns.insert(peer_id, conn.clone());
Expand All @@ -395,13 +399,13 @@ impl Actor {
// Spawn a task for this connection
let in_event_tx = self.in_event_tx.clone();
tokio::spawn(async move {
debug!(?me, peer = ?peer_id, "connection established");
debug!(peer = ?peer_id, "connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
Ok(()) => {
debug!(?me, peer = ?peer_id, "connection closed without error")
debug!(peer = ?peer_id, "connection closed without error")
}
Err(err) => {
debug!(?me, peer = ?peer_id, "connection closed with error {err:?}")
debug!(peer = ?peer_id, "connection closed with error {err:?}")
}
}
in_event_tx
Expand Down Expand Up @@ -458,21 +462,20 @@ impl Actor {
}

async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
if matches!(event, InEvent::TimerExpired(_)) {
trace!(?me, "handle in_event {event:?}");
trace!("handle in_event {event:?}");
} else {
debug!(?me, "handle in_event {event:?}");
debug!("handle in_event {event:?}");
};
if let InEvent::PeerDisconnected(peer) = &event {
self.conn_send_tx.remove(peer);
}
let out = self.state.handle(event, now);
for event in out {
if matches!(event, OutEvent::ScheduleTimer(_, _)) {
trace!(?me, "handle out_event {event:?}");
trace!("handle out_event {event:?}");
} else {
debug!(?me, "handle out_event {event:?}");
debug!("handle out_event {event:?}");
};
match event {
OutEvent::SendMessage(peer_id, message) => {
Expand All @@ -482,7 +485,7 @@ impl Actor {
self.conn_send_tx.remove(&peer_id);
}
} else {
debug!(?me, peer = ?peer_id, "dial");
debug!(peer = ?peer_id, "dial");
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
// TODO: Enforce max length
self.pending_sends.entry(peer_id).or_default().push(message);
Expand Down Expand Up @@ -516,13 +519,13 @@ impl Actor {
OutEvent::PeerData(peer, data) => match decode_peer_data(&data) {
Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"),
Ok(info) => {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
debug!(peer = ?peer, "add known addrs: {info:?}");
let peer_addr = PeerAddr {
peer_id: peer,
info,
};
if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}");
debug!(peer = ?peer, "add known failed: {err:?}");
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/proto/hyparview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ where
io.push(OutEvent::EmitEvent(Event::NeighborDown(peer)));
let data = self.peer_data.remove(&peer);
self.add_passive(peer, data, io);
debug!(peer = ?self.me, other = ?peer, "removed from active view, reason: {reason:?}");
debug!(other = ?peer, "removed from active view, reason: {reason:?}");
Some(peer)
} else {
None
Expand Down Expand Up @@ -701,7 +701,7 @@ where
fn add_active_unchecked(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
self.passive_view.remove(&peer);
self.active_view.insert(peer);
debug!(peer = ?self.me, other = ?peer, "add to active view");
debug!(other = ?peer, "add to active view");

let message = Message::Neighbor(Neighbor {
priority,
Expand Down
8 changes: 8 additions & 0 deletions iroh-net/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl PublicKey {
pub fn verify(&self, message: &[u8], signature: &Signature) -> Result<(), SignatureError> {
self.public.verify_strict(message, signature)
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the key.
pub fn fmt_short(&self) -> String {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]);
text.make_ascii_lowercase();
text
}
}

impl TryFrom<&[u8]> for PublicKey {
Expand Down
48 changes: 23 additions & 25 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tokio::{
sync::{self, mpsc, Mutex},
time,
};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use tracing::{debug, error, error_span, info, info_span, instrument, trace, warn, Instrument};

use crate::{
config::{self, DERP_MAGIC_IP},
Expand Down Expand Up @@ -204,7 +204,8 @@ struct Inner {
actor_sender: mpsc::Sender<ActorMessage>,
/// Sends network messages.
network_sender: mpsc::Sender<Vec<quinn_udp::Transmit>>,
name: String,
/// String representation of the peer_id of this node.
me: String,
#[allow(clippy::type_complexity)]
#[debug("on_endpoints: Option<Box<..>>")]
on_endpoints: Option<Box<dyn Fn(&[config::Endpoint]) + Send + Sync + 'static>>,
Expand Down Expand Up @@ -309,16 +310,13 @@ impl MagicSock {
///
/// [`Callbacks::on_endpoint`]: crate::magicsock::conn::Callbacks::on_endpoints
pub async fn new(opts: Options) -> Result<Self> {
let name = format!(
"magic-{}",
hex::encode(&opts.secret_key.public().as_bytes()[..8])
);
let me = opts.secret_key.public().fmt_short();
if crate::util::derp_only_mode() {
warn!("creating a MagicSock that will only send packets over a DERP relay connection.");
}

Self::with_name(name.clone(), opts)
.instrument(info_span!("magicsock", %name))
Self::with_name(me.clone(), opts)
.instrument(error_span!("magicsock", %me))
.await
}

Expand All @@ -327,7 +325,7 @@ impl MagicSock {
self.inner.has_derp_region(region).await
}

async fn with_name(name: String, opts: Options) -> Result<Self> {
async fn with_name(me: String, opts: Options) -> Result<Self> {
let port_mapper = portmapper::Client::default().await;

let Options {
Expand Down Expand Up @@ -375,7 +373,7 @@ impl MagicSock {
let (network_sender, network_receiver) = mpsc::channel(128);

let inner = Arc::new(Inner {
name,
me,
on_endpoints,
on_derp_active,
on_net_info,
Expand Down Expand Up @@ -523,7 +521,7 @@ impl MagicSock {
}

/// Triggers an address discovery. The provided why string is for debug logging only.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn re_stun(&self, why: &'static str) {
self.inner
.actor_sender
Expand Down Expand Up @@ -552,7 +550,7 @@ impl MagicSock {

// TODO
// /// Handles a "ping" CLI query.
// #[instrument(skip_all, fields(self.name = %self.name))]
// #[instrument(skip_all, fields(me = %self.inner.me))]
// pub async fn ping<F>(&self, peer: config::Node, mut res: config::PingResult, cb: F)
// where
// F: Fn(config::PingResult) -> BoxFuture<'static, ()> + Send + Sync + 'static,
Expand Down Expand Up @@ -586,7 +584,7 @@ impl MagicSock {
// }

/// Sets the connection's preferred local port.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn set_preferred_port(&self, port: u16) {
let (s, r) = sync::oneshot::channel();
self.inner
Expand All @@ -609,7 +607,7 @@ impl MagicSock {
}
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
/// Add addresses for a node to the magic socket's addresbook.
pub async fn add_peer_addr(&self, addr: PeerAddr) -> Result<()> {
let (s, r) = sync::oneshot::channel();
Expand All @@ -624,7 +622,7 @@ impl MagicSock {
/// Closes the connection.
///
/// Only the first close does anything. Any later closes return nil.
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn close(&self) -> Result<()> {
if self.inner.is_closed() {
return Ok(());
Expand All @@ -647,7 +645,7 @@ impl MagicSock {

/// Closes and re-binds the UDP sockets and resets the DERP connection.
/// It should be followed by a call to ReSTUN.
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn rebind_all(&self) {
let (s, r) = sync::oneshot::channel();
self.inner
Expand Down Expand Up @@ -708,7 +706,7 @@ fn endpoint_sets_equal(xs: &[config::Endpoint], ys: &[config::Endpoint]) -> bool
}

impl AsyncUdpSocket for MagicSock {
#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn poll_send(
&self,
_udp_state: &quinn_udp::UdpState,
Expand Down Expand Up @@ -763,7 +761,7 @@ impl AsyncUdpSocket for MagicSock {
Poll::Pending
}

#[instrument(skip_all, fields(name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn poll_recv(
&self,
cx: &mut Context,
Expand Down Expand Up @@ -828,7 +826,7 @@ impl AsyncUdpSocket for MagicSock {
"[QUINN] <- {} ({}b) ({}) ({:?}, {:?})",
meta_out.addr,
meta_out.len,
self.inner.name,
self.inner.me,
meta_out.dst_ip,
source
);
Expand Down Expand Up @@ -1839,7 +1837,7 @@ impl Actor {
}

/// Records the new endpoints, reporting whether they're changed.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn set_endpoints(&mut self, endpoints: &[config::Endpoint]) -> bool {
self.last_endpoints_time = Some(Instant::now());
for (_de, f) in self.on_endpoint_refreshed.drain() {
Expand All @@ -1857,7 +1855,7 @@ impl Actor {
true
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) {
let endpoint = self.peer_map.by_id(&endpoint_id);
if endpoint.is_none() {
Expand Down Expand Up @@ -1921,7 +1919,7 @@ impl Actor {
}
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn rebind_all(&mut self) {
inc!(MagicsockMetrics, rebind_calls);
if let Err(err) = self.rebind(CurrentPortFate::Keep).await {
Expand All @@ -1936,7 +1934,7 @@ impl Actor {

/// Resets the preferred address for all peers.
/// This is called when connectivity changes enough that we no longer trust the old routes.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
fn reset_endpoint_states(&mut self) {
for (_, ep) in self.peer_map.endpoints_mut() {
ep.note_connectivity_change();
Expand All @@ -1945,7 +1943,7 @@ impl Actor {

/// Closes and re-binds the UDP sockets.
/// We consider it successful if we manage to bind the IPv4 socket.
#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
async fn rebind(&mut self, cur_port_fate: CurrentPortFate) -> Result<()> {
let mut ipv6_addr = None;

Expand Down Expand Up @@ -1985,7 +1983,7 @@ impl Actor {
Ok(())
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn set_preferred_port(&mut self, port: u16) {
let existing_port = self.inner.port.swap(port, Ordering::Relaxed);
if existing_port == port {
Expand Down

0 comments on commit 7e79227

Please sign in to comment.