diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 979617bdb29..1b3053eaae7 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -12,6 +12,10 @@ - Update to `libp2p-kad` `v0.39.0`. +- Track number of connected nodes supporting a specific protocol via the identify protocol. See [PR 2734]. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.7.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index 27dcbc08dc8..b90e784f9b7 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -77,10 +77,9 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_dcutr::behaviour::Event) { - self.dcutr - .events + self.events .get_or_create(&EventLabels { event: event.into(), }) diff --git a/misc/metrics/src/gossipsub.rs b/misc/metrics/src/gossipsub.rs index 0bb6af5f452..a82c1a72a24 100644 --- a/misc/metrics/src/gossipsub.rs +++ b/misc/metrics/src/gossipsub.rs @@ -40,10 +40,10 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { if let libp2p_gossipsub::GossipsubEvent::Message { .. } = event { - self.gossipsub.messages.inc(); + self.messages.inc(); } } } diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index 7431eda5d25..b3ae5a75910 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -18,12 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use libp2p_core::PeerId; +use prometheus_client::encoding::text::{EncodeMetric, Encoder}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::histogram::{exponential_buckets, Histogram}; +use prometheus_client::metrics::MetricType; use prometheus_client::registry::Registry; +use std::collections::HashMap; use std::iter; +use std::sync::{Arc, Mutex}; pub struct Metrics { + protocols: Protocols, error: Counter, pushed: Counter, received: Counter, @@ -36,6 +42,15 @@ impl Metrics { pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("identify"); + let protocols = Protocols::default(); + sub_registry.register( + "protocols", + "Number of connected nodes supporting a specific protocol, with \ + \"unrecognized\" for each peer supporting one or more unrecognized \ + protocols", + Box::new(protocols.clone()), + ); + let error = Counter::default(); sub_registry.register( "errors", @@ -86,6 +101,7 @@ impl Metrics { ); Self { + protocols, error, pushed, received, @@ -96,27 +112,136 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_identify::IdentifyEvent) { match event { libp2p_identify::IdentifyEvent::Error { .. } => { - self.identify.error.inc(); + self.error.inc(); } libp2p_identify::IdentifyEvent::Pushed { .. } => { - self.identify.pushed.inc(); + self.pushed.inc(); } - libp2p_identify::IdentifyEvent::Received { info, .. } => { - self.identify.received.inc(); - self.identify - .received_info_protocols + libp2p_identify::IdentifyEvent::Received { peer_id, info, .. } => { + { + let mut protocols: Vec = info + .protocols + .iter() + .filter(|p| { + let allowed_protocols: &[&[u8]] = &[ + #[cfg(feature = "dcutr")] + libp2p_dcutr::PROTOCOL_NAME, + // #[cfg(feature = "gossipsub")] + // #[cfg(not(target_os = "unknown"))] + // TODO: Add Gossipsub protocol name + libp2p_identify::PROTOCOL_NAME, + libp2p_identify::PUSH_PROTOCOL_NAME, + #[cfg(feature = "kad")] + libp2p_kad::protocol::DEFAULT_PROTO_NAME, + #[cfg(feature = "ping")] + libp2p_ping::PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::v2::STOP_PROTOCOL_NAME, + #[cfg(feature = "relay")] + libp2p_relay::v2::HOP_PROTOCOL_NAME, + ]; + + allowed_protocols.contains(&p.as_bytes()) + }) + .cloned() + .collect(); + + // Signal via an additional label value that one or more + // protocols of the remote peer have not been recognized. + if protocols.len() < info.protocols.len() { + protocols.push("unrecognized".to_string()); + } + + protocols.sort_unstable(); + protocols.dedup(); + + self.protocols.add(*peer_id, protocols); + } + + self.received.inc(); + self.received_info_protocols .observe(info.protocols.len() as f64); - self.identify - .received_info_listen_addrs + self.received_info_listen_addrs .observe(info.listen_addrs.len() as f64); } libp2p_identify::IdentifyEvent::Sent { .. } => { - self.identify.sent.inc(); + self.sent.inc(); } } } } + +impl super::Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { + if let libp2p_swarm::SwarmEvent::ConnectionClosed { + peer_id, + num_established, + .. + } = event + { + if *num_established == 0 { + self.protocols.remove(*peer_id) + } + } + } +} + +#[derive(Default, Clone)] +struct Protocols { + peers: Arc>>>, +} + +impl Protocols { + fn add(&self, peer: PeerId, protocols: Vec) { + self.peers + .lock() + .expect("Lock not to be poisoned") + .insert(peer, protocols); + } + + fn remove(&self, peer: PeerId) { + self.peers + .lock() + .expect("Lock not to be poisoned") + .remove(&peer); + } +} + +impl EncodeMetric for Protocols { + fn encode(&self, mut encoder: Encoder) -> Result<(), std::io::Error> { + let count_by_protocol = self + .peers + .lock() + .expect("Lock not to be poisoned") + .iter() + .fold( + HashMap::::default(), + |mut acc, (_, protocols)| { + for protocol in protocols { + let count = acc.entry(protocol.to_string()).or_default(); + *count = *count + 1; + } + acc + }, + ); + + for (protocol, count) in count_by_protocol { + encoder + .with_label_set(&("protocol", protocol)) + .no_suffix()? + .no_bucket()? + .encode_value(count)? + .no_exemplar()?; + } + + Ok(()) + } + + fn metric_type(&self) -> MetricType { + MetricType::Gauge + } +} diff --git a/misc/metrics/src/kad.rs b/misc/metrics/src/kad.rs index 8ab71befe91..5e5a1056060 100644 --- a/misc/metrics/src/kad.rs +++ b/misc/metrics/src/kad.rs @@ -159,25 +159,21 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_kad::KademliaEvent) { match event { libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => { - self.kad - .query_result_num_requests + self.query_result_num_requests .get_or_create(&result.into()) .observe(stats.num_requests().into()); - self.kad - .query_result_num_success + self.query_result_num_success .get_or_create(&result.into()) .observe(stats.num_successes().into()); - self.kad - .query_result_num_failure + self.query_result_num_failure .get_or_create(&result.into()) .observe(stats.num_failures().into()); if let Some(duration) = stats.duration() { - self.kad - .query_result_duration + self.query_result_duration .get_or_create(&result.into()) .observe(duration.as_secs_f64()); } @@ -185,36 +181,30 @@ impl super::Recorder for super::Metrics { match result { libp2p_kad::QueryResult::GetRecord(result) => match result { Ok(ok) => self - .kad .query_result_get_record_ok .observe(ok.records.len() as f64), Err(error) => { - self.kad - .query_result_get_record_error + self.query_result_get_record_error .get_or_create(&error.into()) .inc(); } }, libp2p_kad::QueryResult::GetClosestPeers(result) => match result { Ok(ok) => self - .kad .query_result_get_closest_peers_ok .observe(ok.peers.len() as f64), Err(error) => { - self.kad - .query_result_get_closest_peers_error + self.query_result_get_closest_peers_error .get_or_create(&error.into()) .inc(); } }, libp2p_kad::QueryResult::GetProviders(result) => match result { Ok(ok) => self - .kad .query_result_get_providers_ok .observe(ok.providers.len() as f64), Err(error) => { - self.kad - .query_result_get_providers_error + self.query_result_get_providers_error .get_or_create(&error.into()) .inc(); } @@ -230,16 +220,14 @@ impl super::Recorder for super::Metrics { } => { let bucket = low.ilog2().unwrap_or(0); if *is_new_peer { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Added, bucket, }) .inc(); } else { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Updated, bucket, @@ -248,8 +236,7 @@ impl super::Recorder for super::Metrics { } if old_peer.is_some() { - self.kad - .routing_updated + self.routing_updated .get_or_create(&RoutingUpdated { action: RoutingAction::Evicted, bucket, @@ -259,10 +246,7 @@ impl super::Recorder for super::Metrics { } libp2p_kad::KademliaEvent::InboundRequest { request } => { - self.kad - .inbound_requests - .get_or_create(&request.into()) - .inc(); + self.inbound_requests.get_or_create(&request.into()).inc(); } _ => {} } diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 634d13590df..d9fa3c40ffe 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -95,3 +95,55 @@ pub trait Recorder { /// Record the given event. fn record(&self, event: &Event); } + +#[cfg(feature = "dcutr")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_dcutr::behaviour::Event) { + self.dcutr.record(event) + } +} + +#[cfg(feature = "gossipsub")] +#[cfg(not(target_os = "unknown"))] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_gossipsub::GossipsubEvent) { + self.gossipsub.record(event) + } +} + +#[cfg(feature = "identify")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_identify::IdentifyEvent) { + self.identify.record(event) + } +} + +#[cfg(feature = "kad")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_kad::KademliaEvent) { + self.kad.record(event) + } +} + +#[cfg(feature = "ping")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_ping::PingEvent) { + self.ping.record(event) + } +} + +#[cfg(feature = "relay")] +impl Recorder for Metrics { + fn record(&self, event: &libp2p_relay::v2::relay::Event) { + self.relay.record(event) + } +} + +impl Recorder> for Metrics { + fn record(&self, event: &libp2p_swarm::SwarmEvent) { + self.swarm.record(event); + + #[cfg(feature = "identify")] + self.identify.record(event) + } +} diff --git a/misc/metrics/src/ping.rs b/misc/metrics/src/ping.rs index 76d50b54d17..b7c3ef60f9b 100644 --- a/misc/metrics/src/ping.rs +++ b/misc/metrics/src/ping.rs @@ -92,17 +92,17 @@ impl Metrics { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_ping::PingEvent) { match &event.result { Ok(libp2p_ping::PingSuccess::Pong) => { - self.ping.pong_received.inc(); + self.pong_received.inc(); } Ok(libp2p_ping::PingSuccess::Ping { rtt }) => { - self.ping.rtt.observe(rtt.as_secs_f64()); + self.rtt.observe(rtt.as_secs_f64()); } Err(failure) => { - self.ping.failure.get_or_create(&failure.into()).inc(); + self.failure.get_or_create(&failure.into()).inc(); } } } diff --git a/misc/metrics/src/relay.rs b/misc/metrics/src/relay.rs index 479dcaab724..9267a975b08 100644 --- a/misc/metrics/src/relay.rs +++ b/misc/metrics/src/relay.rs @@ -102,10 +102,9 @@ impl From<&libp2p_relay::v2::relay::Event> for EventType { } } -impl super::Recorder for super::Metrics { +impl super::Recorder for Metrics { fn record(&self, event: &libp2p_relay::v2::relay::Event) { - self.relay - .events + self.events .get_or_create(&EventLabels { event: event.into(), }) diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index d0fb0c664f2..e9c5a0493ce 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -138,34 +138,29 @@ impl Metrics { } } -impl super::Recorder> - for super::Metrics -{ +impl super::Recorder> for Metrics { fn record(&self, event: &libp2p_swarm::SwarmEvent) { match event { libp2p_swarm::SwarmEvent::Behaviour(_) => {} libp2p_swarm::SwarmEvent::ConnectionEstablished { endpoint, .. } => { - self.swarm - .connections_established + self.connections_established .get_or_create(&ConnectionEstablishedLabels { role: endpoint.into(), }) .inc(); } libp2p_swarm::SwarmEvent::ConnectionClosed { endpoint, .. } => { - self.swarm - .connections_closed + self.connections_closed .get_or_create(&ConnectionClosedLabels { role: endpoint.into(), }) .inc(); } libp2p_swarm::SwarmEvent::IncomingConnection { .. } => { - self.swarm.connections_incoming.inc(); + self.connections_incoming.inc(); } libp2p_swarm::SwarmEvent::IncomingConnectionError { error, .. } => { - self.swarm - .connections_incoming_error + self.connections_incoming_error .get_or_create(&IncomingConnectionErrorLabels { error: error.into(), }) @@ -178,8 +173,7 @@ impl super::Recorder super::Recorder { - self.swarm.connected_to_banned_peer.inc(); + self.connected_to_banned_peer.inc(); } libp2p_swarm::SwarmEvent::NewListenAddr { .. } => { - self.swarm.new_listen_addr.inc(); + self.new_listen_addr.inc(); } libp2p_swarm::SwarmEvent::ExpiredListenAddr { .. } => { - self.swarm.expired_listen_addr.inc(); + self.expired_listen_addr.inc(); } libp2p_swarm::SwarmEvent::ListenerClosed { .. } => { - self.swarm.listener_closed.inc(); + self.listener_closed.inc(); } libp2p_swarm::SwarmEvent::ListenerError { .. } => { - self.swarm.listener_error.inc(); + self.listener_error.inc(); } libp2p_swarm::SwarmEvent::Dialing(_) => { - self.swarm.dial_attempt.inc(); + self.dial_attempt.inc(); } } } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 7d144a20772..0416de5e9cb 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME`. See [PR 2734]. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.4.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 20ca846d99b..c55f22427f8 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -27,6 +27,7 @@ mod protocol; pub use protocol::{ inbound::UpgradeError as InboundUpgradeError, outbound::UpgradeError as OutboundUpgradeError, + PROTOCOL_NAME, }; mod message_proto { diff --git a/protocols/dcutr/src/protocol.rs b/protocols/dcutr/src/protocol.rs index d2b8b39a6d0..67f9af69f70 100644 --- a/protocols/dcutr/src/protocol.rs +++ b/protocols/dcutr/src/protocol.rs @@ -21,6 +21,6 @@ pub mod inbound; pub mod outbound; -const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; +pub const PROTOCOL_NAME: &[u8; 13] = b"/libp2p/dcutr"; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 60c77cd032a..499af6e3a13 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME` and `PUSH_PROTOCOL_NAME`. See [PR 2734]. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/identify/src/lib.rs b/protocols/identify/src/lib.rs index f5de8f7a6ac..17925fb6eed 100644 --- a/protocols/identify/src/lib.rs +++ b/protocols/identify/src/lib.rs @@ -45,7 +45,7 @@ //! [`IdentifyInfo`]: self::IdentifyInfo pub use self::identify::{Identify, IdentifyConfig, IdentifyEvent}; -pub use self::protocol::{IdentifyInfo, UpgradeError}; +pub use self::protocol::{IdentifyInfo, UpgradeError, PROTOCOL_NAME, PUSH_PROTOCOL_NAME}; mod handler; mod identify; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 735fbcb342b..163ac0aa396 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -34,6 +34,10 @@ use void::Void; const MAX_MESSAGE_SIZE_BYTES: usize = 4096; +pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0"; + +pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; + /// Substream upgrade protocol for `/ipfs/id/1.0.0`. #[derive(Debug, Clone)] pub struct IdentifyProtocol; @@ -104,7 +108,7 @@ impl UpgradeInfo for IdentifyProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/id/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -136,7 +140,7 @@ impl UpgradeInfo for IdentifyPushProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/id/push/1.0.0") + iter::once(PUSH_PROTOCOL_NAME) } } diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index a31b17d02f5..af9bb0a9690 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `PROTOCOL_NAME`. See [PR 2734]. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 81133b86d74..2a01025ee6d 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -57,8 +57,8 @@ use std::{ note = "Use re-exports that omit `Ping` prefix, i.e. `libp2p::ping::Config` etc" )] pub use self::{ - Config as PingConfig, Event as PingEvent, Failure as PingFailure, Result as PingResult, - Success as PingSuccess, + protocol::PROTOCOL_NAME, Config as PingConfig, Event as PingEvent, Failure as PingFailure, + Result as PingResult, Success as PingSuccess, }; #[deprecated(since = "0.30.0", note = "Use libp2p::ping::Behaviour instead.")] pub use Behaviour as Ping; diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 499c5ad4a0f..659040e2d7f 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -26,6 +26,8 @@ use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; use void::Void; +pub const PROTOCOL_NAME: &[u8; 16] = b"/ipfs/ping/1.0.0"; + /// The `Ping` protocol upgrade. /// /// The ping protocol sends 32 bytes of random data in configurable @@ -55,7 +57,7 @@ impl UpgradeInfo for Ping { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/ipfs/ping/1.0.0") + iter::once(PROTOCOL_NAME) } } diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index cd615778196..f03817080ea 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Expose `HOP_PROTOCOL_NAME` and `STOP_PROTOCOL_NAME`. See [PR 2734]. + +[PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ + # 0.10.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/relay/src/v2.rs b/protocols/relay/src/v2.rs index 7219ab3d69c..c610c1a3b2c 100644 --- a/protocols/relay/src/v2.rs +++ b/protocols/relay/src/v2.rs @@ -34,7 +34,8 @@ pub use protocol::{ inbound_hop::FatalUpgradeError as InboundHopFatalUpgradeError, inbound_stop::FatalUpgradeError as InboundStopFatalUpgradeError, outbound_hop::FatalUpgradeError as OutboundHopFatalUpgradeError, - outbound_stop::FatalUpgradeError as OutboundStopFatalUpgradeError, + outbound_stop::FatalUpgradeError as OutboundStopFatalUpgradeError, HOP_PROTOCOL_NAME, + STOP_PROTOCOL_NAME, }; /// The ID of an outgoing / incoming, relay / destination request. diff --git a/protocols/relay/src/v2/protocol.rs b/protocols/relay/src/v2/protocol.rs index ab2dc487b6f..27f69994957 100644 --- a/protocols/relay/src/v2/protocol.rs +++ b/protocols/relay/src/v2/protocol.rs @@ -26,8 +26,8 @@ pub mod inbound_stop; pub mod outbound_hop; pub mod outbound_stop; -const HOP_PROTOCOL_NAME: &[u8; 31] = b"/libp2p/circuit/relay/0.2.0/hop"; -const STOP_PROTOCOL_NAME: &[u8; 32] = b"/libp2p/circuit/relay/0.2.0/stop"; +pub const HOP_PROTOCOL_NAME: &[u8; 31] = b"/libp2p/circuit/relay/0.2.0/hop"; +pub const STOP_PROTOCOL_NAME: &[u8; 32] = b"/libp2p/circuit/relay/0.2.0/stop"; const MAX_MESSAGE_SIZE: usize = 4096;