diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d42af16f1d22e..713357772d417 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -53,6 +53,7 @@ use parking_lot::Mutex; use prometheus_endpoint::{ register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, + SourcedCounter, MetricSource }; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; @@ -240,12 +241,6 @@ impl NetworkWorker { local_peer_id_legacy ); - // Initialize the metrics. - let metrics = match ¶ms.metrics_registry { - Some(registry) => Some(Metrics::register(®istry)?), - None => None - }; - let checker = params.on_demand.as_ref() .map(|od| od.checker().clone()) .unwrap_or_else(|| Arc::new(AlwaysBadChecker)); @@ -353,6 +348,17 @@ impl NetworkWorker { (builder.build(), bandwidth) }; + // Initialize the metrics. + let metrics = match ¶ms.metrics_registry { + Some(registry) => { + // Sourced metrics. + BandwidthCounters::register(registry, bandwidth.clone())?; + // Other (i.e. new) metrics. + Some(Metrics::register(registry)?) + } + None => None + }; + // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { @@ -1152,9 +1158,6 @@ struct Metrics { kbuckets_num_nodes: GaugeVec, listeners_local_addresses: Gauge, listeners_errors_total: Counter, - // Note: `network_bytes_total` is a monotonic gauge obtained by - // sampling an existing counter. - network_bytes_total: GaugeVec, notifications_sizes: HistogramVec, notifications_streams_closed_total: CounterVec, notifications_streams_opened_total: CounterVec, @@ -1168,6 +1171,35 @@ struct Metrics { requests_out_started_total: CounterVec, } +/// The source for bandwidth metrics. +#[derive(Clone)] +struct BandwidthCounters(Arc); + +impl BandwidthCounters { + fn register(registry: &Registry, sinks: Arc) + -> Result<(), PrometheusError> + { + register(SourcedCounter::new( + &Opts::new( + "sub_libp2p_network_bytes_total", + "Total bandwidth usage" + ).variable_label("direction"), + BandwidthCounters(sinks), + )?, registry)?; + + Ok(()) + } +} + +impl MetricSource for BandwidthCounters { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[&"in"], self.0.total_inbound()); + set(&[&"out"], self.0.total_outbound()); + } +} + impl Metrics { fn register(registry: &Registry) -> Result { Ok(Self { @@ -1271,13 +1303,6 @@ impl Metrics { "sub_libp2p_listeners_errors_total", "Total number of non-fatal errors reported by a listener" )?, registry)?, - network_bytes_total: register(GaugeVec::new( - Opts::new( - "sub_libp2p_network_bytes_total", - "Total bandwidth usage" - ), - &["direction"] - )?, registry)?, notifications_sizes: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( @@ -1725,8 +1750,6 @@ impl Future for NetworkWorker { this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { - metrics.network_bytes_total.with_label_values(&["in"]).set(this.service.bandwidth.total_inbound()); - metrics.network_bytes_total.with_label_values(&["out"]).set(this.service.bandwidth.total_outbound()); metrics.is_major_syncing.set(is_major_syncing as u64); for (proto, num_entries) in this.network_service.num_kbuckets_entries() { let proto = maybe_utf8_bytes_to_string(proto.as_bytes()); diff --git a/utils/prometheus/src/lib.rs b/utils/prometheus/src/lib.rs index 9030704cb746f..be7050a8a0736 100644 --- a/utils/prometheus/src/lib.rs +++ b/utils/prometheus/src/lib.rs @@ -31,6 +31,9 @@ use std::net::SocketAddr; #[cfg(not(target_os = "unknown"))] mod networking; +mod sourced; + +pub use sourced::{SourcedCounter, SourcedGauge, MetricSource}; #[cfg(target_os = "unknown")] pub use unknown_os::init_prometheus; diff --git a/utils/prometheus/src/sourced.rs b/utils/prometheus/src/sourced.rs new file mode 100644 index 0000000000000..58f60e4969bb8 --- /dev/null +++ b/utils/prometheus/src/sourced.rs @@ -0,0 +1,143 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Metrics that are collected from existing sources. + +use prometheus::core::{Collector, Desc, Describer, Number, Opts}; +use prometheus::proto; +use std::{cmp::Ordering, marker::PhantomData}; + +/// A counter whose values are obtained from an existing source. +/// +/// > **Note*: The counter values provided by the source `S` +/// > must be monotonically increasing. Otherwise use a +/// > [`SourcedGauge`] instead. +pub type SourcedCounter = SourcedMetric; + +/// A gauge whose values are obtained from an existing source. +pub type SourcedGauge = SourcedMetric; + +/// The type of a sourced counter. +#[derive(Copy, Clone)] +pub enum Counter {} + +/// The type of a sourced gauge. +#[derive(Copy, Clone)] +pub enum Gauge {} + +/// A metric whose values are obtained from an existing source, +/// instead of being independently recorded. +#[derive(Debug, Clone)] +pub struct SourcedMetric { + source: S, + desc: Desc, + _type: PhantomData, +} + +/// A source of values for a [`SourcedMetric`]. +pub trait MetricSource: Sync + Send + Clone { + /// The type of the collected values. + type N: Number; + /// Collects the current values of the metrics from the source. + fn collect(&self, set: impl FnMut(&[&str], Self::N)); +} + +impl SourcedMetric { + /// Creates a new metric that obtains its values from the given source. + pub fn new(opts: &Opts, source: S) -> prometheus::Result { + let desc = opts.describe()?; + Ok(Self { source, desc, _type: PhantomData }) + } +} + +impl Collector for SourcedMetric { + fn desc(&self) -> Vec<&Desc> { + vec![&self.desc] + } + + fn collect(&self) -> Vec { + let mut counters = Vec::new(); + + self.source.collect(|label_values, value| { + let mut m = proto::Metric::default(); + + match T::proto() { + proto::MetricType::COUNTER => { + let mut c = proto::Counter::default(); + c.set_value(value.into_f64()); + m.set_counter(c); + } + proto::MetricType::GAUGE => { + let mut g = proto::Gauge::default(); + g.set_value(value.into_f64()); + m.set_gauge(g); + } + t => { + log::error!("Unsupported sourced metric type: {:?}", t); + } + } + + debug_assert_eq!(self.desc.variable_labels.len(), label_values.len()); + match self.desc.variable_labels.len().cmp(&label_values.len()) { + Ordering::Greater => + log::warn!("Missing label values for sourced metric {}", self.desc.fq_name), + Ordering::Less => + log::warn!("Too many label values for sourced metric {}", self.desc.fq_name), + Ordering::Equal => {} + } + + m.set_label(self.desc.variable_labels.iter().zip(label_values) + .map(|(l_name, l_value)| { + let mut l = proto::LabelPair::default(); + l.set_name(l_name.to_string()); + l.set_value(l_value.to_string()); + l + }) + .chain(self.desc.const_label_pairs.iter().cloned()) + .collect::>()); + + counters.push(m); + }); + + let mut m = proto::MetricFamily::default(); + m.set_name(self.desc.fq_name.clone()); + m.set_help(self.desc.help.clone()); + m.set_field_type(T::proto()); + m.set_metric(counters); + + vec![m] + } +} + +/// Types of metrics that can obtain their values from an existing source. +pub trait SourcedType: private::Sealed + Sync + Send { + #[doc(hidden)] + fn proto() -> proto::MetricType; +} + +impl SourcedType for Counter { + fn proto() -> proto::MetricType { proto::MetricType::COUNTER } +} + +impl SourcedType for Gauge { + fn proto() -> proto::MetricType { proto::MetricType::GAUGE } +} + +mod private { + pub trait Sealed {} + impl Sealed for super::Counter {} + impl Sealed for super::Gauge {} +}