From 18cd6bc93c804df45579072786ddc1cd17332159 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 13 Sep 2021 08:43:41 +1000 Subject: [PATCH 01/10] Add granular metrics to gossipsub (#146) * Added message count stuff * Message count maps use MeshIndex instead of PeerId * Added validated message count * Count rejected messages too.. * simplified adding new counts significantly * some cleanup and stop resetting counts * Added more metrics and cleanup * added peer slot to logging on score * added more metrics for prune cases * added churn_topic * added debug logging for churn counts * immutable borrow * fixed log * improved logging even more.. * ensure message counts structure exists * Forgot to add prune case * cleaned up debugging with macro * fixed unsubscribed condition * renamed some things for clarity * Optimized Performance with Vec Instead of BTreeMap * Updated comment for clarity * removed redundant category and optimized * fixed small edge case * Unified slot-related HashMaps into 1 Structure * Convert Fields to Array for Passing to Functions * Added Method for Iterating over Metrics * small fix * ran cargo fmt * ran clippy * One last refactor using enums. Looking great now. * Fixed bug with assert and improved error message * fixed issue with release builds * Make minimal changes to master * Added tracking of assigning slot * added slot_metrics_topics() & extra info to logs * Made slot_metrics_topics() smarter * First draft * Update metric design * Add new metric and cleanup * Cleanup code & Remove Redundant map Lookups (#147) * Simplify & Remove Redundant Map Lookups * Cleanup code/Remove Redundant lookups in heartbeat * Commented and renamed things for clarity * More renaming / reorganizing for clarity * Fixed clippy's unnecessary clone warning * one last renaming.. Co-authored-by: Mark Mackey Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com> --- Cargo.toml | 1 + protocols/gossipsub/Cargo.toml | 7 + protocols/gossipsub/src/behaviour.rs | 336 +++++++++++-- protocols/gossipsub/src/behaviour/tests.rs | 73 ++- protocols/gossipsub/src/lib.rs | 4 + protocols/gossipsub/src/mcache.rs | 2 +- protocols/gossipsub/src/metrics.rs | 117 +++++ .../gossipsub/src/metrics/topic_metrics.rs | 445 ++++++++++++++++++ 8 files changed, 941 insertions(+), 44 deletions(-) create mode 100644 protocols/gossipsub/src/metrics.rs create mode 100644 protocols/gossipsub/src/metrics/topic_metrics.rs diff --git a/Cargo.toml b/Cargo.toml index f53f0283846..d05b308ee3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ floodsub = ["libp2p-floodsub"] identify = ["libp2p-identify", "libp2p-metrics/identify"] kad = ["libp2p-kad", "libp2p-metrics/kad"] gossipsub = ["libp2p-gossipsub"] +gossipsub-metrics = ["libp2p-gossipsub/metrics"] metrics = ["libp2p-metrics"] mdns = ["libp2p-mdns"] mplex = ["libp2p-mplex"] diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 115051a5ef4..42efb4dad9e 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -9,6 +9,10 @@ repository = "https://github.com/libp2p/rust-libp2p" keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] + +[features] +metrics = ["strum", "strum_macros"] + [dependencies] libp2p-swarm = { version = "0.31.0", path = "../../swarm" } libp2p-core = { version = "0.30.0", path = "../../core", default-features = false } @@ -27,6 +31,9 @@ smallvec = "1.6.1" prost = "0.8" hex_fmt = "0.3.0" regex = "1.4.0" +# Metrics dependencies +strum = { version = "0.21", optional = true } +strum_macros = { version = "0.21", optional = true} [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 803f79c924b..0a2216f6238 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use std::{ - cmp::{max, Ordering}, + cmp::max, collections::HashSet, collections::VecDeque, collections::{BTreeSet, HashMap}, @@ -68,6 +68,12 @@ use std::{cmp::Ordering::Equal, fmt::Debug}; #[cfg(test)] mod tests; +#[cfg(feature = "metrics")] +use crate::metrics::{ + topic_metrics::{SlotChurnMetric, SlotMessageMetric, TopicMetrics}, + InternalMetrics, +}; + /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. @@ -210,6 +216,10 @@ pub struct Gossipsub< D: DataTransform = IdentityTransform, F: TopicSubscriptionFilter = AllowAllSubscriptionFilter, > { + /// If metrics are enabled, keep track of a set of internal metrics relating to gossipsub. + #[cfg(feature = "metrics")] + metrics: InternalMetrics, + /// Configuration providing gossipsub performance parameters. config: GossipsubConfig, @@ -386,6 +396,8 @@ where // Set up message publishing parameters. Ok(Gossipsub { + #[cfg(feature = "metrics")] + metrics: InternalMetrics::default(), events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -470,6 +482,12 @@ where .map(|(score, ..)| score.score(peer_id)) } + // If metrics are enabled, obtain a shared reference to them. + #[cfg(feature = "metrics")] + pub fn metrics(&self) -> &InternalMetrics { + &self.metrics + } + /// Subscribe to a topic. /// /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already @@ -738,14 +756,55 @@ where "Message not in cache. Ignoring forwarding. Message Id: {}", msg_id ); + + #[cfg(feature = "metrics")] + { + self.metrics.memcache_misses += 1; + } + return Ok(false); } }; + #[cfg(feature = "metrics")] + let topic = raw_message.topic.clone(); + self.forward_msg(msg_id, raw_message, Some(propagation_source))?; + + // Metrics: Report validation result + #[cfg(feature = "metrics")] + self.metrics.increment_message_metric( + &topic, + propagation_source, + SlotMessageMetric::MessagesValidated, + ); return Ok(true); } - MessageAcceptance::Reject => RejectReason::ValidationFailed, - MessageAcceptance::Ignore => RejectReason::ValidationIgnored, + MessageAcceptance::Reject => { + // Metrics: Report validation result + #[cfg(feature = "metrics")] + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesRejected, + ); + } + RejectReason::ValidationFailed + } + MessageAcceptance::Ignore => { + // Metrics: Report validation result + #[cfg(feature = "metrics")] + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesIgnored, + ); + } + RejectReason::ValidationIgnored + } }; if let Some(raw_message) = self.mcache.remove(msg_id) { @@ -761,6 +820,12 @@ where Ok(true) } else { warn!("Rejected message not in cache. Message Id: {}", msg_id); + + #[cfg(feature = "metrics")] + { + self.metrics.memcache_misses += 1; + } + Ok(false) } } @@ -854,6 +919,22 @@ where } } + /// This is just a utility function to verify everything is in order between the + /// mesh and the topic_metrics. It's useful for debugging. + #[cfg(all(feature = "metrics", debug_assertions))] + fn validate_mesh_slots_for_topic(&self, topic: &TopicHash) -> Result<(), String> { + match self.metrics.topic_metrics.get(topic) { + Some(topic_metrics) => match self.mesh.get(topic) { + Some(mesh_peers) => topic_metrics.validate_mesh_slots(mesh_peers), + None => Err(format!("metrics_event[{}] no mesh_peers for topic", topic)), + }, + None => Err(format!( + "metrics_event[{}]: no topic_metrics for topic", + topic + )), + } + } + /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages. fn join(&mut self, topic_hash: &TopicHash) { debug!("Running JOIN for topic: {:?}", topic_hash); @@ -929,6 +1010,10 @@ where mesh_peers.extend(new_peers); } + #[cfg(feature = "metrics")] + self.metrics + .assign_slots_to_peers(topic_hash, added_peers.iter().cloned()); + for peer_id in added_peers { // Send a GRAFT control message debug!("JOIN: Sending Graft message to peer: {:?}", peer_id); @@ -953,7 +1038,19 @@ where &self.connected_peers, ); } - debug!("Completed JOIN for topic: {:?}", topic_hash); + + #[cfg(all(feature = "metrics", debug_assertions))] + { + let validation_result = self.validate_mesh_slots_for_topic(topic_hash); + debug_assert!( + validation_result.is_ok(), + "metrics_event: validate_mesh_slots_for_topic({}) failed! Err({})", + topic_hash, + validation_result.err().unwrap() + ); + } + + trace!("Completed JOIN for topic: {:?}", topic_hash); } /// Creates a PRUNE gossipsub action. @@ -1034,6 +1131,11 @@ where &self.connected_peers, ); } + + #[cfg(feature = "metrics")] + if let Some(topic_metrics) = self.metrics.topic_metrics.get_mut(topic_hash) { + topic_metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); + } } debug!("Completed LEAVE for topic: {:?}", topic_hash); } @@ -1113,7 +1215,7 @@ where debug!("Handling IHAVE for peer: {:?}", peer_id); // use a hashset to avoid duplicates efficiently - let mut iwant_ids = HashSet::new(); + let mut iwant_ids = HashMap::new(); for (topic, ids) in ihave_msgs { // only process the message if we are subscribed @@ -1128,7 +1230,7 @@ where for id in ids { if !self.duplicate_cache.contains(&id) { // have not seen this message, request it - iwant_ids.insert(id); + iwant_ids.insert(id, topic.clone()); } } } @@ -1149,7 +1251,7 @@ where ); //ask in random order - let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect(); + let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect(); let mut rng = thread_rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask as usize); @@ -1169,6 +1271,20 @@ where peer_id, message_ids ); + // Metrics: Add IWANT requests + #[cfg(feature = "metrics")] + { + for id in &message_ids { + if let Some(topic) = iwant_ids.get(id) { + self.metrics + .topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .iwant_requests += 1; + } + } + } + Self::control_pool_add( &mut self.control_pool, *peer_id, @@ -1330,6 +1446,10 @@ where peer_id, &topic_hash ); peers.insert(*peer_id); + + #[cfg(feature = "metrics")] + self.metrics.assign_slot_if_unassigned(&topic_hash, peer_id); + // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( *peer_id, @@ -1392,9 +1512,10 @@ where topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, + #[cfg(feature = "metrics")] churn_reason: SlotChurnMetric, ) { let mut update_backoff = always_update_backoff; - if let Some(peers) = self.mesh.get_mut(&topic_hash) { + if let Some(peers) = self.mesh.get_mut(topic_hash) { // remove the peer if it exists in the mesh if peers.remove(peer_id) { debug!( @@ -1418,6 +1539,8 @@ where &mut self.events, &self.connected_peers, ); + #[cfg(feature = "metrics")] + self.metrics.churn_slot(topic_hash, peer_id, churn_reason); } } if update_backoff { @@ -1441,10 +1564,17 @@ where let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { - self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); + self.remove_peer_from_mesh( + peer_id, + &topic_hash, + backoff, + true, + #[cfg(feature = "metrics")] + SlotChurnMetric::ChurnPrune, + ); if self.mesh.contains_key(&topic_hash) { - //connect to px peers + // connect to px peers if !px.is_empty() { // we ignore PX from peers with insufficient score if below_threshold { @@ -1597,13 +1727,38 @@ where mut raw_message: RawGossipsubMessage, propagation_source: &PeerId, ) { + // Report received message to metrics if we are subscribed to the topic, otherwise + // ignore it. + #[cfg(feature = "metrics")] + if self.mesh.contains_key(&raw_message.topic) { + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesAll, + ); + } + let fast_message_id = self.config.fast_message_id(&raw_message); if let Some(fast_message_id) = fast_message_id.as_ref() { if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) { let msg_id = msg_id.clone(); - self.message_is_valid(&msg_id, &mut raw_message, propagation_source); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic); + if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.duplicated_message( + propagation_source, + &msg_id, + &raw_message.topic, + ); + } + // Metrics: Report the duplicate message, for mesh topics + #[cfg(feature = "metrics")] + if self.mesh.contains_key(&raw_message.topic) { + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } } return; } @@ -1646,13 +1801,34 @@ where if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &message.topic); } + + // Metrics: Report the duplicate message, for mesh topics + #[cfg(feature = "metrics")] + if self.mesh.contains_key(&raw_message.topic) { + // NOTE: Allow overflowing of a usize here + self.metrics.increment_message_metric( + &message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } return; } - debug!( + trace!( "Put message {:?} in duplicate_cache and resolve promises", msg_id ); + // Increment the first message topic, if its in our mesh. + #[cfg(feature = "metrics")] + if self.mesh.contains_key(&raw_message.topic) { + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesFirst, + ); + } + // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { @@ -1678,6 +1854,11 @@ where "Received message on a topic we are not subscribed to: {:?}", message.topic ); + + #[cfg(feature = "metrics")] + { + self.metrics.messages_received_on_invalid_topic += 1; + } return; } @@ -1858,7 +2039,14 @@ where // remove unsubscribed peers from the mesh if it exists for (peer_id, topic_hash) in unsubscribed_peers { - self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); + self.remove_peer_from_mesh( + &peer_id, + &topic_hash, + None, + false, + #[cfg(feature = "metrics")] + SlotChurnMetric::ChurnUnsubscribed, + ); } // Potentially inform the handler if we have added this peer to a mesh for the first time. @@ -1874,6 +2062,12 @@ where ); } + #[cfg(feature = "metrics")] + for topic in &topics_to_graft { + self.metrics + .assign_slot_if_unassigned(topic, propagation_source); + } + // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. if !topics_to_graft.is_empty() @@ -1911,13 +2105,19 @@ where if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); + + // Metrics: Increment broken promises + #[cfg(feature = "metrics")] + { + self.metrics.broken_promises += 1; + } } } } /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { - debug!("Starting heartbeat"); + trace!("Starting heartbeat"); self.heartbeat_ticks += 1; @@ -1950,12 +2150,20 @@ where _ => 0.0, }; + #[cfg(all(debug_assertions, feature = "metrics"))] + let mut modified_topics = HashSet::new(); // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; let backoffs = &self.backoffs; let topic_peers = &self.topic_peers; let outbound_peers = &self.outbound_peers; + #[cfg(feature = "metrics")] + let topic_metrics = self + .metrics + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())); // drop all peers with negative score, without PX // if there is at some point a stable retain method for BTreeSet the following can be @@ -1964,7 +2172,7 @@ where .iter() .filter(|&p| { if score(p) < 0.0 { - debug!( + trace!( "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ {}]", p, @@ -1984,6 +2192,12 @@ where .collect(); for peer in to_remove { peers.remove(&peer); + + // Increment ChurnScore and remove peer from slot + #[cfg(feature = "metrics")] + topic_metrics.churn_slot(&peer, SlotChurnMetric::ChurnScore); + #[cfg(all(debug_assertions, feature = "metrics"))] + modified_topics.insert(topic_hash.clone()); } // too little peers - add some @@ -2013,8 +2227,18 @@ where current_topic.push(topic_hash.clone()); } // update the mesh - debug!("Updating mesh, new mesh: {:?}", peer_list); - peers.extend(peer_list); + if !peer_list.is_empty() { + trace!("Updating mesh, adding to mesh: {:?}", peer_list); + + // Metrics: Update mesh peers + #[cfg(feature = "metrics")] + topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); + #[cfg(all(debug_assertions, feature = "metrics"))] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } // too many peers - remove some @@ -2031,8 +2255,11 @@ where let mut rng = thread_rng(); let mut shuffled = peers.iter().cloned().collect::>(); shuffled.shuffle(&mut rng); - shuffled - .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); + shuffled.sort_by(|p1, p2| { + score(p1) + .partial_cmp(&score(p2)) + .unwrap_or(std::cmp::Ordering::Equal) + }); // shuffle everything except the last retain_scores many peers (the best ones) shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); @@ -2061,6 +2288,11 @@ where outbound -= 1; } } + // Metrics: increment ChurnExcess and vacate slot + #[cfg(feature = "metrics")] + topic_metrics.churn_slot(&peer, SlotChurnMetric::ChurnExcess); + #[cfg(all(debug_assertions, feature = "metrics"))] + modified_topics.insert(topic_hash.clone()); // remove the peer peers.remove(&peer); @@ -2095,9 +2327,18 @@ where let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } - // update the mesh - debug!("Updating mesh, new mesh: {:?}", peer_list); - peers.extend(peer_list); + if !peer_list.is_empty() { + // update the mesh + trace!("Updating mesh, adding to mesh: {:?}", peer_list); + + #[cfg(feature = "metrics")] + topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); + #[cfg(all(debug_assertions, feature = "metrics"))] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } } @@ -2151,12 +2392,23 @@ where let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } - // update the mesh - debug!( - "Opportunistically graft in topic {} with peers {:?}", - topic_hash, peer_list - ); - peers.extend(peer_list); + + if !peer_list.is_empty() { + // update the mesh + debug!( + "Opportunistically graft in topic {} with peers {:?}", + topic_hash, peer_list + ); + + // Metrics: Update mesh peers + #[cfg(feature = "metrics")] + topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); + #[cfg(all(debug_assertions, feature = "metrics"))] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } } } @@ -2192,7 +2444,7 @@ where match self.peer_topics.get(peer) { Some(topics) => { if !topics.contains(&topic_hash) || score(peer) < publish_threshold { - debug!( + trace!( "HEARTBEAT: Peer removed from fanout for topic: {:?}", topic_hash ); @@ -2279,7 +2531,18 @@ where // shift the memcache self.mcache.shift(); - debug!("Completed Heartbeat"); + #[cfg(all(feature = "metrics", debug_assertions))] + for topic in modified_topics { + let validation_result = self.validate_mesh_slots_for_topic(&topic); + debug_assert!( + validation_result.is_ok(), + "metrics_event: validate_mesh_slots_for_topic({}) failed! Err({})", + topic, + validation_result.err().unwrap() + ); + } + + trace!("Completed Heartbeat"); } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh @@ -2875,7 +3138,14 @@ where // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(&topic) { // check if the peer is in the mesh and remove it - mesh_peers.remove(peer_id); + if mesh_peers.contains(peer_id) { + mesh_peers.remove(peer_id); + + // increment churn_disconnected and vacate slot + #[cfg(feature = "metrics")] + self.metrics + .churn_slot(topic, peer_id, SlotChurnMetric::ChurnDisconnected); + } } // remove from topic_peers diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index ead3e6d89a9..3767c37b59d 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -38,9 +38,49 @@ mod tests { use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; + /* For debug purposes + use env_logger::{Builder, Env}; + // Add this line to relevant tests. + Builder::from_env(Env::default().default_filter_or("info")).init(); + */ + use libp2p_swarm::AddressRecord; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; + struct FakePollParams { + peer_id: PeerId, + } + + impl FakePollParams { + pub fn new() -> Self { + FakePollParams { + peer_id: PeerId::random(), + } + } + } + + impl PollParameters for FakePollParams { + type SupportedProtocolsIter = std::vec::IntoIter>; + type ListenedAddressesIter = std::vec::IntoIter; + type ExternalAddressesIter = std::vec::IntoIter; + + fn supported_protocols(&self) -> Self::SupportedProtocolsIter { + Vec::new().into_iter() + } + + fn listened_addresses(&self) -> Self::ListenedAddressesIter { + Vec::new().into_iter() + } + + fn external_addresses(&self) -> Self::ExternalAddressesIter { + Vec::new().into_iter() + } + + fn local_peer_id(&self) -> &PeerId { + &self.peer_id + } + } + #[derive(Default, Builder, Debug)] #[builder(default)] struct InjectNodes @@ -1321,6 +1361,20 @@ mod tests { gs.events.clear(); } + // Process current events + #[allow(dead_code)] + fn process_events(gs: &mut Gossipsub) + where + D: Send + 'static + DataTransform, + F: Send + 'static + TopicSubscriptionFilter, + { + let waker = futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + let mut poll_params = FakePollParams::new(); + + while !gs.poll(&mut cx, &mut poll_params).is_pending() {} + } + #[test] // tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { @@ -1934,7 +1988,7 @@ mod tests { .heartbeat_interval(Duration::from_millis(100)) .build() .unwrap(); - //only one peer => mesh too small and will try to regraft as early as possible + // only one peer => mesh too small and will try to regraft as early as possible let (mut gs, peers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) @@ -1942,22 +1996,21 @@ mod tests { .gs_config(config) .create_network(); - //handle prune from peer with backoff of one second + // handle prune from peer with backoff of one second gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); - //forget all events until now + // forget all events until now flush_events(&mut gs); - - //call heartbeat + // call heartbeat gs.heartbeat(); - //Sleep for one second and apply 10 regular heartbeats (interval = 100ms). + // Sleep for one second and apply 10 regular heartbeats (interval = 100ms). for _ in 0..10 { sleep(Duration::from_millis(100)); gs.heartbeat(); } - //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat + // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( count_control_msgs(&gs, |_, m| match m { @@ -1984,14 +2037,14 @@ mod tests { #[test] fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { - //set default backoff period to 1 second + // set default backoff period to 1 second let config = GossipsubConfigBuilder::default() .prune_backoff(Duration::from_millis(90)) .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) .build() .unwrap(); - //only one peer => mesh too small and will try to regraft as early as possible + // only one peer => mesh too small and will try to regraft as early as possible let (mut gs, peers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) @@ -1999,7 +2052,7 @@ mod tests { .gs_config(config) .create_network(); - //handle prune from peer without a specified backoff + // handle prune from peer without a specified backoff gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index ddba0f69a1e..cfce6e121f4 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -125,6 +125,9 @@ pub mod error; pub mod protocol; +#[cfg(feature = "metrics")] +pub mod metrics; + mod backoff; mod behaviour; mod config; @@ -157,5 +160,6 @@ pub use self::types::{ FastMessageId, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId, RawGossipsubMessage, }; + pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index d9b903acf45..66da2100176 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -89,7 +89,7 @@ impl MessageCache { } /// Get a message with `message_id` - #[cfg(test)] + #[cfg(any(test, feature = "metrics"))] pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { self.msgs.get(message_id) } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs new file mode 100644 index 00000000000..33895394323 --- /dev/null +++ b/protocols/gossipsub/src/metrics.rs @@ -0,0 +1,117 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A set of metrics used to help track and diagnose the network behaviour of the gossipsub +//! protocol. + +pub mod topic_metrics; + +use crate::topic::TopicHash; +use libp2p_core::PeerId; +use log::warn; +use std::collections::HashMap; + +use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; + +/// A collection of metrics used throughout the gossipsub behaviour. +pub struct InternalMetrics { + /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. + pub topic_metrics: HashMap, + /// The number of broken promises (this metric is indicative of nodes with invalid message-ids) + pub broken_promises: usize, + /// When the user validates a message, it tries to re propagate it to its mesh peers. If the + /// message expires from the memcache before it can be validated, we count this a cache miss + /// and it is an indicator that the memcache size should be increased. + pub memcache_misses: usize, + /// Keeps track of the number of messages we have received on topics we are not subscribed + /// to. + pub messages_received_on_invalid_topic: usize, +} + +impl Default for InternalMetrics { + fn default() -> Self { + InternalMetrics { + topic_metrics: HashMap::new(), + broken_promises: 0, + memcache_misses: 0, + messages_received_on_invalid_topic: 0, + } + } +} + +impl InternalMetrics { + /// Returns the slot metrics for a given topic + pub fn slot_metrics_for_topic( + &self, + topic: &TopicHash, + ) -> Option> { + Some(self.topic_metrics.get(topic)?.slot_metrics_iter()) + } + + /// Churns a slot in the topic_metrics. This assumes the peer is in the mesh. + pub fn churn_slot( + &mut self, + topic: &TopicHash, + peer_id: &PeerId, + churn_reason: SlotChurnMetric, + ) { + match self.topic_metrics.get_mut(topic) { + Some(slot_data) => slot_data.churn_slot(peer_id, churn_reason), + None => { + warn!( + "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot_data]", + topic, >::into(churn_reason), peer_id, + ) + } + } + } + + /// Increment a MessageMetric in the topic_metrics for peer in topic. + pub fn increment_message_metric( + &mut self, + topic: &TopicHash, + peer: &PeerId, + message_metric: SlotMessageMetric, + ) { + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .increment_message_metric(peer, message_metric); + } + + /// Assign slots in topic to peers. + pub fn assign_slots_to_peers(&mut self, topic: &TopicHash, peer_list: U) + where + U: Iterator, + { + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .assign_slots_to_peers(peer_list); + } + + /// Assigns a slot in topic to the peer if the peer doesn't already have one. + pub fn assign_slot_if_unassigned(&mut self, topic: &TopicHash, peer: &PeerId) { + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .assign_slot_if_unassigned(*peer); + } +} diff --git a/protocols/gossipsub/src/metrics/topic_metrics.rs b/protocols/gossipsub/src/metrics/topic_metrics.rs new file mode 100644 index 00000000000..6922a7900d9 --- /dev/null +++ b/protocols/gossipsub/src/metrics/topic_metrics.rs @@ -0,0 +1,445 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::TopicHash; +use log::{debug, error, warn}; +use std::collections::{BTreeSet, HashMap}; +use std::ops::AddAssign; + +use libp2p_core::PeerId; +use strum::IntoEnumIterator; +use strum_macros::{EnumIter, IntoStaticStr}; + +#[derive(Default, Clone)] +/// This struct stores all the metrics for a given mesh slot. +/// NOTE: all the `message_*` counters refer to messages received from peers assigned to +/// this mesh slot on the topic this slot is associated with. See [`TopicMetrics`] for more +/// information. +pub struct SlotMetricCounts { + /// The number of times this slot has been assigned to a peer + assign_sum: u32, + /// The total number of messages received + messages_all: u32, + /// The number of duplicate (already seen) messages + /// A large number across peers on this topic could indicate a large amplification on + /// the topic. Lowering the gossip_D parameter could help minimize duplicates. + messages_duplicates: u32, + /// The number of never before seen messages + messages_first: u32, + /// The number of messages that returned [`MessageAcceptance::Ignore`] from validation + messages_ignored: u32, + /// The number of messages that returned [`MessageAcceptance::Reject`] from validation + messages_rejected: u32, + /// The number of messages that returned [`MessageAcceptance::Accept`] from validation + messages_validated: u32, + /// The number of times this slot was churned due to us disconnecting from the peer + churn_disconnected: u32, + /// The number of times this slot was churned due to us removing excess mesh peers + churn_excess: u32, + /// The number of times this slot was churned due to us leaving the topic + churn_leave: u32, + /// The number of times this slot was churned because the peer sent us a PRUNE message + churn_prune: u32, + /// The number of times this slot was churned because the peer score was too low + /// A large number could indicate the network is being attacked or the peer scoring + /// parameters are too restrictive and need to be adjusted. + churn_score: u32, + /// The total number of times this slot was churned for any reason + churn_sum: u32, + /// The number of times this slot was churned due to the peer unsubscribing from the topic + churn_unsubscribed: u32, + /// The peer currently assigned to this slot (None if slot is vacant) + current_peer: Option>, +} + +#[derive(IntoStaticStr, EnumIter, Clone, Copy)] +/// This enumerates all the message metric counters for a slot +pub enum SlotMessageMetric { + /// Total messages received + MessagesAll, + /// Number of duplicate (already seen) messages + MessagesDuplicates, + /// Never before seen messages + MessagesFirst, + /// Messages that returned [`MessageAcceptance::Ignore`] from validation + MessagesIgnored, + /// Messages that returned [`MessageAcceptance::Reject`] from validation + MessagesRejected, + /// Messages that returned [`MessageAcceptance::Accept`] from validation + MessagesValidated, +} + +#[derive(IntoStaticStr, EnumIter, Clone, Copy)] +/// This enumerates all the churn metric counters for a slot +pub enum SlotChurnMetric { + /// Slot churned due to us disconnecting from the peer + ChurnDisconnected, + /// Slot churned due to us removing excess mesh peers + ChurnExcess, + /// Slot churned due to us leaving the topic + ChurnLeave, + /// Slot churned because the peer sent us a PRUNE message + ChurnPrune, + /// Slot churned because the peer score was too low + ChurnScore, + /// Slot churned due to the peer unsubscribing from the topic + ChurnUnsubscribed, +} + +/// This enumerates ALL metric counters for a slot +pub enum SlotMetric { + MessageMetric(SlotMessageMetric), + ChurnMetric(SlotChurnMetric), + /// Total times the slot was churned for any reason + ChurnSum, + /// Total times the slot was assigned + AssignSum, +} + +impl SlotMetric { + /// Make SlotMetric iterable over every metric type + pub fn iter() -> impl Iterator { + SlotMessageMetric::iter() + .map(|message_metric| SlotMetric::MessageMetric(message_metric)) + .chain( + SlotChurnMetric::iter().map(|churn_metric| SlotMetric::ChurnMetric(churn_metric)), + ) + .chain(std::iter::once(SlotMetric::ChurnSum)) + .chain(std::iter::once(SlotMetric::AssignSum)) + } +} + +impl SlotMetricCounts { + pub fn new() -> Self { + SlotMetricCounts::default() + } + + /// returns a message metric count + fn get_message_metric(&self, message_metric: SlotMessageMetric) -> u32 { + match message_metric { + SlotMessageMetric::MessagesAll => self.messages_all, + SlotMessageMetric::MessagesDuplicates => self.messages_duplicates, + SlotMessageMetric::MessagesFirst => self.messages_first, + SlotMessageMetric::MessagesIgnored => self.messages_ignored, + SlotMessageMetric::MessagesRejected => self.messages_rejected, + SlotMessageMetric::MessagesValidated => self.messages_validated, + } + } + + /// increments a message metric count + fn increment_message_metric(&mut self, message_metric: SlotMessageMetric) { + match message_metric { + SlotMessageMetric::MessagesAll => self.messages_all.add_assign(1), + SlotMessageMetric::MessagesDuplicates => self.messages_duplicates.add_assign(1), + SlotMessageMetric::MessagesFirst => self.messages_first.add_assign(1), + SlotMessageMetric::MessagesIgnored => self.messages_ignored.add_assign(1), + SlotMessageMetric::MessagesRejected => self.messages_rejected.add_assign(1), + SlotMessageMetric::MessagesValidated => self.messages_validated.add_assign(1), + }; + } + + /// returns a churn metric count + fn get_churn_metric(&self, churn_reason: SlotChurnMetric) -> u32 { + match churn_reason { + SlotChurnMetric::ChurnDisconnected => self.churn_disconnected, + SlotChurnMetric::ChurnExcess => self.churn_excess, + SlotChurnMetric::ChurnLeave => self.churn_leave, + SlotChurnMetric::ChurnPrune => self.churn_prune, + SlotChurnMetric::ChurnScore => self.churn_score, + SlotChurnMetric::ChurnUnsubscribed => self.churn_unsubscribed, + } + } + + /// churns a slot, incrementing the proper churn metric and returning ChurnSum + pub fn churn_slot(&mut self, churn_reason: SlotChurnMetric) -> u32 { + self.current_peer = None; + self.churn_sum.add_assign(1); + match churn_reason { + SlotChurnMetric::ChurnDisconnected => self.churn_disconnected.add_assign(1), + SlotChurnMetric::ChurnExcess => self.churn_excess.add_assign(1), + SlotChurnMetric::ChurnLeave => self.churn_leave.add_assign(1), + SlotChurnMetric::ChurnPrune => self.churn_prune.add_assign(1), + SlotChurnMetric::ChurnScore => self.churn_score.add_assign(1), + SlotChurnMetric::ChurnUnsubscribed => self.churn_unsubscribed.add_assign(1), + }; + self.churn_sum + } + + /// returns the current peer associated with this slot + pub fn current_peer(&self) -> &Option> { + &self.current_peer + } + + /// assigns a peer to this slot, incrementing and returning AssignSum + pub fn assign_slot(&mut self, peer: PeerId) -> u32 { + self.current_peer = Some(Box::new(peer)); + self.assign_sum.add_assign(1); + self.assign_sum + } + + /// returns the slot metric count corresponding to slot_metric + pub fn get_slot_metric(&self, slot_metric: SlotMetric) -> u32 { + match slot_metric { + SlotMetric::MessageMetric(message_metric) => self.get_message_metric(message_metric), + SlotMetric::ChurnMetric(churn_reason) => self.get_churn_metric(churn_reason), + SlotMetric::ChurnSum => self.churn_sum, + SlotMetric::AssignSum => self.assign_sum, + } + } + + /// returns a vector of pairs of all slot metric names and their corresponding counts + pub fn with_names(&self) -> Vec<(&'static str, u32)> { + SlotMetric::iter() + .map(|t| match t { + SlotMetric::MessageMetric(message_metric) => ( + >::into(message_metric), + self.get_message_metric(message_metric), + ), + SlotMetric::ChurnMetric(churn_reason) => ( + >::into(churn_reason), + self.get_churn_metric(churn_reason), + ), + SlotMetric::ChurnSum => ("ChurnSum", self.churn_sum), + SlotMetric::AssignSum => ("AssignSum", self.assign_sum), + }) + .collect() + } +} + +pub type MeshSlot = usize; +/// This structure stores all metrics associated with a single topic. +/// This introduces the concept of a mesh slot. When a peer is added to the mesh for +/// this topic, it is assigned to a mesh slot. All the metrics relating to messages +/// received from that peer on this topic are then associated to that slot. See the +/// [`SlotMetricCounts`] struct for more information. When a peer exits the mesh, +/// the slot it occupies is 'churned' and becomes vacant. Vacant slots are later +/// re-assigned when a new peer enters the mesh for this topic. +pub struct TopicMetrics { + /// The topic this is associated with (useful for debugging) + topic: TopicHash, + /// Vector of SlotMetricCounts (indexed by MeshSlot) + slot_metrics: Vec, + /// Map of PeerId to MeshSlot + peer_slots: HashMap, + /// Set of Vacant MeshSlots (due to peers leaving the mesh) + vacant_slots: BTreeSet, + /// The number of messages requested via IWANT + /// A large value indicates the mesh isn't performing as optimally as we would + /// like and we have had to request extra messages via gossip + pub iwant_requests: u32, +} + +impl TopicMetrics { + pub fn new(topic: TopicHash) -> Self { + TopicMetrics { + topic, + // the first element in the vector is for peers that aren't in the mesh + slot_metrics: vec![SlotMetricCounts::new()], + peer_slots: HashMap::new(), + vacant_slots: BTreeSet::new(), + iwant_requests: 0, + } + } + + /// Increments the message metric for the specified peer + pub fn increment_message_metric(&mut self, peer: &PeerId, message_metric: SlotMessageMetric) { + let slot = self + .peer_slots + .get(peer) + .map(|s| *s) + // peers that aren't in the mesh get slot 0 + .unwrap_or(0); + match self + .slot_metrics + .get_mut(slot) + { + Some(metric_counts) => metric_counts.increment_message_metric(message_metric), + None => error!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [peer_slots contains peer with slot not existing in slot_metrics!]", + self.topic, + slot, + >::into(message_metric), + peer, + ), + }; + } + + /// Assigns a slot to the peer if the peer doesn't already have one. Note that the + /// lowest vacant slots are assigned first. If all slots are occupied, a new slot + /// will be allocated. + pub fn assign_slot_if_unassigned(&mut self, peer: PeerId) { + if let std::collections::hash_map::Entry::Vacant(entry) = self.peer_slots.entry(peer) { + match self.vacant_slots.iter().next() { + Some(slot_ref) => match self.slot_metrics.get_mut(*slot_ref) { + // vacant slot available, assign new peer to this slot + Some(metric_counts) => { + let slot = *slot_ref; + let assign_sum = metric_counts.assign_slot(peer); + self.vacant_slots.remove(&slot); + entry.insert(slot); + debug!( + "metrics_event[{}]: [slot {:02}] assigning vacant slot to peer {} SUCCESS AssignSum[{}]", + self.topic, slot, peer, assign_sum, + ); + }, + None => error!( + "metrics_event[{}]: [slot {:02}] assigning vacant slot to peer {} FAILURE [SlotMetricCounts doesn't exist in slot_metrics vector!]", + self.topic, slot_ref, peer + ), + }, + None => { + // No vacant slots available, allocate a new slot + let slot = self.slot_metrics.len(); + let mut metric_counts = SlotMetricCounts::new(); + let assign_sum = metric_counts.assign_slot(peer); + self.slot_metrics.push(metric_counts); + entry.insert(slot); + debug!( + "metrics_event[{}]: [slot {:02}] assigning new slot to peer {} SUCCESS AssignSum[{}]", + self.topic, slot, peer, assign_sum, + ); + } + }; + } + } + + /// Ensures all peers returned by the peer_iter have a slot assigned + pub fn assign_slots_to_peers(&mut self, peer_iter: U) + where + U: Iterator, + { + for peer in peer_iter { + self.assign_slot_if_unassigned(peer); + } + } + + /// Churns the slot occupied by peer. + pub fn churn_slot(&mut self, peer: &PeerId, churn_reason: SlotChurnMetric) { + match self.peer_slots.get(peer).cloned() { + Some(slot) => match self.slot_metrics.get_mut(slot) { + Some(metric_counts) => { + debug_assert!(!self.vacant_slots.contains(&slot), + "metrics_event[{}] [slot {:02}] increment {} peer {} FAILURE [vacant slots already contains this slot!]", + self.topic, slot, >::into(churn_reason), peer + ); + let churn_sum = metric_counts.churn_slot(churn_reason); + self.vacant_slots.insert(slot); + self.peer_slots.remove(peer); + debug!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} SUCCESS ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), peer, churn_sum, + ); + }, + None => warn!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [retrieving metric_counts]", + self.topic, slot, >::into(churn_reason), peer + ), + }, + None => warn!( + "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot]", + self.topic, >::into(churn_reason), peer + ), + }; + } + + /// Churns all slots in this topic that aren't already vacant (while incrementing + /// churn_reason). Also clears the peer_slots. This loop is faster than calling + /// churn_slot() for each peer in the topic because it minimizes redundant lookups + /// and only traverses a vector. + pub fn churn_all_slots(&mut self, churn_reason: SlotChurnMetric) { + for slot in (1..self.slot_metrics.len()) + .filter(|s| !self.vacant_slots.contains(s)) + .collect::>() + { + if let Some(metric_counts) = self.slot_metrics.get_mut(slot) { + let previous = metric_counts.current_peer().as_ref().map(|p| **p); + let churn_sum = metric_counts.churn_slot(churn_reason); + self.vacant_slots.insert(slot); + match previous { + Some(peer) => debug!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} SUCCESS ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), peer, churn_sum, + ), + None => warn!( + "metrics_event[{}]: [slot {:02}] increment {} WARNING [current_peer not assigned with non-vacant slot!] ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), churn_sum, + ), + }; + } + } + self.peer_slots.clear(); + } + + /// This function verifies that the TopicMetrics is synchronized perfectly with the mesh. + /// It's useful for debugging. + #[cfg(debug_assertions)] + pub fn validate_mesh_slots(&self, mesh: &BTreeSet) -> Result<(), String> { + let mut result = true; + let mut errors = String::new(); + // No peers are in the peer_slots that aren't in the mesh + for (peer, slot) in self + .peer_slots + .iter() + .filter(|(peer, ..)| !mesh.contains(peer)) + { + result = false; + let message = format!( + "metrics_event[{}]: [slot {:02}] peer {} exists in peer_slots but not in the mesh!\n", + self.topic, slot, peer + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + // No peers are in the mesh that aren't in the peer_slots + for peer in mesh + .iter() + .filter(|peer| !self.peer_slots.contains_key(peer)) + { + result = false; + let message = format!( + "metrics_event[{}]: [slot --] peer {} exists in mesh but not in the peer_slots!\n", + self.topic, peer + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + + // vacant_slots.len() + peer_slots.len() == slot_metrics.len() + 1 + if self.vacant_slots.len() + self.peer_slots.len() + 1 != self.slot_metrics.len() { + result = false; + let message = format!( + "metrics_event[{}] vacant_slots.len()[{}] + peer_slots.len()[{}] + 1 != slot_metrics.len()[{}]", + self.topic, self.vacant_slots.len(), self.peer_slots.len(), self.slot_metrics.len(), + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + + if result { + Ok(()) + } else { + Err(errors) + } + } + + pub fn slot_metrics_iter(&self) -> impl Iterator { + self.slot_metrics.iter() + } +} From e5df08a031958655896bfd658195d3a7d8de4991 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 21 Sep 2021 16:02:14 +1000 Subject: [PATCH 02/10] Add to changelog --- protocols/gossipsub/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index da528c79b42..faafd152256 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,11 @@ # 0.33.0 [unreleased] +- Adds optional metrics for tracking mesh and peer behaviour. + [PR 2235](https://github.com/libp2p/rust-libp2p/pull/2235) + +- Make default features of `libp2p-core` optional. + [PR 2181](https://github.com/libp2p/rust-libp2p/pull/2181) + - Make default features of `libp2p-core` optional. [PR 2181](https://github.com/libp2p/rust-libp2p/pull/2181) From 6df8300654aa01f2f479d04ec5784dd79c5814e6 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 14 Oct 2021 18:03:35 +1100 Subject: [PATCH 03/10] Initial commit towards open metrics --- Cargo.toml | 1 - protocols/gossipsub/Cargo.toml | 10 +- protocols/gossipsub/src/behaviour.rs | 170 +++---- protocols/gossipsub/src/config.rs | 2 +- protocols/gossipsub/src/lib.rs | 1 - protocols/gossipsub/src/mcache.rs | 1 - protocols/gossipsub/src/metrics.rs | 417 ++++++++++++++++-- .../gossipsub/src/metrics/topic_metrics.rs | 88 ++-- protocols/gossipsub/src/topic.rs | 3 +- 9 files changed, 510 insertions(+), 183 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index db51f10e9fe..05b5ccc8860 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,6 @@ floodsub = ["libp2p-floodsub"] identify = ["libp2p-identify", "libp2p-metrics/identify"] kad = ["libp2p-kad", "libp2p-metrics/kad"] gossipsub = ["libp2p-gossipsub"] -gossipsub-metrics = ["libp2p-gossipsub/metrics"] metrics = ["libp2p-metrics"] mdns = ["libp2p-mdns"] mplex = ["libp2p-mplex"] diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 42efb4dad9e..c1b2fff10b0 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -9,13 +9,10 @@ repository = "https://github.com/libp2p/rust-libp2p" keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] - -[features] -metrics = ["strum", "strum_macros"] - [dependencies] libp2p-swarm = { version = "0.31.0", path = "../../swarm" } libp2p-core = { version = "0.30.0", path = "../../core", default-features = false } +libp2p-metrics = { path = "../../misc/metrics"} bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" @@ -32,8 +29,9 @@ prost = "0.8" hex_fmt = "0.3.0" regex = "1.4.0" # Metrics dependencies -strum = { version = "0.21", optional = true } -strum_macros = { version = "0.21", optional = true} +open-metrics-client = "0.12.0" +strum = "0.21" +strum_macros = "0.21" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 32eac0795b3..e57f5ee65db 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -68,9 +68,8 @@ use std::{cmp::Ordering::Equal, fmt::Debug}; #[cfg(test)] mod tests; -#[cfg(feature = "metrics")] use crate::metrics::{ - topic_metrics::{SlotChurnMetric, SlotMessageMetric, TopicMetrics}, + topic_metrics::{SlotChurnMetric, SlotMessageMetric}, InternalMetrics, }; @@ -216,8 +215,7 @@ pub struct Gossipsub< D: DataTransform = IdentityTransform, F: TopicSubscriptionFilter = AllowAllSubscriptionFilter, > { - /// If metrics are enabled, keep track of a set of internal metrics relating to gossipsub. - #[cfg(feature = "metrics")] + /// Keep track of a set of internal metrics relating to gossipsub. metrics: InternalMetrics, /// Configuration providing gossipsub performance parameters. @@ -396,7 +394,6 @@ where // Set up message publishing parameters. Ok(Gossipsub { - #[cfg(feature = "metrics")] metrics: InternalMetrics::default(), events: VecDeque::new(), control_pool: HashMap::new(), @@ -482,12 +479,6 @@ where .map(|(score, ..)| score.score(peer_id)) } - // If metrics are enabled, obtain a shared reference to them. - #[cfg(feature = "metrics")] - pub fn metrics(&self) -> &InternalMetrics { - &self.metrics - } - /// Subscribe to a topic. /// /// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already @@ -756,22 +747,16 @@ where "Message not in cache. Ignoring forwarding. Message Id: {}", msg_id ); - - #[cfg(feature = "metrics")] - { - self.metrics.memcache_misses += 1; - } + self.metrics.memcache_miss(); return Ok(false); } }; - #[cfg(feature = "metrics")] let topic = raw_message.topic.clone(); self.forward_msg(msg_id, raw_message, Some(propagation_source))?; // Metrics: Report validation result - #[cfg(feature = "metrics")] self.metrics.increment_message_metric( &topic, propagation_source, @@ -781,27 +766,29 @@ where } MessageAcceptance::Reject => { // Metrics: Report validation result - #[cfg(feature = "metrics")] - if let Some(raw_message) = self.mcache.get(msg_id) { - // Increment metrics - self.metrics.increment_message_metric( - &raw_message.topic, - propagation_source, - SlotMessageMetric::MessagesRejected, - ); + if self.metrics.enabled() { + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesRejected, + ); + } } RejectReason::ValidationFailed } MessageAcceptance::Ignore => { // Metrics: Report validation result - #[cfg(feature = "metrics")] - if let Some(raw_message) = self.mcache.get(msg_id) { - // Increment metrics - self.metrics.increment_message_metric( - &raw_message.topic, - propagation_source, - SlotMessageMetric::MessagesIgnored, - ); + if self.metrics.enabled() { + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + self.metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesIgnored, + ); + } } RejectReason::ValidationIgnored } @@ -820,11 +807,7 @@ where Ok(true) } else { warn!("Rejected message not in cache. Message Id: {}", msg_id); - - #[cfg(feature = "metrics")] - { - self.metrics.memcache_misses += 1; - } + self.metrics.memcache_miss(); Ok(false) } @@ -921,9 +904,9 @@ where /// This is just a utility function to verify everything is in order between the /// mesh and the topic_metrics. It's useful for debugging. - #[cfg(all(feature = "metrics", debug_assertions))] + #[cfg(debug_assertions)] fn validate_mesh_slots_for_topic(&self, topic: &TopicHash) -> Result<(), String> { - match self.metrics.topic_metrics.get(topic) { + match self.metrics.topic_metrics().get(topic) { Some(topic_metrics) => match self.mesh.get(topic) { Some(mesh_peers) => topic_metrics.validate_mesh_slots(mesh_peers), None => Err(format!("metrics_event[{}] no mesh_peers for topic", topic)), @@ -1010,7 +993,6 @@ where mesh_peers.extend(new_peers); } - #[cfg(feature = "metrics")] self.metrics .assign_slots_to_peers(topic_hash, added_peers.iter().cloned()); @@ -1039,7 +1021,7 @@ where ); } - #[cfg(all(feature = "metrics", debug_assertions))] + #[cfg(debug_assertions)] { let validation_result = self.validate_mesh_slots_for_topic(topic_hash); debug_assert!( @@ -1132,10 +1114,7 @@ where ); } - #[cfg(feature = "metrics")] - if let Some(topic_metrics) = self.metrics.topic_metrics.get_mut(topic_hash) { - topic_metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); - } + self.metrics.leave_topic(topic_hash); } debug!("Completed LEAVE for topic: {:?}", topic_hash); } @@ -1274,15 +1253,10 @@ where ); // Metrics: Add IWANT requests - #[cfg(feature = "metrics")] - { + if self.metrics.enabled() { for id in &message_ids { if let Some(topic) = iwant_ids.get(id) { - self.metrics - .topic_metrics - .entry(topic.clone()) - .or_insert_with(|| TopicMetrics::new(topic.clone())) - .iwant_requests += 1; + self.metrics.iwant_request(topic); } } } @@ -1449,7 +1423,6 @@ where ); peers.insert(*peer_id); - #[cfg(feature = "metrics")] self.metrics.assign_slot_if_unassigned(&topic_hash, peer_id); // If the peer did not previously exist in any mesh, inform the handler @@ -1514,7 +1487,7 @@ where topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, - #[cfg(feature = "metrics")] churn_reason: SlotChurnMetric, + churn_reason: SlotChurnMetric, ) { let mut update_backoff = always_update_backoff; if let Some(peers) = self.mesh.get_mut(topic_hash) { @@ -1541,7 +1514,6 @@ where &mut self.events, &self.connected_peers, ); - #[cfg(feature = "metrics")] self.metrics.churn_slot(topic_hash, peer_id, churn_reason); } } @@ -1571,7 +1543,6 @@ where &topic_hash, backoff, true, - #[cfg(feature = "metrics")] SlotChurnMetric::ChurnPrune, ); @@ -1733,7 +1704,6 @@ where ) { // Report received message to metrics if we are subscribed to the topic, otherwise // ignore it. - #[cfg(feature = "metrics")] if self.mesh.contains_key(&raw_message.topic) { self.metrics.increment_message_metric( &raw_message.topic, @@ -1755,7 +1725,6 @@ where ); } // Metrics: Report the duplicate message, for mesh topics - #[cfg(feature = "metrics")] if self.mesh.contains_key(&raw_message.topic) { self.metrics.increment_message_metric( &raw_message.topic, @@ -1807,7 +1776,6 @@ where } // Metrics: Report the duplicate message, for mesh topics - #[cfg(feature = "metrics")] if self.mesh.contains_key(&raw_message.topic) { // NOTE: Allow overflowing of a usize here self.metrics.increment_message_metric( @@ -1824,7 +1792,6 @@ where ); // Increment the first message topic, if its in our mesh. - #[cfg(feature = "metrics")] if self.mesh.contains_key(&raw_message.topic) { self.metrics.increment_message_metric( &raw_message.topic, @@ -1859,10 +1826,7 @@ where message.topic ); - #[cfg(feature = "metrics")] - { - self.metrics.messages_received_on_invalid_topic += 1; - } + self.metrics.message_invalid_topic(); return; } @@ -1968,7 +1932,15 @@ where } // add to the peer_topics mapping - subscribed_topics.insert(subscription.topic_hash.clone()); + if subscribed_topics.insert(subscription.topic_hash.clone()) { + if self.metrics.enabled() { + self.metrics.peer_joined_topic(&subscription.topic_hash); + if self.mesh.contains_key(&subscription.topic_hash) { + self.metrics + .peer_joined_subscribed_topic(&subscription.topic_hash); + } + } + } // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) @@ -2026,6 +1998,13 @@ where propagation_source.to_string(), subscription.topic_hash ); + if self.metrics.enabled() { + self.metrics.peer_left_topic(&subscription.topic_hash); + if self.mesh.contains_key(&subscription.topic_hash) { + self.metrics + .peer_left_subscribed_topic(&subscription.topic_hash); + } + } } // remove topic from the peer_topics mapping subscribed_topics.remove(&subscription.topic_hash); @@ -2048,7 +2027,6 @@ where &topic_hash, None, false, - #[cfg(feature = "metrics")] SlotChurnMetric::ChurnUnsubscribed, ); } @@ -2066,7 +2044,6 @@ where ); } - #[cfg(feature = "metrics")] for topic in &topics_to_graft { self.metrics .assign_slot_if_unassigned(topic, propagation_source); @@ -2111,10 +2088,7 @@ where peer_score.add_penalty(&peer, count); // Metrics: Increment broken promises - #[cfg(feature = "metrics")] - { - self.metrics.broken_promises += 1; - } + self.metrics.broken_promise(); } } } @@ -2154,7 +2128,7 @@ where _ => 0.0, }; - #[cfg(all(debug_assertions, feature = "metrics"))] + #[cfg(debug_assertions)] let mut modified_topics = HashSet::new(); // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { @@ -2162,12 +2136,6 @@ where let backoffs = &self.backoffs; let topic_peers = &self.topic_peers; let outbound_peers = &self.outbound_peers; - #[cfg(feature = "metrics")] - let topic_metrics = self - .metrics - .topic_metrics - .entry(topic_hash.clone()) - .or_insert_with(|| TopicMetrics::new(topic_hash.clone())); // drop all peers with negative score, without PX // if there is at some point a stable retain method for BTreeSet the following can be @@ -2197,10 +2165,10 @@ where for peer in to_remove { peers.remove(&peer); - // Increment ChurnScore and remove peer from slot - #[cfg(feature = "metrics")] - topic_metrics.churn_slot(&peer, SlotChurnMetric::ChurnScore); - #[cfg(all(debug_assertions, feature = "metrics"))] + // Increment ChurnScore and remove peer from the slot + self.metrics + .churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnScore); + #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); } @@ -2235,9 +2203,9 @@ where trace!("Updating mesh, adding to mesh: {:?}", peer_list); // Metrics: Update mesh peers - #[cfg(feature = "metrics")] - topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); - #[cfg(all(debug_assertions, feature = "metrics"))] + self.metrics + .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); // add the peers @@ -2293,9 +2261,9 @@ where } } // Metrics: increment ChurnExcess and vacate slot - #[cfg(feature = "metrics")] - topic_metrics.churn_slot(&peer, SlotChurnMetric::ChurnExcess); - #[cfg(all(debug_assertions, feature = "metrics"))] + self.metrics + .churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnExcess); + #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); // remove the peer @@ -2335,9 +2303,9 @@ where // update the mesh trace!("Updating mesh, adding to mesh: {:?}", peer_list); - #[cfg(feature = "metrics")] - topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); - #[cfg(all(debug_assertions, feature = "metrics"))] + self.metrics + .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); // add the peers @@ -2405,9 +2373,9 @@ where ); // Metrics: Update mesh peers - #[cfg(feature = "metrics")] - topic_metrics.assign_slots_to_peers(peer_list.iter().cloned()); - #[cfg(all(debug_assertions, feature = "metrics"))] + self.metrics + .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); // add the peers @@ -2535,7 +2503,7 @@ where // shift the memcache self.mcache.shift(); - #[cfg(all(feature = "metrics", debug_assertions))] + #[cfg(debug_assertions)] for topic in modified_topics { let validation_result = self.validate_mesh_slots_for_topic(&topic); debug_assert!( @@ -3146,12 +3114,18 @@ where mesh_peers.remove(peer_id); // increment churn_disconnected and vacate slot - #[cfg(feature = "metrics")] self.metrics .churn_slot(topic, peer_id, SlotChurnMetric::ChurnDisconnected); } } + if self.metrics.enabled() { + self.metrics.peer_left_topic(&topic); + if self.mesh.contains_key(&topic) { + self.metrics.peer_left_subscribed_topic(&topic); + } + } + // remove from topic_peers if let Some(peer_list) = self.topic_peers.get_mut(topic) { if !peer_list.remove(peer_id) { diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index c5e90bf0c3a..1cc5c0e52d9 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -664,7 +664,7 @@ impl GossipsubConfigBuilder { self } - /// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a /// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index cfce6e121f4..7b0ca503fb0 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -125,7 +125,6 @@ pub mod error; pub mod protocol; -#[cfg(feature = "metrics")] pub mod metrics; mod backoff; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index 66da2100176..1e0a7d56a59 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -89,7 +89,6 @@ impl MessageCache { } /// Get a message with `message_id` - #[cfg(any(test, feature = "metrics"))] pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { self.msgs.get(message_id) } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 33895394323..116396e4fb6 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -25,93 +25,426 @@ pub mod topic_metrics; use crate::topic::TopicHash; use libp2p_core::PeerId; -use log::warn; use std::collections::HashMap; +use topic_metrics::Slot; + +// use open_metrics_client::encoding::text::Encode; +use open_metrics_client::metrics::counter::Counter; +use open_metrics_client::metrics::family::Family; +use open_metrics_client::metrics::gauge::Gauge; +use open_metrics_client::metrics::histogram::{exponential_buckets, Histogram}; +use open_metrics_client::registry::Registry; use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; /// A collection of metrics used throughout the gossipsub behaviour. pub struct InternalMetrics { - /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. - pub topic_metrics: HashMap, + /// If a registry is not initially passed to gossipsub, all metric calculations are + /// not enabled. + enabled: bool, + /// The current peers in each mesh. + mesh_peers: Family, + /// The scores for each peer in each mesh. + mesh_score: Family, + /// The average peer score for each mesh. + mesh_avg_score: Family, + /// The total number of messages received (after duplicate filter). + mesh_message_rx_total: Family, + /// The total number of messages sent. + mesh_message_tx_total: Family, + /// The number of messages received from non-mesh peers (after duplicate filter). + mesh_messages_from_non_mesh_peers: Family, + /// The total number of duplicate messages filtered per mesh. + mesh_duplicates_filtered: Family, + /// The total number of messages validated per mesh. + mesh_messages_validated: Family, + /// The total number of messages rejected per mesh. + mesh_messages_rejected: Family, + /// The total number of messages ignored per mesh. + mesh_messages_ignored: Family, + /// The number of first message delivers per slot per mesh. + mesh_first_message_deliveries_per_slot: Family<(TopicHash, Slot), Gauge>, + /// The number of IWANT requests being sent per mesh topic. + mesh_iwant_requests: Family, + /// Number of peers subscribed to each known topic. + topic_peers: Family, + /// Number of peers subscribed to each subscribed topic. + subscribed_topic_peers: Family, /// The number of broken promises (this metric is indicative of nodes with invalid message-ids) - pub broken_promises: usize, + broken_promises: Counter, + /// Keeps track of the number of messages we have received on topics we are not subscribed + /// to. + invalid_topic_messages: Counter, /// When the user validates a message, it tries to re propagate it to its mesh peers. If the /// message expires from the memcache before it can be validated, we count this a cache miss /// and it is an indicator that the memcache size should be increased. - pub memcache_misses: usize, - /// Keeps track of the number of messages we have received on topics we are not subscribed - /// to. - pub messages_received_on_invalid_topic: usize, + memcache_misses: Counter, + /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. + topic_metrics: HashMap, } impl Default for InternalMetrics { fn default() -> Self { InternalMetrics { - topic_metrics: HashMap::new(), - broken_promises: 0, - memcache_misses: 0, - messages_received_on_invalid_topic: 0, + enabled: false, + mesh_peers: Family::default(), + mesh_score: Family::new_with_constructor(|| { + Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) + }), + mesh_avg_score: Family::default(), + mesh_message_rx_total: Family::default(), + mesh_message_tx_total: Family::default(), + mesh_messages_from_non_mesh_peers: Family::default(), + mesh_duplicates_filtered: Family::default(), + mesh_messages_validated: Family::default(), + mesh_messages_rejected: Family::default(), + mesh_messages_ignored: Family::default(), + mesh_first_message_deliveries_per_slot: Family::default(), + mesh_iwant_requests: Family::default(), + broken_promises: Counter::default(), + memcache_misses: Counter::default(), + topic_peers: Family::default(), + subscribed_topic_peers: Family::default(), + invalid_topic_messages: Counter::default(), + topic_metrics: HashMap::default(), } } } impl InternalMetrics { - /// Returns the slot metrics for a given topic - pub fn slot_metrics_for_topic( - &self, - topic: &TopicHash, - ) -> Option> { - Some(self.topic_metrics.get(topic)?.slot_metrics_iter()) + /// Constructs and builds the internal metrics given a registry. + pub fn new(registry: Option<&mut Registry>) -> Self { + if let Some(registry) = registry { + let sub_registry = registry.sub_registry_with_prefix("gossipsub"); + + /* Mesh Metrics */ + + let mesh_peers = Family::default(); + sub_registry.register( + "mesh_peer_count", + "Number of peers in each mesh", + Box::new(mesh_peers.clone()), + ); + + let mesh_score = Family::new_with_constructor(|| { + Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) + }); + sub_registry.register( + "mesh_score", + "Score of all peers in each mesh", + Box::new(mesh_score.clone()), + ); + + let mesh_avg_score = Family::default(); + sub_registry.register( + "mesh_avg_score", + "Average score of all peers in each mesh", + Box::new(mesh_avg_score.clone()), + ); + + let mesh_message_rx_total = Family::default(); + sub_registry.register( + "mesh_message_rx_total", + "Total number of messages received from each mesh", + Box::new(mesh_message_rx_total.clone()), + ); + + let mesh_message_tx_total = Family::default(); + sub_registry.register( + "mesh_message_tx_total", + "Total number of messages sent in each mesh", + Box::new(mesh_message_tx_total.clone()), + ); + + let mesh_messages_from_non_mesh_peers = Family::default(); + sub_registry.register( + "messages_from_non_mesh_peers", + "Number of messages received from peers not in the mesh, for each mesh", + Box::new(mesh_messages_from_non_mesh_peers.clone()), + ); + + let mesh_duplicates_filtered = Family::default(); + sub_registry.register( + "mesh_duplicates_filtered", + "Total number of duplicate messages filtered in each mesh", + Box::new(mesh_duplicates_filtered.clone()), + ); + + let mesh_messages_validated = Family::default(); + sub_registry.register( + "mesh_messages_validated", + "Total number of messages that have been validated in each mesh", + Box::new(mesh_messages_validated.clone()), + ); + + let mesh_messages_rejected = Family::default(); + sub_registry.register( + "mesh_messages_rejected", + "Total number of messages rejected in each mesh", + Box::new(mesh_messages_rejected.clone()), + ); + + let mesh_messages_ignored = Family::default(); + sub_registry.register( + "mesh_messages_ignored", + "Total number of messages ignored in each mesh", + Box::new(mesh_messages_ignored.clone()), + ); + + let mesh_first_message_deliveries_per_slot = Family::default(); + sub_registry.register( + "mesh_first_message_deliveries_per_slot", + "The number of first message deliveries per mesh slot", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let mesh_iwant_requests = Family::default(); + sub_registry.register( + "mesh_iwant_requests", + "The number of IWANT requests per mesh", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let broken_promises = Counter::default(); + sub_registry.register( + "broken_promises", + "Total number of broken promises per mesh", + Box::new(broken_promises.clone()), + ); + + /* Peer Metrics */ + + let topic_peers = Family::default(); + sub_registry.register( + "topic_peer_count", + "Number of peers subscribed to each known topic", + Box::new(topic_peers.clone()), + ); + + let subscribed_topic_peers = Family::default(); + sub_registry.register( + "subscribed_topic_peer_count", + "Number of peers subscribed to each subscribed topic", + Box::new(subscribed_topic_peers.clone()), + ); + + /* Router Metrics */ + + // Invalid Topic Messages + let invalid_topic_messages = Counter::default(); + sub_registry.register( + "invalid_topic_messages", + "Number of times a message has been received on a non-subscribed topic", + Box::new(invalid_topic_messages.clone()), + ); + + let memcache_misses = Counter::default(); + sub_registry.register( + "memcache_misses", + "Number of times a message has attempted to be forwarded but has already been removed from the memcache", + Box::new(memcache_misses.clone()), + ); + + InternalMetrics { + enabled: true, + mesh_peers, + mesh_score, + mesh_avg_score, + mesh_message_rx_total, + mesh_message_tx_total, + mesh_messages_from_non_mesh_peers, + mesh_duplicates_filtered, + mesh_messages_validated, + mesh_messages_rejected, + mesh_messages_ignored, + mesh_first_message_deliveries_per_slot, + mesh_iwant_requests, + broken_promises, + memcache_misses, + topic_peers, + subscribed_topic_peers, + invalid_topic_messages, + topic_metrics: HashMap::new(), + } + } else { + // Metrics are not enabled + InternalMetrics::default() + } + } + + /// Returns whether metrics are enabled or not. + pub fn enabled(&self) -> bool { + self.enabled + } + + // Increase the memcache misses + pub fn memcache_miss(&mut self) { + if self.enabled { + self.memcache_misses.inc(); + } + } + + pub fn broken_promise(&mut self) { + if self.enabled { + self.broken_promises.inc(); + } } /// Churns a slot in the topic_metrics. This assumes the peer is in the mesh. pub fn churn_slot( &mut self, - topic: &TopicHash, - peer_id: &PeerId, - churn_reason: SlotChurnMetric, + topic_hash: &TopicHash, + peer: &PeerId, + slot_churn: SlotChurnMetric, ) { - match self.topic_metrics.get_mut(topic) { - Some(slot_data) => slot_data.churn_slot(peer_id, churn_reason), - None => { - warn!( - "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot_data]", - topic, >::into(churn_reason), peer_id, - ) + if self.enabled { + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .churn_slot(peer, slot_churn) + { + self.reset_slot(topic_hash, slot); + } + } + } + + pub fn iwant_request(&mut self, topic_hash: &TopicHash) { + self.mesh_iwant_requests.get_or_create(topic_hash).inc(); + } + + pub fn message_invalid_topic(&mut self) { + if self.enabled { + self.invalid_topic_messages.inc(); + } + } + + pub fn peer_joined_topic(&mut self, topic_hash: &TopicHash) { + if self.enabled { + self.topic_peers.get_or_create(topic_hash).inc(); + } + } + + pub fn peer_joined_subscribed_topic(&mut self, topic_hash: &TopicHash) { + if self.enabled { + self.subscribed_topic_peers.get_or_create(topic_hash).inc(); + } + } + + pub fn peer_left_topic(&mut self, topic_hash: &TopicHash) { + if self.enabled { + let v = self.topic_peers.get_or_create(topic_hash).get(); + self.topic_peers + .get_or_create(topic_hash) + .set(v.saturating_sub(1)); + } + } + + pub fn peer_left_subscribed_topic(&mut self, topic_hash: &TopicHash) { + if self.enabled { + let v = self.subscribed_topic_peers.get_or_create(topic_hash).get(); + self.subscribed_topic_peers + .get_or_create(topic_hash) + .set(v.saturating_sub(1)); + } + } + + pub fn leave_topic(&mut self, topic_hash: &TopicHash) { + if self.enabled() { + // Remove all the peers from all slots + if let Some(metrics) = self.topic_metrics.get_mut(topic_hash) { + let total_slots = metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); + // Remove the slot metrics + for slot in 1..total_slots { + self.reset_slot(topic_hash, Slot { slot }); + } } + + self.mesh_peers.get_or_create(topic_hash).set(0); + self.mesh_avg_score.get_or_create(topic_hash).set(0); + self.subscribed_topic_peers.get_or_create(topic_hash).set(0); } } + fn reset_slot(&mut self, topic_hash: &TopicHash, slot: Slot) { + self.mesh_first_message_deliveries_per_slot + .get_or_create(&(topic_hash.clone(), slot)) + .set(0); + } + + /// Helpful for testing and validation + #[cfg(debug_assertions)] + pub fn topic_metrics(&self) -> &HashMap { + &self.topic_metrics + } + /// Increment a MessageMetric in the topic_metrics for peer in topic. pub fn increment_message_metric( &mut self, - topic: &TopicHash, + topic_hash: &TopicHash, peer: &PeerId, message_metric: SlotMessageMetric, ) { - self.topic_metrics - .entry(topic.clone()) - .or_insert_with(|| TopicMetrics::new(topic.clone())) - .increment_message_metric(peer, message_metric); + if self.enabled { + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .increment_message_metric(peer, &message_metric) + { + match message_metric { + SlotMessageMetric::MessagesAll => {} + SlotMessageMetric::MessagesDuplicates => { + self.mesh_duplicates_filtered + .get_or_create(topic_hash) + .inc(); + } + SlotMessageMetric::MessagesFirst => { + self.mesh_message_rx_total.get_or_create(topic_hash).inc(); + if slot.slot == 0 { + self.mesh_messages_from_non_mesh_peers + .get_or_create(topic_hash) + .inc(); + } else { + self.mesh_first_message_deliveries_per_slot + .get_or_create(&(topic_hash.clone(), slot)) + .inc(); + } + } + SlotMessageMetric::MessagesIgnored => { + self.mesh_messages_ignored.get_or_create(topic_hash).inc(); + } + SlotMessageMetric::MessagesRejected => { + self.mesh_messages_rejected.get_or_create(topic_hash).inc(); + } + SlotMessageMetric::MessagesValidated => { + self.mesh_messages_validated.get_or_create(topic_hash).inc(); + } + } + } + } } /// Assign slots in topic to peers. - pub fn assign_slots_to_peers(&mut self, topic: &TopicHash, peer_list: U) + pub fn assign_slots_to_peers(&mut self, topic_hash: &TopicHash, peer_list: U) where U: Iterator, { - self.topic_metrics - .entry(topic.clone()) - .or_insert_with(|| TopicMetrics::new(topic.clone())) - .assign_slots_to_peers(peer_list); + if self.enabled { + self.topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .assign_slots_to_peers(peer_list); + } } /// Assigns a slot in topic to the peer if the peer doesn't already have one. pub fn assign_slot_if_unassigned(&mut self, topic: &TopicHash, peer: &PeerId) { - self.topic_metrics - .entry(topic.clone()) - .or_insert_with(|| TopicMetrics::new(topic.clone())) - .assign_slot_if_unassigned(*peer); + if self.enabled { + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .assign_slot_if_unassigned(*peer); + } } } diff --git a/protocols/gossipsub/src/metrics/topic_metrics.rs b/protocols/gossipsub/src/metrics/topic_metrics.rs index 6922a7900d9..67c5f9656df 100644 --- a/protocols/gossipsub/src/metrics/topic_metrics.rs +++ b/protocols/gossipsub/src/metrics/topic_metrics.rs @@ -19,19 +19,24 @@ // DEALINGS IN THE SOFTWARE. use crate::TopicHash; +use libp2p_core::PeerId; use log::{debug, error, warn}; +use open_metrics_client::encoding::text::Encode; use std::collections::{BTreeSet, HashMap}; use std::ops::AddAssign; - -use libp2p_core::PeerId; use strum::IntoEnumIterator; use strum_macros::{EnumIter, IntoStaticStr}; -#[derive(Default, Clone)] +#[derive(Copy, Clone, Hash, Eq, PartialEq, Encode)] +pub struct Slot { + pub slot: u64, +} + /// This struct stores all the metrics for a given mesh slot. /// NOTE: all the `message_*` counters refer to messages received from peers assigned to /// this mesh slot on the topic this slot is associated with. See [`TopicMetrics`] for more /// information. +#[derive(Default, Clone)] pub struct SlotMetricCounts { /// The number of times this slot has been assigned to a peer assign_sum: u32, @@ -144,7 +149,7 @@ impl SlotMetricCounts { } /// increments a message metric count - fn increment_message_metric(&mut self, message_metric: SlotMessageMetric) { + fn increment_message_metric(&mut self, message_metric: &SlotMessageMetric) { match message_metric { SlotMessageMetric::MessagesAll => self.messages_all.add_assign(1), SlotMessageMetric::MessagesDuplicates => self.messages_duplicates.add_assign(1), @@ -259,26 +264,33 @@ impl TopicMetrics { } /// Increments the message metric for the specified peer - pub fn increment_message_metric(&mut self, peer: &PeerId, message_metric: SlotMessageMetric) { + pub fn increment_message_metric( + &mut self, + peer: &PeerId, + message_metric: &SlotMessageMetric, + ) -> Result { let slot = self .peer_slots .get(peer) .map(|s| *s) // peers that aren't in the mesh get slot 0 .unwrap_or(0); - match self - .slot_metrics - .get_mut(slot) - { - Some(metric_counts) => metric_counts.increment_message_metric(message_metric), - None => error!( + match self.slot_metrics.get_mut(slot) { + Some(metric_counts) => { + metric_counts.increment_message_metric(message_metric); + Ok(Slot { slot: slot as u64 }) + } + None => { + error!( "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [peer_slots contains peer with slot not existing in slot_metrics!]", self.topic, slot, - >::into(message_metric), + >::into(*message_metric), peer, - ), - }; + ); + Err(()) + } + } } /// Assigns a slot to the peer if the peer doesn't already have one. Note that the @@ -331,39 +343,50 @@ impl TopicMetrics { } /// Churns the slot occupied by peer. - pub fn churn_slot(&mut self, peer: &PeerId, churn_reason: SlotChurnMetric) { + pub fn churn_slot(&mut self, peer: &PeerId, churn_reason: SlotChurnMetric) -> Result { match self.peer_slots.get(peer).cloned() { - Some(slot) => match self.slot_metrics.get_mut(slot) { - Some(metric_counts) => { - debug_assert!(!self.vacant_slots.contains(&slot), + Some(slot) => { + match self.slot_metrics.get_mut(slot) { + Some(metric_counts) => { + debug_assert!(!self.vacant_slots.contains(&slot), "metrics_event[{}] [slot {:02}] increment {} peer {} FAILURE [vacant slots already contains this slot!]", self.topic, slot, >::into(churn_reason), peer ); - let churn_sum = metric_counts.churn_slot(churn_reason); - self.vacant_slots.insert(slot); - self.peer_slots.remove(peer); - debug!( + let churn_sum = metric_counts.churn_slot(churn_reason); + self.vacant_slots.insert(slot); + self.peer_slots.remove(peer); + debug!( "metrics_event[{}]: [slot {:02}] increment {} peer {} SUCCESS ChurnSum[{}]", self.topic, slot, >::into(churn_reason), peer, churn_sum, ); - }, - None => warn!( + Ok(Slot { slot: slot as u64 }) + } + None => { + warn!( "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [retrieving metric_counts]", self.topic, slot, >::into(churn_reason), peer - ), - }, - None => warn!( - "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot]", - self.topic, >::into(churn_reason), peer - ), - }; + ); + Err(()) + } + } + } + None => { + warn!( + "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot]", + self.topic, + >::into(churn_reason), + peer + ); + Err(()) + } + } } /// Churns all slots in this topic that aren't already vacant (while incrementing /// churn_reason). Also clears the peer_slots. This loop is faster than calling /// churn_slot() for each peer in the topic because it minimizes redundant lookups /// and only traverses a vector. - pub fn churn_all_slots(&mut self, churn_reason: SlotChurnMetric) { + pub fn churn_all_slots(&mut self, churn_reason: SlotChurnMetric) -> u64 { for slot in (1..self.slot_metrics.len()) .filter(|s| !self.vacant_slots.contains(s)) .collect::>() @@ -385,6 +408,7 @@ impl TopicMetrics { } } self.peer_slots.clear(); + self.slot_metrics.len() as u64 } /// This function verifies that the TopicMetrics is synchronized perfectly with the mesh. diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 7e8afca2d9e..f737056f696 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -20,6 +20,7 @@ use crate::rpc_proto; use base64::encode; +use open_metrics_client::encoding::text::Encode; use prost::Message; use sha2::{Digest, Sha256}; use std::fmt; @@ -60,7 +61,7 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Encode)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String, From d4daca7b338774e854f4a35c3430fc8f019db437 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 15 Oct 2021 17:39:35 +1100 Subject: [PATCH 04/10] Temp commit --- examples/gossipsub-chat.rs | 6 +- protocols/gossipsub/src/behaviour.rs | 219 +++++---- protocols/gossipsub/src/behaviour/tests.rs | 4 +- protocols/gossipsub/src/lib.rs | 6 +- protocols/gossipsub/src/metrics.rs | 496 +++++++++++---------- protocols/gossipsub/src/protocol.rs | 1 + protocols/gossipsub/tests/smoke.rs | 3 +- 7 files changed, 406 insertions(+), 329 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index bbf1190f8c3..845f3d229d6 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -94,9 +94,11 @@ async fn main() -> Result<(), Box> { // same content will be propagated. .build() .expect("Valid config"); - // build a gossipsub network behaviour + // Build a gossipsub network behaviour. + // NOTE: The last parameter specifies an `open_metrics_client` `Registry` which optionally + // enables metrics for the gossipsub behaviour. let mut gossipsub: gossipsub::Gossipsub = - gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config) + gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config, None) .expect("Correct configuration"); // subscribes to our topic diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index e57f5ee65db..8bfd8f598e0 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -32,6 +32,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; +use open_metrics_client::registry::Registry; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use wasm_timer::{Instant, Interval}; @@ -216,7 +217,7 @@ pub struct Gossipsub< F: TopicSubscriptionFilter = AllowAllSubscriptionFilter, > { /// Keep track of a set of internal metrics relating to gossipsub. - metrics: InternalMetrics, + metrics: Option, /// Configuration providing gossipsub performance parameters. config: GossipsubConfig, @@ -317,13 +318,16 @@ where { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. + /// Metrics can be evaluated by passing a reference to a [`Registry`]. pub fn new( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<&mut Registry>, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, F::default(), D::default(), ) @@ -340,11 +344,13 @@ where pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<&mut Registry>, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, subscription_filter, D::default(), ) @@ -361,11 +367,13 @@ where pub fn new_with_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<&mut Registry>, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, F::default(), data_transform, ) @@ -382,6 +390,7 @@ where pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<&mut Registry>, subscription_filter: F, data_transform: D, ) -> Result { @@ -391,10 +400,8 @@ where // were received locally. validate_config(&privacy, config.validation_mode())?; - // Set up message publishing parameters. - Ok(Gossipsub { - metrics: InternalMetrics::default(), + metrics: metrics.map(|registry| InternalMetrics::new(registry)), events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -747,7 +754,9 @@ where "Message not in cache. Ignoring forwarding. Message Id: {}", msg_id ); - self.metrics.memcache_miss(); + if let Some(metrics) = self.metrics.as_mut() { + metrics.memcache_miss(); + } return Ok(false); } @@ -757,19 +766,21 @@ where self.forward_msg(msg_id, raw_message, Some(propagation_source))?; // Metrics: Report validation result - self.metrics.increment_message_metric( - &topic, - propagation_source, - SlotMessageMetric::MessagesValidated, - ); + if let Some(metrics) = self.metrics.as_mut() { + metrics.increment_message_metric( + &topic, + propagation_source, + SlotMessageMetric::MessagesValidated, + ); + } return Ok(true); } MessageAcceptance::Reject => { // Metrics: Report validation result - if self.metrics.enabled() { + if let Some(metrics) = self.metrics.as_mut() { if let Some(raw_message) = self.mcache.get(msg_id) { // Increment metrics - self.metrics.increment_message_metric( + metrics.increment_message_metric( &raw_message.topic, propagation_source, SlotMessageMetric::MessagesRejected, @@ -780,10 +791,10 @@ where } MessageAcceptance::Ignore => { // Metrics: Report validation result - if self.metrics.enabled() { + if let Some(metrics) = self.metrics.as_mut() { if let Some(raw_message) = self.mcache.get(msg_id) { // Increment metrics - self.metrics.increment_message_metric( + metrics.increment_message_metric( &raw_message.topic, propagation_source, SlotMessageMetric::MessagesIgnored, @@ -807,7 +818,9 @@ where Ok(true) } else { warn!("Rejected message not in cache. Message Id: {}", msg_id); - self.metrics.memcache_miss(); + if let Some(metrics) = self.metrics.as_mut() { + metrics.memcache_miss(); + } Ok(false) } @@ -906,15 +919,19 @@ where /// mesh and the topic_metrics. It's useful for debugging. #[cfg(debug_assertions)] fn validate_mesh_slots_for_topic(&self, topic: &TopicHash) -> Result<(), String> { - match self.metrics.topic_metrics().get(topic) { - Some(topic_metrics) => match self.mesh.get(topic) { - Some(mesh_peers) => topic_metrics.validate_mesh_slots(mesh_peers), - None => Err(format!("metrics_event[{}] no mesh_peers for topic", topic)), - }, - None => Err(format!( - "metrics_event[{}]: no topic_metrics for topic", - topic - )), + if let Some(metrics) = self.metrics.as_ref() { + match metrics.topic_metrics().get(topic) { + Some(topic_metrics) => match self.mesh.get(topic) { + Some(mesh_peers) => topic_metrics.validate_mesh_slots(mesh_peers), + None => Err(format!("metrics_event[{}] no mesh_peers for topic", topic)), + }, + None => Err(format!( + "metrics_event[{}]: no topic_metrics for topic", + topic + )), + } + } else { + Ok(()) } } @@ -993,8 +1010,9 @@ where mesh_peers.extend(new_peers); } - self.metrics - .assign_slots_to_peers(topic_hash, added_peers.iter().cloned()); + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, added_peers.iter().cloned()); + } for peer_id in added_peers { // Send a GRAFT control message @@ -1114,7 +1132,9 @@ where ); } - self.metrics.leave_topic(topic_hash); + if let Some(metrics) = self.metrics.as_mut() { + metrics.leave_topic(topic_hash); + } } debug!("Completed LEAVE for topic: {:?}", topic_hash); } @@ -1253,10 +1273,10 @@ where ); // Metrics: Add IWANT requests - if self.metrics.enabled() { + if let Some(metrics) = self.metrics.as_mut() { for id in &message_ids { if let Some(topic) = iwant_ids.get(id) { - self.metrics.iwant_request(topic); + metrics.iwant_request(topic); } } } @@ -1423,7 +1443,9 @@ where ); peers.insert(*peer_id); - self.metrics.assign_slot_if_unassigned(&topic_hash, peer_id); + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slot_if_unassigned(&topic_hash, peer_id); + } // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( @@ -1514,7 +1536,9 @@ where &mut self.events, &self.connected_peers, ); - self.metrics.churn_slot(topic_hash, peer_id, churn_reason); + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic_hash, peer_id, churn_reason); + } } } if update_backoff { @@ -1704,12 +1728,14 @@ where ) { // Report received message to metrics if we are subscribed to the topic, otherwise // ignore it. - if self.mesh.contains_key(&raw_message.topic) { - self.metrics.increment_message_metric( - &raw_message.topic, - propagation_source, - SlotMessageMetric::MessagesAll, - ); + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesAll, + ); + } } let fast_message_id = self.config.fast_message_id(&raw_message); @@ -1725,12 +1751,14 @@ where ); } // Metrics: Report the duplicate message, for mesh topics - if self.mesh.contains_key(&raw_message.topic) { - self.metrics.increment_message_metric( - &raw_message.topic, - propagation_source, - SlotMessageMetric::MessagesDuplicates, - ); + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } } } return; @@ -1776,13 +1804,15 @@ where } // Metrics: Report the duplicate message, for mesh topics - if self.mesh.contains_key(&raw_message.topic) { - // NOTE: Allow overflowing of a usize here - self.metrics.increment_message_metric( - &message.topic, - propagation_source, - SlotMessageMetric::MessagesDuplicates, - ); + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + // NOTE: Allow overflowing of a usize here + metrics.increment_message_metric( + &message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } } return; } @@ -1792,12 +1822,14 @@ where ); // Increment the first message topic, if its in our mesh. - if self.mesh.contains_key(&raw_message.topic) { - self.metrics.increment_message_metric( - &raw_message.topic, - propagation_source, - SlotMessageMetric::MessagesFirst, - ); + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesFirst, + ); + } } // Tells score that message arrived (but is maybe not fully validated yet). @@ -1826,7 +1858,9 @@ where message.topic ); - self.metrics.message_invalid_topic(); + if let Some(metrics) = self.metrics.as_mut() { + metrics.message_invalid_topic(); + } return; } @@ -1933,11 +1967,10 @@ where // add to the peer_topics mapping if subscribed_topics.insert(subscription.topic_hash.clone()) { - if self.metrics.enabled() { - self.metrics.peer_joined_topic(&subscription.topic_hash); + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_joined_topic(&subscription.topic_hash); if self.mesh.contains_key(&subscription.topic_hash) { - self.metrics - .peer_joined_subscribed_topic(&subscription.topic_hash); + metrics.peer_joined_subscribed_topic(&subscription.topic_hash); } } } @@ -1998,11 +2031,11 @@ where propagation_source.to_string(), subscription.topic_hash ); - if self.metrics.enabled() { - self.metrics.peer_left_topic(&subscription.topic_hash); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_left_topic(&subscription.topic_hash); if self.mesh.contains_key(&subscription.topic_hash) { - self.metrics - .peer_left_subscribed_topic(&subscription.topic_hash); + metrics.peer_left_subscribed_topic(&subscription.topic_hash); } } } @@ -2044,9 +2077,10 @@ where ); } - for topic in &topics_to_graft { - self.metrics - .assign_slot_if_unassigned(topic, propagation_source); + if let Some(metrics) = self.metrics.as_mut() { + for topic in &topics_to_graft { + metrics.assign_slot_if_unassigned(topic, propagation_source); + } } // If we need to send grafts to peer, do so immediately, rather than waiting for the @@ -2088,7 +2122,9 @@ where peer_score.add_penalty(&peer, count); // Metrics: Increment broken promises - self.metrics.broken_promise(); + if let Some(metrics) = self.metrics.as_mut() { + metrics.broken_promise(); + } } } } @@ -2166,8 +2202,9 @@ where peers.remove(&peer); // Increment ChurnScore and remove peer from the slot - self.metrics - .churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnScore); + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnScore); + } #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); } @@ -2203,8 +2240,9 @@ where trace!("Updating mesh, adding to mesh: {:?}", peer_list); // Metrics: Update mesh peers - self.metrics - .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); @@ -2260,9 +2298,10 @@ where outbound -= 1; } } - // Metrics: increment ChurnExcess and vacate slot - self.metrics - .churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnExcess); + if let Some(metrics) = self.metrics.as_mut() { + // Metrics: increment ChurnExcess and vacate slot + metrics.churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnExcess); + } #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); @@ -2303,8 +2342,9 @@ where // update the mesh trace!("Updating mesh, adding to mesh: {:?}", peer_list); - self.metrics - .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); @@ -2373,8 +2413,10 @@ where ); // Metrics: Update mesh peers - self.metrics - .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + if let Some(metrics) = self.metrics.as_mut() { + metrics + .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } #[cfg(debug_assertions)] modified_topics.insert(topic_hash.clone()); @@ -3114,15 +3156,16 @@ where mesh_peers.remove(peer_id); // increment churn_disconnected and vacate slot - self.metrics - .churn_slot(topic, peer_id, SlotChurnMetric::ChurnDisconnected); + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic, peer_id, SlotChurnMetric::ChurnDisconnected); + } } } - if self.metrics.enabled() { - self.metrics.peer_left_topic(&topic); + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_left_topic(&topic); if self.mesh.contains_key(&topic) { - self.metrics.peer_left_subscribed_topic(&topic); + metrics.peer_left_subscribed_topic(&topic); } } @@ -3707,7 +3750,8 @@ mod local_test { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); + let gs: Gossipsub = + Gossipsub::new(MessageAuthenticity::RandomAuthor, config, None).unwrap(); // Message under the limit should be fine. let mut rpc = empty_rpc(); @@ -3755,7 +3799,8 @@ mod local_test { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); + let gs: Gossipsub = + Gossipsub::new(MessageAuthenticity::RandomAuthor, config, None).unwrap(); let mut length_codec = unsigned_varint::codec::UviBytes::default(); length_codec.set_max_len(max_transmit_size); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 1c71705c506..fbdf2c96e48 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -112,6 +112,7 @@ mod tests { let mut gs: Gossipsub = Gossipsub::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, + None, self.subscription_filter, self.data_transform, ) @@ -932,7 +933,8 @@ mod tests { .build() .unwrap(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config).unwrap(); + let mut gs: Gossipsub = + Gossipsub::new(MessageAuthenticity::Anonymous, gs_config, None).unwrap(); // create a topic and fill it with some peers let topic_hash = Topic::new("Test").hash().clone(); diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 7b0ca503fb0..3079edcb694 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -103,9 +103,11 @@ //! let mut swarm = { //! // set default parameters for gossipsub //! let gossipsub_config = libp2p_gossipsub::GossipsubConfig::default(); -//! // build a gossipsub network behaviour +//! // Build a gossipsub network behaviour +//! // The last parameter specifies an open_metrics_client `Registry` which enables metrics for +//! // the gossipsub behaviour. //! let mut gossipsub: libp2p_gossipsub::Gossipsub = -//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config).unwrap(); +//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config, None).unwrap(); //! // subscribe to the topic //! gossipsub.subscribe(&topic); //! // create the swarm diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 116396e4fb6..9080359d895 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -20,12 +20,18 @@ //! A set of metrics used to help track and diagnose the network behaviour of the gossipsub //! protocol. +//! +//! Note that if metrics are enabled, we store a lot of detail for each metric. Specifically, each metric is stored +//! per "slot" of each mesh. This means each metric is counted for each peer whilst they are in the +//! mesh. The exposed open metric values typically aggregate these into a per +//! mesh metric. Users are able to fine-grain their access to the more detailed metrics via the +//! [`slot_metrics_for_topic`] function. pub mod topic_metrics; use crate::topic::TopicHash; use libp2p_core::PeerId; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use topic_metrics::Slot; // use open_metrics_client::encoding::text::Encode; @@ -37,11 +43,16 @@ use open_metrics_client::registry::Registry; use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; -/// A collection of metrics used throughout the gossipsub behaviour. +/// This provides an upper bound to the number of mesh topics we create metrics for. It prevents +/// unbounded labels being created in the metrics. +const MESH_TOPIC_LIMIT: usize = 300; +/// A separate limit is used to keep track of non-mesh topics. Mesh topics are controlled by the +/// user via subscriptions whereas non-mesh topics are determined by users on the network. +/// This limit permits a fixed amount of topics to allow, in-addition to the mesh topics. +const NON_MESH_TOPIC_LIMIT: usize = 50; + +/// A collection of metrics used throughout the Gossipsub behaviour. pub struct InternalMetrics { - /// If a registry is not initially passed to gossipsub, all metric calculations are - /// not enabled. - enabled: bool, /// The current peers in each mesh. mesh_peers: Family, /// The scores for each peer in each mesh. @@ -81,212 +92,192 @@ pub struct InternalMetrics { memcache_misses: Counter, /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. topic_metrics: HashMap, -} - -impl Default for InternalMetrics { - fn default() -> Self { - InternalMetrics { - enabled: false, - mesh_peers: Family::default(), - mesh_score: Family::new_with_constructor(|| { - Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) - }), - mesh_avg_score: Family::default(), - mesh_message_rx_total: Family::default(), - mesh_message_tx_total: Family::default(), - mesh_messages_from_non_mesh_peers: Family::default(), - mesh_duplicates_filtered: Family::default(), - mesh_messages_validated: Family::default(), - mesh_messages_rejected: Family::default(), - mesh_messages_ignored: Family::default(), - mesh_first_message_deliveries_per_slot: Family::default(), - mesh_iwant_requests: Family::default(), - broken_promises: Counter::default(), - memcache_misses: Counter::default(), - topic_peers: Family::default(), - subscribed_topic_peers: Family::default(), - invalid_topic_messages: Counter::default(), - topic_metrics: HashMap::default(), - } - } + /// Keeps track of which mesh topics have been added to metrics or not. + added_mesh_topics: HashSet, + /// Keeps track of which non mesh topics have been added to metrics or not. + added_non_mesh_topics: HashSet, } impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. - pub fn new(registry: Option<&mut Registry>) -> Self { - if let Some(registry) = registry { - let sub_registry = registry.sub_registry_with_prefix("gossipsub"); - - /* Mesh Metrics */ - - let mesh_peers = Family::default(); - sub_registry.register( - "mesh_peer_count", - "Number of peers in each mesh", - Box::new(mesh_peers.clone()), - ); - - let mesh_score = Family::new_with_constructor(|| { - Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) - }); - sub_registry.register( - "mesh_score", - "Score of all peers in each mesh", - Box::new(mesh_score.clone()), - ); - - let mesh_avg_score = Family::default(); - sub_registry.register( - "mesh_avg_score", - "Average score of all peers in each mesh", - Box::new(mesh_avg_score.clone()), - ); - - let mesh_message_rx_total = Family::default(); - sub_registry.register( - "mesh_message_rx_total", - "Total number of messages received from each mesh", - Box::new(mesh_message_rx_total.clone()), - ); - - let mesh_message_tx_total = Family::default(); - sub_registry.register( - "mesh_message_tx_total", - "Total number of messages sent in each mesh", - Box::new(mesh_message_tx_total.clone()), - ); - - let mesh_messages_from_non_mesh_peers = Family::default(); - sub_registry.register( - "messages_from_non_mesh_peers", - "Number of messages received from peers not in the mesh, for each mesh", - Box::new(mesh_messages_from_non_mesh_peers.clone()), - ); - - let mesh_duplicates_filtered = Family::default(); - sub_registry.register( - "mesh_duplicates_filtered", - "Total number of duplicate messages filtered in each mesh", - Box::new(mesh_duplicates_filtered.clone()), - ); - - let mesh_messages_validated = Family::default(); - sub_registry.register( - "mesh_messages_validated", - "Total number of messages that have been validated in each mesh", - Box::new(mesh_messages_validated.clone()), - ); - - let mesh_messages_rejected = Family::default(); - sub_registry.register( - "mesh_messages_rejected", - "Total number of messages rejected in each mesh", - Box::new(mesh_messages_rejected.clone()), - ); - - let mesh_messages_ignored = Family::default(); - sub_registry.register( - "mesh_messages_ignored", - "Total number of messages ignored in each mesh", - Box::new(mesh_messages_ignored.clone()), - ); - - let mesh_first_message_deliveries_per_slot = Family::default(); - sub_registry.register( - "mesh_first_message_deliveries_per_slot", - "The number of first message deliveries per mesh slot", - Box::new(mesh_first_message_deliveries_per_slot.clone()), - ); - - let mesh_iwant_requests = Family::default(); - sub_registry.register( - "mesh_iwant_requests", - "The number of IWANT requests per mesh", - Box::new(mesh_first_message_deliveries_per_slot.clone()), - ); - - let broken_promises = Counter::default(); - sub_registry.register( - "broken_promises", - "Total number of broken promises per mesh", - Box::new(broken_promises.clone()), - ); - - /* Peer Metrics */ - - let topic_peers = Family::default(); - sub_registry.register( - "topic_peer_count", - "Number of peers subscribed to each known topic", - Box::new(topic_peers.clone()), - ); - - let subscribed_topic_peers = Family::default(); - sub_registry.register( - "subscribed_topic_peer_count", - "Number of peers subscribed to each subscribed topic", - Box::new(subscribed_topic_peers.clone()), - ); - - /* Router Metrics */ - - // Invalid Topic Messages - let invalid_topic_messages = Counter::default(); - sub_registry.register( - "invalid_topic_messages", - "Number of times a message has been received on a non-subscribed topic", - Box::new(invalid_topic_messages.clone()), - ); - - let memcache_misses = Counter::default(); - sub_registry.register( + pub fn new(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("gossipsub"); + + /* Mesh Metrics */ + + let mesh_peers = Family::default(); + sub_registry.register( + "mesh_peer_count", + "Number of peers in each mesh", + Box::new(mesh_peers.clone()), + ); + + let mesh_score = Family::new_with_constructor(|| { + Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) + }); + sub_registry.register( + "mesh_score", + "Score of all peers in each mesh", + Box::new(mesh_score.clone()), + ); + + let mesh_avg_score = Family::default(); + sub_registry.register( + "mesh_avg_score", + "Average score of all peers in each mesh", + Box::new(mesh_avg_score.clone()), + ); + + let mesh_message_rx_total = Family::default(); + sub_registry.register( + "mesh_message_rx_total", + "Total number of messages received from each mesh", + Box::new(mesh_message_rx_total.clone()), + ); + + let mesh_message_tx_total = Family::default(); + sub_registry.register( + "mesh_message_tx_total", + "Total number of messages sent in each mesh", + Box::new(mesh_message_tx_total.clone()), + ); + + let mesh_messages_from_non_mesh_peers = Family::default(); + sub_registry.register( + "messages_from_non_mesh_peers", + "Number of messages received from peers not in the mesh, for each mesh", + Box::new(mesh_messages_from_non_mesh_peers.clone()), + ); + + let mesh_duplicates_filtered = Family::default(); + sub_registry.register( + "mesh_duplicates_filtered", + "Total number of duplicate messages filtered in each mesh", + Box::new(mesh_duplicates_filtered.clone()), + ); + + let mesh_messages_validated = Family::default(); + sub_registry.register( + "mesh_messages_validated", + "Total number of messages that have been validated in each mesh", + Box::new(mesh_messages_validated.clone()), + ); + + let mesh_messages_rejected = Family::default(); + sub_registry.register( + "mesh_messages_rejected", + "Total number of messages rejected in each mesh", + Box::new(mesh_messages_rejected.clone()), + ); + + let mesh_messages_ignored = Family::default(); + sub_registry.register( + "mesh_messages_ignored", + "Total number of messages ignored in each mesh", + Box::new(mesh_messages_ignored.clone()), + ); + + let mesh_first_message_deliveries_per_slot = Family::default(); + sub_registry.register( + "mesh_first_message_deliveries_per_slot", + "The number of first message deliveries per mesh slot", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let mesh_iwant_requests = Family::default(); + sub_registry.register( + "mesh_iwant_requests", + "The number of IWANT requests per mesh", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let broken_promises = Counter::default(); + sub_registry.register( + "broken_promises", + "Total number of broken promises per mesh", + Box::new(broken_promises.clone()), + ); + + /* Peer Metrics */ + + let topic_peers = Family::default(); + sub_registry.register( + "topic_peer_count", + "Number of peers subscribed to each known topic", + Box::new(topic_peers.clone()), + ); + + let subscribed_topic_peers = Family::default(); + sub_registry.register( + "subscribed_topic_peer_count", + "Number of peers subscribed to each subscribed topic", + Box::new(subscribed_topic_peers.clone()), + ); + + /* Router Metrics */ + + // Invalid Topic Messages + let invalid_topic_messages = Counter::default(); + sub_registry.register( + "invalid_topic_messages", + "Number of times a message has been received on a non-subscribed topic", + Box::new(invalid_topic_messages.clone()), + ); + + let memcache_misses = Counter::default(); + sub_registry.register( "memcache_misses", "Number of times a message has attempted to be forwarded but has already been removed from the memcache", Box::new(memcache_misses.clone()), ); - InternalMetrics { - enabled: true, - mesh_peers, - mesh_score, - mesh_avg_score, - mesh_message_rx_total, - mesh_message_tx_total, - mesh_messages_from_non_mesh_peers, - mesh_duplicates_filtered, - mesh_messages_validated, - mesh_messages_rejected, - mesh_messages_ignored, - mesh_first_message_deliveries_per_slot, - mesh_iwant_requests, - broken_promises, - memcache_misses, - topic_peers, - subscribed_topic_peers, - invalid_topic_messages, - topic_metrics: HashMap::new(), - } - } else { - // Metrics are not enabled - InternalMetrics::default() + InternalMetrics { + mesh_peers, + mesh_score, + mesh_avg_score, + mesh_message_rx_total, + mesh_message_tx_total, + mesh_messages_from_non_mesh_peers, + mesh_duplicates_filtered, + mesh_messages_validated, + mesh_messages_rejected, + mesh_messages_ignored, + mesh_first_message_deliveries_per_slot, + mesh_iwant_requests, + broken_promises, + memcache_misses, + topic_peers, + subscribed_topic_peers, + invalid_topic_messages, + topic_metrics: HashMap::new(), + added_mesh_topics: HashSet::new(), + added_non_mesh_topics: HashSet::new(), } } - /// Returns whether metrics are enabled or not. - pub fn enabled(&self) -> bool { - self.enabled + /// Access to the fine-grained metrics which provide information per-peer (slot) of the + /// specified mesh topic. + pub fn slot_metrics_for_topic( + &self, + topic: &TopicHash, + ) -> Option> { + Some(self.topic_metrics.get(topic)?.slot_metrics_iter()) } - // Increase the memcache misses + /// Reports that an attempted message to forward was no longer in the memcache. pub fn memcache_miss(&mut self) { - if self.enabled { - self.memcache_misses.inc(); - } + self.memcache_misses.inc(); } + /// Reports a broken promise. pub fn broken_promise(&mut self) { - if self.enabled { - self.broken_promises.inc(); + self.broken_promises.inc(); + } + + /// Reports that a message was published on the specified topic. + pub fn message_published(&mut self, topic_hash: &TopicHash) { + if self.allowed_mesh_topic(topic_hash) { + self.mesh_message_tx_total.get_or_create(topic_hash).inc(); } } @@ -297,42 +288,40 @@ impl InternalMetrics { peer: &PeerId, slot_churn: SlotChurnMetric, ) { - if self.enabled { - if let Ok(slot) = self - .topic_metrics - .entry(topic_hash.clone()) - .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) - .churn_slot(peer, slot_churn) - { - self.reset_slot(topic_hash, slot); - } + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .churn_slot(peer, slot_churn) + { + self.reset_slot(topic_hash, slot); } } pub fn iwant_request(&mut self, topic_hash: &TopicHash) { - self.mesh_iwant_requests.get_or_create(topic_hash).inc(); + if self.allowed_mesh_topic(topic_hash) { + self.mesh_iwant_requests.get_or_create(topic_hash).inc(); + } } pub fn message_invalid_topic(&mut self) { - if self.enabled { - self.invalid_topic_messages.inc(); - } + self.invalid_topic_messages.inc(); } pub fn peer_joined_topic(&mut self, topic_hash: &TopicHash) { - if self.enabled { + if self.allowed_non_mesh_topic(topic_hash) { self.topic_peers.get_or_create(topic_hash).inc(); } } pub fn peer_joined_subscribed_topic(&mut self, topic_hash: &TopicHash) { - if self.enabled { + if self.allowed_mesh_topic(topic_hash) { self.subscribed_topic_peers.get_or_create(topic_hash).inc(); } } pub fn peer_left_topic(&mut self, topic_hash: &TopicHash) { - if self.enabled { + if self.allowed_non_mesh_topic(topic_hash) { let v = self.topic_peers.get_or_create(topic_hash).get(); self.topic_peers .get_or_create(topic_hash) @@ -341,7 +330,9 @@ impl InternalMetrics { } pub fn peer_left_subscribed_topic(&mut self, topic_hash: &TopicHash) { - if self.enabled { + // We use the non_mesh version here, because if we are subscribed this topic should exist + // in the added_mesh_topics mappings + if self.allowed_non_mesh_topic(topic_hash) { let v = self.subscribed_topic_peers.get_or_create(topic_hash).get(); self.subscribed_topic_peers .get_or_create(topic_hash) @@ -350,16 +341,18 @@ impl InternalMetrics { } pub fn leave_topic(&mut self, topic_hash: &TopicHash) { - if self.enabled() { - // Remove all the peers from all slots - if let Some(metrics) = self.topic_metrics.get_mut(topic_hash) { - let total_slots = metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); - // Remove the slot metrics + // Remove all the peers from all slots + if let Some(metrics) = self.topic_metrics.get_mut(topic_hash) { + let total_slots = metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); + // Remove the slot metrics + if self.allowed_mesh_topic(topic_hash) { for slot in 1..total_slots { self.reset_slot(topic_hash, Slot { slot }); } } + } + if self.allowed_non_mesh_topic(topic_hash) { self.mesh_peers.get_or_create(topic_hash).set(0); self.mesh_avg_score.get_or_create(topic_hash).set(0); self.subscribed_topic_peers.get_or_create(topic_hash).set(0); @@ -367,9 +360,11 @@ impl InternalMetrics { } fn reset_slot(&mut self, topic_hash: &TopicHash, slot: Slot) { - self.mesh_first_message_deliveries_per_slot - .get_or_create(&(topic_hash.clone(), slot)) - .set(0); + if self.allowed_mesh_topic(topic_hash) { + self.mesh_first_message_deliveries_per_slot + .get_or_create(&(topic_hash.clone(), slot)) + .set(0); + } } /// Helpful for testing and validation @@ -385,13 +380,13 @@ impl InternalMetrics { peer: &PeerId, message_metric: SlotMessageMetric, ) { - if self.enabled { - if let Ok(slot) = self - .topic_metrics - .entry(topic_hash.clone()) - .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) - .increment_message_metric(peer, &message_metric) - { + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .increment_message_metric(peer, &message_metric) + { + if self.allowed_mesh_topic(topic_hash) { match message_metric { SlotMessageMetric::MessagesAll => {} SlotMessageMetric::MessagesDuplicates => { @@ -430,21 +425,50 @@ impl InternalMetrics { where U: Iterator, { - if self.enabled { - self.topic_metrics - .entry(topic_hash.clone()) - .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) - .assign_slots_to_peers(peer_list); - } + self.topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .assign_slots_to_peers(peer_list); } /// Assigns a slot in topic to the peer if the peer doesn't already have one. pub fn assign_slot_if_unassigned(&mut self, topic: &TopicHash, peer: &PeerId) { - if self.enabled { - self.topic_metrics - .entry(topic.clone()) - .or_insert_with(|| TopicMetrics::new(topic.clone())) - .assign_slot_if_unassigned(*peer); + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .assign_slot_if_unassigned(*peer); + } + + /// Limits the number of topics that can be created in each metric. If the topic hasn't already + /// been added to a metric and we have added less than TOPIC_LIMIT topics, this will return + /// true. + fn allowed_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { + // If we haven't reached the limit, record the topic. + if self.added_mesh_topics.len() < MESH_TOPIC_LIMIT { + self.added_mesh_topics.insert(topic_hash.clone()); + true + } else { + // We've reached the topic limit, the topic is allowed if we have seen it before, + // otherwise we reject it. + self.added_mesh_topics.contains(topic_hash) + | self.added_non_mesh_topics.contains(topic_hash) + } + } + + /// Limits the number of topics that can be created in each metric. If the topic hasn't already + /// been added to a metric and we have added less than TOPIC_LIMIT topics, this will return + /// true. + fn allowed_non_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { + // If we haven't reached the limit, record the topic. + if self.added_non_mesh_topics.len() < NON_MESH_TOPIC_LIMIT + && !self.added_mesh_topics.contains(topic_hash) + { + self.added_non_mesh_topics.insert(topic_hash.clone()); + true + } else { + // We've reached the topic limit, the topic is allowed if we have seen it before, + // otherwise we reject it. + self.added_non_mesh_topics.contains(topic_hash) } } } diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index c0a3ec6a9bd..486c2a4f74e 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -570,6 +570,7 @@ mod tests { let gs: Gossipsub = Gossipsub::new( crate::MessageAuthenticity::Signed(keypair.0.clone()), config, + None, ) .unwrap(); let data = (0..g.gen_range(10, 10024)) diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 3cf3f882427..6512f31f511 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -170,7 +170,8 @@ fn build_node() -> (Multiaddr, Swarm) { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config).unwrap(); + let behaviour = + Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config, None).unwrap(); let mut swarm = Swarm::new(transport, behaviour, peer_id); let port = 1 + random::(); From 0a3b256e17683cfb9567ea6a516d365be65f0491 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 22 Oct 2021 10:19:31 -0500 Subject: [PATCH 05/10] allow creating gossipsub without a metrics param --- examples/gossipsub-chat.rs | 2 +- protocols/gossipsub/src/behaviour.rs | 33 +++++++++++----- protocols/gossipsub/src/behaviour/tests.rs | 3 +- protocols/gossipsub/src/metrics.rs | 44 +++++++++++----------- protocols/gossipsub/src/protocol.rs | 1 - protocols/gossipsub/tests/smoke.rs | 3 +- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 845f3d229d6..b738541c72f 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -98,7 +98,7 @@ async fn main() -> Result<(), Box> { // NOTE: The last parameter specifies an `open_metrics_client` `Registry` which optionally // enables metrics for the gossipsub behaviour. let mut gossipsub: gossipsub::Gossipsub = - gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config, None) + gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config) .expect("Correct configuration"); // subscribes to our topic diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 3fe7226d6bc..a27b376a517 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -218,9 +218,6 @@ pub struct Gossipsub< D: DataTransform = IdentityTransform, F: TopicSubscriptionFilter = AllowAllSubscriptionFilter, > { - /// Keep track of a set of internal metrics relating to gossipsub. - metrics: Option, - /// Configuration providing gossipsub performance parameters. config: GossipsubConfig, @@ -311,6 +308,9 @@ pub struct Gossipsub< /// calculating the message-id and sending to the application. This is designed to allow the /// user to implement arbitrary topic-based compression algorithms. data_transform: D, + + /// Keep track of a set of internal metrics relating to gossipsub. + metrics: Option, } impl Gossipsub @@ -320,16 +320,31 @@ where { /// Creates a [`Gossipsub`] struct given a set of parameters specified via a /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. - /// Metrics can be evaluated by passing a reference to a [`Registry`]. pub fn new( privacy: MessageAuthenticity, config: GossipsubConfig, - metrics: Option<&mut Registry>, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, - metrics, + None, + F::default(), + D::default(), + ) + } + + /// Creates a [`Gossipsub`] struct given a set of parameters specified via a + /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. + /// Metrics can be evaluated by passing a reference to a [`Registry`]. + pub fn new_with_metrics( + privacy: MessageAuthenticity, + config: GossipsubConfig, + metrics: &mut Registry, + ) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + Some(metrics), F::default(), D::default(), ) @@ -3754,8 +3769,7 @@ mod local_test { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = - Gossipsub::new(MessageAuthenticity::RandomAuthor, config, None).unwrap(); + let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); // Message under the limit should be fine. let mut rpc = empty_rpc(); @@ -3803,8 +3817,7 @@ mod local_test { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let gs: Gossipsub = - Gossipsub::new(MessageAuthenticity::RandomAuthor, config, None).unwrap(); + let gs: Gossipsub = Gossipsub::new(MessageAuthenticity::RandomAuthor, config).unwrap(); let mut length_codec = unsigned_varint::codec::UviBytes::default(); length_codec.set_max_len(max_transmit_size); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index c0fbc1d9136..6839b876a2f 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -935,8 +935,7 @@ mod tests { .build() .unwrap(); // create a gossipsub struct - let mut gs: Gossipsub = - Gossipsub::new(MessageAuthenticity::Anonymous, gs_config, None).unwrap(); + let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config).unwrap(); // create a topic and fill it with some peers let topic_hash = Topic::new("Test").hash().clone(); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 9080359d895..bf5da700cd6 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -53,6 +53,10 @@ const NON_MESH_TOPIC_LIMIT: usize = 50; /// A collection of metrics used throughout the Gossipsub behaviour. pub struct InternalMetrics { + /// Keeps track of which mesh topics have been added to metrics or not. + added_mesh_topics: HashSet, + /// Keeps track of which non mesh topics have been added to metrics or not. + added_non_mesh_topics: HashSet, /// The current peers in each mesh. mesh_peers: Family, /// The scores for each peer in each mesh. @@ -92,21 +96,15 @@ pub struct InternalMetrics { memcache_misses: Counter, /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. topic_metrics: HashMap, - /// Keeps track of which mesh topics have been added to metrics or not. - added_mesh_topics: HashSet, - /// Keeps track of which non mesh topics have been added to metrics or not. - added_non_mesh_topics: HashSet, } impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. pub fn new(registry: &mut Registry) -> Self { - let sub_registry = registry.sub_registry_with_prefix("gossipsub"); - /* Mesh Metrics */ let mesh_peers = Family::default(); - sub_registry.register( + registry.register( "mesh_peer_count", "Number of peers in each mesh", Box::new(mesh_peers.clone()), @@ -115,84 +113,84 @@ impl InternalMetrics { let mesh_score = Family::new_with_constructor(|| { Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) }); - sub_registry.register( + registry.register( "mesh_score", "Score of all peers in each mesh", Box::new(mesh_score.clone()), ); let mesh_avg_score = Family::default(); - sub_registry.register( + registry.register( "mesh_avg_score", "Average score of all peers in each mesh", Box::new(mesh_avg_score.clone()), ); let mesh_message_rx_total = Family::default(); - sub_registry.register( + registry.register( "mesh_message_rx_total", "Total number of messages received from each mesh", Box::new(mesh_message_rx_total.clone()), ); let mesh_message_tx_total = Family::default(); - sub_registry.register( + registry.register( "mesh_message_tx_total", "Total number of messages sent in each mesh", Box::new(mesh_message_tx_total.clone()), ); let mesh_messages_from_non_mesh_peers = Family::default(); - sub_registry.register( + registry.register( "messages_from_non_mesh_peers", "Number of messages received from peers not in the mesh, for each mesh", Box::new(mesh_messages_from_non_mesh_peers.clone()), ); let mesh_duplicates_filtered = Family::default(); - sub_registry.register( + registry.register( "mesh_duplicates_filtered", "Total number of duplicate messages filtered in each mesh", Box::new(mesh_duplicates_filtered.clone()), ); let mesh_messages_validated = Family::default(); - sub_registry.register( + registry.register( "mesh_messages_validated", "Total number of messages that have been validated in each mesh", Box::new(mesh_messages_validated.clone()), ); let mesh_messages_rejected = Family::default(); - sub_registry.register( + registry.register( "mesh_messages_rejected", "Total number of messages rejected in each mesh", Box::new(mesh_messages_rejected.clone()), ); let mesh_messages_ignored = Family::default(); - sub_registry.register( + registry.register( "mesh_messages_ignored", "Total number of messages ignored in each mesh", Box::new(mesh_messages_ignored.clone()), ); let mesh_first_message_deliveries_per_slot = Family::default(); - sub_registry.register( + registry.register( "mesh_first_message_deliveries_per_slot", "The number of first message deliveries per mesh slot", Box::new(mesh_first_message_deliveries_per_slot.clone()), ); let mesh_iwant_requests = Family::default(); - sub_registry.register( + registry.register( "mesh_iwant_requests", "The number of IWANT requests per mesh", Box::new(mesh_first_message_deliveries_per_slot.clone()), ); let broken_promises = Counter::default(); - sub_registry.register( + registry.register( "broken_promises", "Total number of broken promises per mesh", Box::new(broken_promises.clone()), @@ -201,14 +199,14 @@ impl InternalMetrics { /* Peer Metrics */ let topic_peers = Family::default(); - sub_registry.register( + registry.register( "topic_peer_count", "Number of peers subscribed to each known topic", Box::new(topic_peers.clone()), ); let subscribed_topic_peers = Family::default(); - sub_registry.register( + registry.register( "subscribed_topic_peer_count", "Number of peers subscribed to each subscribed topic", Box::new(subscribed_topic_peers.clone()), @@ -218,14 +216,14 @@ impl InternalMetrics { // Invalid Topic Messages let invalid_topic_messages = Counter::default(); - sub_registry.register( + registry.register( "invalid_topic_messages", "Number of times a message has been received on a non-subscribed topic", Box::new(invalid_topic_messages.clone()), ); let memcache_misses = Counter::default(); - sub_registry.register( + registry.register( "memcache_misses", "Number of times a message has attempted to be forwarded but has already been removed from the memcache", Box::new(memcache_misses.clone()), diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 486c2a4f74e..c0a3ec6a9bd 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -570,7 +570,6 @@ mod tests { let gs: Gossipsub = Gossipsub::new( crate::MessageAuthenticity::Signed(keypair.0.clone()), config, - None, ) .unwrap(); let data = (0..g.gen_range(10, 10024)) diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 6512f31f511..3cf3f882427 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -170,8 +170,7 @@ fn build_node() -> (Multiaddr, Swarm) { .validation_mode(ValidationMode::Permissive) .build() .unwrap(); - let behaviour = - Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config, None).unwrap(); + let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config).unwrap(); let mut swarm = Swarm::new(transport, behaviour, peer_id); let port = 1 + random::(); From 20add10056dc5bc4921d4b82a4ea611ecc158133 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 22 Oct 2021 10:33:54 -0500 Subject: [PATCH 06/10] add metrics config --- protocols/gossipsub/src/behaviour.rs | 22 +++++++++++----------- protocols/gossipsub/src/metrics.rs | 3 ++- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index a27b376a517..d57c98e97aa 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -52,6 +52,10 @@ use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; +use crate::metrics::{ + topic_metrics::{SlotChurnMetric, SlotMessageMetric}, + Config as MetricsConfig, InternalMetrics, +}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -69,11 +73,6 @@ use std::{cmp::Ordering::Equal, fmt::Debug}; #[cfg(test)] mod tests; -use crate::metrics::{ - topic_metrics::{SlotChurnMetric, SlotMessageMetric}, - InternalMetrics, -}; - /// Determines if published messages should be signed or not. /// /// Without signing, a number of privacy preserving modes can be selected. @@ -339,12 +338,13 @@ where pub fn new_with_metrics( privacy: MessageAuthenticity, config: GossipsubConfig, - metrics: &mut Registry, + metrics_registry: &mut Registry, + metrics_config: MetricsConfig, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, - Some(metrics), + Some((metrics_registry, metrics_config)), F::default(), D::default(), ) @@ -361,7 +361,7 @@ where pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: GossipsubConfig, - metrics: Option<&mut Registry>, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( @@ -384,7 +384,7 @@ where pub fn new_with_transform( privacy: MessageAuthenticity, config: GossipsubConfig, - metrics: Option<&mut Registry>, + metrics: Option<(&mut Registry, MetricsConfig)>, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( @@ -407,7 +407,7 @@ where pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: GossipsubConfig, - metrics: Option<&mut Registry>, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, data_transform: D, ) -> Result { @@ -418,7 +418,7 @@ where validate_config(&privacy, config.validation_mode())?; Ok(Gossipsub { - metrics: metrics.map(|registry| InternalMetrics::new(registry)), + metrics: metrics.map(|(registry, cfg)| InternalMetrics::new(registry, cfg)), events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index bf5da700cd6..51b2cd3c13e 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -97,10 +97,11 @@ pub struct InternalMetrics { /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. topic_metrics: HashMap, } +pub struct Config {} impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. - pub fn new(registry: &mut Registry) -> Self { + pub fn new(registry: &mut Registry, config: Config) -> Self { /* Mesh Metrics */ let mesh_peers = Family::default(); From b0b1083489f1df71874f2464b21a95c5f5dc6df5 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 22 Oct 2021 15:15:44 -0500 Subject: [PATCH 07/10] add metrics config --- protocols/gossipsub/src/metrics.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 51b2cd3c13e..221ef245258 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -38,7 +38,7 @@ use topic_metrics::Slot; use open_metrics_client::metrics::counter::Counter; use open_metrics_client::metrics::family::Family; use open_metrics_client::metrics::gauge::Gauge; -use open_metrics_client::metrics::histogram::{exponential_buckets, Histogram}; +use open_metrics_client::metrics::histogram::{linear_buckets, Histogram}; use open_metrics_client::registry::Registry; use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; @@ -97,11 +97,20 @@ pub struct InternalMetrics { /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. topic_metrics: HashMap, } -pub struct Config {} + +pub struct Config { + pub score_histogram_buckets: Vec, +} + +impl Config { + pub fn histogram(&self) -> Histogram { + Histogram::new(self.score_histogram_buckets.clone().into_iter()) + } +} impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. - pub fn new(registry: &mut Registry, config: Config) -> Self { + pub fn new(registry: &mut Registry, _config: Config) -> Self { /* Mesh Metrics */ let mesh_peers = Family::default(); @@ -111,9 +120,11 @@ impl InternalMetrics { Box::new(mesh_peers.clone()), ); - let mesh_score = Family::new_with_constructor(|| { - Histogram::new(exponential_buckets(-1000.0, 10.0, 100)) - }); + // TODO: change after https://github.com/mxinden/rust-open-metrics-client/pull/21 and use + // for now the range -10K to 10K with 100 long intervals as reasonable default. + // let mesh_score = Family::new_with_constructor(config); + let mesh_score = + Family::new_with_constructor(|| Histogram::new(linear_buckets(-10_000.0, 100.0, 201))); registry.register( "mesh_score", "Score of all peers in each mesh", From 3242012c8ffd12f549a36908d98a7a36ece62f07 Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 22 Oct 2021 17:25:12 -0500 Subject: [PATCH 08/10] tmp use local open-metrics-client --- protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/metrics.rs | 27 +++++++++++++++------------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 5de59b2abff..cdde9dac262 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -29,7 +29,7 @@ prost = "0.9" hex_fmt = "0.3.0" regex = "1.4.0" # Metrics dependencies -open-metrics-client = "0.12.0" +open-metrics-client = {path = "../../../rust-open-metrics-client"} strum = "0.21" strum_macros = "0.21" diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index 221ef245258..f2eef920c12 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -36,9 +36,9 @@ use topic_metrics::Slot; // use open_metrics_client::encoding::text::Encode; use open_metrics_client::metrics::counter::Counter; -use open_metrics_client::metrics::family::Family; +use open_metrics_client::metrics::family::{Family, MetricConstructor}; use open_metrics_client::metrics::gauge::Gauge; -use open_metrics_client::metrics::histogram::{linear_buckets, Histogram}; +use open_metrics_client::metrics::histogram::Histogram; use open_metrics_client::registry::Registry; use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; @@ -60,7 +60,7 @@ pub struct InternalMetrics { /// The current peers in each mesh. mesh_peers: Family, /// The scores for each peer in each mesh. - mesh_score: Family, + mesh_score: Family, /// The average peer score for each mesh. mesh_avg_score: Family, /// The total number of messages received (after duplicate filter). @@ -102,15 +102,19 @@ pub struct Config { pub score_histogram_buckets: Vec, } -impl Config { - pub fn histogram(&self) -> Histogram { - Histogram::new(self.score_histogram_buckets.clone().into_iter()) +#[derive(Clone)] +struct ScoreHistogramBuilder { + buckets: Vec, +} +impl MetricConstructor for ScoreHistogramBuilder { + fn new(&self) -> Histogram { + Histogram::new(self.buckets.clone().into_iter()) } } impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. - pub fn new(registry: &mut Registry, _config: Config) -> Self { + pub fn new(registry: &mut Registry, config: Config) -> Self { /* Mesh Metrics */ let mesh_peers = Family::default(); @@ -120,11 +124,10 @@ impl InternalMetrics { Box::new(mesh_peers.clone()), ); - // TODO: change after https://github.com/mxinden/rust-open-metrics-client/pull/21 and use - // for now the range -10K to 10K with 100 long intervals as reasonable default. - // let mesh_score = Family::new_with_constructor(config); - let mesh_score = - Family::new_with_constructor(|| Histogram::new(linear_buckets(-10_000.0, 100.0, 201))); + let score_histogram_builder = ScoreHistogramBuilder { + buckets: config.score_histogram_buckets.clone(), + }; + let mesh_score = Family::new_with_constructor(score_histogram_builder); registry.register( "mesh_score", "Score of all peers in each mesh", From cfd74f5f8a93b07c87e20deba369e710c438bcaa Mon Sep 17 00:00:00 2001 From: Diva M Date: Fri, 22 Oct 2021 17:37:48 -0500 Subject: [PATCH 09/10] fix example --- examples/gossipsub-chat.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index b738541c72f..bbf1190f8c3 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -94,9 +94,7 @@ async fn main() -> Result<(), Box> { // same content will be propagated. .build() .expect("Valid config"); - // Build a gossipsub network behaviour. - // NOTE: The last parameter specifies an `open_metrics_client` `Registry` which optionally - // enables metrics for the gossipsub behaviour. + // build a gossipsub network behaviour let mut gossipsub: gossipsub::Gossipsub = gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config) .expect("Correct configuration"); From 0926879f68d20a4049064070d8f937737fc038bb Mon Sep 17 00:00:00 2001 From: Diva M Date: Mon, 25 Oct 2021 16:54:32 -0500 Subject: [PATCH 10/10] add notes and expand config --- protocols/gossipsub/src/behaviour.rs | 1 + protocols/gossipsub/src/metrics.rs | 83 +++++++++++++++++++++------- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index d57c98e97aa..001aa5d98e4 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -961,6 +961,7 @@ where debug!("JOIN: The topic is already in the mesh, ignoring JOIN"); return; } + // TODO: here. No need to store the added peers twice. let mut added_peers = HashSet::new(); diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs index f2eef920c12..f4706a981ea 100644 --- a/protocols/gossipsub/src/metrics.rs +++ b/protocols/gossipsub/src/metrics.rs @@ -38,57 +38,65 @@ use topic_metrics::Slot; use open_metrics_client::metrics::counter::Counter; use open_metrics_client::metrics::family::{Family, MetricConstructor}; use open_metrics_client::metrics::gauge::Gauge; -use open_metrics_client::metrics::histogram::Histogram; +use open_metrics_client::metrics::histogram::{linear_buckets, Histogram}; use open_metrics_client::registry::Registry; use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; -/// This provides an upper bound to the number of mesh topics we create metrics for. It prevents -/// unbounded labels being created in the metrics. -const MESH_TOPIC_LIMIT: usize = 300; -/// A separate limit is used to keep track of non-mesh topics. Mesh topics are controlled by the -/// user via subscriptions whereas non-mesh topics are determined by users on the network. -/// This limit permits a fixed amount of topics to allow, in-addition to the mesh topics. -const NON_MESH_TOPIC_LIMIT: usize = 50; - /// A collection of metrics used throughout the Gossipsub behaviour. pub struct InternalMetrics { + /* Auxiliary, defensive values */ /// Keeps track of which mesh topics have been added to metrics or not. added_mesh_topics: HashSet, + /// Maximum number of mesh topics instrumented. + max_measured_mesh_topics: usize, + /// Maximum number of non-mesh topics instrumented. + max_measured_non_mesh_topics: usize, /// Keeps track of which non mesh topics have been added to metrics or not. added_non_mesh_topics: HashSet, + + /* Mesh metrics */ /// The current peers in each mesh. mesh_peers: Family, /// The scores for each peer in each mesh. mesh_score: Family, /// The average peer score for each mesh. + // TODO: we need the median not the avg. mesh_avg_score: Family, - /// The total number of messages received (after duplicate filter). - mesh_message_rx_total: Family, /// The total number of messages sent. mesh_message_tx_total: Family, - /// The number of messages received from non-mesh peers (after duplicate filter). - mesh_messages_from_non_mesh_peers: Family, + + /// The total number of messages received (after duplicate filter). + mesh_message_rx_total: Family, /// The total number of duplicate messages filtered per mesh. + // TODO: is this messages received? mesh_duplicates_filtered: Family, + /// The number of messages received from non-mesh peers (after duplicate filter). + mesh_messages_from_non_mesh_peers: Family, + + // TODO: how do these add up wrt to the total? /// The total number of messages validated per mesh. mesh_messages_validated: Family, /// The total number of messages rejected per mesh. mesh_messages_rejected: Family, /// The total number of messages ignored per mesh. mesh_messages_ignored: Family, - /// The number of first message delivers per slot per mesh. + + /// The number of first message deliveries per slot per mesh. mesh_first_message_deliveries_per_slot: Family<(TopicHash, Slot), Gauge>, /// The number of IWANT requests being sent per mesh topic. + // TODO: do these counters not get reset? mesh_iwant_requests: Family, /// Number of peers subscribed to each known topic. topic_peers: Family, /// Number of peers subscribed to each subscribed topic. subscribed_topic_peers: Family, /// The number of broken promises (this metric is indicative of nodes with invalid message-ids) + // TODO: why is this not per topic? because who cares? broken_promises: Counter, /// Keeps track of the number of messages we have received on topics we are not subscribed /// to. + // TODO: why is this invalid? invalid_topic_messages: Counter, /// When the user validates a message, it tries to re propagate it to its mesh peers. If the /// message expires from the memcache before it can be validated, we count this a cache miss @@ -98,8 +106,33 @@ pub struct InternalMetrics { topic_metrics: HashMap, } +/// This provides an upper bound to the number of mesh topics we create metrics for. It prevents +/// unbounded labels being created in the metrics. +const DEFAULT_MESH_TOPIC_LIMIT: usize = 300; +/// A separate limit is used to keep track of non-mesh topics. Mesh topics are controlled by the +/// user via subscriptions whereas non-mesh topics are determined by users on the network. +/// This limit permits a fixed amount of topics to allow, in-addition to the mesh topics. +const DEFAULT_NON_MESH_TOPIC_LIMIT: usize = 50; + pub struct Config { pub score_histogram_buckets: Vec, + /// This provides an upper bound to the number of mesh topics we create metrics for. It + /// prevents unbounded labels being created in the metrics. + pub max_measured_mesh_topics: usize, + /// Mesh topics are controlled by the user via subscriptions whereas non-mesh topics are + /// determined by users on the network. This limit permits a fixed amount of topics to allow, + /// in-addition to the mesh topics. + pub max_measured_non_mesh_topics: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + score_histogram_buckets: linear_buckets(-10_000.0, 100.0, 201).collect(), + max_measured_mesh_topics: DEFAULT_MESH_TOPIC_LIMIT, + max_measured_non_mesh_topics: DEFAULT_NON_MESH_TOPIC_LIMIT, + } + } } #[derive(Clone)] @@ -115,6 +148,12 @@ impl MetricConstructor for ScoreHistogramBuilder { impl InternalMetrics { /// Constructs and builds the internal metrics given a registry. pub fn new(registry: &mut Registry, config: Config) -> Self { + let Config { + score_histogram_buckets, + max_measured_mesh_topics, + max_measured_non_mesh_topics, + } = config; + /* Mesh Metrics */ let mesh_peers = Family::default(); @@ -125,7 +164,7 @@ impl InternalMetrics { ); let score_histogram_builder = ScoreHistogramBuilder { - buckets: config.score_histogram_buckets.clone(), + buckets: score_histogram_buckets, }; let mesh_score = Family::new_with_constructor(score_histogram_builder); registry.register( @@ -245,6 +284,10 @@ impl InternalMetrics { ); InternalMetrics { + added_mesh_topics: HashSet::new(), + added_non_mesh_topics: HashSet::new(), + max_measured_mesh_topics, + max_measured_non_mesh_topics, mesh_peers, mesh_score, mesh_avg_score, @@ -257,14 +300,12 @@ impl InternalMetrics { mesh_messages_ignored, mesh_first_message_deliveries_per_slot, mesh_iwant_requests, - broken_promises, - memcache_misses, topic_peers, subscribed_topic_peers, + broken_promises, invalid_topic_messages, + memcache_misses, topic_metrics: HashMap::new(), - added_mesh_topics: HashSet::new(), - added_non_mesh_topics: HashSet::new(), } } @@ -457,7 +498,7 @@ impl InternalMetrics { /// true. fn allowed_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { // If we haven't reached the limit, record the topic. - if self.added_mesh_topics.len() < MESH_TOPIC_LIMIT { + if self.added_mesh_topics.len() < self.max_measured_mesh_topics { self.added_mesh_topics.insert(topic_hash.clone()); true } else { @@ -473,7 +514,7 @@ impl InternalMetrics { /// true. fn allowed_non_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { // If we haven't reached the limit, record the topic. - if self.added_non_mesh_topics.len() < NON_MESH_TOPIC_LIMIT + if self.added_non_mesh_topics.len() < self.max_measured_non_mesh_topics && !self.added_mesh_topics.contains(topic_hash) { self.added_non_mesh_topics.insert(topic_hash.clone());