diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index a755c375f01..ca1613cee67 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -3,6 +3,12 @@ - Add an event to register peers that do not support the gossipsub protocol [PR 2241](https://github.com/libp2p/rust-libp2p/pull/2241) +- Adds optional metrics for tracking mesh and peer behaviour. + [PR 2235](https://github.com/libp2p/rust-libp2p/pull/2235) + +- Make default features of `libp2p-core` optional. + [PR 2181](https://github.com/libp2p/rust-libp2p/pull/2181) + - Make default features of `libp2p-core` optional. [PR 2181](https://github.com/libp2p/rust-libp2p/pull/2181) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 825c174b9f3..cdde9dac262 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] libp2p-swarm = { version = "0.31.0-rc.1", path = "../../swarm" } libp2p-core = { version = "0.30.0-rc.1", path = "../../core", default-features = false } +libp2p-metrics = { path = "../../misc/metrics"} bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" @@ -27,6 +28,10 @@ smallvec = "1.6.1" prost = "0.9" hex_fmt = "0.3.0" regex = "1.4.0" +# Metrics dependencies +open-metrics-client = {path = "../../../rust-open-metrics-client"} +strum = "0.21" +strum_macros = "0.21" [dev-dependencies] async-std = "1.6.3" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 9c2feb77765..001aa5d98e4 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use std::{ - cmp::{max, Ordering}, + cmp::max, collections::HashSet, collections::VecDeque, collections::{BTreeSet, HashMap}, @@ -32,6 +32,7 @@ use std::{ use futures::StreamExt; use log::{debug, error, trace, warn}; +use open_metrics_client::registry::Registry; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; use wasm_timer::{Instant, Interval}; @@ -51,6 +52,10 @@ use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; +use crate::metrics::{ + topic_metrics::{SlotChurnMetric, SlotMessageMetric}, + Config as MetricsConfig, InternalMetrics, +}; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter}; @@ -302,6 +307,9 @@ pub struct Gossipsub< /// calculating the message-id and sending to the application. This is designed to allow the /// user to implement arbitrary topic-based compression algorithms. data_transform: D, + + /// Keep track of a set of internal metrics relating to gossipsub. + metrics: Option, } impl Gossipsub @@ -318,6 +326,25 @@ where Self::new_with_subscription_filter_and_transform( privacy, config, + None, + F::default(), + D::default(), + ) + } + + /// Creates a [`Gossipsub`] struct given a set of parameters specified via a + /// [`GossipsubConfig`]. This has no subscription filter and uses no compression. + /// Metrics can be evaluated by passing a reference to a [`Registry`]. + pub fn new_with_metrics( + privacy: MessageAuthenticity, + config: GossipsubConfig, + metrics_registry: &mut Registry, + metrics_config: MetricsConfig, + ) -> Result { + Self::new_with_subscription_filter_and_transform( + privacy, + config, + Some((metrics_registry, metrics_config)), F::default(), D::default(), ) @@ -334,11 +361,13 @@ where pub fn new_with_subscription_filter( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, subscription_filter, D::default(), ) @@ -355,11 +384,13 @@ where pub fn new_with_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, data_transform: D, ) -> Result { Self::new_with_subscription_filter_and_transform( privacy, config, + metrics, F::default(), data_transform, ) @@ -376,6 +407,7 @@ where pub fn new_with_subscription_filter_and_transform( privacy: MessageAuthenticity, config: GossipsubConfig, + metrics: Option<(&mut Registry, MetricsConfig)>, subscription_filter: F, data_transform: D, ) -> Result { @@ -385,9 +417,8 @@ where // were received locally. validate_config(&privacy, config.validation_mode())?; - // Set up message publishing parameters. - Ok(Gossipsub { + metrics: metrics.map(|(registry, cfg)| InternalMetrics::new(registry, cfg)), events: VecDeque::new(), control_pool: HashMap::new(), publish_config: privacy.into(), @@ -740,14 +771,55 @@ where "Message not in cache. Ignoring forwarding. Message Id: {}", msg_id ); + if let Some(metrics) = self.metrics.as_mut() { + metrics.memcache_miss(); + } + return Ok(false); } }; + let topic = raw_message.topic.clone(); + self.forward_msg(msg_id, raw_message, Some(propagation_source))?; + + // Metrics: Report validation result + if let Some(metrics) = self.metrics.as_mut() { + metrics.increment_message_metric( + &topic, + propagation_source, + SlotMessageMetric::MessagesValidated, + ); + } return Ok(true); } - MessageAcceptance::Reject => RejectReason::ValidationFailed, - MessageAcceptance::Ignore => RejectReason::ValidationIgnored, + MessageAcceptance::Reject => { + // Metrics: Report validation result + if let Some(metrics) = self.metrics.as_mut() { + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesRejected, + ); + } + } + RejectReason::ValidationFailed + } + MessageAcceptance::Ignore => { + // Metrics: Report validation result + if let Some(metrics) = self.metrics.as_mut() { + if let Some(raw_message) = self.mcache.get(msg_id) { + // Increment metrics + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesIgnored, + ); + } + } + RejectReason::ValidationIgnored + } }; if let Some(raw_message) = self.mcache.remove(msg_id) { @@ -763,6 +835,10 @@ where Ok(true) } else { warn!("Rejected message not in cache. Message Id: {}", msg_id); + if let Some(metrics) = self.metrics.as_mut() { + metrics.memcache_miss(); + } + Ok(false) } } @@ -856,6 +932,26 @@ where } } + /// This is just a utility function to verify everything is in order between the + /// mesh and the topic_metrics. It's useful for debugging. + #[cfg(debug_assertions)] + fn validate_mesh_slots_for_topic(&self, topic: &TopicHash) -> Result<(), String> { + if let Some(metrics) = self.metrics.as_ref() { + match metrics.topic_metrics().get(topic) { + Some(topic_metrics) => match self.mesh.get(topic) { + Some(mesh_peers) => topic_metrics.validate_mesh_slots(mesh_peers), + None => Err(format!("metrics_event[{}] no mesh_peers for topic", topic)), + }, + None => Err(format!( + "metrics_event[{}]: no topic_metrics for topic", + topic + )), + } + } else { + Ok(()) + } + } + /// Gossipsub JOIN(topic) - adds topic peers to mesh and sends them GRAFT messages. fn join(&mut self, topic_hash: &TopicHash) { debug!("Running JOIN for topic: {:?}", topic_hash); @@ -865,6 +961,7 @@ where debug!("JOIN: The topic is already in the mesh, ignoring JOIN"); return; } + // TODO: here. No need to store the added peers twice. let mut added_peers = HashSet::new(); @@ -931,6 +1028,10 @@ where mesh_peers.extend(new_peers); } + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, added_peers.iter().cloned()); + } + for peer_id in added_peers { // Send a GRAFT control message debug!("JOIN: Sending Graft message to peer: {:?}", peer_id); @@ -955,7 +1056,19 @@ where &self.connected_peers, ); } - debug!("Completed JOIN for topic: {:?}", topic_hash); + + #[cfg(debug_assertions)] + { + let validation_result = self.validate_mesh_slots_for_topic(topic_hash); + debug_assert!( + validation_result.is_ok(), + "metrics_event: validate_mesh_slots_for_topic({}) failed! Err({})", + topic_hash, + validation_result.err().unwrap() + ); + } + + trace!("Completed JOIN for topic: {:?}", topic_hash); } /// Creates a PRUNE gossipsub action. @@ -1036,6 +1149,10 @@ where &self.connected_peers, ); } + + if let Some(metrics) = self.metrics.as_mut() { + metrics.leave_topic(topic_hash); + } } debug!("Completed LEAVE for topic: {:?}", topic_hash); } @@ -1117,7 +1234,7 @@ where debug!("Handling IHAVE for peer: {:?}", peer_id); // use a hashset to avoid duplicates efficiently - let mut iwant_ids = HashSet::new(); + let mut iwant_ids = HashMap::new(); for (topic, ids) in ihave_msgs { // only process the message if we are subscribed @@ -1132,7 +1249,7 @@ where for id in ids { if !self.duplicate_cache.contains(&id) { // have not seen this message, request it - iwant_ids.insert(id); + iwant_ids.insert(id, topic.clone()); } } } @@ -1153,7 +1270,7 @@ where ); //ask in random order - let mut iwant_ids_vec: Vec<_> = iwant_ids.iter().collect(); + let mut iwant_ids_vec: Vec<_> = iwant_ids.keys().collect(); let mut rng = thread_rng(); iwant_ids_vec.partial_shuffle(&mut rng, iask as usize); @@ -1173,6 +1290,15 @@ where peer_id, message_ids ); + // Metrics: Add IWANT requests + if let Some(metrics) = self.metrics.as_mut() { + for id in &message_ids { + if let Some(topic) = iwant_ids.get(id) { + metrics.iwant_request(topic); + } + } + } + Self::control_pool_add( &mut self.control_pool, *peer_id, @@ -1334,6 +1460,11 @@ where peer_id, &topic_hash ); peers.insert(*peer_id); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slot_if_unassigned(&topic_hash, peer_id); + } + // If the peer did not previously exist in any mesh, inform the handler peer_added_to_mesh( *peer_id, @@ -1396,6 +1527,7 @@ where topic_hash: &TopicHash, backoff: Option, always_update_backoff: bool, + churn_reason: SlotChurnMetric, ) { let mut update_backoff = always_update_backoff; if let Some(peers) = self.mesh.get_mut(topic_hash) { @@ -1422,6 +1554,9 @@ where &mut self.events, &self.connected_peers, ); + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic_hash, peer_id, churn_reason); + } } } if update_backoff { @@ -1445,10 +1580,16 @@ where let (below_threshold, score) = self.score_below_threshold(peer_id, |pst| pst.accept_px_threshold); for (topic_hash, px, backoff) in prune_data { - self.remove_peer_from_mesh(peer_id, &topic_hash, backoff, true); + self.remove_peer_from_mesh( + peer_id, + &topic_hash, + backoff, + true, + SlotChurnMetric::ChurnPrune, + ); if self.mesh.contains_key(&topic_hash) { - //connect to px peers + // connect to px peers if !px.is_empty() { // we ignore PX from peers with insufficient score if below_threshold { @@ -1603,13 +1744,40 @@ where mut raw_message: RawGossipsubMessage, propagation_source: &PeerId, ) { + // Report received message to metrics if we are subscribed to the topic, otherwise + // ignore it. + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesAll, + ); + } + } + let fast_message_id = self.config.fast_message_id(&raw_message); if let Some(fast_message_id) = fast_message_id.as_ref() { if let Some(msg_id) = self.fast_messsage_id_cache.get(fast_message_id) { let msg_id = msg_id.clone(); - self.message_is_valid(&msg_id, &mut raw_message, propagation_source); - if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.duplicated_message(propagation_source, &msg_id, &raw_message.topic); + if self.message_is_valid(&msg_id, &mut raw_message, propagation_source) { + if let Some((peer_score, ..)) = &mut self.peer_score { + peer_score.duplicated_message( + propagation_source, + &msg_id, + &raw_message.topic, + ); + } + // Metrics: Report the duplicate message, for mesh topics + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } + } } return; } @@ -1652,13 +1820,36 @@ where if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &message.topic); } + + // Metrics: Report the duplicate message, for mesh topics + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + // NOTE: Allow overflowing of a usize here + metrics.increment_message_metric( + &message.topic, + propagation_source, + SlotMessageMetric::MessagesDuplicates, + ); + } + } return; } - debug!( + trace!( "Put message {:?} in duplicate_cache and resolve promises", msg_id ); + // Increment the first message topic, if its in our mesh. + if let Some(metrics) = self.metrics.as_mut() { + if self.mesh.contains_key(&raw_message.topic) { + metrics.increment_message_metric( + &raw_message.topic, + propagation_source, + SlotMessageMetric::MessagesFirst, + ); + } + } + // Tells score that message arrived (but is maybe not fully validated yet). // Consider the message as delivered for gossip promises. if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { @@ -1684,6 +1875,10 @@ where "Received message on a topic we are not subscribed to: {:?}", message.topic ); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.message_invalid_topic(); + } return; } @@ -1789,7 +1984,14 @@ where } // add to the peer_topics mapping - subscribed_topics.insert(subscription.topic_hash.clone()); + if subscribed_topics.insert(subscription.topic_hash.clone()) { + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_joined_topic(&subscription.topic_hash); + if self.mesh.contains_key(&subscription.topic_hash) { + metrics.peer_joined_subscribed_topic(&subscription.topic_hash); + } + } + } // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) @@ -1847,6 +2049,13 @@ where propagation_source.to_string(), subscription.topic_hash ); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_left_topic(&subscription.topic_hash); + if self.mesh.contains_key(&subscription.topic_hash) { + metrics.peer_left_subscribed_topic(&subscription.topic_hash); + } + } } // remove topic from the peer_topics mapping subscribed_topics.remove(&subscription.topic_hash); @@ -1864,7 +2073,13 @@ where // remove unsubscribed peers from the mesh if it exists for (peer_id, topic_hash) in unsubscribed_peers { - self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); + self.remove_peer_from_mesh( + &peer_id, + &topic_hash, + None, + false, + SlotChurnMetric::ChurnUnsubscribed, + ); } // Potentially inform the handler if we have added this peer to a mesh for the first time. @@ -1880,6 +2095,12 @@ where ); } + if let Some(metrics) = self.metrics.as_mut() { + for topic in &topics_to_graft { + metrics.assign_slot_if_unassigned(topic, propagation_source); + } + } + // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. if !topics_to_graft.is_empty() @@ -1917,13 +2138,18 @@ where if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score { for (peer, count) in gossip_promises.get_broken_promises() { peer_score.add_penalty(&peer, count); + + // Metrics: Increment broken promises + if let Some(metrics) = self.metrics.as_mut() { + metrics.broken_promise(); + } } } } /// Heartbeat function which shifts the memcache and updates the mesh. fn heartbeat(&mut self) { - debug!("Starting heartbeat"); + trace!("Starting heartbeat"); self.heartbeat_ticks += 1; @@ -1956,6 +2182,8 @@ where _ => 0.0, }; + #[cfg(debug_assertions)] + let mut modified_topics = HashSet::new(); // maintain the mesh for each topic for (topic_hash, peers) in self.mesh.iter_mut() { let explicit_peers = &self.explicit_peers; @@ -1970,7 +2198,7 @@ where .iter() .filter(|&p| { if score(p) < 0.0 { - debug!( + trace!( "HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \ {}]", p, @@ -1990,6 +2218,13 @@ where .collect(); for peer in to_remove { peers.remove(&peer); + + // Increment ChurnScore and remove peer from the slot + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnScore); + } + #[cfg(debug_assertions)] + modified_topics.insert(topic_hash.clone()); } // too little peers - add some @@ -2019,8 +2254,19 @@ where current_topic.push(topic_hash.clone()); } // update the mesh - debug!("Updating mesh, new mesh: {:?}", peer_list); - peers.extend(peer_list); + if !peer_list.is_empty() { + trace!("Updating mesh, adding to mesh: {:?}", peer_list); + + // Metrics: Update mesh peers + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } + #[cfg(debug_assertions)] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } // too many peers - remove some @@ -2037,8 +2283,11 @@ where let mut rng = thread_rng(); let mut shuffled = peers.iter().cloned().collect::>(); shuffled.shuffle(&mut rng); - shuffled - .sort_by(|p1, p2| score(p1).partial_cmp(&score(p2)).unwrap_or(Ordering::Equal)); + shuffled.sort_by(|p1, p2| { + score(p1) + .partial_cmp(&score(p2)) + .unwrap_or(std::cmp::Ordering::Equal) + }); // shuffle everything except the last retain_scores many peers (the best ones) shuffled[..peers.len() - self.config.retain_scores()].shuffle(&mut rng); @@ -2067,6 +2316,12 @@ where outbound -= 1; } } + if let Some(metrics) = self.metrics.as_mut() { + // Metrics: increment ChurnExcess and vacate slot + metrics.churn_slot(topic_hash, &peer, SlotChurnMetric::ChurnExcess); + } + #[cfg(debug_assertions)] + modified_topics.insert(topic_hash.clone()); // remove the peer peers.remove(&peer); @@ -2101,9 +2356,19 @@ where let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } - // update the mesh - debug!("Updating mesh, new mesh: {:?}", peer_list); - peers.extend(peer_list); + if !peer_list.is_empty() { + // update the mesh + trace!("Updating mesh, adding to mesh: {:?}", peer_list); + + if let Some(metrics) = self.metrics.as_mut() { + metrics.assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } + #[cfg(debug_assertions)] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } } @@ -2157,12 +2422,25 @@ where let current_topic = to_graft.entry(*peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } - // update the mesh - debug!( - "Opportunistically graft in topic {} with peers {:?}", - topic_hash, peer_list - ); - peers.extend(peer_list); + + if !peer_list.is_empty() { + // update the mesh + debug!( + "Opportunistically graft in topic {} with peers {:?}", + topic_hash, peer_list + ); + + // Metrics: Update mesh peers + if let Some(metrics) = self.metrics.as_mut() { + metrics + .assign_slots_to_peers(topic_hash, peer_list.iter().cloned()); + } + #[cfg(debug_assertions)] + modified_topics.insert(topic_hash.clone()); + + // add the peers + peers.extend(peer_list); + } } } } @@ -2198,7 +2476,7 @@ where match self.peer_topics.get(peer) { Some(topics) => { if !topics.contains(topic_hash) || score(peer) < publish_threshold { - debug!( + trace!( "HEARTBEAT: Peer removed from fanout for topic: {:?}", topic_hash ); @@ -2285,7 +2563,18 @@ where // shift the memcache self.mcache.shift(); - debug!("Completed Heartbeat"); + #[cfg(debug_assertions)] + for topic in modified_topics { + let validation_result = self.validate_mesh_slots_for_topic(&topic); + debug_assert!( + validation_result.is_ok(), + "metrics_event: validate_mesh_slots_for_topic({}) failed! Err({})", + topic, + validation_result.err().unwrap() + ); + } + + trace!("Completed Heartbeat"); } /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh @@ -2881,7 +3170,21 @@ where // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(topic) { // check if the peer is in the mesh and remove it - mesh_peers.remove(peer_id); + if mesh_peers.contains(peer_id) { + mesh_peers.remove(peer_id); + + // increment churn_disconnected and vacate slot + if let Some(metrics) = self.metrics.as_mut() { + metrics.churn_slot(topic, peer_id, SlotChurnMetric::ChurnDisconnected); + } + } + } + + if let Some(metrics) = self.metrics.as_mut() { + metrics.peer_left_topic(&topic); + if self.mesh.contains_key(&topic) { + metrics.peer_left_subscribed_topic(&topic); + } } // remove from topic_peers diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index d22ce8a3e6d..6839b876a2f 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -38,9 +38,49 @@ mod tests { use crate::subscription_filter::WhitelistSubscriptionFilter; use crate::transform::{DataTransform, IdentityTransform}; use crate::types::FastMessageId; + /* For debug purposes + use env_logger::{Builder, Env}; + // Add this line to relevant tests. + Builder::from_env(Env::default().default_filter_or("info")).init(); + */ + use libp2p_swarm::AddressRecord; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; + struct FakePollParams { + peer_id: PeerId, + } + + impl FakePollParams { + pub fn new() -> Self { + FakePollParams { + peer_id: PeerId::random(), + } + } + } + + impl PollParameters for FakePollParams { + type SupportedProtocolsIter = std::vec::IntoIter>; + type ListenedAddressesIter = std::vec::IntoIter; + type ExternalAddressesIter = std::vec::IntoIter; + + fn supported_protocols(&self) -> Self::SupportedProtocolsIter { + Vec::new().into_iter() + } + + fn listened_addresses(&self) -> Self::ListenedAddressesIter { + Vec::new().into_iter() + } + + fn external_addresses(&self) -> Self::ExternalAddressesIter { + Vec::new().into_iter() + } + + fn local_peer_id(&self) -> &PeerId { + &self.peer_id + } + } + #[derive(Default, Builder, Debug)] #[builder(default)] struct InjectNodes @@ -72,6 +112,7 @@ mod tests { let mut gs: Gossipsub = Gossipsub::new_with_subscription_filter_and_transform( MessageAuthenticity::Signed(keypair), self.gs_config, + None, self.subscription_filter, self.data_transform, ) @@ -1323,6 +1364,20 @@ mod tests { gs.events.clear(); } + // Process current events + #[allow(dead_code)] + fn process_events(gs: &mut Gossipsub) + where + D: Send + 'static + DataTransform, + F: Send + 'static + TopicSubscriptionFilter, + { + let waker = futures::task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + let mut poll_params = FakePollParams::new(); + + while !gs.poll(&mut cx, &mut poll_params).is_pending() {} + } + #[test] // tests that a peer added as explicit peer gets connected to fn test_explicit_peer_gets_connected() { @@ -1940,7 +1995,7 @@ mod tests { .heartbeat_interval(Duration::from_millis(100)) .build() .unwrap(); - //only one peer => mesh too small and will try to regraft as early as possible + // only one peer => mesh too small and will try to regraft as early as possible let (mut gs, peers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) @@ -1948,22 +2003,21 @@ mod tests { .gs_config(config) .create_network(); - //handle prune from peer with backoff of one second + // handle prune from peer with backoff of one second gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), Some(1))]); - //forget all events until now + // forget all events until now flush_events(&mut gs); - - //call heartbeat + // call heartbeat gs.heartbeat(); - //Sleep for one second and apply 10 regular heartbeats (interval = 100ms). + // Sleep for one second and apply 10 regular heartbeats (interval = 100ms). for _ in 0..10 { sleep(Duration::from_millis(100)); gs.heartbeat(); } - //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat + // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( count_control_msgs(&gs, |_, m| match m { @@ -1990,14 +2044,14 @@ mod tests { #[test] fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without_backoff() { - //set default backoff period to 1 second + // set default backoff period to 1 second let config = GossipsubConfigBuilder::default() .prune_backoff(Duration::from_millis(90)) .backoff_slack(1) .heartbeat_interval(Duration::from_millis(100)) .build() .unwrap(); - //only one peer => mesh too small and will try to regraft as early as possible + // only one peer => mesh too small and will try to regraft as early as possible let (mut gs, peers, topics) = inject_nodes1() .peer_no(1) .topics(vec!["test".into()]) @@ -2005,7 +2059,7 @@ mod tests { .gs_config(config) .create_network(); - //handle prune from peer without a specified backoff + // handle prune from peer without a specified backoff gs.handle_prune(&peers[0], vec![(topics[0].clone(), Vec::new(), None)]); //forget all events until now diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index c5e90bf0c3a..1cc5c0e52d9 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -664,7 +664,7 @@ impl GossipsubConfigBuilder { self } - /// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + /// Number of heartbeat ticks that specify the interval in which opportunistic grafting is /// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh /// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a /// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index ddba0f69a1e..3079edcb694 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -103,9 +103,11 @@ //! let mut swarm = { //! // set default parameters for gossipsub //! let gossipsub_config = libp2p_gossipsub::GossipsubConfig::default(); -//! // build a gossipsub network behaviour +//! // Build a gossipsub network behaviour +//! // The last parameter specifies an open_metrics_client `Registry` which enables metrics for +//! // the gossipsub behaviour. //! let mut gossipsub: libp2p_gossipsub::Gossipsub = -//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config).unwrap(); +//! libp2p_gossipsub::Gossipsub::new(message_authenticity, gossipsub_config, None).unwrap(); //! // subscribe to the topic //! gossipsub.subscribe(&topic); //! // create the swarm @@ -125,6 +127,8 @@ pub mod error; pub mod protocol; +pub mod metrics; + mod backoff; mod behaviour; mod config; @@ -157,5 +161,6 @@ pub use self::types::{ FastMessageId, GossipsubMessage, GossipsubRpc, MessageAcceptance, MessageId, RawGossipsubMessage, }; + pub type IdentTopic = Topic; pub type Sha256Topic = Topic; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index d9b903acf45..1e0a7d56a59 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -89,7 +89,6 @@ impl MessageCache { } /// Get a message with `message_id` - #[cfg(test)] pub fn get(&self, message_id: &MessageId) -> Option<&RawGossipsubMessage> { self.msgs.get(message_id) } diff --git a/protocols/gossipsub/src/metrics.rs b/protocols/gossipsub/src/metrics.rs new file mode 100644 index 00000000000..f4706a981ea --- /dev/null +++ b/protocols/gossipsub/src/metrics.rs @@ -0,0 +1,528 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A set of metrics used to help track and diagnose the network behaviour of the gossipsub +//! protocol. +//! +//! Note that if metrics are enabled, we store a lot of detail for each metric. Specifically, each metric is stored +//! per "slot" of each mesh. This means each metric is counted for each peer whilst they are in the +//! mesh. The exposed open metric values typically aggregate these into a per +//! mesh metric. Users are able to fine-grain their access to the more detailed metrics via the +//! [`slot_metrics_for_topic`] function. + +pub mod topic_metrics; + +use crate::topic::TopicHash; +use libp2p_core::PeerId; +use std::collections::{HashMap, HashSet}; +use topic_metrics::Slot; + +// use open_metrics_client::encoding::text::Encode; +use open_metrics_client::metrics::counter::Counter; +use open_metrics_client::metrics::family::{Family, MetricConstructor}; +use open_metrics_client::metrics::gauge::Gauge; +use open_metrics_client::metrics::histogram::{linear_buckets, Histogram}; +use open_metrics_client::registry::Registry; + +use self::topic_metrics::{SlotChurnMetric, SlotMessageMetric, SlotMetricCounts, TopicMetrics}; + +/// A collection of metrics used throughout the Gossipsub behaviour. +pub struct InternalMetrics { + /* Auxiliary, defensive values */ + /// Keeps track of which mesh topics have been added to metrics or not. + added_mesh_topics: HashSet, + /// Maximum number of mesh topics instrumented. + max_measured_mesh_topics: usize, + /// Maximum number of non-mesh topics instrumented. + max_measured_non_mesh_topics: usize, + /// Keeps track of which non mesh topics have been added to metrics or not. + added_non_mesh_topics: HashSet, + + /* Mesh metrics */ + /// The current peers in each mesh. + mesh_peers: Family, + /// The scores for each peer in each mesh. + mesh_score: Family, + /// The average peer score for each mesh. + // TODO: we need the median not the avg. + mesh_avg_score: Family, + /// The total number of messages sent. + mesh_message_tx_total: Family, + + /// The total number of messages received (after duplicate filter). + mesh_message_rx_total: Family, + /// The total number of duplicate messages filtered per mesh. + // TODO: is this messages received? + mesh_duplicates_filtered: Family, + /// The number of messages received from non-mesh peers (after duplicate filter). + mesh_messages_from_non_mesh_peers: Family, + + // TODO: how do these add up wrt to the total? + /// The total number of messages validated per mesh. + mesh_messages_validated: Family, + /// The total number of messages rejected per mesh. + mesh_messages_rejected: Family, + /// The total number of messages ignored per mesh. + mesh_messages_ignored: Family, + + /// The number of first message deliveries per slot per mesh. + mesh_first_message_deliveries_per_slot: Family<(TopicHash, Slot), Gauge>, + /// The number of IWANT requests being sent per mesh topic. + // TODO: do these counters not get reset? + mesh_iwant_requests: Family, + /// Number of peers subscribed to each known topic. + topic_peers: Family, + /// Number of peers subscribed to each subscribed topic. + subscribed_topic_peers: Family, + /// The number of broken promises (this metric is indicative of nodes with invalid message-ids) + // TODO: why is this not per topic? because who cares? + broken_promises: Counter, + /// Keeps track of the number of messages we have received on topics we are not subscribed + /// to. + // TODO: why is this invalid? + invalid_topic_messages: Counter, + /// When the user validates a message, it tries to re propagate it to its mesh peers. If the + /// message expires from the memcache before it can be validated, we count this a cache miss + /// and it is an indicator that the memcache size should be increased. + memcache_misses: Counter, + /// Current metrics for all known mesh data. See [`TopicMetrics`] for further information. + topic_metrics: HashMap, +} + +/// This provides an upper bound to the number of mesh topics we create metrics for. It prevents +/// unbounded labels being created in the metrics. +const DEFAULT_MESH_TOPIC_LIMIT: usize = 300; +/// A separate limit is used to keep track of non-mesh topics. Mesh topics are controlled by the +/// user via subscriptions whereas non-mesh topics are determined by users on the network. +/// This limit permits a fixed amount of topics to allow, in-addition to the mesh topics. +const DEFAULT_NON_MESH_TOPIC_LIMIT: usize = 50; + +pub struct Config { + pub score_histogram_buckets: Vec, + /// This provides an upper bound to the number of mesh topics we create metrics for. It + /// prevents unbounded labels being created in the metrics. + pub max_measured_mesh_topics: usize, + /// Mesh topics are controlled by the user via subscriptions whereas non-mesh topics are + /// determined by users on the network. This limit permits a fixed amount of topics to allow, + /// in-addition to the mesh topics. + pub max_measured_non_mesh_topics: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + score_histogram_buckets: linear_buckets(-10_000.0, 100.0, 201).collect(), + max_measured_mesh_topics: DEFAULT_MESH_TOPIC_LIMIT, + max_measured_non_mesh_topics: DEFAULT_NON_MESH_TOPIC_LIMIT, + } + } +} + +#[derive(Clone)] +struct ScoreHistogramBuilder { + buckets: Vec, +} +impl MetricConstructor for ScoreHistogramBuilder { + fn new(&self) -> Histogram { + Histogram::new(self.buckets.clone().into_iter()) + } +} + +impl InternalMetrics { + /// Constructs and builds the internal metrics given a registry. + pub fn new(registry: &mut Registry, config: Config) -> Self { + let Config { + score_histogram_buckets, + max_measured_mesh_topics, + max_measured_non_mesh_topics, + } = config; + + /* Mesh Metrics */ + + let mesh_peers = Family::default(); + registry.register( + "mesh_peer_count", + "Number of peers in each mesh", + Box::new(mesh_peers.clone()), + ); + + let score_histogram_builder = ScoreHistogramBuilder { + buckets: score_histogram_buckets, + }; + let mesh_score = Family::new_with_constructor(score_histogram_builder); + registry.register( + "mesh_score", + "Score of all peers in each mesh", + Box::new(mesh_score.clone()), + ); + + let mesh_avg_score = Family::default(); + registry.register( + "mesh_avg_score", + "Average score of all peers in each mesh", + Box::new(mesh_avg_score.clone()), + ); + + let mesh_message_rx_total = Family::default(); + registry.register( + "mesh_message_rx_total", + "Total number of messages received from each mesh", + Box::new(mesh_message_rx_total.clone()), + ); + + let mesh_message_tx_total = Family::default(); + registry.register( + "mesh_message_tx_total", + "Total number of messages sent in each mesh", + Box::new(mesh_message_tx_total.clone()), + ); + + let mesh_messages_from_non_mesh_peers = Family::default(); + registry.register( + "messages_from_non_mesh_peers", + "Number of messages received from peers not in the mesh, for each mesh", + Box::new(mesh_messages_from_non_mesh_peers.clone()), + ); + + let mesh_duplicates_filtered = Family::default(); + registry.register( + "mesh_duplicates_filtered", + "Total number of duplicate messages filtered in each mesh", + Box::new(mesh_duplicates_filtered.clone()), + ); + + let mesh_messages_validated = Family::default(); + registry.register( + "mesh_messages_validated", + "Total number of messages that have been validated in each mesh", + Box::new(mesh_messages_validated.clone()), + ); + + let mesh_messages_rejected = Family::default(); + registry.register( + "mesh_messages_rejected", + "Total number of messages rejected in each mesh", + Box::new(mesh_messages_rejected.clone()), + ); + + let mesh_messages_ignored = Family::default(); + registry.register( + "mesh_messages_ignored", + "Total number of messages ignored in each mesh", + Box::new(mesh_messages_ignored.clone()), + ); + + let mesh_first_message_deliveries_per_slot = Family::default(); + registry.register( + "mesh_first_message_deliveries_per_slot", + "The number of first message deliveries per mesh slot", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let mesh_iwant_requests = Family::default(); + registry.register( + "mesh_iwant_requests", + "The number of IWANT requests per mesh", + Box::new(mesh_first_message_deliveries_per_slot.clone()), + ); + + let broken_promises = Counter::default(); + registry.register( + "broken_promises", + "Total number of broken promises per mesh", + Box::new(broken_promises.clone()), + ); + + /* Peer Metrics */ + + let topic_peers = Family::default(); + registry.register( + "topic_peer_count", + "Number of peers subscribed to each known topic", + Box::new(topic_peers.clone()), + ); + + let subscribed_topic_peers = Family::default(); + registry.register( + "subscribed_topic_peer_count", + "Number of peers subscribed to each subscribed topic", + Box::new(subscribed_topic_peers.clone()), + ); + + /* Router Metrics */ + + // Invalid Topic Messages + let invalid_topic_messages = Counter::default(); + registry.register( + "invalid_topic_messages", + "Number of times a message has been received on a non-subscribed topic", + Box::new(invalid_topic_messages.clone()), + ); + + let memcache_misses = Counter::default(); + registry.register( + "memcache_misses", + "Number of times a message has attempted to be forwarded but has already been removed from the memcache", + Box::new(memcache_misses.clone()), + ); + + InternalMetrics { + added_mesh_topics: HashSet::new(), + added_non_mesh_topics: HashSet::new(), + max_measured_mesh_topics, + max_measured_non_mesh_topics, + mesh_peers, + mesh_score, + mesh_avg_score, + mesh_message_rx_total, + mesh_message_tx_total, + mesh_messages_from_non_mesh_peers, + mesh_duplicates_filtered, + mesh_messages_validated, + mesh_messages_rejected, + mesh_messages_ignored, + mesh_first_message_deliveries_per_slot, + mesh_iwant_requests, + topic_peers, + subscribed_topic_peers, + broken_promises, + invalid_topic_messages, + memcache_misses, + topic_metrics: HashMap::new(), + } + } + + /// Access to the fine-grained metrics which provide information per-peer (slot) of the + /// specified mesh topic. + pub fn slot_metrics_for_topic( + &self, + topic: &TopicHash, + ) -> Option> { + Some(self.topic_metrics.get(topic)?.slot_metrics_iter()) + } + + /// Reports that an attempted message to forward was no longer in the memcache. + pub fn memcache_miss(&mut self) { + self.memcache_misses.inc(); + } + + /// Reports a broken promise. + pub fn broken_promise(&mut self) { + self.broken_promises.inc(); + } + + /// Reports that a message was published on the specified topic. + pub fn message_published(&mut self, topic_hash: &TopicHash) { + if self.allowed_mesh_topic(topic_hash) { + self.mesh_message_tx_total.get_or_create(topic_hash).inc(); + } + } + + /// Churns a slot in the topic_metrics. This assumes the peer is in the mesh. + pub fn churn_slot( + &mut self, + topic_hash: &TopicHash, + peer: &PeerId, + slot_churn: SlotChurnMetric, + ) { + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .churn_slot(peer, slot_churn) + { + self.reset_slot(topic_hash, slot); + } + } + + pub fn iwant_request(&mut self, topic_hash: &TopicHash) { + if self.allowed_mesh_topic(topic_hash) { + self.mesh_iwant_requests.get_or_create(topic_hash).inc(); + } + } + + pub fn message_invalid_topic(&mut self) { + self.invalid_topic_messages.inc(); + } + + pub fn peer_joined_topic(&mut self, topic_hash: &TopicHash) { + if self.allowed_non_mesh_topic(topic_hash) { + self.topic_peers.get_or_create(topic_hash).inc(); + } + } + + pub fn peer_joined_subscribed_topic(&mut self, topic_hash: &TopicHash) { + if self.allowed_mesh_topic(topic_hash) { + self.subscribed_topic_peers.get_or_create(topic_hash).inc(); + } + } + + pub fn peer_left_topic(&mut self, topic_hash: &TopicHash) { + if self.allowed_non_mesh_topic(topic_hash) { + let v = self.topic_peers.get_or_create(topic_hash).get(); + self.topic_peers + .get_or_create(topic_hash) + .set(v.saturating_sub(1)); + } + } + + pub fn peer_left_subscribed_topic(&mut self, topic_hash: &TopicHash) { + // We use the non_mesh version here, because if we are subscribed this topic should exist + // in the added_mesh_topics mappings + if self.allowed_non_mesh_topic(topic_hash) { + let v = self.subscribed_topic_peers.get_or_create(topic_hash).get(); + self.subscribed_topic_peers + .get_or_create(topic_hash) + .set(v.saturating_sub(1)); + } + } + + pub fn leave_topic(&mut self, topic_hash: &TopicHash) { + // Remove all the peers from all slots + if let Some(metrics) = self.topic_metrics.get_mut(topic_hash) { + let total_slots = metrics.churn_all_slots(SlotChurnMetric::ChurnLeave); + // Remove the slot metrics + if self.allowed_mesh_topic(topic_hash) { + for slot in 1..total_slots { + self.reset_slot(topic_hash, Slot { slot }); + } + } + } + + if self.allowed_non_mesh_topic(topic_hash) { + self.mesh_peers.get_or_create(topic_hash).set(0); + self.mesh_avg_score.get_or_create(topic_hash).set(0); + self.subscribed_topic_peers.get_or_create(topic_hash).set(0); + } + } + + fn reset_slot(&mut self, topic_hash: &TopicHash, slot: Slot) { + if self.allowed_mesh_topic(topic_hash) { + self.mesh_first_message_deliveries_per_slot + .get_or_create(&(topic_hash.clone(), slot)) + .set(0); + } + } + + /// Helpful for testing and validation + #[cfg(debug_assertions)] + pub fn topic_metrics(&self) -> &HashMap { + &self.topic_metrics + } + + /// Increment a MessageMetric in the topic_metrics for peer in topic. + pub fn increment_message_metric( + &mut self, + topic_hash: &TopicHash, + peer: &PeerId, + message_metric: SlotMessageMetric, + ) { + if let Ok(slot) = self + .topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .increment_message_metric(peer, &message_metric) + { + if self.allowed_mesh_topic(topic_hash) { + match message_metric { + SlotMessageMetric::MessagesAll => {} + SlotMessageMetric::MessagesDuplicates => { + self.mesh_duplicates_filtered + .get_or_create(topic_hash) + .inc(); + } + SlotMessageMetric::MessagesFirst => { + self.mesh_message_rx_total.get_or_create(topic_hash).inc(); + if slot.slot == 0 { + self.mesh_messages_from_non_mesh_peers + .get_or_create(topic_hash) + .inc(); + } else { + self.mesh_first_message_deliveries_per_slot + .get_or_create(&(topic_hash.clone(), slot)) + .inc(); + } + } + SlotMessageMetric::MessagesIgnored => { + self.mesh_messages_ignored.get_or_create(topic_hash).inc(); + } + SlotMessageMetric::MessagesRejected => { + self.mesh_messages_rejected.get_or_create(topic_hash).inc(); + } + SlotMessageMetric::MessagesValidated => { + self.mesh_messages_validated.get_or_create(topic_hash).inc(); + } + } + } + } + } + + /// Assign slots in topic to peers. + pub fn assign_slots_to_peers(&mut self, topic_hash: &TopicHash, peer_list: U) + where + U: Iterator, + { + self.topic_metrics + .entry(topic_hash.clone()) + .or_insert_with(|| TopicMetrics::new(topic_hash.clone())) + .assign_slots_to_peers(peer_list); + } + + /// Assigns a slot in topic to the peer if the peer doesn't already have one. + pub fn assign_slot_if_unassigned(&mut self, topic: &TopicHash, peer: &PeerId) { + self.topic_metrics + .entry(topic.clone()) + .or_insert_with(|| TopicMetrics::new(topic.clone())) + .assign_slot_if_unassigned(*peer); + } + + /// Limits the number of topics that can be created in each metric. If the topic hasn't already + /// been added to a metric and we have added less than TOPIC_LIMIT topics, this will return + /// true. + fn allowed_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { + // If we haven't reached the limit, record the topic. + if self.added_mesh_topics.len() < self.max_measured_mesh_topics { + self.added_mesh_topics.insert(topic_hash.clone()); + true + } else { + // We've reached the topic limit, the topic is allowed if we have seen it before, + // otherwise we reject it. + self.added_mesh_topics.contains(topic_hash) + | self.added_non_mesh_topics.contains(topic_hash) + } + } + + /// Limits the number of topics that can be created in each metric. If the topic hasn't already + /// been added to a metric and we have added less than TOPIC_LIMIT topics, this will return + /// true. + fn allowed_non_mesh_topic(&mut self, topic_hash: &TopicHash) -> bool { + // If we haven't reached the limit, record the topic. + if self.added_non_mesh_topics.len() < self.max_measured_non_mesh_topics + && !self.added_mesh_topics.contains(topic_hash) + { + self.added_non_mesh_topics.insert(topic_hash.clone()); + true + } else { + // We've reached the topic limit, the topic is allowed if we have seen it before, + // otherwise we reject it. + self.added_non_mesh_topics.contains(topic_hash) + } + } +} diff --git a/protocols/gossipsub/src/metrics/topic_metrics.rs b/protocols/gossipsub/src/metrics/topic_metrics.rs new file mode 100644 index 00000000000..67c5f9656df --- /dev/null +++ b/protocols/gossipsub/src/metrics/topic_metrics.rs @@ -0,0 +1,469 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::TopicHash; +use libp2p_core::PeerId; +use log::{debug, error, warn}; +use open_metrics_client::encoding::text::Encode; +use std::collections::{BTreeSet, HashMap}; +use std::ops::AddAssign; +use strum::IntoEnumIterator; +use strum_macros::{EnumIter, IntoStaticStr}; + +#[derive(Copy, Clone, Hash, Eq, PartialEq, Encode)] +pub struct Slot { + pub slot: u64, +} + +/// This struct stores all the metrics for a given mesh slot. +/// NOTE: all the `message_*` counters refer to messages received from peers assigned to +/// this mesh slot on the topic this slot is associated with. See [`TopicMetrics`] for more +/// information. +#[derive(Default, Clone)] +pub struct SlotMetricCounts { + /// The number of times this slot has been assigned to a peer + assign_sum: u32, + /// The total number of messages received + messages_all: u32, + /// The number of duplicate (already seen) messages + /// A large number across peers on this topic could indicate a large amplification on + /// the topic. Lowering the gossip_D parameter could help minimize duplicates. + messages_duplicates: u32, + /// The number of never before seen messages + messages_first: u32, + /// The number of messages that returned [`MessageAcceptance::Ignore`] from validation + messages_ignored: u32, + /// The number of messages that returned [`MessageAcceptance::Reject`] from validation + messages_rejected: u32, + /// The number of messages that returned [`MessageAcceptance::Accept`] from validation + messages_validated: u32, + /// The number of times this slot was churned due to us disconnecting from the peer + churn_disconnected: u32, + /// The number of times this slot was churned due to us removing excess mesh peers + churn_excess: u32, + /// The number of times this slot was churned due to us leaving the topic + churn_leave: u32, + /// The number of times this slot was churned because the peer sent us a PRUNE message + churn_prune: u32, + /// The number of times this slot was churned because the peer score was too low + /// A large number could indicate the network is being attacked or the peer scoring + /// parameters are too restrictive and need to be adjusted. + churn_score: u32, + /// The total number of times this slot was churned for any reason + churn_sum: u32, + /// The number of times this slot was churned due to the peer unsubscribing from the topic + churn_unsubscribed: u32, + /// The peer currently assigned to this slot (None if slot is vacant) + current_peer: Option>, +} + +#[derive(IntoStaticStr, EnumIter, Clone, Copy)] +/// This enumerates all the message metric counters for a slot +pub enum SlotMessageMetric { + /// Total messages received + MessagesAll, + /// Number of duplicate (already seen) messages + MessagesDuplicates, + /// Never before seen messages + MessagesFirst, + /// Messages that returned [`MessageAcceptance::Ignore`] from validation + MessagesIgnored, + /// Messages that returned [`MessageAcceptance::Reject`] from validation + MessagesRejected, + /// Messages that returned [`MessageAcceptance::Accept`] from validation + MessagesValidated, +} + +#[derive(IntoStaticStr, EnumIter, Clone, Copy)] +/// This enumerates all the churn metric counters for a slot +pub enum SlotChurnMetric { + /// Slot churned due to us disconnecting from the peer + ChurnDisconnected, + /// Slot churned due to us removing excess mesh peers + ChurnExcess, + /// Slot churned due to us leaving the topic + ChurnLeave, + /// Slot churned because the peer sent us a PRUNE message + ChurnPrune, + /// Slot churned because the peer score was too low + ChurnScore, + /// Slot churned due to the peer unsubscribing from the topic + ChurnUnsubscribed, +} + +/// This enumerates ALL metric counters for a slot +pub enum SlotMetric { + MessageMetric(SlotMessageMetric), + ChurnMetric(SlotChurnMetric), + /// Total times the slot was churned for any reason + ChurnSum, + /// Total times the slot was assigned + AssignSum, +} + +impl SlotMetric { + /// Make SlotMetric iterable over every metric type + pub fn iter() -> impl Iterator { + SlotMessageMetric::iter() + .map(|message_metric| SlotMetric::MessageMetric(message_metric)) + .chain( + SlotChurnMetric::iter().map(|churn_metric| SlotMetric::ChurnMetric(churn_metric)), + ) + .chain(std::iter::once(SlotMetric::ChurnSum)) + .chain(std::iter::once(SlotMetric::AssignSum)) + } +} + +impl SlotMetricCounts { + pub fn new() -> Self { + SlotMetricCounts::default() + } + + /// returns a message metric count + fn get_message_metric(&self, message_metric: SlotMessageMetric) -> u32 { + match message_metric { + SlotMessageMetric::MessagesAll => self.messages_all, + SlotMessageMetric::MessagesDuplicates => self.messages_duplicates, + SlotMessageMetric::MessagesFirst => self.messages_first, + SlotMessageMetric::MessagesIgnored => self.messages_ignored, + SlotMessageMetric::MessagesRejected => self.messages_rejected, + SlotMessageMetric::MessagesValidated => self.messages_validated, + } + } + + /// increments a message metric count + fn increment_message_metric(&mut self, message_metric: &SlotMessageMetric) { + match message_metric { + SlotMessageMetric::MessagesAll => self.messages_all.add_assign(1), + SlotMessageMetric::MessagesDuplicates => self.messages_duplicates.add_assign(1), + SlotMessageMetric::MessagesFirst => self.messages_first.add_assign(1), + SlotMessageMetric::MessagesIgnored => self.messages_ignored.add_assign(1), + SlotMessageMetric::MessagesRejected => self.messages_rejected.add_assign(1), + SlotMessageMetric::MessagesValidated => self.messages_validated.add_assign(1), + }; + } + + /// returns a churn metric count + fn get_churn_metric(&self, churn_reason: SlotChurnMetric) -> u32 { + match churn_reason { + SlotChurnMetric::ChurnDisconnected => self.churn_disconnected, + SlotChurnMetric::ChurnExcess => self.churn_excess, + SlotChurnMetric::ChurnLeave => self.churn_leave, + SlotChurnMetric::ChurnPrune => self.churn_prune, + SlotChurnMetric::ChurnScore => self.churn_score, + SlotChurnMetric::ChurnUnsubscribed => self.churn_unsubscribed, + } + } + + /// churns a slot, incrementing the proper churn metric and returning ChurnSum + pub fn churn_slot(&mut self, churn_reason: SlotChurnMetric) -> u32 { + self.current_peer = None; + self.churn_sum.add_assign(1); + match churn_reason { + SlotChurnMetric::ChurnDisconnected => self.churn_disconnected.add_assign(1), + SlotChurnMetric::ChurnExcess => self.churn_excess.add_assign(1), + SlotChurnMetric::ChurnLeave => self.churn_leave.add_assign(1), + SlotChurnMetric::ChurnPrune => self.churn_prune.add_assign(1), + SlotChurnMetric::ChurnScore => self.churn_score.add_assign(1), + SlotChurnMetric::ChurnUnsubscribed => self.churn_unsubscribed.add_assign(1), + }; + self.churn_sum + } + + /// returns the current peer associated with this slot + pub fn current_peer(&self) -> &Option> { + &self.current_peer + } + + /// assigns a peer to this slot, incrementing and returning AssignSum + pub fn assign_slot(&mut self, peer: PeerId) -> u32 { + self.current_peer = Some(Box::new(peer)); + self.assign_sum.add_assign(1); + self.assign_sum + } + + /// returns the slot metric count corresponding to slot_metric + pub fn get_slot_metric(&self, slot_metric: SlotMetric) -> u32 { + match slot_metric { + SlotMetric::MessageMetric(message_metric) => self.get_message_metric(message_metric), + SlotMetric::ChurnMetric(churn_reason) => self.get_churn_metric(churn_reason), + SlotMetric::ChurnSum => self.churn_sum, + SlotMetric::AssignSum => self.assign_sum, + } + } + + /// returns a vector of pairs of all slot metric names and their corresponding counts + pub fn with_names(&self) -> Vec<(&'static str, u32)> { + SlotMetric::iter() + .map(|t| match t { + SlotMetric::MessageMetric(message_metric) => ( + >::into(message_metric), + self.get_message_metric(message_metric), + ), + SlotMetric::ChurnMetric(churn_reason) => ( + >::into(churn_reason), + self.get_churn_metric(churn_reason), + ), + SlotMetric::ChurnSum => ("ChurnSum", self.churn_sum), + SlotMetric::AssignSum => ("AssignSum", self.assign_sum), + }) + .collect() + } +} + +pub type MeshSlot = usize; +/// This structure stores all metrics associated with a single topic. +/// This introduces the concept of a mesh slot. When a peer is added to the mesh for +/// this topic, it is assigned to a mesh slot. All the metrics relating to messages +/// received from that peer on this topic are then associated to that slot. See the +/// [`SlotMetricCounts`] struct for more information. When a peer exits the mesh, +/// the slot it occupies is 'churned' and becomes vacant. Vacant slots are later +/// re-assigned when a new peer enters the mesh for this topic. +pub struct TopicMetrics { + /// The topic this is associated with (useful for debugging) + topic: TopicHash, + /// Vector of SlotMetricCounts (indexed by MeshSlot) + slot_metrics: Vec, + /// Map of PeerId to MeshSlot + peer_slots: HashMap, + /// Set of Vacant MeshSlots (due to peers leaving the mesh) + vacant_slots: BTreeSet, + /// The number of messages requested via IWANT + /// A large value indicates the mesh isn't performing as optimally as we would + /// like and we have had to request extra messages via gossip + pub iwant_requests: u32, +} + +impl TopicMetrics { + pub fn new(topic: TopicHash) -> Self { + TopicMetrics { + topic, + // the first element in the vector is for peers that aren't in the mesh + slot_metrics: vec![SlotMetricCounts::new()], + peer_slots: HashMap::new(), + vacant_slots: BTreeSet::new(), + iwant_requests: 0, + } + } + + /// Increments the message metric for the specified peer + pub fn increment_message_metric( + &mut self, + peer: &PeerId, + message_metric: &SlotMessageMetric, + ) -> Result { + let slot = self + .peer_slots + .get(peer) + .map(|s| *s) + // peers that aren't in the mesh get slot 0 + .unwrap_or(0); + match self.slot_metrics.get_mut(slot) { + Some(metric_counts) => { + metric_counts.increment_message_metric(message_metric); + Ok(Slot { slot: slot as u64 }) + } + None => { + error!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [peer_slots contains peer with slot not existing in slot_metrics!]", + self.topic, + slot, + >::into(*message_metric), + peer, + ); + Err(()) + } + } + } + + /// Assigns a slot to the peer if the peer doesn't already have one. Note that the + /// lowest vacant slots are assigned first. If all slots are occupied, a new slot + /// will be allocated. + pub fn assign_slot_if_unassigned(&mut self, peer: PeerId) { + if let std::collections::hash_map::Entry::Vacant(entry) = self.peer_slots.entry(peer) { + match self.vacant_slots.iter().next() { + Some(slot_ref) => match self.slot_metrics.get_mut(*slot_ref) { + // vacant slot available, assign new peer to this slot + Some(metric_counts) => { + let slot = *slot_ref; + let assign_sum = metric_counts.assign_slot(peer); + self.vacant_slots.remove(&slot); + entry.insert(slot); + debug!( + "metrics_event[{}]: [slot {:02}] assigning vacant slot to peer {} SUCCESS AssignSum[{}]", + self.topic, slot, peer, assign_sum, + ); + }, + None => error!( + "metrics_event[{}]: [slot {:02}] assigning vacant slot to peer {} FAILURE [SlotMetricCounts doesn't exist in slot_metrics vector!]", + self.topic, slot_ref, peer + ), + }, + None => { + // No vacant slots available, allocate a new slot + let slot = self.slot_metrics.len(); + let mut metric_counts = SlotMetricCounts::new(); + let assign_sum = metric_counts.assign_slot(peer); + self.slot_metrics.push(metric_counts); + entry.insert(slot); + debug!( + "metrics_event[{}]: [slot {:02}] assigning new slot to peer {} SUCCESS AssignSum[{}]", + self.topic, slot, peer, assign_sum, + ); + } + }; + } + } + + /// Ensures all peers returned by the peer_iter have a slot assigned + pub fn assign_slots_to_peers(&mut self, peer_iter: U) + where + U: Iterator, + { + for peer in peer_iter { + self.assign_slot_if_unassigned(peer); + } + } + + /// Churns the slot occupied by peer. + pub fn churn_slot(&mut self, peer: &PeerId, churn_reason: SlotChurnMetric) -> Result { + match self.peer_slots.get(peer).cloned() { + Some(slot) => { + match self.slot_metrics.get_mut(slot) { + Some(metric_counts) => { + debug_assert!(!self.vacant_slots.contains(&slot), + "metrics_event[{}] [slot {:02}] increment {} peer {} FAILURE [vacant slots already contains this slot!]", + self.topic, slot, >::into(churn_reason), peer + ); + let churn_sum = metric_counts.churn_slot(churn_reason); + self.vacant_slots.insert(slot); + self.peer_slots.remove(peer); + debug!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} SUCCESS ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), peer, churn_sum, + ); + Ok(Slot { slot: slot as u64 }) + } + None => { + warn!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} FAILURE [retrieving metric_counts]", + self.topic, slot, >::into(churn_reason), peer + ); + Err(()) + } + } + } + None => { + warn!( + "metrics_event[{}]: [slot --] increment {} peer {} FAILURE [retrieving slot]", + self.topic, + >::into(churn_reason), + peer + ); + Err(()) + } + } + } + + /// Churns all slots in this topic that aren't already vacant (while incrementing + /// churn_reason). Also clears the peer_slots. This loop is faster than calling + /// churn_slot() for each peer in the topic because it minimizes redundant lookups + /// and only traverses a vector. + pub fn churn_all_slots(&mut self, churn_reason: SlotChurnMetric) -> u64 { + for slot in (1..self.slot_metrics.len()) + .filter(|s| !self.vacant_slots.contains(s)) + .collect::>() + { + if let Some(metric_counts) = self.slot_metrics.get_mut(slot) { + let previous = metric_counts.current_peer().as_ref().map(|p| **p); + let churn_sum = metric_counts.churn_slot(churn_reason); + self.vacant_slots.insert(slot); + match previous { + Some(peer) => debug!( + "metrics_event[{}]: [slot {:02}] increment {} peer {} SUCCESS ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), peer, churn_sum, + ), + None => warn!( + "metrics_event[{}]: [slot {:02}] increment {} WARNING [current_peer not assigned with non-vacant slot!] ChurnSum[{}]", + self.topic, slot, >::into(churn_reason), churn_sum, + ), + }; + } + } + self.peer_slots.clear(); + self.slot_metrics.len() as u64 + } + + /// This function verifies that the TopicMetrics is synchronized perfectly with the mesh. + /// It's useful for debugging. + #[cfg(debug_assertions)] + pub fn validate_mesh_slots(&self, mesh: &BTreeSet) -> Result<(), String> { + let mut result = true; + let mut errors = String::new(); + // No peers are in the peer_slots that aren't in the mesh + for (peer, slot) in self + .peer_slots + .iter() + .filter(|(peer, ..)| !mesh.contains(peer)) + { + result = false; + let message = format!( + "metrics_event[{}]: [slot {:02}] peer {} exists in peer_slots but not in the mesh!\n", + self.topic, slot, peer + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + // No peers are in the mesh that aren't in the peer_slots + for peer in mesh + .iter() + .filter(|peer| !self.peer_slots.contains_key(peer)) + { + result = false; + let message = format!( + "metrics_event[{}]: [slot --] peer {} exists in mesh but not in the peer_slots!\n", + self.topic, peer + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + + // vacant_slots.len() + peer_slots.len() == slot_metrics.len() + 1 + if self.vacant_slots.len() + self.peer_slots.len() + 1 != self.slot_metrics.len() { + result = false; + let message = format!( + "metrics_event[{}] vacant_slots.len()[{}] + peer_slots.len()[{}] + 1 != slot_metrics.len()[{}]", + self.topic, self.vacant_slots.len(), self.peer_slots.len(), self.slot_metrics.len(), + ); + errors.push_str(message.as_str()); + error!("{}", message); + } + + if result { + Ok(()) + } else { + Err(errors) + } + } + + pub fn slot_metrics_iter(&self) -> impl Iterator { + self.slot_metrics.iter() + } +} diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 7e8afca2d9e..f737056f696 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -20,6 +20,7 @@ use crate::rpc_proto; use base64::encode; +use open_metrics_client::encoding::text::Encode; use prost::Message; use sha2::{Digest, Sha256}; use std::fmt; @@ -60,7 +61,7 @@ impl Hasher for Sha256Hash { } } -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Encode)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String,