diff --git a/CHANGELOG.md b/CHANGELOG.md index 93591aa..7b3ee70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Pods are now annotated with their associated node's primary address ([#36]). + +[#36]: https://github.com/stackabletech/commons-operator/pull/36 + ## [0.1.0] - 2022-05-04 ### Changed diff --git a/docs/modules/ROOT/pages/pod-enrichment.adoc b/docs/modules/ROOT/pages/pod-enrichment.adoc new file mode 100644 index 0000000..07853a3 --- /dev/null +++ b/docs/modules/ROOT/pages/pod-enrichment.adoc @@ -0,0 +1,19 @@ += Pod Enricher + +The Stackable Commons Operator automatically adds commonly used information to `Pod` objects, which +would otherwise have to be inferred by traversing the Kubernetes object graph. + +== Usage + +The pod enricher is only enabled for `Pod` objects that set the label `enrichment.stackable.tech/enabled` to `true`. + +== Node Address + +Annotation:: `enrichment.stackable.tech/node-address` + +The hostname or IP address of the `Node` that the `Pod` is scheduled to run on. +Compared to `Pod.status.nodeIP`, this can also (but doesn't have to) be a hostname, and prefers +externally routable addresses. + +This is intended to be used for components that need to register an accessible address (such as Kafka brokers, +or HDFS DataNodes). diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 3e6f482..01ff043 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -15,7 +15,10 @@ Multiple operators use this CRD as a way to express the authentication of the pr |Section of a CRD describing how to connect to a TLS enabled system |xref:restarter.adoc[] -|An operator that watches `Pod` objects and their controllers and restarts them when required +|A controller that watches `Pod` objects and their controllers and restarts them when required + +|xref:pod-enrichment.adoc[] +|A controller that adds commonly used information to `Pod` objects |=== The following diagram describes the relationship between the CRDs diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 098acc4..9009490 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -1,3 +1,4 @@ +mod pod_enrichment_controller; mod restart_controller; use futures::pin_mut; @@ -57,8 +58,17 @@ async fn main() -> anyhow::Result<()> { let sts_restart_controller = restart_controller::statefulset::start(&client); let pod_restart_controller = restart_controller::pod::start(&client); - pin_mut!(sts_restart_controller, pod_restart_controller); - futures::future::select(sts_restart_controller, pod_restart_controller).await; + let pod_enrichment_controller = pod_enrichment_controller::start(&client); + pin_mut!( + sts_restart_controller, + pod_restart_controller, + pod_enrichment_controller + ); + futures::future::select( + futures::future::select(sts_restart_controller, pod_restart_controller), + pod_enrichment_controller, + ) + .await; } } diff --git a/rust/operator-binary/src/pod_enrichment_controller.rs b/rust/operator-binary/src/pod_enrichment_controller.rs new file mode 100644 index 0000000..31c0c53 --- /dev/null +++ b/rust/operator-binary/src/pod_enrichment_controller.rs @@ -0,0 +1,144 @@ +use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; + +use futures::StreamExt; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + k8s_openapi::api::core::v1::{Node, Pod}, + kube::{ + api::ListParams, + core::ObjectMeta, + runtime::{ + controller::{self, Context}, + reflector::ObjectRef, + Controller, + }, + }, + logging::controller::{report_controller_reconciled, ReconcilerError}, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +const FIELD_MANAGER_SCOPE: &str = "enrichment.stackable.tech/pod"; +const ANNOTATION_NODE_ADDRESS: &str = "enrichment.stackable.tech/node-address"; + +struct Ctx { + client: stackable_operator::client::Client, +} + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +pub enum Error { + #[snafu(display("failed to get {node} for Pod"))] + GetNode { + source: stackable_operator::error::Error, + node: ObjectRef, + }, + #[snafu(display("failed to update Pod"))] + UpdatePod { + source: stackable_operator::error::Error, + }, +} + +impl ReconcilerError for Error { + fn category(&self) -> &'static str { + ErrorDiscriminants::from(self).into() + } + + fn secondary_object(&self) -> Option> { + match self { + Error::GetNode { node, .. } => Some(node.clone().erase()), + Error::UpdatePod { source: _ } => None, + } + } +} + +pub async fn start(client: &stackable_operator::client::Client) { + let controller = Controller::new( + client.get_all_api::(), + ListParams::default().labels("enrichment.stackable.tech/enabled=true"), + ); + let pods = controller.store(); + controller + .watches( + client.get_all_api::(), + ListParams::default(), + move |node| { + pods.state() + .into_iter() + .filter(move |pod| { + pod.spec.as_ref().and_then(|s| s.node_name.as_deref()) + == node.metadata.name.as_deref() + }) + .map(|pod| ObjectRef::from_obj(&*pod)) + }, + ) + .run( + reconcile, + error_policy, + Context::new(Ctx { + client: client.clone(), + }), + ) + .for_each(|res| async move { + report_controller_reconciled(client, "pod.enrichment.commons.stackable.tech", &res) + }) + .await; +} + +#[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord, strum::EnumString)] +pub enum NodeAddressType { + ExternalIP, + InternalIP, +} + +async fn reconcile(pod: Arc, ctx: Context) -> Result { + let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref()); + let node = if let Some(node_name) = node_name { + ctx.get_ref() + .client + .get::(node_name, None) + .await + .with_context(|_| GetNodeSnafu { + node: ObjectRef::new(node_name), + })? + } else { + // this condition is normal enough during pod setup that we don't want to cause a bunch of + // error messages... + tracing::debug!("Pod has not yet been scheduled to a Node"); + return Ok(controller::Action::await_change()); + }; + + let mut annotations = BTreeMap::new(); + + let node_addr = node + .status + .iter() + .flat_map(|s| &s.addresses) + .flatten() + .filter_map(|addr| Some((NodeAddressType::from_str(&addr.type_).ok()?, &addr.address))) + .min_by_key(|(ty, _)| *ty) + .map(|(_, addr)| addr); + if let Some(node_addr) = node_addr { + annotations.insert(ANNOTATION_NODE_ADDRESS.to_string(), node_addr.clone()); + } + + let patch = Pod { + metadata: ObjectMeta { + name: pod.metadata.name.clone(), + namespace: pod.metadata.namespace.clone(), + uid: pod.metadata.uid.clone(), + annotations: Some(annotations), + ..ObjectMeta::default() + }, + ..Pod::default() + }; + ctx.get_ref() + .client + .apply_patch(FIELD_MANAGER_SCOPE, &patch, &patch) + .await + .context(UpdatePodSnafu)?; + Ok(controller::Action::await_change()) +} + +fn error_policy(_error: &Error, _ctx: Context) -> controller::Action { + controller::Action::requeue(Duration::from_secs(5)) +}