From 97151c8d0acea5786d31fce8becf8746e7db075e Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 16 Nov 2021 12:57:21 -0500 Subject: [PATCH 01/11] add performance metrics to gossipsub --- protocols/gossipsub/src/behaviour.rs | 17 +++++++++---- protocols/gossipsub/src/metrics.rs | 36 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9daddc5d7af..866bba9fe43 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1174,8 +1174,8 @@ where debug!("Handling IHAVE for peer: {:?}", peer_id); - // use a hashset to avoid duplicates efficiently - let mut iwant_ids = HashSet::new(); + // use a hashmap to avoid duplicates efficiently + let mut iwant_ids = HashMap::new(); for (topic, ids) in ihave_msgs { // only process the message if we are subscribed @@ -1190,7 +1190,7 @@ where for id in ids { if !self.duplicate_cache.contains(&id) { // have not seen this message, request it - iwant_ids.insert(id); + iwant_ids.insert(id, topic.clone()); } } } @@ -1210,7 +1210,7 @@ where peer_id ); - //ask in random order + // Ask in random order let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect(); let mut rng = thread_rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask as usize); @@ -1218,7 +1218,14 @@ where iwant_ids_vec.truncate(iask as usize); *iasked += iask; - let message_ids = iwant_ids_vec.into_iter().cloned().collect::>(); + let mut message_ids = Vec::with_capacity(iwant_ids_vec.len()); + for (id, topic) in iwant_ids_vec { + message_ids.push(id.clone()); + if let Some(m) = self.metrics.as_mut() { + m.iwant(topic) + } + } + if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 8e4e16f0ed8..415d1f73c69 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -138,6 +138,15 @@ pub struct Metrics { topic_msg_sent_counts: Family, /// Bytes from gossip messages sent to each topic . topic_msg_sent_bytes: Family, + + /* Performance metrics */ + /// When the user validates a message, it tries to re propagate it to its mesh peers. If the + /// message expires from the memcache before it can be validated, we count this a cache miss + /// and it is an indicator that the memcache size should be increased. + memcache_misses: Counter, + /// The number of times we have decided that an IWANT control message is required for this + /// topic. A very high metric might indicate an underperforming network. + topic_iwant_msgs: Family, } impl Metrics { @@ -186,6 +195,19 @@ impl Metrics { "topic_msg_sent_bytes", "Bytes from gossip messages sent to each topic." ); + let topic_iwant_msgs = register_family!( + "topic_iwant_msgs", + "Number of times we have decided an IWANT is required for this topic." + ); + let memcache_misses = { + let metric = Counter::default(); + registry.register( + "memcache_misses", + "Number of times a message is not found in the duplicate cache when validating.", + Box::new(metric.clone()), + ); + metric + }; Self { max_topics, @@ -198,6 +220,8 @@ impl Metrics { mesh_peer_churn_events, topic_msg_sent_counts, topic_msg_sent_bytes, + memcache_misses, + topic_iwant_msgs, } } @@ -301,4 +325,16 @@ impl Metrics { .inc_by(bytes as u64); } } + + /// Register a memcache miss. + pub fn memcache_miss(&mut self) { + self.memcache_misses.inc(); + } + + /// Register sending an IWANT msg for this topic. + pub fn iwant(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.topic_iwant_msgs.get_or_create(topic).inc(); + } + } } From 393a6ec0b04fbec89fbebb1801b67957540d660c Mon Sep 17 00:00:00 2001 From: Diva M Date: Tue, 16 Nov 2021 13:02:10 -0500 Subject: [PATCH 02/11] add changelog entry --- protocols/gossipsub/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index d999bca901e..3d1b2e8c8f4 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.34.1 [unreleased] + +- Add metrics for network and configuration performance analysis (see [PR 2346]). + +[PR 2346]: https://github.com/libp2p/rust-libp2p/pull/2346 + # 0.34.0 [2021-11-16] - Add topic and mesh metrics (see [PR 2316]). From 474ceb148f6a56e5d8a4d5c14d5f48040cfbd538 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 26 Nov 2021 16:27:33 +1100 Subject: [PATCH 03/11] Re-export open-client-metrics crate --- protocols/gossipsub/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index a05c81806ee..c401018c26e 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -122,6 +122,10 @@ //! println!("Listening on {:?}", addr); //! ``` +// Re-export open_metrics_client so external applications can inject the correct Registry when +// using metrics +pub use open_metrics_client; + pub mod error; pub mod protocol; From 6284b9e7d591708270d5c093ade8701ebe69863c Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 29 Nov 2021 18:29:42 +1100 Subject: [PATCH 04/11] Added some extra metrics --- protocols/gossipsub/src/behaviour.rs | 21 ++++++++++- protocols/gossipsub/src/metrics.rs | 55 +++++++++++++++++++++++++++- protocols/gossipsub/src/types.rs | 2 +- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 866bba9fe43..1db150bbd79 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1222,7 +1222,7 @@ where for (id, topic) in iwant_ids_vec { message_ids.push(id.clone()); if let Some(m) = self.metrics.as_mut() { - m.iwant(topic) + m.register_iwant(topic) } } @@ -2023,6 +2023,9 @@ where if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_broken_promise(); + } } } } @@ -3054,6 +3057,17 @@ where // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. self.peer_topics.remove(peer_id); + + // If metrics are enabled, register the disconnection of a peer based on its protocol. + if let Some(metrics) = self.metrics.as_mut() { + let peer_kind = &self + .connected_peers + .get(peer_id) + .expect("Connected peer must be registered") + .kind; + metrics.peer_protocol_disconnected(peer_kind); + } + self.connected_peers.remove(peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { @@ -3201,6 +3215,11 @@ where match handler_event { HandlerEvent::PeerKind(kind) => { // We have identified the protocol this peer is using + + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_protocol_connected(&kind); + } + if let PeerKind::NotSupported = kind { debug!( "Peer does not support gossipsub protocols. {}", diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 415d1f73c69..5f184b1676c 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -30,6 +30,7 @@ use open_metrics_client::metrics::gauge::Gauge; use open_metrics_client::registry::Registry; use crate::topic::TopicHash; +use crate::types::PeerKind; // Default value that limits for how many topics do we store metrics. const DEFAULT_MAX_TOPICS: usize = 300; @@ -139,11 +140,21 @@ pub struct Metrics { /// Bytes from gossip messages sent to each topic . topic_msg_sent_bytes: Family, + /* General Metrics */ + /// Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based + /// on which protocol they support. This metric keeps track of the number of peers that are + /// connected of each type. + peers_per_protocol: Family<&'static str, Gauge>, + /* Performance metrics */ /// When the user validates a message, it tries to re propagate it to its mesh peers. If the /// message expires from the memcache before it can be validated, we count this a cache miss /// and it is an indicator that the memcache size should be increased. memcache_misses: Counter, + /// If we request a message via an IWANT and the peer does not respond in time, this counter is + /// increased. This measures unresponsive peers or peers that have no set the correct + /// message-id function. + broken_promises: Counter, /// The number of times we have decided that an IWANT control message is required for this /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, @@ -195,6 +206,12 @@ impl Metrics { "topic_msg_sent_bytes", "Bytes from gossip messages sent to each topic." ); + + let peers_per_protocol = register_family!( + "peers_per_protocol", + "Number of connected peers by protocol type." + ); + let topic_iwant_msgs = register_family!( "topic_iwant_msgs", "Number of times we have decided an IWANT is required for this topic." @@ -208,6 +225,15 @@ impl Metrics { ); metric }; + let broken_promises = { + let metric = Counter::default(); + registry.register( + "broken_promises", + "Number of broken IWANT promises. i.e the number of times peers failed to respond to message requests on time.", + Box::new(metric.clone()), + ); + metric + }; Self { max_topics, @@ -220,7 +246,9 @@ impl Metrics { mesh_peer_churn_events, topic_msg_sent_counts, topic_msg_sent_bytes, + peers_per_protocol, memcache_misses, + broken_promises, topic_iwant_msgs, } } @@ -332,9 +360,34 @@ impl Metrics { } /// Register sending an IWANT msg for this topic. - pub fn iwant(&mut self, topic: &TopicHash) { + pub fn register_iwant(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { self.topic_iwant_msgs.get_or_create(topic).inc(); } } + + /// Register a broken promise. A peer can have many broken promises, but we only register one + /// broken promise per peer per heartbeat. This way the number isn't skewed too heavily if a + /// single peer becomes unresponsive. + pub fn register_broken_promise(&mut self) { + self.broken_promises.inc(); + } + + /// Register a new peers connection based on its protocol. + pub fn peer_protocol_connected(&mut self, kind: &PeerKind) { + self.peers_per_protocol + .get_or_create(&kind.as_static_ref()) + .inc(); + } + + /// Removes a peer from the counter based on its protocol when it disconnects. + pub fn peer_protocol_disconnected(&mut self, kind: &PeerKind) { + let metric = self.peers_per_protocol.get_or_create(&kind.as_static_ref()); + if metric.get() == 0 { + return; + } else { + // decrement the counter + metric.set(metric.get() - 1); + } + } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 3f4cc9c6c97..1c3bb1efd88 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -89,7 +89,7 @@ pub struct PeerConnections { } /// Describes the types of peers that can exist in the gossipsub context. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Hash, Eq)] pub enum PeerKind { /// A gossipsub 1.1 peer. Gossipsubv1_1, From 6af6fffa158d8dc22826756c1e54d3a8ca2160dd Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 30 Nov 2021 18:46:32 +1100 Subject: [PATCH 05/11] More metrics --- protocols/gossipsub/src/behaviour.rs | 75 ++++++++++------ protocols/gossipsub/src/metrics.rs | 125 ++++++++++++++++++++++++--- protocols/gossipsub/src/types.rs | 16 ++++ 3 files changed, 178 insertions(+), 38 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 1db150bbd79..b175ad6c23e 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -629,10 +629,9 @@ where return Err(PublishError::Duplicate); } - debug!("Publishing message: {:?}", msg_id); + trace!("Publishing message: {:?}", msg_id); let topic_hash = raw_message.topic.clone(); - let msg_bytes = raw_message.data.len(); // If we are not flood publishing forward the message to mesh peers. let mesh_peers_sent = @@ -726,8 +725,9 @@ where } // Send to peers we know are subscribed to the topic. + let msg_bytes = event.encoded_len(); for peer_id in recipient_peers.iter() { - debug!("Sending message to peer: {:?}", peer_id); + trace!("Sending message to peer: {:?}", peer_id); self.send_message(*peer_id, event.clone())?; if let Some(m) = self.metrics.as_mut() { @@ -736,6 +736,11 @@ where } debug!("Published message: {:?}", &msg_id); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_published_message(&topic_hash); + } + Ok(msg_id) } @@ -776,6 +781,11 @@ where return Ok(false); } }; + + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_msg_validation(&raw_message.topic, &acceptance); + } + self.forward_msg(msg_id, raw_message, Some(propagation_source))?; return Ok(true); } @@ -784,6 +794,10 @@ where }; if let Some(raw_message) = self.mcache.remove(msg_id) { + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_msg_validation(&raw_message.topic, &acceptance); + } + // Tell peer_score about reject if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.reject_message( @@ -1284,35 +1298,26 @@ where // Send the messages to the peer let message_list: Vec<_> = cached_messages.into_iter().map(|entry| entry.1).collect(); - let mut topic_msgs = HashMap::>::default(); - if self.metrics.is_some() { - for msg in message_list.iter() { - topic_msgs - .entry(msg.topic.clone()) - .or_default() - .push(msg.data.len()); - } + let topics = message_list + .iter() + .map(|message| message.topic.clone()) + .collect::>(); + + let message = GossipsubRpc { + subscriptions: Vec::new(), + messages: message_list, + control_msgs: Vec::new(), } + .into_protobuf(); - if self - .send_message( - *peer_id, - GossipsubRpc { - subscriptions: Vec::new(), - messages: message_list, - control_msgs: Vec::new(), - } - .into_protobuf(), - ) - .is_err() - { + let msg_bytes = message.encoded_len(); + + if self.send_message(*peer_id, message).is_err() { error!("Failed to send cached messages. Messages too large"); } else if let Some(m) = self.metrics.as_mut() { // Sending of messages succeeded, register them on the internal metrics. - for (topic, msg_bytes_vec) in topic_msgs.into_iter() { - for msg_bytes in msg_bytes_vec { - m.msg_sent(&topic, msg_bytes); - } + for topic in topics.iter() { + m.msg_sent(&topic, msg_bytes); } } } @@ -1701,6 +1706,11 @@ where mut raw_message: RawGossipsubMessage, propagation_source: &PeerId, ) { + // Record the received metric + if let Some(metrics) = self.metrics.as_mut() { + metrics.msg_recvd_unfiltered(&raw_message.topic, raw_message.raw_protobuf_len()); + } + let fast_message_id = self.config.fast_message_id(&raw_message); if let Some(fast_message_id) = fast_message_id.as_ref() { if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) { @@ -1735,6 +1745,9 @@ where // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_invalid_message(&raw_message.topic); + } return; } @@ -1757,6 +1770,11 @@ where msg_id ); + // Record the received message with the metrics + if let Some(metrics) = self.metrics.as_mut() { + metrics.msg_recvd(&message.topic); + } + // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { @@ -2645,11 +2663,12 @@ where } .into_protobuf(); + let msg_bytes = event.encoded_len(); for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); self.send_message(*peer, event.clone())?; if let Some(m) = self.metrics.as_mut() { - m.msg_sent(&message.topic, message.data.len()); + m.msg_sent(&message.topic, msg_bytes); } } debug!("Completed forwarding message"); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 5f184b1676c..cef9bbdbd2d 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -30,7 +30,7 @@ use open_metrics_client::metrics::gauge::Gauge; use open_metrics_client::registry::Registry; use crate::topic::TopicHash; -use crate::types::PeerKind; +use crate::types::{MessageAcceptance, PeerKind}; // Default value that limits for how many topics do we store metrics. const DEFAULT_MAX_TOPICS: usize = 300; @@ -124,6 +124,14 @@ pub struct Metrics { /// Number of peers subscribed to each topic. This allows us to analyze a topic's behaviour /// regardless of our subscription status. topic_peers_count: Family, + /// The number of invalid messages received for a given topic. + invalid_messages: Family, + /// The number of messages accepted by the application (validation result). + accepted_messages: Family, + /// The number of messages ignored by the application (validation result). + ignored_messages: Family, + /// The number of messages rejected by the application (validation result). + rejected_messages: Family, /* Metrics regarding mesh state */ /// Number of peers in our mesh. This metric should be updated with the count of peers for a @@ -134,11 +142,20 @@ pub struct Metrics { /// Number of times we remove peers in a topic mesh for different reasons. mesh_peer_churn_events: Family, - /* Metrics regarding messages sent */ + /* Metrics regarding messages sent/received */ /// Number of gossip messages sent to each topic. topic_msg_sent_counts: Family, - /// Bytes from gossip messages sent to each topic . + /// Bytes from gossip messages sent to each topic. topic_msg_sent_bytes: Family, + /// Number of gossipsub messages published to each topic. + topic_msg_published: Family, + + /// Number of gossipsub messages received on each topic (without filtering duplicates). + topic_msg_recv_counts_unfiltered: Family, + /// Number of gossipsub messages received on each topic (after filtering duplicates). + topic_msg_recv_counts: Family, + /// Bytes received from gossip messages for each topic. + topic_msg_recv_bytes: Family, /* General Metrics */ /// Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based @@ -185,6 +202,26 @@ impl Metrics { "Number of peers subscribed to each topic" ); + let invalid_messages = register_family!( + "invalid_messages_per_topic", + "Number of invalid messages received for each topic" + ); + + let accepted_messages = register_family!( + "accepted_messages_per_topic", + "Number of accepted messages received for each topic" + ); + + let ignored_messages = register_family!( + "ignored_messages_per_topic", + "Number of ignored messages received for each topic" + ); + + let rejected_messages = register_family!( + "accepted_messages_per_topic", + "Number of rejected messages received for each topic" + ); + let mesh_peer_counts = register_family!( "mesh_peer_counts", "Number of peers in each topic in our mesh" @@ -197,30 +234,47 @@ impl Metrics { "mesh_peer_churn_events", "Number of times a peer gets removed from our mesh for different reasons" ); - let topic_msg_sent_counts = register_family!( "topic_msg_sent_counts", - "Number of gossip messages sent to each topic." + "Number of gossip messages sent to each topic" + ); + let topic_msg_published = register_family!( + "topic_msg_published", + "Number of gossip messages published to each topic" ); let topic_msg_sent_bytes = register_family!( "topic_msg_sent_bytes", - "Bytes from gossip messages sent to each topic." + "Bytes from gossip messages sent to each topic" + ); + + let topic_msg_recv_counts_unfiltered = register_family!( + "topic_msg_recv_counts_unfiltered", + "Number of gossip messages received on each topic (without duplicates being filtered)" + ); + + let topic_msg_recv_counts = register_family!( + "topic_msg_recv_counts", + "Number of gossip messages received on each topic (after duplicates have been filtered)" + ); + let topic_msg_recv_bytes = register_family!( + "topic_msg_recv_bytes", + "Bytes received from gossip messages for each topic" ); let peers_per_protocol = register_family!( "peers_per_protocol", - "Number of connected peers by protocol type." + "Number of connected peers by protocol type" ); let topic_iwant_msgs = register_family!( "topic_iwant_msgs", - "Number of times we have decided an IWANT is required for this topic." + "Number of times we have decided an IWANT is required for this topic" ); let memcache_misses = { let metric = Counter::default(); registry.register( "memcache_misses", - "Number of times a message is not found in the duplicate cache when validating.", + "Number of times a message is not found in the duplicate cache when validating", Box::new(metric.clone()), ); metric @@ -229,7 +283,7 @@ impl Metrics { let metric = Counter::default(); registry.register( "broken_promises", - "Number of broken IWANT promises. i.e the number of times peers failed to respond to message requests on time.", + "Number of broken IWANT promises. i.e the number of times peers failed to respond to message requests on time", Box::new(metric.clone()), ); metric @@ -241,11 +295,19 @@ impl Metrics { topic_info: HashMap::default(), topic_subscription_status, topic_peers_count, + invalid_messages, + accepted_messages, + ignored_messages, + rejected_messages, mesh_peer_counts, mesh_peer_inclusion_events, mesh_peer_churn_events, topic_msg_sent_counts, topic_msg_sent_bytes, + topic_msg_published, + topic_msg_recv_counts_unfiltered, + topic_msg_recv_counts, + topic_msg_recv_bytes, peers_per_protocol, memcache_misses, broken_promises, @@ -344,6 +406,20 @@ impl Metrics { } } + /// Register that an invalid message was received on a specific topic. + pub fn register_invalid_message(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.invalid_messages.get_or_create(topic).inc(); + } + } + + /// Registers that a message was published on a specific topic. + pub fn register_published_message(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.topic_msg_published.get_or_create(topic).inc(); + } + } + /// Register sending a message over a topic. pub fn msg_sent(&mut self, topic: &TopicHash, bytes: usize) { if self.register_topic(topic).is_ok() { @@ -354,6 +430,35 @@ impl Metrics { } } + /// Register that a message was received (and was not a duplicate). + pub fn msg_recvd(&mut self, topic: &TopicHash) { + if self.register_topic(topic).is_ok() { + self.topic_msg_recv_counts.get_or_create(topic).inc(); + } + } + + /// Register that a message was received (could have been a duplicate). + pub fn msg_recvd_unfiltered(&mut self, topic: &TopicHash, bytes: usize) { + if self.register_topic(topic).is_ok() { + self.topic_msg_recv_counts_unfiltered + .get_or_create(topic) + .inc(); + self.topic_msg_recv_bytes + .get_or_create(topic) + .inc_by(bytes as u64); + } + } + + pub fn register_msg_validation(&mut self, topic: &TopicHash, validation: &MessageAcceptance) { + if self.register_topic(topic).is_ok() { + match validation { + MessageAcceptance::Accept => self.accepted_messages.get_or_create(topic).inc(), + MessageAcceptance::Ignore => self.ignored_messages.get_or_create(topic).inc(), + MessageAcceptance::Reject => self.rejected_messages.get_or_create(topic).inc(), + }; + } + } + /// Register a memcache miss. pub fn memcache_miss(&mut self) { self.memcache_misses.inc(); diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 1c3bb1efd88..f7ce44ab6df 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -22,6 +22,7 @@ use crate::rpc_proto; use crate::TopicHash; use libp2p_core::{connection::ConnectionId, PeerId}; +use prost::Message; use std::fmt; use std::fmt::Debug; @@ -126,6 +127,21 @@ pub struct RawGossipsubMessage { pub validated: bool, } +impl RawGossipsubMessage { + /// Calculates the encoded length of this message (used for calculating metrics). + pub fn raw_protobuf_len(&self) -> usize { + let message = rpc_proto::Message { + from: self.source.map(|m| m.to_bytes()), + data: Some(self.data.clone()), + seqno: self.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic: TopicHash::into_string(self.topic.clone()), + signature: self.signature.clone(), + key: self.key.clone(), + }; + message.encoded_len() + } +} + /// The message sent to the user after a [`RawGossipsubMessage`] has been transformed by a /// [`crate::DataTransform`]. #[derive(Clone, PartialEq, Eq, Hash)] From b9ec14d87909f6f204a17b1fc9ce1d1b5bd0e61c Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 3 Dec 2021 12:10:50 +1100 Subject: [PATCH 06/11] Additional metrics --- protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/behaviour.rs | 196 +++++++++++---------- protocols/gossipsub/src/gossip_promises.rs | 2 +- protocols/gossipsub/src/metrics.rs | 96 +++++++--- protocols/gossipsub/src/peer_score.rs | 24 ++- 5 files changed, 200 insertions(+), 120 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 270cc6474f3..aa760af9d5c 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -31,7 +31,7 @@ futures-timer = "3.0.2" pin-project = "1.0.8" instant = "0.1.11" # Metrics dependencies -open-metrics-client = "0.13" +open-metrics-client = "0.13.0" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index b175ad6c23e..30f1ed6c93a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -1372,11 +1372,14 @@ where { if backoff_time > now { warn!( - "GRAFT: peer attempted graft within backoff time, penalizing {}", + "[Penalty] Peer attempted graft within backoff time, penalizing {}", peer_id ); // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_score_penalty("graft-backoff"); + } peer_score.add_penalty(peer_id, 1); // check the flood cutoff @@ -1649,15 +1652,11 @@ where "Rejecting message from peer {} because of blacklisted source: {}", propagation_source, source ); - if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - peer_score.reject_message( - propagation_source, - msg_id, - &raw_message.topic, - RejectReason::BlackListedSource, - ); - gossip_promises.reject_message(msg_id, &RejectReason::BlackListedSource); - } + self.handle_invalid_message( + propagation_source, + raw_message, + RejectReason::BlackListedSource, + ); return false; } } @@ -1683,15 +1682,7 @@ where "Dropping message {} claiming to be from self but forwarded from {}", msg_id, propagation_source ); - if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score { - peer_score.reject_message( - propagation_source, - msg_id, - &raw_message.topic, - RejectReason::SelfOrigin, - ); - gossip_promises.reject_message(msg_id, &RejectReason::SelfOrigin); - } + self.handle_invalid_message(propagation_source, raw_message, RejectReason::SelfOrigin); return false; } @@ -1731,8 +1722,8 @@ where // Reject the message and return self.handle_invalid_message( propagation_source, - raw_message, - ValidationError::TransformFailed, + &raw_message, + RejectReason::ValidationError(ValidationError::TransformFailed), ); return; } @@ -1745,9 +1736,6 @@ where // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. if !self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { - if let Some(metrics) = self.metrics.as_mut() { - metrics.register_invalid_message(&raw_message.topic); - } return; } @@ -1819,19 +1807,27 @@ where fn handle_invalid_message( &mut self, propagation_source: &PeerId, - raw_message: RawGossipsubMessage, - validation_error: ValidationError, + raw_message: &RawGossipsubMessage, + reject_reason: RejectReason, ) { if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { - let reason = RejectReason::ValidationError(validation_error); + if let Some(metrics) = self.metrics.as_mut() { + metrics.register_invalid_message(&raw_message.topic); + } + let fast_message_id_cache = &self.fast_messsage_id_cache; if let Some(msg_id) = self .config - .fast_message_id(&raw_message) + .fast_message_id(raw_message) .and_then(|id| fast_message_id_cache.get(&id)) { - peer_score.reject_message(propagation_source, msg_id, &raw_message.topic, reason); - gossip_promises.reject_message(msg_id, &reason); + peer_score.reject_message( + propagation_source, + msg_id, + &raw_message.topic, + reject_reason, + ); + gossip_promises.reject_message(msg_id, &reject_reason); } else { // The message is invalid, we reject it ignoring any gossip promises. If a peer is // advertising this message via an IHAVE and it's invalid it will be double @@ -2042,7 +2038,7 @@ where for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); if let Some(metrics) = self.metrics.as_mut() { - metrics.register_broken_promise(); + metrics.register_score_penalty("broken_promise"); } } } @@ -2051,6 +2047,7 @@ where /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { debug!("Starting heartbeat"); + let start = Instant::now(); self.heartbeat_ticks += 1; @@ -2075,13 +2072,15 @@ where } } - // cache scores throughout the heartbeat - let mut scores = HashMap::new(); - let peer_score = &self.peer_score; - let mut score = |p: &PeerId| match peer_score { - Some((peer_score, ..)) => *scores.entry(*p).or_insert_with(|| peer_score.score(p)), - _ => 0.0, - }; + // Cache the scores of all connected peers, and record metrics for current penalties. + let mut scores = HashMap::with_capacity(self.connected_peers.len()); + if let Some((peer_score, ..)) = &self.peer_score { + for peer_id in self.connected_peers.keys() { + scores + .entry(peer_id) + .or_insert_with(|| peer_score.metric_score(&peer_id, self.metrics.as_mut())); + } + } // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { @@ -2093,35 +2092,35 @@ where // drop all peers with negative score, without PX // if there is at some point a stable retain method for BTreeSet the following can be // written more efficiently with retain. - let to_remove: Vec<_> = peers - .iter() - .filter(|&p| { - if score(p) < 0.0 { - debug!( - "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ + let mut to_remove_peers = Vec::new(); + for peer_id in peers.iter() { + let peer_score = *scores.get(peer_id).unwrap_or(&0.0); + + // Record the score per mesh + if let Some(metrics) = self.metrics.as_mut() { + metrics.observe_mesh_peers_score(topic_hash, peer_score); + } + + if peer_score < 0.0 { + debug!( + "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ {}]", - p, - score(p), - topic_hash - ); + peer_id, peer_score, topic_hash + ); - let current_topic = to_prune.entry(*p).or_insert_with(Vec::new); - current_topic.push(topic_hash.clone()); - no_px.insert(*p); - true - } else { - false - } - }) - .cloned() - .collect(); + let current_topic = to_prune.entry(*peer_id).or_insert_with(Vec::new); + current_topic.push(topic_hash.clone()); + no_px.insert(*peer_id); + to_remove_peers.push(*peer_id); + } + } if let Some(m) = self.metrics.as_mut() { - m.peers_removed(topic_hash, Churn::BadScore, to_remove.len()) + m.peers_removed(topic_hash, Churn::BadScore, to_remove_peers.len()) } - for peer in to_remove { - peers.remove(&peer); + for peer_id in to_remove_peers { + peers.remove(&peer_id); } // too little peers - add some @@ -2143,7 +2142,7 @@ where !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) - && score(peer) >= 0.0 + && *scores.get(peer).unwrap_or(&0.0) >= 0.0 }, ); for peer in &peer_list { @@ -2172,8 +2171,12 @@ where let mut rng = thread_rng(); let mut shuffled = peers.iter().cloned().collect::>(); shuffled.shuffle(&mut rng); - shuffled - .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); + shuffled.sort_by(|p1, p2| { + let score_p1 = *scores.get(p1).unwrap_or(&0.0); + let score_p2 = *scores.get(p2).unwrap_or(&0.0); + + score_p1.partial_cmp(&score_p2).unwrap_or(Ordering::Equal) + }); // shuffle everything except the last retain_scores many peers (the best ones) shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); @@ -2232,7 +2235,7 @@ where !peers.contains(peer) && !explicit_peers.contains(peer) && !backoffs.is_backoff_with_slack(topic_hash, peer) - && score(peer) >= 0.0 + && *scores.get(peer).unwrap_or(&0.0) >= 0.0 && outbound_peers.contains(peer) }, ); @@ -2265,19 +2268,27 @@ where // now compute the median peer score in the mesh let mut peers_by_score: Vec<_> = peers.iter().collect(); - peers_by_score - .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Equal)); + peers_by_score.sort_by(|p1, p2| { + let p1_score = *scores.get(p1).unwrap_or(&0.0); + let p2_score = *scores.get(p2).unwrap_or(&0.0); + p1_score.partial_cmp(&p2_score).unwrap_or(Equal) + }); let middle = peers_by_score.len() / 2; let median = if peers_by_score.len() % 2 == 0 { - (score( - *peers_by_score.get(middle - 1).expect( - "middle < vector length and middle > 0 since peers.len() > 0", - ), - ) + score(*peers_by_score.get(middle).expect("middle < vector length"))) - * 0.5 + let sub_middle_peer = *peers_by_score + .get(middle - 1) + .expect("middle < vector length and middle > 0 since peers.len() > 0"); + let sub_middle_score = *scores.get(sub_middle_peer).unwrap_or(&0.0); + let middle_peer = + *peers_by_score.get(middle).expect("middle < vector length"); + let middle_score = *scores.get(middle_peer).unwrap_or(&0.0); + + (sub_middle_score + middle_score) * 0.5 } else { - score(*peers_by_score.get(middle).expect("middle < vector length")) + *scores + .get(*peers_by_score.get(middle).expect("middle < vector length")) + .unwrap_or(&0.0) }; // if the median score is below the threshold, select a better peer (if any) and @@ -2288,11 +2299,11 @@ where &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), - |peer| { - !peers.contains(peer) - && !explicit_peers.contains(peer) - && !backoffs.is_backoff_with_slack(topic_hash, peer) - && score(peer) > median + |peer_id| { + !peers.contains(peer_id) + && !explicit_peers.contains(peer_id) + && !backoffs.is_backoff_with_slack(topic_hash, peer_id) + && *scores.get(peer_id).unwrap_or(&0.0) > median }, ); for peer in &peer_list { @@ -2344,9 +2355,10 @@ where }; for peer in peers.iter() { // is the peer still subscribed to the topic? + let peer_score = *scores.get(peer).unwrap_or(&0.0); match self.peer_topics.get(peer) { Some(topics) => { - if !topics.contains(topic_hash) || score(peer) < publish_threshold { + if !topics.contains(topic_hash) || peer_score < publish_threshold { debug!( "HEARTBEAT: Peer removed from fanout for topic: {:?}", topic_hash @@ -2378,10 +2390,10 @@ where &self.connected_peers, topic_hash, needed_peers, - |peer| { - !peers.contains(peer) - && !explicit_peers.contains(peer) - && score(peer) < publish_threshold + |peer_id| { + !peers.contains(peer_id) + && !explicit_peers.contains(peer_id) + && *scores.get(peer_id).unwrap_or(&0.0) < publish_threshold }, ); peers.extend(new_peers); @@ -2389,12 +2401,6 @@ where } if self.peer_score.is_some() { - trace!("Peer_scores: {:?}", { - for peer in self.peer_topics.keys() { - score(peer); - } - scores - }); trace!("Mesh message deliveries: {:?}", { self.mesh .iter() @@ -2406,7 +2412,7 @@ where .map(|p| { ( *p, - peer_score + self.peer_score .as_ref() .expect("peer_score.is_some()") .0 @@ -2435,6 +2441,10 @@ where self.mcache.shift(); debug!("Completed Heartbeat"); + if let Some(metrics) = self.metrics.as_mut() { + let duration = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX); + metrics.observe_heartbeat_duration(duration); + } } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh @@ -3286,8 +3296,8 @@ where for (raw_message, validation_error) in invalid_messages { self.handle_invalid_message( &propagation_source, - raw_message, - validation_error, + &raw_message, + RejectReason::ValidationError(validation_error), ) } } else { diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 2904b152ee7..2f05f4ede9e 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -86,7 +86,7 @@ impl GossipPromises { let count = result.entry(*peer_id).or_insert(0); *count += 1; debug!( - "The peer {} broke the promise to deliver message {} in time!", + "[Penalty] The peer {} broke the promise to deliver message {} in time!", peer_id, msg ); false diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index cef9bbdbd2d..d95a8289061 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -25,8 +25,9 @@ use std::collections::HashMap; use open_metrics_client::encoding::text::Encode; use open_metrics_client::metrics::counter::Counter; -use open_metrics_client::metrics::family::Family; +use open_metrics_client::metrics::family::{Family, MetricConstructor}; use open_metrics_client::metrics::gauge::Gauge; +use open_metrics_client::metrics::histogram::{linear_buckets, Histogram}; use open_metrics_client::registry::Registry; use crate::topic::TopicHash; @@ -157,21 +158,25 @@ pub struct Metrics { /// Bytes received from gossip messages for each topic. topic_msg_recv_bytes: Family, + /* Metrics related to scoring */ + /// Histogram of the scores for each mesh topic. + score_per_mesh: Family, + /// A counter of the kind of penalties being applied to peers. + scoring_penalties: Family<&'static str, Counter>, + /* General Metrics */ /// Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based /// on which protocol they support. This metric keeps track of the number of peers that are /// connected of each type. peers_per_protocol: Family<&'static str, Gauge>, + /// The time it takes to complete one iteration of the heartbeat. + heartbeat_duration: Histogram, /* Performance metrics */ /// When the user validates a message, it tries to re propagate it to its mesh peers. If the /// message expires from the memcache before it can be validated, we count this a cache miss /// and it is an indicator that the memcache size should be increased. memcache_misses: Counter, - /// If we request a message via an IWANT and the peer does not respond in time, this counter is - /// increased. This measures unresponsive peers or peers that have no set the correct - /// message-id function. - broken_promises: Counter, /// The number of times we have decided that an IWANT control message is required for this /// topic. A very high metric might indicate an underperforming network. topic_iwant_msgs: Family, @@ -260,12 +265,49 @@ impl Metrics { "topic_msg_recv_bytes", "Bytes received from gossip messages for each topic" ); + // TODO: Update default variables once a builder pattern is used. + let gossip_threshold = -4000.0; + let publish_threshold = -8000.0; + let greylist_threshold = -16000.0; + let histogram_buckets: Vec = vec![ + greylist_threshold, + publish_threshold, + gossip_threshold, + gossip_threshold / 2.0, + gossip_threshold / 4.0, + 0.0, + 1.0, + 10.0, + 100.0, + ]; + + let hist_builder = HistBuilder { + buckets: histogram_buckets, + }; + + let score_per_mesh: Family<_, _, HistBuilder> = Family::new_with_constructor(hist_builder); + registry.register( + "score_per_mesh", + "Histogram of scores per mesh topic", + Box::new(score_per_mesh.clone()), + ); + let scoring_penalties = register_family!( + "scoring_penalties", + "Counter of types of scoring penalties given to peers" + ); let peers_per_protocol = register_family!( "peers_per_protocol", "Number of connected peers by protocol type" ); + let heartbeat_duration = Histogram::new(linear_buckets(0.0, 100.0, 10)); + registry.register( + "heartbeat_duration", + "Histogram of observed heartbeat durations", + Box::new(heartbeat_duration.clone()), + ); + let topic_iwant_msgs = register_family!( "topic_iwant_msgs", "Number of times we have decided an IWANT is required for this topic" @@ -279,15 +321,6 @@ impl Metrics { ); metric }; - let broken_promises = { - let metric = Counter::default(); - registry.register( - "broken_promises", - "Number of broken IWANT promises. i.e the number of times peers failed to respond to message requests on time", - Box::new(metric.clone()), - ); - metric - }; Self { max_topics, @@ -308,9 +341,11 @@ impl Metrics { topic_msg_recv_counts_unfiltered, topic_msg_recv_counts, topic_msg_recv_bytes, + score_per_mesh, + scoring_penalties, peers_per_protocol, + heartbeat_duration, memcache_misses, - broken_promises, topic_iwant_msgs, } } @@ -413,6 +448,11 @@ impl Metrics { } } + /// Register a score penalty. + pub fn register_score_penalty(&mut self, kind: &'static str) { + self.scoring_penalties.get_or_create(&kind).inc(); + } + /// Registers that a message was published on a specific topic. pub fn register_published_message(&mut self, topic: &TopicHash) { if self.register_topic(topic).is_ok() { @@ -471,11 +511,16 @@ impl Metrics { } } - /// Register a broken promise. A peer can have many broken promises, but we only register one - /// broken promise per peer per heartbeat. This way the number isn't skewed too heavily if a - /// single peer becomes unresponsive. - pub fn register_broken_promise(&mut self) { - self.broken_promises.inc(); + /// Observes a heartbeat duration. + pub fn observe_heartbeat_duration(&mut self, millis: u64) { + self.heartbeat_duration.observe(millis as f64); + } + + /// Observe a score of a mesh peer. + pub fn observe_mesh_peers_score(&mut self, topic: &TopicHash, score: f64) { + if self.register_topic(topic).is_ok() { + self.score_per_mesh.get_or_create(topic).observe(score); + } } /// Register a new peers connection based on its protocol. @@ -496,3 +541,14 @@ impl Metrics { } } } + +#[derive(Clone)] +struct HistBuilder { + buckets: Vec, +} + +impl MetricConstructor for HistBuilder { + fn new_metric(&self) -> Histogram { + Histogram::new(self.buckets.clone().into_iter()) + } +} diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 09ba48baf62..44ff469f453 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -21,6 +21,7 @@ //! //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. +use crate::metrics::Metrics; use crate::time_cache::TimeCache; use crate::{MessageId, TopicHash}; use instant::Instant; @@ -212,8 +213,14 @@ impl PeerScore { } } - /// Returns the score for a peer. + /// Returns the score for a peer pub fn score(&self, peer_id: &PeerId) -> f64 { + self.metric_score(peer_id, None) + } + + /// Returns the score for a peer, logging metrics. This is called from the heartbeat and + /// increments the metric counts for penalties. + pub fn metric_score(&self, peer_id: &PeerId, mut metrics: Option<&mut Metrics>) -> f64 { let peer_stats = match self.peer_stats.get(peer_id) { Some(v) => v, None => return 0.0, @@ -264,8 +271,11 @@ impl PeerScore { - topic_stats.mesh_message_deliveries; let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; + if let Some(metrics) = metrics.as_mut() { + metrics.register_score_penalty("message_deficit"); + } debug!( - "The peer {} has a mesh message deliveries deficit of {} in topic\ + "[Penalty] The peer {} has a mesh message deliveries deficit of {} in topic\ {} and will get penalized by {}", peer_id, deficit, @@ -313,8 +323,11 @@ impl PeerScore { if (peers_in_ip as f64) > self.params.ip_colocation_factor_threshold { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; + if let Some(metrics) = metrics.as_mut() { + metrics.register_score_penalty("ip-colocation"); + } debug!( - "The peer {} gets penalized because of too many peers with the ip {}. \ + "[Penalty] The peer {} gets penalized because of too many peers with the ip {}. \ The surplus is {}. ", peer_id, ip, surplus ); @@ -335,7 +348,7 @@ impl PeerScore { pub fn add_penalty(&mut self, peer_id: &PeerId, count: usize) { if let Some(peer_stats) = self.peer_stats.get_mut(peer_id) { debug!( - "Behavioral penalty for peer {}, count = {}.", + "[Penalty] Behavioral penalty for peer {}, count = {}.", peer_id, count ); peer_stats.behaviour_penalty += count as f64; @@ -600,6 +613,7 @@ impl PeerScore { "Message from {} rejected because of ValidationError or SelfOrigin", from ); + self.mark_invalid_message_delivery(from, topic_hash); } @@ -764,7 +778,7 @@ impl PeerScore { peer_stats.stats_or_default_mut(topic_hash.clone(), &self.params) { debug!( - "Peer {} delivered an invalid message in topic {} and gets penalized \ + "[Penalty] Peer {} delivered an invalid message in topic {} and gets penalized \ for it", peer_id, topic_hash ); From 858b9e34894ca4d65a6375c386e258a155bec7bb Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 3 Dec 2021 17:12:14 +1100 Subject: [PATCH 07/11] Correct topic encoding --- protocols/gossipsub/src/metrics.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index d95a8289061..ffb496a1a6a 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -91,12 +91,23 @@ pub enum Churn { } /// Label for the mesh inclusion event metrics. -#[derive(PartialEq, Eq, Hash, Encode, Clone)] +#[derive(PartialEq, Eq, Hash, Clone)] struct InclusionLabel { topic: TopicHash, reason: Inclusion, } +// Custom implementation is necessary because TopicHash encodes as "hash=", when using the derive +// gives topic="hash="..."" +impl Encode for InclusionLabel { + fn encode(&self, writer: &mut dyn std::io::Write) -> Result<(), std::io::Error> { + self.topic.encode(writer)?; + writer.write_all(b"reason=\"")?; + self.reason.encode(writer)?; + Ok(()) + } +} + /// Label for the mesh churn event metrics. #[derive(PartialEq, Eq, Hash, Encode, Clone)] struct ChurnLabel { From a8b67a34b0a0b5b11e4bbcd6bce807ca257000ee Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 3 Dec 2021 17:27:01 +1100 Subject: [PATCH 08/11] More manual encoding of metric labels --- protocols/gossipsub/src/metrics.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index ffb496a1a6a..258b05dc0b0 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -102,19 +102,32 @@ struct InclusionLabel { impl Encode for InclusionLabel { fn encode(&self, writer: &mut dyn std::io::Write) -> Result<(), std::io::Error> { self.topic.encode(writer)?; + writer.write_all(b",")?; writer.write_all(b"reason=\"")?; self.reason.encode(writer)?; + writer.write_all(b"\"")?; Ok(()) } } /// Label for the mesh churn event metrics. -#[derive(PartialEq, Eq, Hash, Encode, Clone)] +#[derive(PartialEq, Eq, Hash, Clone)] struct ChurnLabel { topic: TopicHash, reason: Churn, } +impl Encode for ChurnLabel { + fn encode(&self, writer: &mut dyn std::io::Write) -> Result<(), std::io::Error> { + self.topic.encode(writer)?; + writer.write_all(b",")?; + writer.write_all(b"reason=\"")?; + self.reason.encode(writer)?; + writer.write_all(b"\"")?; + Ok(()) + } +} + /// A collection of metrics used throughout the Gossipsub behaviour. pub struct Metrics { /* Configuration parameters */ From 5572fd2f6f8b7c6414444c194927570236a33049 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 3 Dec 2021 18:34:02 +1100 Subject: [PATCH 09/11] Clean up metric labelling --- protocols/gossipsub/src/behaviour.rs | 12 +- protocols/gossipsub/src/metrics.rs | 157 +++++++++++++------------- protocols/gossipsub/src/peer_score.rs | 6 +- protocols/gossipsub/src/types.rs | 3 +- 4 files changed, 92 insertions(+), 86 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 30f1ed6c93a..f68ffb091e1 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -51,7 +51,7 @@ use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; -use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics}; +use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -957,7 +957,7 @@ where let fanaout_added = added_peers.len(); if let Some(m) = self.metrics.as_mut() { - m.peers_included(topic_hash, Inclusion::Fanaout, fanaout_added) + m.peers_included(topic_hash, Inclusion::Fanout, fanaout_added) } // check if we need to get more peers, which we randomly select @@ -1378,7 +1378,7 @@ where // add behavioural penalty if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty("graft-backoff"); + metrics.register_score_penalty(Penalty::GraftBackoff); } peer_score.add_penalty(peer_id, 1); @@ -2038,7 +2038,7 @@ where for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); if let Some(metrics) = self.metrics.as_mut() { - metrics.register_score_penalty("broken_promise"); + metrics.register_score_penalty(Penalty::BrokenPromise); } } } @@ -3094,7 +3094,7 @@ where .get(peer_id) .expect("Connected peer must be registered") .kind; - metrics.peer_protocol_disconnected(peer_kind); + metrics.peer_protocol_disconnected(peer_kind.clone()); } self.connected_peers.remove(peer_id); @@ -3246,7 +3246,7 @@ where // We have identified the protocol this peer is using if let Some(metrics) = self.metrics.as_mut() { - metrics.peer_protocol_connected(&kind); + metrics.peer_protocol_connected(kind.clone()); } if let PeerKind::NotSupported = kind { diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 258b05dc0b0..2690a7f59e5 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -62,72 +62,6 @@ impl Default for Config { /// Whether we have ever been subscribed to this topic. type EverSubscribed = bool; -/// Reasons why a peer was included in the mesh. -#[derive(PartialEq, Eq, Hash, Encode, Clone)] -pub enum Inclusion { - /// Peer was a fanaout peer. - Fanaout, - /// Included from random selection. - Random, - /// Peer subscribed. - Subscribed, - /// Peer was included to fill the outbound quota. - Outbound, -} - -/// Reasons why a peer was removed from the mesh. -#[derive(PartialEq, Eq, Hash, Encode, Clone)] -pub enum Churn { - /// Peer disconnected. - Dc, - /// Peer had a bad score. - BadScore, - /// Peer sent a PRUNE. - Prune, - /// Peer unsubscribed. - Unsub, - /// Too many peers. - Excess, -} - -/// Label for the mesh inclusion event metrics. -#[derive(PartialEq, Eq, Hash, Clone)] -struct InclusionLabel { - topic: TopicHash, - reason: Inclusion, -} - -// Custom implementation is necessary because TopicHash encodes as "hash=", when using the derive -// gives topic="hash="..."" -impl Encode for InclusionLabel { - fn encode(&self, writer: &mut dyn std::io::Write) -> Result<(), std::io::Error> { - self.topic.encode(writer)?; - writer.write_all(b",")?; - writer.write_all(b"reason=\"")?; - self.reason.encode(writer)?; - writer.write_all(b"\"")?; - Ok(()) - } -} - -/// Label for the mesh churn event metrics. -#[derive(PartialEq, Eq, Hash, Clone)] -struct ChurnLabel { - topic: TopicHash, - reason: Churn, -} - -impl Encode for ChurnLabel { - fn encode(&self, writer: &mut dyn std::io::Write) -> Result<(), std::io::Error> { - self.topic.encode(writer)?; - writer.write_all(b",")?; - writer.write_all(b"reason=\"")?; - self.reason.encode(writer)?; - writer.write_all(b"\"")?; - Ok(()) - } -} - /// A collection of metrics used throughout the Gossipsub behaviour. pub struct Metrics { /* Configuration parameters */ @@ -186,13 +120,13 @@ pub struct Metrics { /// Histogram of the scores for each mesh topic. score_per_mesh: Family, /// A counter of the kind of penalties being applied to peers. - scoring_penalties: Family<&'static str, Counter>, + scoring_penalties: Family, /* General Metrics */ /// Gossipsub supports floodsub, gossipsub v1.0 and gossipsub v1.1. Peers are classified based /// on which protocol they support. This metric keeps track of the number of peers that are /// connected of each type. - peers_per_protocol: Family<&'static str, Gauge>, + peers_per_protocol: Family, /// The time it takes to complete one iteration of the heartbeat. heartbeat_duration: Histogram, @@ -438,7 +372,7 @@ impl Metrics { if self.register_topic(topic).is_ok() { self.mesh_peer_inclusion_events .get_or_create(&InclusionLabel { - topic: topic.clone(), + hash: topic.to_string(), reason, }) .inc_by(count as u64); @@ -450,7 +384,7 @@ impl Metrics { if self.register_topic(topic).is_ok() { self.mesh_peer_churn_events .get_or_create(&ChurnLabel { - topic: topic.clone(), + hash: topic.to_string(), reason, }) .inc_by(count as u64); @@ -473,8 +407,10 @@ impl Metrics { } /// Register a score penalty. - pub fn register_score_penalty(&mut self, kind: &'static str) { - self.scoring_penalties.get_or_create(&kind).inc(); + pub fn register_score_penalty(&mut self, penalty: Penalty) { + self.scoring_penalties + .get_or_create(&PenaltyLabel { penalty }) + .inc(); } /// Registers that a message was published on a specific topic. @@ -548,15 +484,17 @@ impl Metrics { } /// Register a new peers connection based on its protocol. - pub fn peer_protocol_connected(&mut self, kind: &PeerKind) { + pub fn peer_protocol_connected(&mut self, kind: PeerKind) { self.peers_per_protocol - .get_or_create(&kind.as_static_ref()) + .get_or_create(&ProtocolLabel { protocol: kind }) .inc(); } /// Removes a peer from the counter based on its protocol when it disconnects. - pub fn peer_protocol_disconnected(&mut self, kind: &PeerKind) { - let metric = self.peers_per_protocol.get_or_create(&kind.as_static_ref()); + pub fn peer_protocol_disconnected(&mut self, kind: PeerKind) { + let metric = self + .peers_per_protocol + .get_or_create(&ProtocolLabel { protocol: kind }); if metric.get() == 0 { return; } else { @@ -566,6 +504,73 @@ impl Metrics { } } +/// Reasons why a peer was included in the mesh. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +pub enum Inclusion { + /// Peer was a fanaout peer. + Fanout, + /// Included from random selection. + Random, + /// Peer subscribed. + Subscribed, + /// Peer was included to fill the outbound quota. + Outbound, +} + +/// Reasons why a peer was removed from the mesh. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +pub enum Churn { + /// Peer disconnected. + Dc, + /// Peer had a bad score. + BadScore, + /// Peer sent a PRUNE. + Prune, + /// Peer unsubscribed. + Unsub, + /// Too many peers. + Excess, +} + +/// Kinds of reasons a peer's score has been penalized +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +pub enum Penalty { + /// A peer grafted before waiting the back-off time. + GraftBackoff, + /// A Peer did not respond to an IWANT request in time. + BrokenPromise, + /// A Peer did not send enough messages as expected. + MessageDeficit, + /// Too many peers under one IP address. + IPColocation, +} + +/// Label for the mesh inclusion event metrics. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct InclusionLabel { + hash: String, + reason: Inclusion, +} + +/// Label for the mesh churn event metrics. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct ChurnLabel { + hash: String, + reason: Churn, +} + +/// Label for the kinds of protocols peers can connect as. +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct ProtocolLabel { + protocol: PeerKind, +} + +/// Label for the kinds of scoring penalties that can occur +#[derive(PartialEq, Eq, Hash, Encode, Clone)] +struct PenaltyLabel { + penalty: Penalty, +} + #[derive(Clone)] struct HistBuilder { buckets: Vec, diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 44ff469f453..ba14985405c 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -21,7 +21,7 @@ //! //! Manages and stores the Scoring logic of a particular peer on the gossipsub behaviour. -use crate::metrics::Metrics; +use crate::metrics::{Metrics, Penalty}; use crate::time_cache::TimeCache; use crate::{MessageId, TopicHash}; use instant::Instant; @@ -272,7 +272,7 @@ impl PeerScore { let p3 = deficit * deficit; topic_score += p3 * topic_params.mesh_message_deliveries_weight; if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty("message_deficit"); + metrics.register_score_penalty(Penalty::MessageDeficit); } debug!( "[Penalty] The peer {} has a mesh message deliveries deficit of {} in topic\ @@ -324,7 +324,7 @@ impl PeerScore { let surplus = (peers_in_ip as f64) - self.params.ip_colocation_factor_threshold; let p6 = surplus * surplus; if let Some(metrics) = metrics.as_mut() { - metrics.register_score_penalty("ip-colocation"); + metrics.register_score_penalty(Penalty::IPColocation); } debug!( "[Penalty] The peer {} gets penalized because of too many peers with the ip {}. \ diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index f7ce44ab6df..f9c2ddaeb57 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -22,6 +22,7 @@ use crate::rpc_proto; use crate::TopicHash; use libp2p_core::{connection::ConnectionId, PeerId}; +use open_metrics_client::encoding::text::Encode; use prost::Message; use std::fmt; use std::fmt::Debug; @@ -90,7 +91,7 @@ pub struct PeerConnections { } /// Describes the types of peers that can exist in the gossipsub context. -#[derive(Debug, Clone, PartialEq, Hash, Eq)] +#[derive(Debug, Clone, PartialEq, Hash, Encode, Eq)] pub enum PeerKind { /// A gossipsub 1.1 peer. Gossipsubv1_1, From d3165fcf90240967732fe740398da70642c1d1c9 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 13 Dec 2021 18:50:28 +1100 Subject: [PATCH 10/11] Improve heartbeat duration buckets --- protocols/gossipsub/src/metrics.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 2690a7f59e5..f211edc9f99 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -259,7 +259,7 @@ impl Metrics { "Number of connected peers by protocol type" ); - let heartbeat_duration = Histogram::new(linear_buckets(0.0, 100.0, 10)); + let heartbeat_duration = Histogram::new(linear_buckets(0.0, 50.0, 10)); registry.register( "heartbeat_duration", "Histogram of observed heartbeat durations", From eb787c0ed25f5dad3162ce7b3735676d230d919a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 21 Dec 2021 11:34:56 +1100 Subject: [PATCH 11/11] Remove re-export --- protocols/gossipsub/src/lib.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index c401018c26e..a05c81806ee 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -122,10 +122,6 @@ //! println!("Listening on {:?}", addr); //! ``` -// Re-export open_metrics_client so external applications can inject the correct Registry when -// using metrics -pub use open_metrics_client; - pub mod error; pub mod protocol;