Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh, iroh-gossip): introduce log-self feature #1544

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ url = "2.4.0"
[features]
default = ["net"]
net = ["futures", "iroh-net", "quinn", "tokio", "tokio-util"]
log-self = []

[[example]]
name = "chat"
Expand Down
41 changes: 23 additions & 18 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use tokio::{
task::JoinHandle,
};
use tracing::{debug, trace, warn};
#[cfg(feature = "log-self")]
use tracing::{trace_span, Instrument};

use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, PeerData, Scope, TopicId};
Expand Down Expand Up @@ -85,6 +87,9 @@ impl Gossip {
let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP);
let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
let (on_endpoints_tx, on_endpoints_rx) = watch::channel(Default::default());

#[cfg(feature = "log-self")]
let me = endpoint.peer_id().fmt_short();
let actor = Actor {
endpoint,
state,
Expand All @@ -101,7 +106,10 @@ impl Gossip {
subscribers_topic: Default::default(),
};
let actor_handle = tokio::spawn(async move {
if let Err(err) = actor.run().await {
let fut = actor.run();
#[cfg(feature = "log-self")]
let fut = fut.instrument(trace_span!("gossip", %me));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is a spawned future I'd strongly recommend to use an info_span. And also to not make it depend on the feature. Every task should have it's own info_span attached as a general rule.

The span should also cover the bit of error handing in the lines below that are still inside this spawned task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why info_span? looking throughout the code we still have trace_span once in baomap, debug_span 16 times. Looking at what Frando did in his own branch for his needs he went with trace as well and it seems to work well. What's the difference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My general guideline is to use info_span for spawned tasks, motivation:

  • If something is spawned it usually is a "top level task" of some sort (with a serious pinch of salt).
  • In this case you want to see most log messages with that span attached
  • and if you set the level to info, debug or trace you will see that attached span by default
  • (bonus: this matches the "info is for operators" guidance we just discussed)

This does not mean we already do this everywhere consistently. I've done a few PRs in the past making this more consistent but it certainly isn't complete.

Now I understand your reluctance to always having this logging on and that having this as debug or trace makes it even easier to hide. but i do really think this is more useful at info.

Copy link
Contributor Author

@divagant-martian divagant-martian Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok just to be clear about my question I don't understand what this changes, not sure what the effect of this is.

Is the effect to add the fields only from info and above? if so then don't think this will help in debugging tests
if the effect is to limit to info and below then it's probably better

but then it's confusing that trace still gives me debug logs with peer_ids.

TLDR: I'm confused about what the levels do here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See also https://discord.com/channels/949724860232392765/950683937661935667/1156506832567808060 where @Frando is also asked me this question and I (hopefully!) gave a similar answer.

if let Err(err) = fut.await {
warn!("gossip actor closed with error: {err:?}");
Err(err)
} else {
Expand Down Expand Up @@ -333,15 +341,14 @@ struct Actor {

impl Actor {
pub async fn run(mut self) -> anyhow::Result<()> {
let me = *self.state.me();
loop {
tokio::select! {
biased;
msg = self.to_actor_rx.recv() => {
match msg {
Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?,
None => {
debug!(?me, "all gossip handles dropped, stop gossip actor");
debug!("all gossip handles dropped, stop gossip actor");
break;
}
}
Expand All @@ -354,11 +361,11 @@ impl Actor {
(peer_id, res) = self.dialer.next_conn() => {
match res {
Ok(conn) => {
debug!(?me, peer = ?peer_id, "dial successfull");
debug!(peer = ?peer_id, "dial successfull");
self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?;
}
Err(err) => {
warn!(?me, peer = ?peer_id, "dial failed: {err}");
warn!(peer = ?peer_id, "dial failed: {err}");
}
}
}
Expand All @@ -383,8 +390,7 @@ impl Actor {
}

async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
trace!(?me, "handle to_actor {msg:?}");
trace!("handle to_actor {msg:?}");
match msg {
ToActor::ConnIncoming(peer_id, origin, conn) => {
self.conns.insert(peer_id, conn.clone());
Expand All @@ -395,13 +401,13 @@ impl Actor {
// Spawn a task for this connection
let in_event_tx = self.in_event_tx.clone();
tokio::spawn(async move {
debug!(?me, peer = ?peer_id, "connection established");
debug!(peer = ?peer_id, "connection established");
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
Ok(()) => {
debug!(?me, peer = ?peer_id, "connection closed without error")
debug!(peer = ?peer_id, "connection closed without error")
}
Err(err) => {
debug!(?me, peer = ?peer_id, "connection closed with error {err:?}")
debug!(peer = ?peer_id, "connection closed with error {err:?}")
}
}
in_event_tx
Expand Down Expand Up @@ -458,21 +464,20 @@ impl Actor {
}

async fn handle_in_event(&mut self, event: InEvent, now: Instant) -> anyhow::Result<()> {
let me = *self.state.me();
if matches!(event, InEvent::TimerExpired(_)) {
trace!(?me, "handle in_event {event:?}");
trace!("handle in_event {event:?}");
} else {
debug!(?me, "handle in_event {event:?}");
debug!("handle in_event {event:?}");
};
if let InEvent::PeerDisconnected(peer) = &event {
self.conn_send_tx.remove(peer);
}
let out = self.state.handle(event, now);
for event in out {
if matches!(event, OutEvent::ScheduleTimer(_, _)) {
trace!(?me, "handle out_event {event:?}");
trace!("handle out_event {event:?}");
} else {
debug!(?me, "handle out_event {event:?}");
debug!("handle out_event {event:?}");
};
match event {
OutEvent::SendMessage(peer_id, message) => {
Expand All @@ -482,7 +487,7 @@ impl Actor {
self.conn_send_tx.remove(&peer_id);
}
} else {
debug!(?me, peer = ?peer_id, "dial");
debug!(peer = ?peer_id, "dial");
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
// TODO: Enforce max length
self.pending_sends.entry(peer_id).or_default().push(message);
Expand Down Expand Up @@ -516,13 +521,13 @@ impl Actor {
OutEvent::PeerData(peer, data) => match decode_peer_data(&data) {
Err(err) => warn!("Failed to decode {data:?} from {peer}: {err}"),
Ok(info) => {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known addrs: {info:?}");
debug!(peer = ?peer, "add known addrs: {info:?}");
let peer_addr = PeerAddr {
peer_id: peer,
info,
};
if let Err(err) = self.endpoint.add_peer_addr(peer_addr).await {
debug!(me = ?self.endpoint.peer_id(), peer = ?peer, "add known failed: {err:?}");
debug!(peer = ?peer, "add known failed: {err:?}");
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions iroh-gossip/src/proto/hyparview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ where
io.push(OutEvent::EmitEvent(Event::NeighborDown(peer)));
let data = self.peer_data.remove(&peer);
self.add_passive(peer, data, io);
debug!(peer = ?self.me, other = ?peer, "removed from active view, reason: {reason:?}");
debug!(other = ?peer, "removed from active view, reason: {reason:?}");
Some(peer)
} else {
None
Expand Down Expand Up @@ -701,7 +701,7 @@ where
fn add_active_unchecked(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
self.passive_view.remove(&peer);
self.active_view.insert(peer);
debug!(peer = ?self.me, other = ?peer, "add to active view");
debug!(other = ?peer, "add to active view");

let message = Message::Neighbor(Neighbor {
priority,
Expand Down
8 changes: 8 additions & 0 deletions iroh-net/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl PublicKey {
pub fn verify(&self, message: &[u8], signature: &Signature) -> Result<(), SignatureError> {
self.public.verify_strict(message, signature)
}

/// Convert to a base32 string limited to the first 10 bytes for a friendly string
/// representation of the key.
pub fn fmt_short(&self) -> String {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.as_bytes()[..10]);
text.make_ascii_lowercase();
text
}
}

impl TryFrom<&[u8]> for PublicKey {
Expand Down
3 changes: 2 additions & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ colored = { version = "2.0.4", optional = true }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"], optional = true }

[features]
default = ["cli", "metrics"]
default = ["cli", "metrics", "log-self"]
cli = ["clap", "config", "console", "dirs-next", "indicatif", "multibase", "quic-rpc/quinn-transport", "tempfile", "tokio/rt-multi-thread", "tracing-subscriber", "flat-db", "mem-db", "iroh-collection", "shell-words", "shellexpand", "rustyline", "colored", "toml", "human-time", "comfy-table"]
metrics = ["iroh-metrics"]
mem-db = []
flat-db = []
iroh-collection = []
test = []
example-sync = ["cli"]
log-self = ["iroh-gossip/log-self"]

[dev-dependencies]
anyhow = { version = "1", features = ["backtrace"] }
Expand Down
10 changes: 10 additions & 0 deletions iroh/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use iroh_net::{key::PublicKey, MagicEndpoint};
use tokio::sync::{mpsc, oneshot};
use tokio_util::{sync::CancellationToken, time::delay_queue};
use tracing::{debug, trace};
#[cfg(feature = "log-self")]
use tracing::{trace_span, Instrument};

mod get;
mod invariants;
Expand Down Expand Up @@ -225,6 +227,8 @@ impl Downloader {
S: Store,
C: CollectionParser,
{
#[cfg(feature = "log-self")]
let me = endpoint.peer_id().fmt_short();
let (msg_tx, msg_rx) = mpsc::channel(SERVICE_CHANNEL_CAPACITY);
let dialer = iroh_gossip::net::util::Dialer::new(endpoint);

Expand All @@ -237,6 +241,12 @@ impl Downloader {

let service = Service::new(getter, dialer, concurrency_limits, msg_rx);

#[cfg(feature = "log-self")]
{
service.run().instrument(trace_span!("downloader", %me))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, it should always attach the span and be info_span as it's run in a task. Again try to do it around the entire spawned future and not just the service.run() method.

}

#[cfg(not(feature = "log-self"))]
service.run()
};
rt.local_pool().spawn_pinned(create_future);
Expand Down
27 changes: 10 additions & 17 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use tokio::{
task::JoinError,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, warn, Instrument};
use tracing::{debug, error, warn};
#[cfg(feature = "log-self")]
use tracing::{trace_span, Instrument};

pub use iroh_sync::ContentStatus;

Expand Down Expand Up @@ -183,7 +185,8 @@ impl<S: store::Store> LiveSync<S> {
downloader: Downloader,
) -> Self {
let (to_actor_tx, to_actor_rx) = mpsc::channel(CHANNEL_CAP);
let me = base32::fmt_short(endpoint.peer_id());
#[cfg(feature = "log-self")]
let me = endpoint.peer_id().fmt_short();
let mut actor = Actor::new(
endpoint,
gossip,
Expand All @@ -193,9 +196,12 @@ impl<S: store::Store> LiveSync<S> {
to_actor_rx,
to_actor_tx.clone(),
);
let span = debug_span!("sync", %me);

let task = rt.main().spawn(async move {
if let Err(err) = actor.run().instrument(span).await {
let fut = actor.run();
#[cfg(feature = "log-self")]
let fut = fut.instrument(trace_span!("sync", %me));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same note, info_span and cover entire future that's spawned into the task.

if let Err(err) = fut.await {
error!("live sync failed: {err:?}");
}
});
Expand Down Expand Up @@ -1016,16 +1022,3 @@ async fn notify_all(subs: &mut HashMap<u64, OnLiveEventCallback>, event: LiveEve
}
}
}

/// Utilities for working with byte array identifiers
// TODO: copy-pasted from iroh-gossip/src/proto/util.rs
// Unify into iroh-common crate or similar
pub(super) mod base32 {
/// Convert to a base32 string limited to the first 10 bytes
pub fn fmt_short(bytes: impl AsRef<[u8]>) -> String {
let len = bytes.as_ref().len().min(10);
let mut text = data_encoding::BASE32_NOPAD.encode(&bytes.as_ref()[..len]);
text.make_ascii_lowercase();
text
}
}
Loading