From 746b8c40ee1a97be2711be827e9104a55b9c282b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 13 May 2022 15:47:03 +0200 Subject: [PATCH 1/4] Add pod enrichment controller for injecting node addresses Fixes #35 --- docs/modules/ROOT/pages/pod-enrichment.adoc | 15 ++ docs/modules/ROOT/pages/usage.adoc | 5 +- rust/operator-binary/src/main.rs | 14 +- .../src/pod_enrichment_controller.rs | 139 ++++++++++++++++++ 4 files changed, 170 insertions(+), 3 deletions(-) create mode 100644 docs/modules/ROOT/pages/pod-enrichment.adoc create mode 100644 rust/operator-binary/src/pod_enrichment_controller.rs diff --git a/docs/modules/ROOT/pages/pod-enrichment.adoc b/docs/modules/ROOT/pages/pod-enrichment.adoc new file mode 100644 index 0000000..4a86563 --- /dev/null +++ b/docs/modules/ROOT/pages/pod-enrichment.adoc @@ -0,0 +1,15 @@ += Pod Enricher + +The Stackable Commons Operator automatically adds commonly used information to `Pod` objects, which +would otherwise have to be inferred by traversing the Kubernetes object graph. + +== Node Address + +Annotation:: `enrichment.stackable.tech/node-address` + +The hostname or IP address of the `Node` that the `Pod` is scheduled to run on. +Compared to `Pod.status.nodeIP`, this can also (but doesn't have to) be a hostname, and prefers +externally routable addresses. + +This is intended to be used for components that need to register an accessible address (such as Kafka brokers, +or HDFS DataNodes). diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 3e6f482..01ff043 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -15,7 +15,10 @@ Multiple operators use this CRD as a way to express the authentication of the pr |Section of a CRD describing how to connect to a TLS enabled system |xref:restarter.adoc[] -|An operator that watches `Pod` objects and their controllers and restarts them when required +|A controller that watches `Pod` objects and their controllers and restarts them when required + +|xref:pod-enrichment.adoc[] +|A controller that adds commonly used information to `Pod` objects |=== The following diagram describes the relationship between the CRDs diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 098acc4..9009490 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,3 +1,4 @@ +mod pod_enrichment_controller; mod restart_controller; use futures::pin_mut; @@ -57,8 +58,17 @@ async fn main() -> anyhow::Result<()> { let sts_restart_controller = restart_controller::statefulset::start(&client); let pod_restart_controller = restart_controller::pod::start(&client); - pin_mut!(sts_restart_controller, pod_restart_controller); - futures::future::select(sts_restart_controller, pod_restart_controller).await; + let pod_enrichment_controller = pod_enrichment_controller::start(&client); + pin_mut!( + sts_restart_controller, + pod_restart_controller, + pod_enrichment_controller + ); + futures::future::select( + futures::future::select(sts_restart_controller, pod_restart_controller), + pod_enrichment_controller, + ) + .await; } } diff --git a/rust/operator-binary/src/pod_enrichment_controller.rs b/rust/operator-binary/src/pod_enrichment_controller.rs new file mode 100644 index 0000000..148349f --- /dev/null +++ b/rust/operator-binary/src/pod_enrichment_controller.rs @@ -0,0 +1,139 @@ +use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; + +use futures::StreamExt; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + k8s_openapi::api::core::v1::{Node, Pod}, + kube::{ + api::ListParams, + core::ObjectMeta, + runtime::{ + controller::{self, Context}, + reflector::ObjectRef, + Controller, + }, + }, + logging::controller::{report_controller_reconciled, ReconcilerError}, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +const FIELD_MANAGER_SCOPE: &str = "enrichment.stackable.tech/pod"; +const ANNOTATION_NODE_ADDRESS: &str = "enrichment.stackable.tech/node-address"; + +struct Ctx { + client: stackable_operator::client::Client, +} + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +pub enum Error { + GetNode { + source: stackable_operator::error::Error, + node: ObjectRef, + }, + UpdatePod { + source: stackable_operator::error::Error, + }, +} + +impl ReconcilerError for Error { + fn category(&self) -> &'static str { + ErrorDiscriminants::from(self).into() + } + + fn secondary_object(&self) -> Option> { + match self { + Error::GetNode { node, .. } => Some(node.clone().erase()), + Error::UpdatePod { source: _ } => None, + } + } +} + +pub async fn start(client: &stackable_operator::client::Client) { + let controller = Controller::new(client.get_all_api::(), ListParams::default()); + let pods = controller.store(); + controller + .watches( + client.get_all_api::(), + ListParams::default(), + move |node| { + pods.state() + .into_iter() + .filter(move |pod| { + pod.spec.as_ref().and_then(|s| s.node_name.as_deref()) + == node.metadata.name.as_deref() + }) + .map(|pod| ObjectRef::from_obj(&*pod)) + }, + ) + .run( + reconcile, + error_policy, + Context::new(Ctx { + client: client.clone(), + }), + ) + .for_each(|res| async move { + report_controller_reconciled(client, "pod.enrichment.commons.stackable.tech", &res) + }) + .await; +} + +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord, strum::EnumString)] +pub enum NodeAddressType { + ExternalIP, + InternalIP, +} + +async fn reconcile(pod: Arc, ctx: Context) -> Result { + let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref()); + let node = if let Some(node_name) = node_name { + ctx.get_ref() + .client + .get::(node_name, None) + .await + .with_context(|_| GetNodeSnafu { + node: ObjectRef::new(node_name), + })? + } else { + // this condition is normal enough during pod setup that we don't want to cause a bunch of + // error messages... + tracing::debug!("Pod has not yet been scheduled to a Node"); + return Ok(controller::Action::await_change()); + }; + + let mut annotations = BTreeMap::new(); + + let node_addr = node + .status + .iter() + .flat_map(|s| &s.addresses) + .flatten() + .filter_map(|addr| Some((NodeAddressType::from_str(&addr.type_).ok()?, &addr.address))) + .min_by_key(|(ty, _)| *ty) + .map(|(_, addr)| addr); + if let Some(node_addr) = node_addr { + annotations.insert(ANNOTATION_NODE_ADDRESS.to_string(), node_addr.clone()); + } + + let patch = Pod { + metadata: ObjectMeta { + name: pod.metadata.name.clone(), + namespace: pod.metadata.namespace.clone(), + uid: pod.metadata.uid.clone(), + annotations: Some(annotations), + ..ObjectMeta::default() + }, + ..Pod::default() + }; + ctx.get_ref() + .client + .apply_patch(FIELD_MANAGER_SCOPE, &patch, &patch) + .await + .context(UpdatePodSnafu)?; + Ok(controller::Action::await_change()) +} + +fn error_policy(_error: &Error, _ctx: Context) -> controller::Action { + controller::Action::requeue(Duration::from_secs(5)) +} From f5f18f0271eeba0a96fe1466a6b7502e39c04ae1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 13 May 2022 15:49:19 +0200 Subject: [PATCH 2/4] Changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93591aa..7b3ee70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Pods are now annotated with their associated node's primary address ([#36]). + +[#36]: https://github.com/stackabletech/commons-operator/pull/36 + ## [0.1.0] - 2022-05-04 ### Changed From a575c8e4a7763b126906e34a23ad15a378d9f855 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 19 May 2022 12:27:58 +0200 Subject: [PATCH 3/4] Error messages --- rust/operator-binary/src/pod_enrichment_controller.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/operator-binary/src/pod_enrichment_controller.rs b/rust/operator-binary/src/pod_enrichment_controller.rs index 148349f..b95278c 100644 --- a/rust/operator-binary/src/pod_enrichment_controller.rs +++ b/rust/operator-binary/src/pod_enrichment_controller.rs @@ -27,10 +27,12 @@ struct Ctx { #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] pub enum Error { + #[snafu(display("failed to get {node} for Pod"))] GetNode { source: stackable_operator::error::Error, node: ObjectRef, }, + #[snafu(display("failed to update Pod"))] UpdatePod { source: stackable_operator::error::Error, }, From 37759ab91e94ea1dbb23ddac4fa102170d6eb0f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Thu, 19 May 2022 12:47:53 +0200 Subject: [PATCH 4/4] Only enrich labeled pods --- docs/modules/ROOT/pages/pod-enrichment.adoc | 4 ++++ rust/operator-binary/src/pod_enrichment_controller.rs | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/modules/ROOT/pages/pod-enrichment.adoc b/docs/modules/ROOT/pages/pod-enrichment.adoc index 4a86563..07853a3 100644 --- a/docs/modules/ROOT/pages/pod-enrichment.adoc +++ b/docs/modules/ROOT/pages/pod-enrichment.adoc @@ -3,6 +3,10 @@ The Stackable Commons Operator automatically adds commonly used information to `Pod` objects, which would otherwise have to be inferred by traversing the Kubernetes object graph. +== Usage + +The pod enricher is only enabled for `Pod` objects that set the label `enrichment.stackable.tech/enabled` to `true`. + == Node Address Annotation:: `enrichment.stackable.tech/node-address` diff --git a/rust/operator-binary/src/pod_enrichment_controller.rs b/rust/operator-binary/src/pod_enrichment_controller.rs index b95278c..31c0c53 100644 --- a/rust/operator-binary/src/pod_enrichment_controller.rs +++ b/rust/operator-binary/src/pod_enrichment_controller.rs @@ -52,7 +52,10 @@ impl ReconcilerError for Error { } pub async fn start(client: &stackable_operator::client::Client) { - let controller = Controller::new(client.get_all_api::(), ListParams::default()); + let controller = Controller::new( + client.get_all_api::(), + ListParams::default().labels("enrichment.stackable.tech/enabled=true"), + ); let pods = controller.store(); controller .watches(