From 22ee063ff4b4010fb5abc4eacf7f0ef00d87780b Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Fri, 12 Apr 2024 23:57:50 +0000 Subject: [PATCH 1/2] Add outbound index metrics to the policy controller Signed-off-by: Alex Leong --- Cargo.lock | 1 + policy-controller/k8s/index/Cargo.toml | 1 + .../k8s/index/src/outbound/index.rs | 75 +++++++++++++++++-- policy-controller/src/main.rs | 43 ++++++----- 4 files changed, 95 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e040143edcc99..db29ad706bdb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1267,6 +1267,7 @@ dependencies = [ "linkerd-policy-controller-k8s-api", "maplit", "parking_lot", + "prometheus-client", "thiserror", "tokio", "tokio-stream", diff --git a/policy-controller/k8s/index/Cargo.toml b/policy-controller/k8s/index/Cargo.toml index 9c1526e8dc413..768eadd3ab203 100644 --- a/policy-controller/k8s/index/Cargo.toml +++ b/policy-controller/k8s/index/Cargo.toml @@ -20,6 +20,7 @@ kubert = { version = "0.21.1", default-features = false, features = ["index"] } linkerd-policy-controller-core = { path = "../../core" } linkerd-policy-controller-k8s-api = { path = "../api" } parking_lot = "0.12" +prometheus-client = { version = "0.22.0", default-features = false } thiserror = "1" tokio = { version = "1", features = ["macros", "rt", "sync"] } tracing = "0.1" diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index f488bbfcb8c47..1673699d25b88 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -15,7 +15,12 @@ use linkerd_policy_controller_core::{ }; use linkerd_policy_controller_k8s_api::{policy as api, ResourceExt, Service, Time}; use parking_lot::RwLock; -use std::{hash::Hash, net::IpAddr, num::NonZeroU16, sync::Arc, time}; +use prometheus_client::{ + collector::Collector, + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::{gauge::ConstGauge, MetricType}, +}; +use std::{hash::Hash, net::IpAddr, num::NonZeroU16, ops::Deref, sync::Arc, time}; use tokio::sync::watch; #[derive(Debug)] @@ -24,8 +29,8 @@ pub struct Index { services_by_ip: HashMap, service_info: HashMap, } - -pub type SharedIndex = Arc>; +#[derive(Clone, Debug)] +pub struct SharedIndex(pub Arc>); #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ServiceRef { @@ -177,14 +182,14 @@ impl kubert::index::IndexNamespacedResource for Index { impl Index { pub fn shared(cluster_info: Arc) -> SharedIndex { - Arc::new(RwLock::new(Self { + SharedIndex(Arc::new(RwLock::new(Self { namespaces: NamespaceIndex { by_ns: HashMap::default(), cluster_info, }, services_by_ip: HashMap::default(), service_info: HashMap::default(), - })) + }))) } pub fn outbound_policy_rx( @@ -547,6 +552,66 @@ impl Namespace { } } +impl Deref for SharedIndex { + type Target = RwLock; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Collector for SharedIndex { + fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> Result<(), std::fmt::Error> { + let this = self.0.read(); + + let service_encoder = encoder.encode_descriptor( + "service_index_size", + "The number of entires in service index", + None, + MetricType::Gauge, + )?; + let services = ConstGauge::new(this.services_by_ip.len() as u32); + services.encode(service_encoder)?; + + let service_info_encoder = encoder.encode_descriptor( + "service_info_index_size", + "The number of entires in the service info index", + None, + MetricType::Gauge, + )?; + let service_infos = ConstGauge::new(this.service_info.len() as u32); + service_infos.encode(service_info_encoder)?; + + let mut service_route_encoder = encoder.encode_descriptor( + "service_route_index_size", + "The number of entires in the service route index", + None, + MetricType::Gauge, + )?; + for (ns, index) in &this.namespaces.by_ns { + let labels = vec![("namespace", ns.as_str())]; + let service_routes = ConstGauge::new(index.service_routes.len() as u32); + let service_route_encoder = service_route_encoder.encode_family(&labels)?; + service_routes.encode(service_route_encoder)?; + } + + let mut service_port_route_encoder = encoder.encode_descriptor( + "service_port_route_index_size", + "The number of entires in the service port route index", + None, + MetricType::Gauge, + )?; + for (ns, index) in &this.namespaces.by_ns { + let labels = vec![("namespace", ns.as_str())]; + let service_port_routes = ConstGauge::new(index.service_port_routes.len() as u32); + let service_port_route_encoder = service_port_route_encoder.encode_family(&labels)?; + service_port_routes.encode(service_port_route_encoder)?; + } + + Ok(()) + } +} + fn convert_backend( ns: &str, backend: HttpBackendRef, diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index 20d22bfa0bce1..5a26d32617687 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -125,19 +125,6 @@ async fn main() -> Result<()> { Some(server) }; - let mut prom = ::default(); - let resource_status = prom.sub_registry_with_prefix("resource_status"); - let status_metrics = status::ControllerMetrics::register(resource_status); - let status_index_metrcs = status::IndexMetrics::register(resource_status); - - let mut runtime = kubert::Runtime::builder() - .with_log(log_level, log_format) - .with_admin(admin.into_builder().with_prometheus(prom)) - .with_client(client) - .with_optional_server(server) - .build() - .await?; - let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default(); let default_opaque_ports = parse_portset(&default_opaque_ports)?; @@ -152,6 +139,27 @@ async fn main() -> Result<()> { probe_networks, }); + // Build the API index data structures which will maintain information + // necessary for serving the inbound policy and outbound policy gRPC APIs. + let inbound_index = inbound::Index::shared(cluster_info.clone()); + let outbound_index = outbound::Index::shared(cluster_info); + + let mut prom = ::default(); + let resource_status = prom.sub_registry_with_prefix("resource_status"); + let status_metrics = status::ControllerMetrics::register(resource_status); + let status_index_metrcs = status::IndexMetrics::register(resource_status); + + prom.sub_registry_with_prefix("outbound_index") + .register_collector(Box::new(outbound_index.clone())); + + let mut runtime = kubert::Runtime::builder() + .with_log(log_level, log_format) + .with_admin(admin.into_builder().with_prometheus(prom)) + .with_client(client) + .with_optional_server(server) + .build() + .await?; + let hostname = std::env::var("HOSTNAME").expect("Failed to fetch `HOSTNAME` environment variable"); let params = kubert::lease::ClaimParams { @@ -167,11 +175,6 @@ async fn main() -> Result<()> { .await?; let (claims, _task) = lease.spawn(hostname.clone(), params).await?; - // Build the API index data structures which will maintain information - // necessary for serving the inbound policy and outbound policy gRPC APIs. - let inbound_index = inbound::Index::shared(cluster_info.clone()); - let outbound_index = outbound::Index::shared(cluster_info); - // Build the status index which will maintain information necessary for // updating the status field of policy resources. let (updates_tx, updates_rx) = mpsc::channel(STATUS_UPDATE_QUEUE_SIZE); @@ -235,7 +238,7 @@ async fn main() -> Result<()> { let http_routes = runtime.watch_all::(watcher::Config::default()); let http_routes_indexes = IndexList::new(inbound_index.clone()) - .push(outbound_index.clone()) + .push(outbound_index.0.clone()) .push(status_index.clone()) .shared(); tokio::spawn( @@ -251,7 +254,7 @@ async fn main() -> Result<()> { ); let services = runtime.watch_all::(watcher::Config::default()); - let services_indexes = IndexList::new(outbound_index.clone()) + let services_indexes = IndexList::new(outbound_index.0.clone()) .push(status_index.clone()) .shared(); tokio::spawn( From 33a1138bc2254ad68ccf47d03b628f9513a57a5c Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Wed, 8 May 2024 00:26:00 +0000 Subject: [PATCH 2/2] Put metrics in module Signed-off-by: Alex Leong --- policy-controller/k8s/index/src/outbound.rs | 2 +- .../k8s/index/src/outbound/index.rs | 77 ++----------------- .../k8s/index/src/outbound/index/metrics.rs | 67 ++++++++++++++++ policy-controller/src/main.rs | 10 ++- 4 files changed, 81 insertions(+), 75 deletions(-) create mode 100644 policy-controller/k8s/index/src/outbound/index/metrics.rs diff --git a/policy-controller/k8s/index/src/outbound.rs b/policy-controller/k8s/index/src/outbound.rs index f86debc32bc7f..1568b11300f72 100644 --- a/policy-controller/k8s/index/src/outbound.rs +++ b/policy-controller/k8s/index/src/outbound.rs @@ -1,3 +1,3 @@ pub mod index; -pub use index::{Index, ServiceRef, SharedIndex}; +pub use index::{metrics, Index, ServiceRef, SharedIndex}; diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 1673699d25b88..be615c03e4536 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -15,12 +15,7 @@ use linkerd_policy_controller_core::{ }; use linkerd_policy_controller_k8s_api::{policy as api, ResourceExt, Service, Time}; use parking_lot::RwLock; -use prometheus_client::{ - collector::Collector, - encoding::{DescriptorEncoder, EncodeMetric}, - metrics::{gauge::ConstGauge, MetricType}, -}; -use std::{hash::Hash, net::IpAddr, num::NonZeroU16, ops::Deref, sync::Arc, time}; +use std::{hash::Hash, net::IpAddr, num::NonZeroU16, sync::Arc, time}; use tokio::sync::watch; #[derive(Debug)] @@ -29,8 +24,10 @@ pub struct Index { services_by_ip: HashMap, service_info: HashMap, } -#[derive(Clone, Debug)] -pub struct SharedIndex(pub Arc>); + +pub mod metrics; + +pub type SharedIndex = Arc>; #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct ServiceRef { @@ -182,14 +179,14 @@ impl kubert::index::IndexNamespacedResource for Index { impl Index { pub fn shared(cluster_info: Arc) -> SharedIndex { - SharedIndex(Arc::new(RwLock::new(Self { + Arc::new(RwLock::new(Self { namespaces: NamespaceIndex { by_ns: HashMap::default(), cluster_info, }, services_by_ip: HashMap::default(), service_info: HashMap::default(), - }))) + })) } pub fn outbound_policy_rx( @@ -552,66 +549,6 @@ impl Namespace { } } -impl Deref for SharedIndex { - type Target = RwLock; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Collector for SharedIndex { - fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> Result<(), std::fmt::Error> { - let this = self.0.read(); - - let service_encoder = encoder.encode_descriptor( - "service_index_size", - "The number of entires in service index", - None, - MetricType::Gauge, - )?; - let services = ConstGauge::new(this.services_by_ip.len() as u32); - services.encode(service_encoder)?; - - let service_info_encoder = encoder.encode_descriptor( - "service_info_index_size", - "The number of entires in the service info index", - None, - MetricType::Gauge, - )?; - let service_infos = ConstGauge::new(this.service_info.len() as u32); - service_infos.encode(service_info_encoder)?; - - let mut service_route_encoder = encoder.encode_descriptor( - "service_route_index_size", - "The number of entires in the service route index", - None, - MetricType::Gauge, - )?; - for (ns, index) in &this.namespaces.by_ns { - let labels = vec![("namespace", ns.as_str())]; - let service_routes = ConstGauge::new(index.service_routes.len() as u32); - let service_route_encoder = service_route_encoder.encode_family(&labels)?; - service_routes.encode(service_route_encoder)?; - } - - let mut service_port_route_encoder = encoder.encode_descriptor( - "service_port_route_index_size", - "The number of entires in the service port route index", - None, - MetricType::Gauge, - )?; - for (ns, index) in &this.namespaces.by_ns { - let labels = vec![("namespace", ns.as_str())]; - let service_port_routes = ConstGauge::new(index.service_port_routes.len() as u32); - let service_port_route_encoder = service_port_route_encoder.encode_family(&labels)?; - service_port_routes.encode(service_port_route_encoder)?; - } - - Ok(()) - } -} - fn convert_backend( ns: &str, backend: HttpBackendRef, diff --git a/policy-controller/k8s/index/src/outbound/index/metrics.rs b/policy-controller/k8s/index/src/outbound/index/metrics.rs new file mode 100644 index 0000000000000..bbde96ee72ed9 --- /dev/null +++ b/policy-controller/k8s/index/src/outbound/index/metrics.rs @@ -0,0 +1,67 @@ +use prometheus_client::{ + collector::Collector, + encoding::{DescriptorEncoder, EncodeMetric}, + metrics::{gauge::ConstGauge, MetricType}, + registry::Registry, +}; + +use super::SharedIndex; + +#[derive(Debug)] +struct Instrumented(SharedIndex); + +pub fn register(reg: &mut Registry, index: SharedIndex) { + reg.register_collector(Box::new(Instrumented(index))); +} + +impl Collector for Instrumented { + fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> Result<(), std::fmt::Error> { + let this = self.0.read(); + + let service_encoder = encoder.encode_descriptor( + "service_index_size", + "The number of entires in service index", + None, + MetricType::Gauge, + )?; + let services = ConstGauge::new(this.services_by_ip.len() as u32); + services.encode(service_encoder)?; + + let service_info_encoder = encoder.encode_descriptor( + "service_info_index_size", + "The number of entires in the service info index", + None, + MetricType::Gauge, + )?; + let service_infos = ConstGauge::new(this.service_info.len() as u32); + service_infos.encode(service_info_encoder)?; + + let mut service_route_encoder = encoder.encode_descriptor( + "service_route_index_size", + "The number of entires in the service route index", + None, + MetricType::Gauge, + )?; + for (ns, index) in &this.namespaces.by_ns { + let labels = vec![("namespace", ns.as_str())]; + let service_routes = ConstGauge::new(index.service_routes.len() as u32); + let service_route_encoder = service_route_encoder.encode_family(&labels)?; + service_routes.encode(service_route_encoder)?; + } + + let mut service_port_route_encoder = encoder.encode_descriptor( + "service_port_route_index_size", + "The number of entires in the service port route index", + None, + MetricType::Gauge, + )?; + for (ns, index) in &this.namespaces.by_ns { + let labels = vec![("namespace", ns.as_str())]; + let service_port_routes = ConstGauge::new(index.service_port_routes.len() as u32); + let service_port_route_encoder = service_port_route_encoder.encode_family(&labels)?; + service_port_routes.encode(service_port_route_encoder)?; + } + + Ok(()) + } +} diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index 5a26d32617687..cbf4a9b3f2bd4 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -149,8 +149,10 @@ async fn main() -> Result<()> { let status_metrics = status::ControllerMetrics::register(resource_status); let status_index_metrcs = status::IndexMetrics::register(resource_status); - prom.sub_registry_with_prefix("outbound_index") - .register_collector(Box::new(outbound_index.clone())); + outbound::metrics::register( + prom.sub_registry_with_prefix("outbound_index"), + outbound_index.clone(), + ); let mut runtime = kubert::Runtime::builder() .with_log(log_level, log_format) @@ -238,7 +240,7 @@ async fn main() -> Result<()> { let http_routes = runtime.watch_all::(watcher::Config::default()); let http_routes_indexes = IndexList::new(inbound_index.clone()) - .push(outbound_index.0.clone()) + .push(outbound_index.clone()) .push(status_index.clone()) .shared(); tokio::spawn( @@ -254,7 +256,7 @@ async fn main() -> Result<()> { ); let services = runtime.watch_all::(watcher::Config::default()); - let services_indexes = IndexList::new(outbound_index.0.clone()) + let services_indexes = IndexList::new(outbound_index.clone()) .push(status_index.clone()) .shared(); tokio::spawn(