Skip to content

Commit

Permalink
misc/metrics: Track # connected nodes supporting specific protocol (#…
Browse files Browse the repository at this point in the history
…2734)

* misc/metrics: Explicitly delegate event recording to each recorder

This allows delegating a single event to multiple `Recorder`s. That enables e.g. the
`identify::Metrics` `Recorder` to act both on `IdentifyEvent` and `SwarmEvent`. The latter enables
it to garbage collect per peer data on disconnects.

* protocols/dcutr: Expose PROTOCOL_NAME

* protocols/identify: Expose PROTOCOL_NAME and PUSH_PROTOCOL_NAME

* protocols/ping: Expose PROTOCOL_NAME

* protocols/relay: Expose HOP_PROTOCOL_NAME and STOP_PROTOCOL_NAME

* misc/metrics: Track # connected nodes supporting specific protocol

An example metric exposed with this patch:

```
libp2p_identify_protocols{protocol="/ipfs/ping/1.0.0"} 10
```

This implies that 10 of the currently connected nodes support the ping protocol.
  • Loading branch information
mxinden committed Jul 15, 2022
1 parent 7c8a977 commit d4f8ec2
Show file tree
Hide file tree
Showing 21 changed files with 259 additions and 78 deletions.
4 changes: 4 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Update to `libp2p-kad` `v0.39.0`.

- Track number of connected nodes supporting a specific protocol via the identify protocol. See [PR 2734].

[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/

# 0.7.0

- Update to `libp2p-core` `v0.34.0`.
Expand Down
5 changes: 2 additions & 3 deletions misc/metrics/src/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType {
}
}

impl super::Recorder<libp2p_dcutr::behaviour::Event> for super::Metrics {
impl super::Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr
.events
self.events
.get_or_create(&EventLabels {
event: event.into(),
})
Expand Down
4 changes: 2 additions & 2 deletions misc/metrics/src/gossipsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for super::Metrics {
impl super::Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event {
self.gossipsub.messages.inc();
self.messages.inc();
}
}
}
145 changes: 135 additions & 10 deletions misc/metrics/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use libp2p_core::PeerId;
use prometheus_client::encoding::text::{EncodeMetric, Encoder};
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::MetricType;
use prometheus_client::registry::Registry;
use std::collections::HashMap;
use std::iter;
use std::sync::{Arc, Mutex};

pub struct Metrics {
protocols: Protocols,
error: Counter,
pushed: Counter,
received: Counter,
Expand All @@ -36,6 +42,15 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("identify");

let protocols = Protocols::default();
sub_registry.register(
"protocols",
"Number of connected nodes supporting a specific protocol, with \
\"unrecognized\" for each peer supporting one or more unrecognized \
protocols",
Box::new(protocols.clone()),
);

let error = Counter::default();
sub_registry.register(
"errors",
Expand Down Expand Up @@ -86,6 +101,7 @@ impl Metrics {
);

Self {
protocols,
error,
pushed,
received,
Expand All @@ -96,27 +112,136 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_identify::IdentifyEvent> for super::Metrics {
impl super::Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
match event {
libp2p_identify::IdentifyEvent::Error { .. } => {
self.identify.error.inc();
self.error.inc();
}
libp2p_identify::IdentifyEvent::Pushed { .. } => {
self.identify.pushed.inc();
self.pushed.inc();
}
libp2p_identify::IdentifyEvent::Received { info, .. } => {
self.identify.received.inc();
self.identify
.received_info_protocols
libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => {
{
let mut protocols: Vec<String> = info
.protocols
.iter()
.filter(|p| {
let allowed_protocols: &[&[u8]] = &[
#[cfg(feature = "dcutr")]
libp2p_dcutr::PROTOCOL_NAME,
// #[cfg(feature = "gossipsub")]
// #[cfg(not(target_os = "unknown"))]
// TODO: Add Gossipsub protocol name
libp2p_identify::PROTOCOL_NAME,
libp2p_identify::PUSH_PROTOCOL_NAME,
#[cfg(feature = "kad")]
libp2p_kad::protocol::DEFAULT_PROTO_NAME,
#[cfg(feature = "ping")]
libp2p_ping::PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::STOP_PROTOCOL_NAME,
#[cfg(feature = "relay")]
libp2p_relay::v2::HOP_PROTOCOL_NAME,
];

allowed_protocols.contains(&p.as_bytes())
})
.cloned()
.collect();

// Signal via an additional label value that one or more
// protocols of the remote peer have not been recognized.
if protocols.len() < info.protocols.len() {
protocols.push("unrecognized".to_string());
}

protocols.sort_unstable();
protocols.dedup();

self.protocols.add(*peer_id, protocols);
}

self.received.inc();
self.received_info_protocols
.observe(info.protocols.len() as f64);
self.identify
.received_info_listen_addrs
self.received_info_listen_addrs
.observe(info.listen_addrs.len() as f64);
}
libp2p_identify::IdentifyEvent::Sent { .. } => {
self.identify.sent.inc();
self.sent.inc();
}
}
}
}

impl<TBvEv, THandleErr> super::Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
if let libp2p_swarm::SwarmEvent::ConnectionClosed {
peer_id,
num_established,
..
} = event
{
if *num_established == 0 {
self.protocols.remove(*peer_id)
}
}
}
}

#[derive(Default, Clone)]
struct Protocols {
peers: Arc<Mutex<HashMap<PeerId, Vec<String>>>>,
}

impl Protocols {
fn add(&self, peer: PeerId, protocols: Vec<String>) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.insert(peer, protocols);
}

fn remove(&self, peer: PeerId) {
self.peers
.lock()
.expect("Lock not to be poisoned")
.remove(&peer);
}
}

impl EncodeMetric for Protocols {
fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> {
let count_by_protocol = self
.peers
.lock()
.expect("Lock not to be poisoned")
.iter()
.fold(
HashMap::<String, u64>::default(),
|mut acc, (_, protocols)| {
for protocol in protocols {
let count = acc.entry(protocol.to_string()).or_default();
*count = *count + 1;
}
acc
},
);

for (protocol, count) in count_by_protocol {
encoder
.with_label_set(&("protocol", protocol))
.no_suffix()?
.no_bucket()?
.encode_value(count)?
.no_exemplar()?;
}

Ok(())
}

fn metric_type(&self) -> MetricType {
MetricType::Gauge
}
}
40 changes: 12 additions & 28 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,62 +159,52 @@ impl Metrics {
}
}

impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
self.kad
.query_result_num_requests
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
self.kad
.query_result_num_success
self.query_result_num_success
.get_or_create(&result.into())
.observe(stats.num_successes().into());
self.kad
.query_result_num_failure
self.query_result_num_failure
.get_or_create(&result.into())
.observe(stats.num_failures().into());
if let Some(duration) = stats.duration() {
self.kad
.query_result_duration
self.query_result_duration
.get_or_create(&result.into())
.observe(duration.as_secs_f64());
}

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.kad
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Err(error) => {
self.kad
.query_result_get_record_error
self.query_result_get_record_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetClosestPeers(result) => match result {
Ok(ok) => self
.kad
.query_result_get_closest_peers_ok
.observe(ok.peers.len() as f64),
Err(error) => {
self.kad
.query_result_get_closest_peers_error
self.query_result_get_closest_peers_error
.get_or_create(&error.into())
.inc();
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.kad
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Err(error) => {
self.kad
.query_result_get_providers_error
self.query_result_get_providers_error
.get_or_create(&error.into())
.inc();
}
Expand All @@ -230,16 +220,14 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
} => {
let bucket = low.ilog2().unwrap_or(0);
if *is_new_peer {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Added,
bucket,
})
.inc();
} else {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Updated,
bucket,
Expand All @@ -248,8 +236,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

if old_peer.is_some() {
self.kad
.routing_updated
self.routing_updated
.get_or_create(&RoutingUpdated {
action: RoutingAction::Evicted,
bucket,
Expand All @@ -259,10 +246,7 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for super::Metrics {
}

libp2p_kad::KademliaEvent::InboundRequest { request } => {
self.kad
.inbound_requests
.get_or_create(&request.into())
.inc();
self.inbound_requests.get_or_create(&request.into()).inc();
}
_ => {}
}
Expand Down
52 changes: 52 additions & 0 deletions misc/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,55 @@ pub trait Recorder<Event> {
/// Record the given event.
fn record(&self, event: &Event);
}

#[cfg(feature = "dcutr")]
impl Recorder<libp2p_dcutr::behaviour::Event> for Metrics {
fn record(&self, event: &libp2p_dcutr::behaviour::Event) {
self.dcutr.record(event)
}
}

#[cfg(feature = "gossipsub")]
#[cfg(not(target_os = "unknown"))]
impl Recorder<libp2p_gossipsub::GossipsubEvent> for Metrics {
fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) {
self.gossipsub.record(event)
}
}

#[cfg(feature = "identify")]
impl Recorder<libp2p_identify::IdentifyEvent> for Metrics {
fn record(&self, event: &libp2p_identify::IdentifyEvent) {
self.identify.record(event)
}
}

#[cfg(feature = "kad")]
impl Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
self.kad.record(event)
}
}

#[cfg(feature = "ping")]
impl Recorder<libp2p_ping::PingEvent> for Metrics {
fn record(&self, event: &libp2p_ping::PingEvent) {
self.ping.record(event)
}
}

#[cfg(feature = "relay")]
impl Recorder<libp2p_relay::v2::relay::Event> for Metrics {
fn record(&self, event: &libp2p_relay::v2::relay::Event) {
self.relay.record(event)
}
}

impl<TBvEv, THandleErr> Recorder<libp2p_swarm::SwarmEvent<TBvEv, THandleErr>> for Metrics {
fn record(&self, event: &libp2p_swarm::SwarmEvent<TBvEv, THandleErr>) {
self.swarm.record(event);

#[cfg(feature = "identify")]
self.identify.record(event)
}
}

0 comments on commit d4f8ec2

Please sign in to comment.