-
Notifications
You must be signed in to change notification settings - Fork 3
[Merged by Bors] - Add pod enrichment controller for injecting node addresses #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| = Pod Enricher | ||
|
|
||
| The Stackable Commons Operator automatically adds commonly used information to `Pod` objects, which | ||
| would otherwise have to be inferred by traversing the Kubernetes object graph. | ||
|
|
||
| == Usage | ||
|
|
||
| The pod enricher is only enabled for `Pod` objects that set the label `enrichment.stackable.tech/enabled` to `true`. | ||
|
|
||
| == Node Address | ||
|
|
||
| Annotation:: `enrichment.stackable.tech/node-address` | ||
|
|
||
| The hostname or IP address of the `Node` that the `Pod` is scheduled to run on. | ||
| Compared to `Pod.status.nodeIP`, this can also (but doesn't have to) be a hostname, and prefers | ||
| externally routable addresses. | ||
|
|
||
| This is intended to be used for components that need to register an accessible address (such as Kafka brokers, | ||
| or HDFS DataNodes). | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| mod pod_enrichment_controller; | ||
| mod restart_controller; | ||
|
|
||
| use futures::pin_mut; | ||
|
|
@@ -57,8 +58,17 @@ async fn main() -> anyhow::Result<()> { | |
|
|
||
| let sts_restart_controller = restart_controller::statefulset::start(&client); | ||
| let pod_restart_controller = restart_controller::pod::start(&client); | ||
| pin_mut!(sts_restart_controller, pod_restart_controller); | ||
| futures::future::select(sts_restart_controller, pod_restart_controller).await; | ||
| let pod_enrichment_controller = pod_enrichment_controller::start(&client); | ||
| pin_mut!( | ||
| sts_restart_controller, | ||
| pod_restart_controller, | ||
| pod_enrichment_controller | ||
| ); | ||
| futures::future::select( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would using select_all instead of chaining selects have the same effect here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but I'm not sure how big the impact of that is here specifically, but… |
||
| futures::future::select(sts_restart_controller, pod_restart_controller), | ||
| pod_enrichment_controller, | ||
| ) | ||
| .await; | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,144 @@ | ||||||
| use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; | ||||||
|
|
||||||
| use futures::StreamExt; | ||||||
| use snafu::{ResultExt, Snafu}; | ||||||
| use stackable_operator::{ | ||||||
| k8s_openapi::api::core::v1::{Node, Pod}, | ||||||
| kube::{ | ||||||
| api::ListParams, | ||||||
| core::ObjectMeta, | ||||||
| runtime::{ | ||||||
| controller::{self, Context}, | ||||||
| reflector::ObjectRef, | ||||||
| Controller, | ||||||
| }, | ||||||
| }, | ||||||
| logging::controller::{report_controller_reconciled, ReconcilerError}, | ||||||
| }; | ||||||
| use strum::{EnumDiscriminants, IntoStaticStr}; | ||||||
|
|
||||||
| const FIELD_MANAGER_SCOPE: &str = "enrichment.stackable.tech/pod"; | ||||||
| const ANNOTATION_NODE_ADDRESS: &str = "enrichment.stackable.tech/node-address"; | ||||||
|
|
||||||
| struct Ctx { | ||||||
| client: stackable_operator::client::Client, | ||||||
| } | ||||||
|
|
||||||
| #[derive(Snafu, Debug, EnumDiscriminants)] | ||||||
| #[strum_discriminants(derive(IntoStaticStr))] | ||||||
| pub enum Error { | ||||||
| #[snafu(display("failed to get {node} for Pod"))] | ||||||
| GetNode { | ||||||
|
nightkr marked this conversation as resolved.
|
||||||
| source: stackable_operator::error::Error, | ||||||
| node: ObjectRef<Node>, | ||||||
| }, | ||||||
| #[snafu(display("failed to update Pod"))] | ||||||
| UpdatePod { | ||||||
| source: stackable_operator::error::Error, | ||||||
| }, | ||||||
| } | ||||||
|
|
||||||
| impl ReconcilerError for Error { | ||||||
| fn category(&self) -> &'static str { | ||||||
| ErrorDiscriminants::from(self).into() | ||||||
| } | ||||||
|
|
||||||
| fn secondary_object(&self) -> Option<ObjectRef<stackable_operator::kube::core::DynamicObject>> { | ||||||
| match self { | ||||||
| Error::GetNode { node, .. } => Some(node.clone().erase()), | ||||||
| Error::UpdatePod { source: _ } => None, | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| pub async fn start(client: &stackable_operator::client::Client) { | ||||||
| let controller = Controller::new( | ||||||
| client.get_all_api::<Pod>(), | ||||||
| ListParams::default().labels("enrichment.stackable.tech/enabled=true"), | ||||||
| ); | ||||||
| let pods = controller.store(); | ||||||
| controller | ||||||
| .watches( | ||||||
| client.get_all_api::<Node>(), | ||||||
| ListParams::default(), | ||||||
| move |node| { | ||||||
| pods.state() | ||||||
| .into_iter() | ||||||
| .filter(move |pod| { | ||||||
| pod.spec.as_ref().and_then(|s| s.node_name.as_deref()) | ||||||
| == node.metadata.name.as_deref() | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
feel free to ignore
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Myeah, |
||||||
| }) | ||||||
| .map(|pod| ObjectRef::from_obj(&*pod)) | ||||||
| }, | ||||||
| ) | ||||||
| .run( | ||||||
| reconcile, | ||||||
| error_policy, | ||||||
| Context::new(Ctx { | ||||||
| client: client.clone(), | ||||||
| }), | ||||||
| ) | ||||||
| .for_each(|res| async move { | ||||||
| report_controller_reconciled(client, "pod.enrichment.commons.stackable.tech", &res) | ||||||
| }) | ||||||
| .await; | ||||||
| } | ||||||
|
|
||||||
| #[derive(Clone, Copy, PartialEq, PartialOrd, Eq, Ord, strum::EnumString)] | ||||||
| pub enum NodeAddressType { | ||||||
| ExternalIP, | ||||||
| InternalIP, | ||||||
| } | ||||||
|
|
||||||
| async fn reconcile(pod: Arc<Pod>, ctx: Context<Ctx>) -> Result<controller::Action, Error> { | ||||||
| let node_name = pod.spec.as_ref().and_then(|s| s.node_name.as_deref()); | ||||||
| let node = if let Some(node_name) = node_name { | ||||||
| ctx.get_ref() | ||||||
| .client | ||||||
| .get::<Node>(node_name, None) | ||||||
| .await | ||||||
| .with_context(|_| GetNodeSnafu { | ||||||
| node: ObjectRef::new(node_name), | ||||||
| })? | ||||||
| } else { | ||||||
| // this condition is normal enough during pod setup that we don't want to cause a bunch of | ||||||
| // error messages... | ||||||
| tracing::debug!("Pod has not yet been scheduled to a Node"); | ||||||
| return Ok(controller::Action::await_change()); | ||||||
| }; | ||||||
|
|
||||||
| let mut annotations = BTreeMap::new(); | ||||||
|
|
||||||
| let node_addr = node | ||||||
| .status | ||||||
| .iter() | ||||||
| .flat_map(|s| &s.addresses) | ||||||
| .flatten() | ||||||
| .filter_map(|addr| Some((NodeAddressType::from_str(&addr.type_).ok()?, &addr.address))) | ||||||
| .min_by_key(|(ty, _)| *ty) | ||||||
| .map(|(_, addr)| addr); | ||||||
|
nightkr marked this conversation as resolved.
|
||||||
| if let Some(node_addr) = node_addr { | ||||||
| annotations.insert(ANNOTATION_NODE_ADDRESS.to_string(), node_addr.clone()); | ||||||
| } | ||||||
|
|
||||||
| let patch = Pod { | ||||||
| metadata: ObjectMeta { | ||||||
| name: pod.metadata.name.clone(), | ||||||
| namespace: pod.metadata.namespace.clone(), | ||||||
| uid: pod.metadata.uid.clone(), | ||||||
| annotations: Some(annotations), | ||||||
| ..ObjectMeta::default() | ||||||
| }, | ||||||
| ..Pod::default() | ||||||
| }; | ||||||
| ctx.get_ref() | ||||||
| .client | ||||||
| .apply_patch(FIELD_MANAGER_SCOPE, &patch, &patch) | ||||||
| .await | ||||||
| .context(UpdatePodSnafu)?; | ||||||
| Ok(controller::Action::await_change()) | ||||||
| } | ||||||
|
|
||||||
| fn error_policy(_error: &Error, _ctx: Context<Ctx>) -> controller::Action { | ||||||
| controller::Action::requeue(Duration::from_secs(5)) | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about having to separate labels
enrichment.stackable.tech/node-ipandenrichment.stackable.tech/node-hostname? This way the product operator or even the product could decides what he prefers or needs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might add a separate
node-ipat some point if something requires that specifically, but part of the point here is to abstract away the fact that not all clus†ers have meaningful (resolvable) hostnames for nodes.