Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add performance metrics to gossipsub #2346

Merged
merged 14 commits into from
Dec 21, 2021
6 changes: 6 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.34.1 [unreleased]

- Add metrics for network and configuration performance analysis (see [PR 2346]).

[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346

# 0.34.0 [2021-11-16]

- Add topic and mesh metrics (see [PR 2316]).
Expand Down
17 changes: 12 additions & 5 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,8 +1174,8 @@ where

debug!("Handling IHAVE for peer: {:?}", peer_id);

// use a hashset to avoid duplicates efficiently
let mut iwant_ids = HashSet::new();
// use a hashmap to avoid duplicates efficiently
let mut iwant_ids = HashMap::new();

for (topic, ids) in ihave_msgs {
// only process the message if we are subscribed
Expand All @@ -1190,7 +1190,7 @@ where
for id in ids {
if !self.duplicate_cache.contains(&id) {
// have not seen this message, request it
iwant_ids.insert(id);
iwant_ids.insert(id, topic.clone());
}
}
}
Expand All @@ -1210,15 +1210,22 @@ where
peer_id
);

//ask in random order
// Ask in random order
let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect();
let mut rng = thread_rng();
iwant_ids_vec.partial_shuffle(&mut rng, iask as usize);

iwant_ids_vec.truncate(iask as usize);
*iasked += iask;

let message_ids = iwant_ids_vec.into_iter().cloned().collect::<Vec<_>>();
let mut message_ids = Vec::with_capacity(iwant_ids_vec.len());
for (id, topic) in iwant_ids_vec {
message_ids.push(id.clone());
if let Some(m) = self.metrics.as_mut() {
m.iwant(topic)
}
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
Expand Down
4 changes: 4 additions & 0 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@
//! println!("Listening on {:?}", addr);
//! ```

// Re-export open_metrics_client so external applications can inject the correct Registry when
// using metrics
pub use open_metrics_client;
AgeManning marked this conversation as resolved.
Show resolved Hide resolved

pub mod error;
pub mod protocol;

Expand Down
36 changes: 36 additions & 0 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ pub struct Metrics {
topic_msg_sent_counts: Family<TopicHash, Counter>,
/// Bytes from gossip messages sent to each topic .
topic_msg_sent_bytes: Family<TopicHash, Counter>,

/* Performance metrics */
/// When the user validates a message, it tries to re propagate it to its mesh peers. If the
/// message expires from the memcache before it can be validated, we count this a cache miss
/// and it is an indicator that the memcache size should be increased.
memcache_misses: Counter,
/// The number of times we have decided that an IWANT control message is required for this
/// topic. A very high metric might indicate an underperforming network.
topic_iwant_msgs: Family<TopicHash, Counter>,
}

impl Metrics {
Expand Down Expand Up @@ -186,6 +195,19 @@ impl Metrics {
"topic_msg_sent_bytes",
"Bytes from gossip messages sent to each topic."
);
let topic_iwant_msgs = register_family!(
"topic_iwant_msgs",
"Number of times we have decided an IWANT is required for this topic."
);
let memcache_misses = {
let metric = Counter::default();
registry.register(
"memcache_misses",
"Number of times a message is not found in the duplicate cache when validating.",
Box::new(metric.clone()),
);
metric
};

Self {
max_topics,
Expand All @@ -198,6 +220,8 @@ impl Metrics {
mesh_peer_churn_events,
topic_msg_sent_counts,
topic_msg_sent_bytes,
memcache_misses,
topic_iwant_msgs,
}
}

Expand Down Expand Up @@ -301,4 +325,16 @@ impl Metrics {
.inc_by(bytes as u64);
}
}

/// Register a memcache miss.
pub fn memcache_miss(&mut self) {
self.memcache_misses.inc();
}

/// Register sending an IWANT msg for this topic.
pub fn iwant(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_iwant_msgs.get_or_create(topic).inc();
}
}
}