Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add lots of networking metrics for Prometheus (#5126)
Browse files Browse the repository at this point in the history
* Add some metrics

* Address concerns
  • Loading branch information
gavofyork committed Mar 5, 2020
1 parent adacbd1 commit e6cc799
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 23 deletions.
10 changes: 10 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub enum BehaviourOut<B: BlockT> {
BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Started a random Kademlia discovery query.
RandomKademliaStarted,
Event(Event),
}

Expand Down Expand Up @@ -96,6 +98,11 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self.discovery.add_known_address(peer_id, addr)
}

/// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize {
self.discovery.num_kbuckets_entries()
}

/// Borrows `self` and returns a struct giving access to the information about a node.
///
/// Returns `None` if we don't know anything about this node. Always returns `Some` for nodes
Expand Down Expand Up @@ -216,6 +223,9 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
DiscoveryOut::ValuePutFailed(key) => {
self.events.push(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
}
DiscoveryOut::RandomKademliaStarted => {
self.events.push(BehaviourOut::RandomKademliaStarted);
}
}
}
}
Expand Down
20 changes: 18 additions & 2 deletions client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ impl DiscoveryBehaviour {
pub fn put_value(&mut self, key: record::Key, value: Vec<u8>) {
self.kademlia.put_record(Record::new(key, value), Quorum::All);
}

/// Returns the number of nodes that are in the Kademlia k-buckets.
pub fn num_kbuckets_entries(&mut self) -> usize {
self.kademlia.kbuckets_entries().count()
}
}

/// Event generated by the `DiscoveryBehaviour`.
Expand Down Expand Up @@ -203,6 +208,9 @@ pub enum DiscoveryOut {

/// Inserting a value into the DHT failed.
ValuePutFailed(record::Key),

/// Started a random Kademlia query.
RandomKademliaStarted,
}

impl NetworkBehaviour for DiscoveryBehaviour {
Expand Down Expand Up @@ -330,25 +338,33 @@ impl NetworkBehaviour for DiscoveryBehaviour {

// Poll the stream that fires when we need to start a random Kademlia query.
while let Poll::Ready(_) = self.next_kad_random_query.poll_unpin(cx) {
if self.num_connections < self.discovery_only_if_under_num {
let actually_started = if self.num_connections < self.discovery_only_if_under_num {
let random_peer_id = PeerId::random();
debug!(target: "sub-libp2p", "Libp2p <= Starting random Kademlia request for \
{:?}", random_peer_id);

self.kademlia.get_closest_peers(random_peer_id);
true

} else {
debug!(
target: "sub-libp2p",
"Kademlia paused due to high number of connections ({})",
self.num_connections
);
}
false
};

// Schedule the next random query with exponentially increasing delay,
// capped at 60 seconds.
self.next_kad_random_query = Delay::new(self.duration_to_next_kad);
self.duration_to_next_kad = cmp::min(self.duration_to_next_kad * 2,
Duration::from_secs(60));

if actually_started {
let ev = DiscoveryOut::RandomKademliaStarted;
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
}

// Poll Kademlia.
Expand Down
10 changes: 10 additions & 0 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,16 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.behaviour.is_open(peer_id)
}

/// Returns the list of all the peers that the peerset currently requests us to be connected to.
pub fn requested_peers(&self) -> impl Iterator<Item = &PeerId> {
self.behaviour.requested_peers()
}

/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.behaviour.num_discovered_peers()
}

/// Disconnects the given peer if we are connected to it.
pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
self.behaviour.disconnect_peer(peer_id)
Expand Down
24 changes: 24 additions & 0 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ impl PeerState {
PeerState::Incoming { .. } => false,
}
}

/// True if that node has been requested by the PSM.
fn is_requested(&self) -> bool {
match self {
PeerState::Poisoned => false,
PeerState::Banned { .. } => false,
PeerState::PendingRequest { .. } => true,
PeerState::Requested => true,
PeerState::Disabled { .. } => false,
PeerState::DisabledPendingEnable { .. } => true,
PeerState::Enabled { .. } => true,
PeerState::Incoming { .. } => false,
}
}
}

/// State of an "incoming" message sent to the peer set manager.
Expand Down Expand Up @@ -277,6 +291,11 @@ impl GenericProto {
self.notif_protocols.push((protocol_name.into(), engine_id, handshake_msg.into()));
}

/// Returns the number of discovered nodes that we keep in memory.
pub fn num_discovered_peers(&self) -> usize {
self.peerset.num_discovered_peers()
}

/// Returns the list of all the peers we have an open channel to.
pub fn open_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.peers.iter().filter(|(_, state)| state.is_open()).map(|(id, _)| id)
Expand Down Expand Up @@ -360,6 +379,11 @@ impl GenericProto {
}
}

/// Returns the list of all the peers that the peerset currently requests us to be connected to.
pub fn requested_peers<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
self.peers.iter().filter(|(_, state)| state.is_requested()).map(|(id, _)| id)
}

/// Returns true if we try to open protocols with the given peer.
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
match self.peers.get(peer_id) {
Expand Down
172 changes: 152 additions & 20 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! The methods of the [`NetworkService`] are implemented by sending a message over a channel,
//! which is then processed by [`NetworkWorker::poll`].

use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path};
use std::{borrow::Cow, collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path, str};
use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}};
use std::pin::Pin;
use std::task::Poll;
Expand All @@ -39,7 +39,7 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent};
use parking_lot::Mutex;
use sc_peerset::PeersetHandle;
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId};
use prometheus_endpoint::{Registry, Gauge, U64, register, PrometheusError};
use prometheus_endpoint::{Registry, Counter, CounterVec, Gauge, GaugeVec, Opts, U64, register, PrometheusError};

use crate::{behaviour::{Behaviour, BehaviourOut}, config::{parse_str_addr, parse_addr}};
use crate::{transport, config::NonReservedPeerMode, ReputationChange};
Expand Down Expand Up @@ -734,25 +734,108 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// Senders for events that happen on the network.
event_streams: Vec<mpsc::UnboundedSender<Event>>,
/// Prometheus network metrics.
metrics: Option<Metrics>
metrics: Option<Metrics>,
}

struct Metrics {
// This list is ordered alphabetically
connections: Gauge<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_finality_proofs_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
is_major_syncing: Gauge<U64>,
kbuckets_num_nodes: Gauge<U64>,
network_per_sec_bytes: GaugeVec<U64>,
notifications_total: CounterVec<U64>,
num_event_stream_channels: Gauge<U64>,
opened_notification_streams: GaugeVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
peerset_num_requested: Gauge<U64>,
random_kademalia_queries_total: Counter<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
// This list is ordered alphabetically
connections: register(Gauge::new(
"sub_libp2p_connections", "Number of libp2p connections"
)?, registry)?,
import_queue_blocks_submitted: register(Counter::new(
"import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?, registry)?,
import_queue_finality_proofs_submitted: register(Counter::new(
"import_queue_finality_proofs_submitted",
"Number of finality proofs submitted to the import queue.",
)?, registry)?,
import_queue_justifications_submitted: register(Counter::new(
"import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?, registry)?,
is_major_syncing: register(Gauge::new(
"is_major_syncing", "Whether the node is performing a major sync or not.",
"sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.",
)?, registry)?,
kbuckets_num_nodes: register(Gauge::new(
"sub_libp2p_kbuckets_num_nodes", "Number of nodes in the Kademlia k-buckets"
)?, registry)?,
network_per_sec_bytes: register(GaugeVec::new(
Opts::new(
"sub_libp2p_network_per_sec_bytes",
"Average bandwidth usage per second"
),
&["direction"]
)?, registry)?,
notifications_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_total",
"Number of notification received from all nodes"
),
&["direction", "protocol"]
)?, registry)?,
num_event_stream_channels: register(Gauge::new(
"sub_libp2p_num_event_stream_channels",
"Number of internal active channels that broadcast network events",
)?, registry)?,
opened_notification_streams: register(GaugeVec::new(
Opts::new(
"sub_libp2p_opened_notification_streams",
"Number of open notification substreams"
),
&["protocol"]
)?, registry)?,
peers_count: register(Gauge::new(
"peers_count", "Number of network gossip peers",
"sub_libp2p_peers_count", "Number of network gossip peers",
)?, registry)?,
peerset_num_discovered: register(Gauge::new(
"sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager",
)?, registry)?,
peerset_num_requested: register(Gauge::new(
"sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to",
)?, registry)?,
random_kademalia_queries_total: register(Counter::new(
"sub_libp2p_random_kademalia_queries_total", "Number of random Kademlia queries started",
)?, registry)?,
})
}

fn update_with_network_event(&self, event: &Event) {
match event {
Event::NotificationStreamOpened { engine_id, .. } => {
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.opened_notification_streams.with_label_values(&[&engine_id_to_string(&engine_id)]).dec();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, _) in messages {
self.notifications_total.with_label_values(&["in", &engine_id_to_string(&engine_id)]).inc();
}
},
_ => {}
}
}
}

impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
Expand Down Expand Up @@ -800,10 +883,15 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } =>
this.network_service.user_protocol_mut().write_notification(target, engine_id, message),
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_total.with_label_values(&["out", &engine_id_to_string(&engine_id)]).inc();
}
this.network_service.user_protocol_mut().write_notification(target, engine_id, message)
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
let events = this.network_service.user_protocol_mut().register_notifications_protocol(engine_id, protocol_name);
let events = this.network_service.user_protocol_mut()
.register_notifications_protocol(engine_id, protocol_name);
for event in events {
this.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok());
}
Expand All @@ -821,18 +909,47 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {

match poll_value {
Poll::Pending => break,
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) =>
this.import_queue.import_blocks(origin, blocks),
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) =>
this.import_queue.import_justification(origin, hash, nb, justification),
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) =>
this.import_queue.import_finality_proof(origin, hash, nb, proof),
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) =>
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()),
Poll::Ready(SwarmEvent::Connected(peer_id)) =>
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id),
Poll::Ready(SwarmEvent::Disconnected(peer_id)) =>
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id),
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_blocks_submitted.inc();
}
this.import_queue.import_blocks(origin, blocks);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_justifications_submitted.inc();
}
this.import_queue.import_justification(origin, hash, nb, justification);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.import_queue_finality_proofs_submitted.inc();
}
this.import_queue.import_finality_proof(origin, hash, nb, proof);
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.random_kademalia_queries_total.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
if let Some(metrics) = this.metrics.as_ref() {
metrics.update_with_network_event(&ev);
}
},
Poll::Ready(SwarmEvent::Connected(peer_id)) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
if let Some(metrics) = this.metrics.as_ref() {
metrics.connections.inc();
}
},
Poll::Ready(SwarmEvent::Disconnected(peer_id)) => {
trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?})", peer_id);
if let Some(metrics) = this.metrics.as_ref() {
metrics.connections.dec();
}
},
Poll::Ready(SwarmEvent::NewListenAddr(addr)) =>
trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr),
Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) =>
Expand Down Expand Up @@ -861,8 +978,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);

if let Some(metrics) = this.metrics.as_ref() {
metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec());
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64);
metrics.num_event_stream_channels.set(this.event_streams.len() as u64);
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
}

Poll::Pending
Expand All @@ -872,6 +995,15 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
}

/// Turns a `ConsensusEngineId` into a representable string.
fn engine_id_to_string(id: &ConsensusEngineId) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s)
} else {
Cow::Owned(format!("{:?}", id))
}
}

/// The libp2p swarm, customized for our needs.
type Swarm<B, H> = libp2p::swarm::Swarm<Behaviour<B, H>>;

Expand Down

0 comments on commit e6cc799

Please sign in to comment.