From 91fd3cb2a2093b54660f0cb88d43a2523851fcd0 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 14 Nov 2025 13:39:36 +0100 Subject: [PATCH 01/25] feat!: Add support for mutating webhooks --- Cargo.lock | 2 + Cargo.toml | 5 +- crates/stackable-webhook/Cargo.toml | 2 + crates/stackable-webhook/src/lib.rs | 200 +++++------ crates/stackable-webhook/src/maintainer.rs | 274 --------------- crates/stackable-webhook/src/options.rs | 149 -------- .../src/servers/conversion.rs | 322 ------------------ .../src/servers/conversion_webhook.rs | 185 ++++++++++ crates/stackable-webhook/src/servers/mod.rs | 42 ++- .../src/servers/mutating_webhook.rs | 152 +++++++++ crates/stackable-webhook/src/tls/mod.rs | 16 +- 11 files changed, 487 insertions(+), 862 deletions(-) delete mode 100644 crates/stackable-webhook/src/maintainer.rs delete mode 100644 crates/stackable-webhook/src/options.rs delete mode 100644 crates/stackable-webhook/src/servers/conversion.rs create mode 100644 crates/stackable-webhook/src/servers/conversion_webhook.rs create mode 100644 crates/stackable-webhook/src/servers/mutating_webhook.rs diff --git a/Cargo.lock b/Cargo.lock index 9a4939bd4..0d324aab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2996,6 +2996,7 @@ name = "stackable-webhook" version = "0.7.1" dependencies = [ "arc-swap", + "async-trait", "axum", "clap", "futures-util", @@ -3006,6 +3007,7 @@ dependencies = [ "opentelemetry", "opentelemetry-semantic-conventions", "rand 0.9.2", + "serde", "serde_json", "snafu 0.8.9", "stackable-certs", diff --git a/Cargo.toml b/Cargo.toml index 1f191b8db..3ac42776f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ repository = "https://github.com/stackabletech/operator-rs" [workspace.dependencies] product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.8.0" } -arc-swap = "1.7" +arc-swap = "1.7.0" +async-trait = "0.1.89" axum = { version = "0.8.1", features = ["http2"] } chrono = { version = "0.4.38", default-features = false } clap = { version = "4.5.17", features = ["derive", "cargo", "env"] } @@ -38,7 +39,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = ["schem # We use rustls instead of openssl for easier portability, e.g. so that we can build stackablectl without the need to vendor (build from source) openssl # We use ring instead of aws-lc-rs, as this currently fails to build in "make run-dev" # We pin the kube version, as we use a patch for 2.0.1 below -kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "rustls-tls", "ring"] } +kube = { version = "=2.0.1", default-features = false, features = ["client", "jsonpatch", "runtime", "derive", "admission", "rustls-tls", "ring"] } opentelemetry = "0.31.0" opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } opentelemetry-appender-tracing = "0.31.0" diff --git a/crates/stackable-webhook/Cargo.toml b/crates/stackable-webhook/Cargo.toml index 3dc40e036..0341ecb95 100644 --- a/crates/stackable-webhook/Cargo.toml +++ b/crates/stackable-webhook/Cargo.toml @@ -12,6 +12,7 @@ stackable-shared = { path = "../stackable-shared" } stackable-telemetry = { path = "../stackable-telemetry" } arc-swap.workspace = true +async-trait.workspace = true axum.workspace = true futures-util.workspace = true hyper-util.workspace = true @@ -21,6 +22,7 @@ kube.workspace = true opentelemetry.workspace = true opentelemetry-semantic-conventions.workspace = true rand.workspace = true +serde.workspace = true serde_json.workspace = true snafu.workspace = true tokio-rustls.workspace = true diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index ee33ab542..76ffbd53a 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -1,64 +1,25 @@ -//! Utility types and functions to easily create ready-to-use webhook servers -//! which can handle different tasks, for example CRD conversions. All webhook -//! servers use HTTPS by default. This library is fully compatible with the -//! [`tracing`] crate and emits debug level tracing data. -//! -//! Most users will only use the top-level exported generic [`WebhookServer`] -//! which enables complete control over the [Router] which handles registering -//! routes and their handler functions. -//! -//! ``` -//! use stackable_webhook::{WebhookServer, WebhookOptions}; -//! use axum::Router; -//! -//! # async fn test() { -//! let router = Router::new(); -//! let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) -//! .await -//! .expect("failed to create WebhookServer"); -//! # } -//! ``` -//! -//! For some usages, complete end-to-end [`WebhookServer`] implementations -//! exist. One such implementation is the [`ConversionWebhookServer`][1]. -//! -//! This library additionally also exposes lower-level structs and functions to -//! enable complete control over these details if needed. -//! -//! [1]: crate::servers::ConversionWebhookServer use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; use axum::{Router, routing::get}; -use futures_util::{FutureExt as _, pin_mut, select}; +use futures_util::{FutureExt as _, TryFutureExt, select}; +use k8s_openapi::ByteString; +use servers::{WebhookServerImplementation, WebhookServerImplementationError}; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; use tokio::{ signal::unix::{SignalKind, signal}, sync::mpsc, + try_join, }; use tower::ServiceBuilder; +use x509_cert::der::{EncodePem, pem::LineEnding}; -// Selected re-exports -pub use crate::options::WebhookOptions; use crate::tls::TlsServer; -pub mod maintainer; -pub mod options; pub mod servers; pub mod tls; -/// A generic webhook handler receiving a request and sending back a response. -/// -/// This trait is not intended to be implemented by external crates and this -/// library provides various ready-to-use implementations for it. One such an -/// implementation is part of the [`ConversionWebhookServer`][1]. -/// -/// [1]: crate::servers::ConversionWebhookServer -pub trait WebhookHandler { - fn call(self, req: Req) -> Res; -} - /// A result type alias with the [`WebhookError`] type as the default error type. pub type Result = std::result::Result; @@ -69,24 +30,38 @@ pub enum WebhookError { #[snafu(display("failed to run TLS server"))] RunTlsServer { source: tls::TlsServerError }, + + #[snafu(display("failed to update certificate"))] + UpdateCertificate { + source: WebhookServerImplementationError, + }, + + #[snafu(display("failed to encode CA certificate as PEM format"))] + EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, } -/// A ready-to-use webhook server. -/// -/// This server abstracts away lower-level details like TLS termination -/// and other various configurations, validations or middlewares. The routes -/// and their handlers are completely customizable by bringing your own -/// Axum [`Router`]. -/// -/// For complete end-to-end implementations, see [`ConversionWebhookServer`][1]. -/// -/// [1]: crate::servers::ConversionWebhookServer pub struct WebhookServer { + options: WebhookOptions, + webhooks: Vec>, tls_server: TlsServer, + cert_rx: mpsc::Receiver, +} + +#[derive(Clone, Debug)] +pub struct WebhookOptions { + /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] + /// binds to. + pub socket_addr: SocketAddr, + + /// The namespace the operator/webhook is running in. + pub operator_namespace: String, + + /// The name of the Kubernetes service which points to the operator/webhook. + pub operator_service_name: String, } impl WebhookServer { - /// The default HTTPS port `8443` + /// The default HTTPS port pub const DEFAULT_HTTPS_PORT: u16 = 8443; /// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to, /// which represents binding on all network addresses. @@ -99,52 +74,10 @@ impl WebhookServer { pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); - /// Creates a new ready-to-use webhook server. - /// - /// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles - /// routing based on the provided Axum `router`. Most of the time it is sufficient to use - /// [`WebhookOptions::default()`]. See the documentation for [`WebhookOptions`] for more details - /// on the default values. - /// - /// To start the server, use the [`WebhookServer::run()`] function. This will - /// run the server using the Tokio runtime until it is terminated. - /// - /// ### Basic Example - /// - /// ``` - /// use stackable_webhook::{WebhookServer, WebhookOptions}; - /// use axum::Router; - /// - /// # async fn test() { - /// let router = Router::new(); - /// let (server, cert_rx) = WebhookServer::new(router, WebhookOptions::default()) - /// .await - /// .expect("failed to create WebhookServer"); - /// # } - /// ``` - /// - /// ### Example with Custom Options - /// - /// ``` - /// use stackable_webhook::{WebhookServer, WebhookOptions}; - /// use axum::Router; - /// - /// # async fn test() { - /// let options = WebhookOptions::builder() - /// .bind_address([127, 0, 0, 1], 8080) - /// .add_subject_alterative_dns_name("my-san-entry") - /// .build(); - /// - /// let router = Router::new(); - /// let (server, cert_rx) = WebhookServer::new(router, options) - /// .await - /// .expect("failed to create WebhookServer"); - /// # } - /// ``` pub async fn new( - router: Router, options: WebhookOptions, - ) -> Result<(Self, mpsc::Receiver)> { + webhooks: Vec>, + ) -> Result { tracing::trace!("create new webhook server"); // TODO (@Techassi): Make opt-in configurable from the outside @@ -161,17 +94,26 @@ impl WebhookServer { // Create the root router and merge the provided router into it. tracing::debug!("create core router and merge provided router"); - let router = router + let mut router = Router::new() .layer(service_builder) // The health route is below the AxumTraceLayer so as not to be instrumented .route("/health", get(|| async { "ok" })); + for webhook in webhooks.iter() { + router = webhook.register_routes(router); + } + tracing::debug!("create TLS server"); - let (tls_server, cert_rx) = TlsServer::new(router, options) + let (tls_server, cert_rx) = TlsServer::new(router, options.clone()) .await .context(CreateTlsServerSnafu)?; - Ok((Self { tls_server }, cert_rx)) + Ok(Self { + options, + webhooks, + tls_server, + cert_rx, + }) } /// Runs the Webhook server and sets up signal handlers for shutting down. @@ -200,19 +142,59 @@ impl WebhookServer { }; // select requires Future + Unpin - pin_mut!(future_server); - pin_mut!(future_signal); - - futures_util::future::select(future_server, future_signal).await; + tokio::pin!(future_server); + tokio::pin!(future_signal); + + tokio::select! { + res = &mut future_server => { + // If the server future errors, propagate the error + res?; + } + _ = &mut future_signal => { + tracing::info!("shutdown signal received, stopping server"); + } + } Ok(()) } - /// Runs the webhook server by creating a TCP listener and binding it to - /// the specified socket address. async fn run_server(self) -> Result<()> { tracing::debug!("run webhook server"); - self.tls_server.run().await.context(RunTlsServerSnafu) + let Self { + options, + mut webhooks, + tls_server, + mut cert_rx, + // initial_reconcile_tx, + } = self; + let tls_server = tls_server + .run() + .map_err(|err| WebhookError::RunTlsServer { source: err }); + + let cert_update_loop = async { + loop { + while let Some(cert) = cert_rx.recv().await { + // The caBundle needs to be provided as a base64-encoded PEM envelope. + let ca_bundle = cert + .to_pem(LineEnding::LF) + .context(EncodeCertificateAuthorityAsPemSnafu)?; + let ca_bundle = ByteString(ca_bundle.as_bytes().to_vec()); + + for webhook in webhooks.iter_mut() { + webhook + .handle_certificate_rotation(&cert, &ca_bundle, &options) + .await + .context(UpdateCertificateSnafu)?; + } + } + } + + // We need to hint the return type to the compiler + #[allow(unreachable_code)] + Ok(()) + }; + + try_join!(cert_update_loop, tls_server).map(|_| ()) } } diff --git a/crates/stackable-webhook/src/maintainer.rs b/crates/stackable-webhook/src/maintainer.rs deleted file mode 100644 index b94da27aa..000000000 --- a/crates/stackable-webhook/src/maintainer.rs +++ /dev/null @@ -1,274 +0,0 @@ -use k8s_openapi::{ - ByteString, - apiextensions_apiserver::pkg::apis::apiextensions::v1::{ - CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, - WebhookConversion, - }, -}; -use kube::{ - Api, Client, ResourceExt, - api::{Patch, PatchParams}, -}; -use snafu::{ResultExt, Snafu, ensure}; -use tokio::sync::{mpsc, oneshot}; -use x509_cert::{ - Certificate, - der::{EncodePem, pem::LineEnding}, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("failed to encode CA certificate as PEM format"))] - EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, - - #[snafu(display("failed to send initial CRD reconcile heartbeat"))] - SendInitialReconcileHeartbeat, - - #[snafu(display("failed to patch CRD {crd_name:?}"))] - PatchCrd { - source: kube::Error, - crd_name: String, - }, -} - -/// Maintains various custom resource definitions. -/// -/// When running this, the following operations are done: -/// -/// - Apply the CRDs when starting up -/// - Reconcile the CRDs when the conversion webhook certificate is rotated -pub struct CustomResourceDefinitionMaintainer<'a> { - client: Client, - certificate_rx: mpsc::Receiver, - - definitions: Vec, - options: CustomResourceDefinitionMaintainerOptions<'a>, - - initial_reconcile_tx: oneshot::Sender<()>, -} - -impl<'a> CustomResourceDefinitionMaintainer<'a> { - /// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more - /// custom resource definitions. - /// - /// ## Parameters - /// - /// This function expects four parameters: - /// - /// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches - /// the CRDs when the TLS certificate is rotated. - /// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The - /// certificate data sent through the channel is used to set the caBundle in the conversion - /// section of the CRD. - /// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained - /// by this maintainer. If the iterator is empty, the maintainer returns early without doing - /// any work. As such, a polling mechanism which waits for all futures should be used to - /// prevent premature termination of the operator. - /// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various - /// parts of the maintainer. In the future, this will be converted to a builder, to enable a - /// cleaner API interface. - /// - /// ## Return Values - /// - /// This function returns a 2-tuple (pair) of values: - /// - /// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer. - /// See [`CustomResourceDefinitionMaintainer::run`] for more details. - /// - The [`oneshot::Receiver`] which will be used to send out a message once the initial - /// CRD reconciliation ran. This signal can be used to trigger the deployment of custom - /// resources defined by the maintained CRDs. - /// - /// ## Example - /// - /// ```no_run - /// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion}; - /// # use tokio::sync::mpsc::channel; - /// # use x509_cert::Certificate; - /// # use kube::Client; - /// use stackable_webhook::maintainer::{ - /// CustomResourceDefinitionMaintainerOptions, - /// CustomResourceDefinitionMaintainer, - /// }; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # let (certificate_tx, certificate_rx) = channel(1); - /// let options = CustomResourceDefinitionMaintainerOptions { - /// operator_service_name: "my-service-name", - /// operator_namespace: "my-namespace", - /// field_manager: "my-field-manager", - /// webhook_https_port: 8443, - /// disabled: true, - /// }; - /// - /// let client = Client::try_default().await.unwrap(); - /// - /// let definitions = vec![ - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(), - /// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(), - /// ]; - /// - /// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( - /// client, - /// certificate_rx, - /// definitions, - /// options, - /// ); - /// # } - /// ``` - pub fn new( - client: Client, - certificate_rx: mpsc::Receiver, - definitions: impl IntoIterator, - options: CustomResourceDefinitionMaintainerOptions<'a>, - ) -> (Self, oneshot::Receiver<()>) { - let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); - - let maintainer = Self { - definitions: definitions.into_iter().collect(), - initial_reconcile_tx, - certificate_rx, - options, - client, - }; - - (maintainer, initial_reconcile_rx) - } - - /// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously. - /// - /// This needs to be polled in parallel with other parts of an operator, like controllers or - /// webhook servers. If it is disabled, the returned future immediately resolves to - /// [`std::task::Poll::Ready`] and thus doesn't consume any resources. - pub async fn run(mut self) -> Result<(), Error> { - let CustomResourceDefinitionMaintainerOptions { - operator_service_name, - operator_namespace, - webhook_https_port, - field_manager, - disabled, - } = self.options; - - // If the maintainer is disabled or there are no custom resource definitions, immediately - // return without doing any work. - if disabled || self.definitions.is_empty() { - return Ok(()); - } - - // This channel can only be used exactly once. The sender's send method consumes self, and - // as such, the sender is wrapped in an Option to be able to call take to consume the inner - // value. - let mut initial_reconcile_tx = Some(self.initial_reconcile_tx); - - // This get's polled by the async runtime on a regular basis (or when woken up). Once we - // receive a message containing the newly generated TLS certificate for the conversion - // webhook, we need to update the caBundle in the CRD. - while let Some(certificate) = self.certificate_rx.recv().await { - tracing::info!( - k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::>(), - "reconciling custom resource definitions" - ); - - // The caBundle needs to be provided as a base64-encoded PEM envelope. - let ca_bundle = certificate - .to_pem(LineEnding::LF) - .context(EncodeCertificateAuthorityAsPemSnafu)?; - - let crd_api: Api = Api::all(self.client.clone()); - - for crd in self.definitions.iter_mut() { - let crd_kind = &crd.spec.names.kind; - let crd_name = crd.name_any(); - - tracing::debug!( - k8s.crd.kind = crd_kind, - k8s.crd.name = crd_name, - "reconciling custom resource definition" - ); - - crd.spec.conversion = Some(CustomResourceConversion { - strategy: "Webhook".to_owned(), - webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are - // supported by the webhook. The first version in the list understood by the - // API server is sent to the webhook. The webhook must respond with a - // ConversionReview object in the same version it received. We only support - // the stable v1 ConversionReview to keep the implementation as simple as - // possible. - conversion_review_versions: vec!["v1".to_owned()], - client_config: Some(WebhookClientConfig { - service: Some(ServiceReference { - name: operator_service_name.to_owned(), - namespace: operator_namespace.to_owned(), - path: Some(format!("/convert/{crd_name}")), - port: Some(webhook_https_port.into()), - }), - // Here, ByteString takes care of encoding the provided content as - // base64. - ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())), - url: None, - }), - }), - }); - - // Deploy the updated CRDs using a server-side apply. - let patch = Patch::Apply(&crd); - - // We force apply here, because we want to become the sole manager of the CRD. This - // avoids any conflicts from previous deployments via helm or stackablectl which are - // reported with the following error message: - // - // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: - // - .spec.versions - // - .spec.conversion.strategy: Conflict - // - // The official Kubernetes documentation provides three options on how to solve - // these conflicts. Option 1 is used, which is described as follows: - // - // Overwrite value, become sole manager: If overwriting the value was intentional - // (or if the applier is an automated process like a controller) the applier should - // set the force query parameter to true [...], and make the request again. This - // forces the operation to succeed, changes the value of the field, and removes the - // field from all other managers' entries in managedFields. - // - // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts - let patch_params = PatchParams::apply(field_manager).force(); - - crd_api - .patch(&crd_name, &patch_params, &patch) - .await - .with_context(|_| PatchCrdSnafu { crd_name })?; - } - - // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out - // via the oneshot channel. - if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() { - ensure!( - initial_reconcile_tx.send(()).is_ok(), - SendInitialReconcileHeartbeatSnafu - ); - } - } - - Ok(()) - } -} - -// TODO (@Techassi): Make this a builder instead -/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`]. -pub struct CustomResourceDefinitionMaintainerOptions<'a> { - /// The service name used by the operator/conversion webhook. - pub operator_service_name: &'a str, - - /// The namespace the operator/conversion webhook runs in. - pub operator_namespace: &'a str, - - /// The field manager used when maintaining the CRDs. - pub field_manager: &'a str, - - /// The HTTPS port the conversion webhook listens on. - pub webhook_https_port: u16, - - /// Indicates if the maintainer should be disabled. - pub disabled: bool, -} diff --git a/crates/stackable-webhook/src/options.rs b/crates/stackable-webhook/src/options.rs deleted file mode 100644 index b7eeaffea..000000000 --- a/crates/stackable-webhook/src/options.rs +++ /dev/null @@ -1,149 +0,0 @@ -//! Contains available options to configure the [WebhookServer]. - -use std::{ - net::{IpAddr, SocketAddr}, - path::PathBuf, -}; - -use stackable_certs::PrivateKeyType; - -use crate::WebhookServer; - -/// Specifies available webhook server options. -/// -/// The [`Default`] implementation for this struct contains the following values: -/// -/// - The socket binds to 127.0.0.1 on port 8443 (HTTPS) -/// - An empty list of SANs is provided to the certificate the TLS server uses. -/// -/// ### Example with Custom HTTPS IP Address and Port -/// -/// ``` -/// use stackable_webhook::WebhookOptions; -/// -/// // Set IP address and port at the same time -/// let options = WebhookOptions::builder() -/// .bind_address([0, 0, 0, 0], 12345) -/// .build(); -/// -/// // Set IP address only -/// let options = WebhookOptions::builder() -/// .bind_ip([0, 0, 0, 0]) -/// .build(); -/// -/// // Set port only -/// let options = WebhookOptions::builder() -/// .bind_port(12345) -/// .build(); -/// ``` -#[derive(Debug)] -pub struct WebhookOptions { - /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] - /// binds to. - pub socket_addr: SocketAddr, - - /// The subject alterative DNS names that should be added to the certificates generated for this - /// webhook. - pub subject_alterative_dns_names: Vec, -} - -impl Default for WebhookOptions { - fn default() -> Self { - Self::builder().build() - } -} - -impl WebhookOptions { - /// Returns the default [`WebhookOptionsBuilder`] which allows to selectively - /// customize the options. See the documentation for [`WebhookOptions`] for more - /// information on available functions. - pub fn builder() -> WebhookOptionsBuilder { - WebhookOptionsBuilder::default() - } -} - -/// The [`WebhookOptionsBuilder`] which allows to selectively customize the webhook -/// server [`WebhookOptions`]. -/// -/// Usually, this struct is not constructed manually, but instead by calling -/// [`WebhookOptions::builder()`] or [`WebhookOptionsBuilder::default()`]. -#[derive(Debug, Default)] -pub struct WebhookOptionsBuilder { - socket_addr: Option, - subject_alterative_dns_names: Vec, -} - -impl WebhookOptionsBuilder { - /// Sets the socket address the webhook server uses to bind for HTTPS. - pub fn bind_address(mut self, bind_ip: impl Into, bind_port: u16) -> Self { - self.socket_addr = Some(SocketAddr::new(bind_ip.into(), bind_port)); - self - } - - /// Sets the IP address of the socket address the webhook server uses to - /// bind for HTTPS. - pub fn bind_ip(mut self, bind_ip: impl Into) -> Self { - let addr = self - .socket_addr - .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); - addr.set_ip(bind_ip.into()); - self - } - - /// Sets the port of the socket address the webhook server uses to bind - /// for HTTPS. - pub fn bind_port(mut self, bind_port: u16) -> Self { - let addr = self - .socket_addr - .get_or_insert(WebhookServer::DEFAULT_SOCKET_ADDRESS); - addr.set_port(bind_port); - self - } - - /// Sets the subject alterative DNS names that should be added to the certificates generated for - /// this webhook. - pub fn subject_alterative_dns_names( - mut self, - subject_alterative_dns_name: Vec, - ) -> Self { - self.subject_alterative_dns_names = subject_alterative_dns_name; - self - } - - /// Adds the subject alterative DNS name to the list of names. - pub fn add_subject_alterative_dns_name( - mut self, - subject_alterative_dns_name: impl Into, - ) -> Self { - self.subject_alterative_dns_names - .push(subject_alterative_dns_name.into()); - self - } - - /// Builds the final [`WebhookOptions`] by using default values for any not - /// explicitly set option. - pub fn build(self) -> WebhookOptions { - WebhookOptions { - socket_addr: self - .socket_addr - .unwrap_or(WebhookServer::DEFAULT_SOCKET_ADDRESS), - subject_alterative_dns_names: self.subject_alterative_dns_names, - } - } -} - -#[derive(Debug)] -pub enum TlsOption { - AutoGenerate, - Mount { - private_key_type: PrivateKeyType, - private_key_path: PathBuf, - certificate_path: PathBuf, - }, -} - -impl Default for TlsOption { - fn default() -> Self { - Self::AutoGenerate - } -} diff --git a/crates/stackable-webhook/src/servers/conversion.rs b/crates/stackable-webhook/src/servers/conversion.rs deleted file mode 100644 index f15b4b0a3..000000000 --- a/crates/stackable-webhook/src/servers/conversion.rs +++ /dev/null @@ -1,322 +0,0 @@ -use std::{fmt::Debug, net::SocketAddr}; - -use axum::{Json, Router, routing::post}; -use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; -// Re-export this type because users of the conversion webhook server require -// this type to write the handler function. Instead of importing this type from -// kube directly, consumers can use this type instead. This also eliminates -// keeping the kube dependency version in sync between here and the operator. -pub use kube::core::conversion::ConversionReview; -use kube::{Client, ResourceExt}; -use snafu::{ResultExt, Snafu}; -use tokio::sync::{mpsc, oneshot}; -use tracing::instrument; -use x509_cert::Certificate; - -use crate::{ - WebhookError, WebhookHandler, WebhookServer, - maintainer::{CustomResourceDefinitionMaintainer, CustomResourceDefinitionMaintainerOptions}, - options::WebhookOptions, -}; - -#[derive(Debug, Snafu)] -pub enum ConversionWebhookError { - #[snafu(display("failed to create webhook server"))] - CreateWebhookServer { source: WebhookError }, - - #[snafu(display("failed to run webhook server"))] - RunWebhookServer { source: WebhookError }, - - #[snafu(display("failed to receive certificate from channel"))] - ReceiveCertificateFromChannel, - - #[snafu(display("failed to convert CA certificate into PEM format"))] - ConvertCaToPem { source: x509_cert::der::Error }, - - #[snafu(display("failed to reconcile CRDs"))] - ReconcileCrds { - #[snafu(source(from(ConversionWebhookError, Box::new)))] - source: Box, - }, - - #[snafu(display("failed to update CRD {crd_name:?}"))] - UpdateCrd { - source: kube::Error, - crd_name: String, - }, -} - -impl WebhookHandler for F -where - F: FnOnce(ConversionReview) -> ConversionReview, -{ - fn call(self, req: ConversionReview) -> ConversionReview { - self(req) - } -} - -// TODO: Add a builder, maybe with `bon`. -#[derive(Debug)] -pub struct ConversionWebhookOptions<'a> { - /// The bind address to bind the HTTPS server to. - pub socket_addr: SocketAddr, - - /// The namespace the operator/webhook is running in. - pub namespace: &'a str, - - /// The name of the Kubernetes service which points to the operator/webhook. - pub service_name: &'a str, -} - -/// A ready-to-use CRD conversion webhook server. -/// -/// See [`ConversionWebhookServer::new()`] for usage examples. -pub struct ConversionWebhookServer(WebhookServer); - -impl ConversionWebhookServer { - /// The default socket address the conversion webhook server binds to, see - /// [`WebhookServer::DEFAULT_SOCKET_ADDRESS`]. - pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = WebhookServer::DEFAULT_SOCKET_ADDRESS; - - /// Creates and returns a new [`ConversionWebhookServer`], which expects POST requests being - /// made to the `/convert/{CRD_NAME}` endpoint. - /// - /// The TLS certificate is automatically generated and rotated. - /// - /// ## Parameters - /// - /// This function expects the following parameters: - /// - /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] - /// to a handler function. In most cases, the generated `CustomResource::try_merge` function - /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` - /// signature. - /// - `options`: Provides [`ConversionWebhookOptions`] to customize various parts of the - /// webhook server, eg. the socket address used to listen on. - /// - /// ## Return Values - /// - /// This function returns a [`Result`] which contains a 2-tuple (pair) of values for the [`Ok`] - /// variant: - /// - /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See - /// [`ConversionWebhookServer::run`] for more details. - /// - The [`mpsc::Receiver`] which will be used to send out messages containing the newly - /// generated TLS certificate. This channel is used by the CRD maintainer to trigger a - /// reconcile of the CRDs it maintains. - /// - /// ## Example - /// - /// ```no_run - /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; - /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; - /// use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion}; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # CryptoProvider::install_default(default_provider()).unwrap(); - /// let crds_and_handlers = vec![ - /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("the S3Connection CRD must be merged"), - /// S3Connection::try_convert, - /// ) - /// ]; - /// - /// let options = ConversionWebhookOptions { - /// socket_addr: ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS, - /// namespace: "stackable-operators", - /// service_name: "product-operator", - /// }; - /// - /// let (conversion_webhook_server, _certificate_rx) = - /// ConversionWebhookServer::new(crds_and_handlers, options) - /// .await - /// .unwrap(); - /// - /// conversion_webhook_server.run().await.unwrap(); - /// # } - /// ``` - #[instrument(name = "create_conversion_webhook_server", skip(crds_and_handlers))] - pub async fn new( - crds_and_handlers: impl IntoIterator, - options: ConversionWebhookOptions<'_>, - ) -> Result<(Self, mpsc::Receiver), ConversionWebhookError> - where - H: WebhookHandler + Clone + Send + Sync + 'static, - { - tracing::debug!("create new conversion webhook server"); - - let mut router = Router::new(); - - for (crd, handler) in crds_and_handlers { - let crd_name = crd.name_any(); - let handler_fn = |Json(review): Json| async { - let review = handler.call(review); - Json(review) - }; - - // TODO (@Techassi): Make this part of the trait mentioned above - let route = format!("/convert/{crd_name}"); - router = router.route(&route, post(handler_fn)); - } - - let ConversionWebhookOptions { - socket_addr, - namespace: operator_namespace, - service_name: operator_service_name, - } = &options; - - // This is how Kubernetes calls us, so it decides about the naming. - // AFAIK we can not influence this, so this is the only SAN entry needed. - // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) - // HTTP clients can use the FQDN of the service for testing or user use-cases. - let subject_alterative_dns_name = - format!("{operator_service_name}.{operator_namespace}.svc",); - - let webhook_options = WebhookOptions { - subject_alterative_dns_names: vec![subject_alterative_dns_name], - socket_addr: *socket_addr, - }; - - let (server, certificate_rx) = WebhookServer::new(router, webhook_options) - .await - .context(CreateWebhookServerSnafu)?; - - Ok((Self(server), certificate_rx)) - } - - /// Creates and returns a tuple consisting of a [`ConversionWebhookServer`], a [`CustomResourceDefinitionMaintainer`], - /// and a [`oneshot::Receiver`]. - /// - /// ## Parameters - /// - /// - `crds_and_handlers`: An iterator over a 2-tuple (pair) mapping a [`CustomResourceDefinition`] - /// to a handler function. In most cases, the generated `CustomResource::try_merge` function - /// should be used. It provides the expected `fn(ConversionReview) -> ConversionReview` - /// signature. - /// - `operator_service_name`: The name of the Kubernetes service name which points to the - /// operator/conversion webhook. This is used to construct the service reference in the CRD - /// `spec.conversion` field. - /// - `operator_namespace`: The namespace the operator runs in. This is used to construct the - /// service reference in the CRD `spec.conversion` field. - /// - `disable_maintainer`: A boolean value to indicate if the [`CustomResourceDefinitionMaintainer`] - /// should be disabled. - /// - `client`: A [`kube::Client`] used to maintain the custom resource definitions. - /// - /// See the referenced items for more details on usage. - /// - /// ## Return Values - /// - /// - The [`ConversionWebhookServer`] itself. This is used to run the server. See - /// [`ConversionWebhookServer::run`] for more details. - /// - The [`CustomResourceDefinitionMaintainer`] which is used to run the maintainer. See - /// [`CustomResourceDefinitionMaintainer::run`] for more details. - /// - A [`oneshot::Receiver`] which is triggered after the initial reconciliation of the CRDs - /// succeeded. This signal can be used to deploy any custom resources defined by these CRDs. - /// - /// ## Example - /// - /// ```no_run - /// # use futures_util::TryFutureExt; - /// # use tokio_rustls::rustls::crypto::{CryptoProvider, ring::default_provider}; - /// use stackable_webhook::servers::{ConversionWebhookServer, ConversionWebhookOptions}; - /// use stackable_operator::{kube::Client, crd::s3::{S3Connection, S3ConnectionVersion}}; - /// - /// # #[tokio::main] - /// # async fn main() { - /// # CryptoProvider::install_default(default_provider()).unwrap(); - /// let client = Client::try_default().await.unwrap(); - /// - /// let crds_and_handlers = vec![ - /// ( - /// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1) - /// .expect("the S3Connection CRD must be merged"), - /// S3Connection::try_convert, - /// ) - /// ]; - /// - /// let (conversion_webhook_server, crd_maintainer, _initial_reconcile_rx) = - /// ConversionWebhookServer::with_maintainer( - /// crds_and_handlers, - /// "my-operator", - /// "my-namespace", - /// "my-field-manager", - /// false, - /// client, - /// ) - /// .await - /// .unwrap(); - /// - /// let conversion_webhook_server = conversion_webhook_server - /// .run() - /// .map_err(|err| err.to_string()); - /// - /// let crd_maintainer = crd_maintainer - /// .run() - /// .map_err(|err| err.to_string()); - /// - /// // Run both the conversion webhook server and crd_maintainer concurrently, eg. with - /// // futures::try_join!. - /// futures_util::try_join!(conversion_webhook_server, crd_maintainer).unwrap(); - /// # } - /// ``` - pub async fn with_maintainer<'a, H>( - // TODO (@Techassi): Use a trait type here which can be used to build all part of the - // conversion webhook server and a CRD maintainer. - crds_and_handlers: impl IntoIterator + Clone, - operator_service_name: &'a str, - operator_namespace: &'a str, - field_manager: &'a str, - disable_maintainer: bool, - client: Client, - ) -> Result< - ( - Self, - CustomResourceDefinitionMaintainer<'a>, - oneshot::Receiver<()>, - ), - ConversionWebhookError, - > - where - H: WebhookHandler + Clone + Send + Sync + 'static, - { - let socket_addr = ConversionWebhookServer::DEFAULT_SOCKET_ADDRESS; - - // TODO (@Techassi): These should be moved into a builder - let webhook_options = ConversionWebhookOptions { - service_name: operator_service_name, - namespace: operator_namespace, - socket_addr, - }; - - let (conversion_webhook_server, certificate_rx) = - Self::new(crds_and_handlers.clone(), webhook_options).await?; - - let definitions = crds_and_handlers.into_iter().map(|(crd, _)| crd); - - // TODO (@Techassi): These should be moved into a builder - let maintainer_options = CustomResourceDefinitionMaintainerOptions { - webhook_https_port: socket_addr.port(), - disabled: disable_maintainer, - operator_service_name, - operator_namespace, - field_manager, - }; - - let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new( - client, - certificate_rx, - definitions, - maintainer_options, - ); - - Ok((conversion_webhook_server, maintainer, initial_reconcile_rx)) - } - - /// Runs the [`ConversionWebhookServer`] asynchronously. - pub async fn run(self) -> Result<(), ConversionWebhookError> { - tracing::info!("run conversion webhook server"); - self.0.run().await.context(RunWebhookServerSnafu) - } -} diff --git a/crates/stackable-webhook/src/servers/conversion_webhook.rs b/crates/stackable-webhook/src/servers/conversion_webhook.rs new file mode 100644 index 000000000..f269eed05 --- /dev/null +++ b/crates/stackable-webhook/src/servers/conversion_webhook.rs @@ -0,0 +1,185 @@ +use std::fmt::Debug; + +use async_trait::async_trait; +use axum::{Json, Router, routing::post}; +use k8s_openapi::{ + ByteString, + apiextensions_apiserver::pkg::apis::apiextensions::v1::{ + CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig, + WebhookConversion, + }, +}; +// Re-export this type because users of the conversion webhook server require +// this type to write the handler function. Instead of importing this type from +// kube directly, consumers can use this type instead. This also eliminates +// keeping the kube dependency version in sync between here and the operator. +pub use kube::core::conversion::ConversionReview; +use kube::{ + Api, Client, ResourceExt, + api::{Patch, PatchParams}, +}; +use snafu::{ResultExt, Snafu, ensure}; +use tokio::sync::oneshot; +use x509_cert::Certificate; + +use super::{WebhookServerImplementation, WebhookServerImplementationError}; +use crate::WebhookOptions; + +#[derive(Debug, Snafu)] +pub enum ConversionWebhookError { + #[snafu(display("failed to send initial CRD reconcile heartbeat"))] + SendInitialReconcileHeartbeat, + + #[snafu(display("failed to patch CRD {crd_name:?}"))] + PatchCrd { + source: kube::Error, + crd_name: String, + }, +} + +pub struct ConversionWebhookServer { + crds_and_handlers: Vec<(CustomResourceDefinition, H)>, + disable_crd_maintenance: bool, + client: Client, + /// The field manager used when maintaining the CRDs. + field_manager: String, + // This channel can only be used exactly once. The sender's send method consumes self, and + // as such, the sender is wrapped in an Option to be able to call take to consume the inner + // value. + initial_reconcile_tx: Option>, +} + +impl ConversionWebhookServer { + pub fn new( + crds_and_handlers: impl IntoIterator, + disable_crd_maintenance: bool, + client: Client, + field_manager: String, + ) -> (Self, oneshot::Receiver<()>) { + let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel(); + + ( + Self { + crds_and_handlers: crds_and_handlers.into_iter().collect(), + disable_crd_maintenance, + client, + field_manager, + initial_reconcile_tx: Some(initial_reconcile_tx), + }, + initial_reconcile_rx, + ) + } +} + +#[async_trait] +impl WebhookServerImplementation for ConversionWebhookServer +where + H: FnOnce(ConversionReview) -> ConversionReview + Clone + Send + Sync + 'static, +{ + fn register_routes(&self, mut router: Router) -> Router { + for (crd, handler) in self.crds_and_handlers.clone() { + let crd_name = crd.name_any(); + let handler_fn = |Json(review): Json| async { + let review = handler(review); + Json(review) + }; + + let route = format!("/convert/{crd_name}"); + router = router.route(&route, post(handler_fn)); + } + + router + } + + async fn handle_certificate_rotation( + &mut self, + _new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookOptions, + ) -> Result<(), WebhookServerImplementationError> { + if self.disable_crd_maintenance { + return Ok(()); + } + + tracing::info!( + k8s.crd.names = ?self.crds_and_handlers.iter().map(|(crd, _)| crd.name_any()).collect::>(), + "reconciling custom resource definitions" + ); + + let crd_api: Api = Api::all(self.client.clone()); + + for mut crd in self.crds_and_handlers.iter().map(|(crd, _)| crd).cloned() { + let crd_kind = &crd.spec.names.kind; + let crd_name = crd.name_any(); + + tracing::debug!( + k8s.crd.kind = crd_kind, + k8s.crd.name = crd_name, + "reconciling custom resource definition" + ); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_owned(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are + // supported by the webhook. The first version in the list understood by the + // API server is sent to the webhook. The webhook must respond with a + // ConversionReview object in the same version it received. We only support + // the stable v1 ConversionReview to keep the implementation as simple as + // possible. + conversion_review_versions: vec!["v1".to_owned()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: options.operator_service_name.to_owned(), + namespace: options.operator_namespace.to_owned(), + path: Some(format!("/convert/{crd_name}")), + port: Some(options.socket_addr.port().into()), + }), + // Here, ByteString takes care of encoding the provided content as base64. + ca_bundle: Some(new_ca_bundle.to_owned()), + url: None, + }), + }), + }); + + // Deploy the updated CRDs using a server-side apply. + let patch = Patch::Apply(&crd); + + // We force apply here, because we want to become the sole manager of the CRD. This + // avoids any conflicts from previous deployments via helm or stackablectl which are + // reported with the following error message: + // + // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: + // - .spec.versions + // - .spec.conversion.strategy: Conflict + // + // The official Kubernetes documentation provides three options on how to solve + // these conflicts. Option 1 is used, which is described as follows: + // + // Overwrite value, become sole manager: If overwriting the value was intentional + // (or if the applier is an automated process like a controller) the applier should + // set the force query parameter to true [...], and make the request again. This + // forces the operation to succeed, changes the value of the field, and removes the + // field from all other managers' entries in managedFields. + // + // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts + let patch_params = PatchParams::apply(&self.field_manager).force(); + + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| PatchCrdSnafu { crd_name })?; + } + + // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out + // via the oneshot channel. + if let Some(initial_reconcile_tx) = self.initial_reconcile_tx.take() { + ensure!( + initial_reconcile_tx.send(()).is_ok(), + SendInitialReconcileHeartbeatSnafu + ); + } + + Ok(()) + } +} diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/servers/mod.rs index 6fbadc12d..146503691 100644 --- a/crates/stackable-webhook/src/servers/mod.rs +++ b/crates/stackable-webhook/src/servers/mod.rs @@ -1,5 +1,39 @@ -//! Contains high-level ready-to-use webhook server implementations for specific -//! purposes. -mod conversion; +use async_trait::async_trait; +use axum::Router; +pub use conversion_webhook::{ConversionReview, ConversionWebhookError, ConversionWebhookServer}; +use k8s_openapi::ByteString; +pub use mutating_webhook::{MutatingWebhookError, MutatingWebhookServer}; +use snafu::Snafu; +use x509_cert::Certificate; -pub use conversion::{ConversionWebhookError, ConversionWebhookOptions, ConversionWebhookServer}; +use crate::WebhookOptions; + +mod conversion_webhook; +mod mutating_webhook; + +#[derive(Snafu, Debug)] +pub enum WebhookServerImplementationError { + #[snafu(display("conversion webhook error"), context(false))] + ConversionWebhookError { + source: conversion_webhook::ConversionWebhookError, + }, + + #[snafu(display("mutating webhook error"), context(false))] + MutatingWebhookError { + source: mutating_webhook::MutatingWebhookError, + }, +} + +// We still need to use the async-trait crate, as Rust 1.91.1 does not support dynamic dispatch +// in combination with async functions. +#[async_trait] +pub trait WebhookServerImplementation { + fn register_routes(&self, router: Router) -> Router; + + async fn handle_certificate_rotation( + &mut self, + new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookOptions, + ) -> Result<(), WebhookServerImplementationError>; +} diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs new file mode 100644 index 000000000..dd80ce58e --- /dev/null +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -0,0 +1,152 @@ +use std::{fmt::Debug, marker::PhantomData}; + +use async_trait::async_trait; +use axum::{Json, Router, routing::post}; +use k8s_openapi::{ + ByteString, + api::admissionregistration::v1::{ + MutatingWebhookConfiguration, ServiceReference, WebhookClientConfig, + }, +}; +use kube::{ + Api, Client, Resource, ResourceExt, + api::{Patch, PatchParams}, + core::admission::{AdmissionRequest, AdmissionResponse, AdmissionReview}, +}; +use serde::{Serialize, de::DeserializeOwned}; +use snafu::{ResultExt, Snafu}; +use x509_cert::Certificate; + +use super::{WebhookServerImplementation, WebhookServerImplementationError}; +use crate::WebhookOptions; + +#[derive(Debug, Snafu)] +pub enum MutatingWebhookError { + #[snafu(display("failed to patch MutatingWebhookConfiguration {vwc_name:?}"))] + PatchMutatingWebhookConfiguration { + source: kube::Error, + vwc_name: String, + }, +} + +/// As the webhook is typed with the Resource type `R`, it can only handle a single resource +/// mutation. Use multiple [`MutatingWebhookServer`] if you need to mutate multiple resource kinds. +pub struct MutatingWebhookServer { + mutating_webhook_configuration: MutatingWebhookConfiguration, + handler: H, + resource: PhantomData, + + disable_mutating_webhook_configuration_maintenance: bool, + client: Client, + + /// The field manager used when maintaining the CRDs. + field_manager: String, +} + +impl MutatingWebhookServer { + pub fn new( + mutating_webhook_configuration: MutatingWebhookConfiguration, + handler: H, + disable_mutating_webhook_configuration_maintenance: bool, + client: Client, + field_manager: String, + ) -> Self { + Self { + mutating_webhook_configuration, + handler, + resource: PhantomData, + disable_mutating_webhook_configuration_maintenance, + client, + field_manager, + } + } + + fn http_path(&self) -> String { + format!("/mutate/{}", self.mutating_webhook_configuration.name_any()) + } +} + +#[async_trait] +impl WebhookServerImplementation for MutatingWebhookServer +where + H: FnOnce(AdmissionRequest) -> AdmissionResponse + Clone + Send + Sync + 'static, + R: Resource + Send + Sync + DeserializeOwned + Serialize + 'static, +{ + fn register_routes(&self, router: Router) -> Router { + let handler = self.handler.clone(); + let handler_fn = |Json(review): Json>| async { + let request: AdmissionRequest = match review.try_into() { + Ok(request) => request, + Err(err) => { + return Json( + AdmissionResponse::invalid(format!("failed to convert to request: {err}")) + .into_review(), + ); + } + }; + + let response = handler(request); + let review = response.into_review(); + Json(review) + }; + + let route = self.http_path(); + router.route(&route, post(handler_fn)) + } + + async fn handle_certificate_rotation( + &mut self, + _new_certificate: &Certificate, + new_ca_bundle: &ByteString, + options: &WebhookOptions, + ) -> Result<(), WebhookServerImplementationError> { + if self.disable_mutating_webhook_configuration_maintenance { + return Ok(()); + } + + let mut mutating_webhook_configuration = self.mutating_webhook_configuration.clone(); + let vwc_name = mutating_webhook_configuration.name_any(); + tracing::info!( + k8s.MutatingWebhookConfiguration.name = vwc_name, + "reconciling mutating webhook configurations" + ); + + for webhook in mutating_webhook_configuration.webhooks.iter_mut().flatten() { + // TODO: Think is this is a bit excessive + // assert!( + // webhook.failure_policy.is_some(), + // "Users of the mutating webhook need to make an explicit choice on the failure policy" + // ); + assert_eq!( + webhook.admission_review_versions, + vec!["v1"], + "We decide how we de-serialize the JSON and with that what AdmissionReview version we support (currently only v1)" + ); + + // We know how we can be called (and with what certificate), so we can always set that + webhook.client_config = WebhookClientConfig { + service: Some(ServiceReference { + name: options.operator_service_name.to_owned(), + namespace: options.operator_namespace.to_owned(), + path: Some(self.http_path()), + port: Some(options.socket_addr.port().into()), + }), + // Here, ByteString takes care of encoding the provided content as base64. + ca_bundle: Some(new_ca_bundle.to_owned()), + url: None, + }; + } + + let vwc_api: Api = Api::all(self.client.clone()); + // Other than with the CRDs we don't need to force-apply the MutatingWebhookConfiguration + let patch = Patch::Apply(&mutating_webhook_configuration); + let patch_params = PatchParams::apply(&self.field_manager); + + vwc_api + .patch(&vwc_name, &patch_params, &patch) + .await + .with_context(|_| PatchMutatingWebhookConfigurationSnafu { vwc_name })?; + + Ok(()) + } +} diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index cfd7c2e86..99d2e7e29 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -31,7 +31,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use x509_cert::Certificate; use crate::{ - options::WebhookOptions, + WebhookOptions, tls::cert_resolver::{CertificateResolver, CertificateResolverError}, }; @@ -92,9 +92,17 @@ impl TlsServer { let WebhookOptions { socket_addr, - subject_alterative_dns_names, + operator_namespace, + operator_service_name, } = options; + // This is how Kubernetes calls us, so it decides about the naming. + // AFAIK we can not influence this, so this is the only SAN entry needed. + // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) + // HTTP clients can use the FQDN of the service for testing or user use-cases. + let subject_alterative_dns_names = + vec![format!("{operator_service_name}.{operator_namespace}.svc")]; + let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, certificate_tx) .await .context(CreateCertificateResolverSnafu)?; @@ -120,6 +128,10 @@ impl TlsServer { Ok((tls_server, certificate_rx)) } + pub fn socket_addr(&self) -> &SocketAddr { + &self.socket_addr + } + /// Runs the TLS server by listening for incoming TCP connections on the /// bound socket address. It only accepts TLS connections. Internally each /// TLS stream get handled by a Hyper service, which in turn is an Axum From 886efccb8e3d2a8af602c8bb09171e96c6367145 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Mon, 17 Nov 2025 16:07:19 +0100 Subject: [PATCH 02/25] Add state to the mutating webhook --- .../src/servers/mutating_webhook.rs | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index dd80ce58e..43dbfe5bb 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, marker::PhantomData}; +use std::{fmt::Debug, marker::PhantomData, sync::Arc}; use async_trait::async_trait; use axum::{Json, Router, routing::post}; @@ -31,9 +31,10 @@ pub enum MutatingWebhookError { /// As the webhook is typed with the Resource type `R`, it can only handle a single resource /// mutation. Use multiple [`MutatingWebhookServer`] if you need to mutate multiple resource kinds. -pub struct MutatingWebhookServer { +pub struct MutatingWebhookServer { mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, + handler_state: Arc, resource: PhantomData, disable_mutating_webhook_configuration_maintenance: bool, @@ -43,10 +44,11 @@ pub struct MutatingWebhookServer { field_manager: String, } -impl MutatingWebhookServer { +impl MutatingWebhookServer { pub fn new( mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, + handler_state: Arc, disable_mutating_webhook_configuration_maintenance: bool, client: Client, field_manager: String, @@ -54,6 +56,7 @@ impl MutatingWebhookServer { Self { mutating_webhook_configuration, handler, + handler_state, resource: PhantomData, disable_mutating_webhook_configuration_maintenance, client, @@ -67,14 +70,17 @@ impl MutatingWebhookServer { } #[async_trait] -impl WebhookServerImplementation for MutatingWebhookServer +impl WebhookServerImplementation for MutatingWebhookServer where - H: FnOnce(AdmissionRequest) -> AdmissionResponse + Clone + Send + Sync + 'static, + H: Fn(Arc, AdmissionRequest) -> Fut + Clone + Send + Sync + 'static, + Fut: Future + Send + 'static, R: Resource + Send + Sync + DeserializeOwned + Serialize + 'static, + S: Send + Sync + 'static, { fn register_routes(&self, router: Router) -> Router { + let handler_state = self.handler_state.clone(); let handler = self.handler.clone(); - let handler_fn = |Json(review): Json>| async { + let handler_fn = |Json(review): Json>| async move { let request: AdmissionRequest = match review.try_into() { Ok(request) => request, Err(err) => { @@ -85,7 +91,7 @@ where } }; - let response = handler(request); + let response = handler(handler_state, request).await; let review = response.into_review(); Json(review) }; From f48447c3dc69848b568bb4c16d0a5274704c95fd Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 18 Nov 2025 14:34:10 +0100 Subject: [PATCH 03/25] Improve docs --- .../src/servers/mutating_webhook.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index 43dbfe5bb..cec3fe46b 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -45,6 +45,8 @@ pub struct MutatingWebhookServer { } impl MutatingWebhookServer { + /// All webhooks need to set the admissionReviewVersions to `["v1"]`, as this mutating webhook + /// only supports that version! A failure to do so will result in a panic. pub fn new( mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, @@ -53,6 +55,14 @@ impl MutatingWebhookServer { client: Client, field_manager: String, ) -> Self { + for webhook in mutating_webhook_configuration.webhooks.iter().flatten() { + assert_eq!( + webhook.admission_review_versions, + vec!["v1"], + "We decide how we de-serialize the JSON and with that what AdmissionReview version we support (currently only v1)" + ); + } + Self { mutating_webhook_configuration, handler, @@ -118,17 +128,6 @@ where ); for webhook in mutating_webhook_configuration.webhooks.iter_mut().flatten() { - // TODO: Think is this is a bit excessive - // assert!( - // webhook.failure_policy.is_some(), - // "Users of the mutating webhook need to make an explicit choice on the failure policy" - // ); - assert_eq!( - webhook.admission_review_versions, - vec!["v1"], - "We decide how we de-serialize the JSON and with that what AdmissionReview version we support (currently only v1)" - ); - // We know how we can be called (and with what certificate), so we can always set that webhook.client_config = WebhookClientConfig { service: Some(ServiceReference { From caa81fccfd4ff9842b448b963d6b9beef2bd24f4 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 18 Nov 2025 15:11:20 +0100 Subject: [PATCH 04/25] Add some docs --- .../src/servers/mutating_webhook.rs | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index cec3fe46b..64bb850cb 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -29,8 +29,76 @@ pub enum MutatingWebhookError { }, } +/// Mutating webhook, which let's you intercept object creations/modification and modify the object +/// on the fly. +/// /// As the webhook is typed with the Resource type `R`, it can only handle a single resource /// mutation. Use multiple [`MutatingWebhookServer`] if you need to mutate multiple resource kinds. +/// +/// ### Example usage +/// +/// This is only some high-level basic usage! +/// +/// For concrete usage please have a look at the restart controller mutating webhook in +/// commons-operator. +/// +/// ``` +/// use std::sync::Arc; +/// +/// use k8s_openapi::api::admissionregistration::v1::MutatingWebhook; +/// use k8s_openapi::api::admissionregistration::v1::MutatingWebhookConfiguration; +/// use k8s_openapi::api::apps::v1::StatefulSet; +/// +/// use stackable_operator::builder::meta::ObjectMetaBuilder; +/// use stackable_operator::kube::Client; +/// use stackable_operator::kube::core::admission::{AdmissionRequest, AdmissionResponse}; +/// use stackable_operator::kvp::Label; +/// use stackable_webhook::WebhookServer; +/// use stackable_webhook::servers::MutatingWebhookServer; +/// +/// # async fn docs() { +/// // The Kubernetes client +/// let client = Client::try_default().await.unwrap(); +/// // The context of the controller, e.g. contains a Kubernetes client +/// let ctx = Arc::new(()); +/// // Read in from user input, e.g. CLI arguments +/// let disable_restarter_mutating_webhook = false; +/// +/// let mutating_webhook = Box::new(MutatingWebhookServer::new( +/// get_mutating_webhook_configuration(), +/// my_handler, +/// ctx, +/// disable_restarter_mutating_webhook, +/// client, +/// "my-field-manager".to_owned(), +/// )); +/// +/// let webhook_options = todo!(); +/// let webhook_server = WebhookServer::new(webhook_options, vec![mutating_webhook]).await.unwrap(); +/// webhook_server.run().await.unwrap(); +/// # } +/// +/// fn get_mutating_webhook_configuration() -> MutatingWebhookConfiguration { +/// let webhook_name = "pod-labeler.stackable.tech"; +/// +/// MutatingWebhookConfiguration { +/// webhooks: Some(vec![MutatingWebhook { +/// // This is checked by the stackable_webhook code +/// admission_review_versions: vec!["v1".to_owned()], +/// ..Default::default() +/// }]), +/// ..Default::default() +/// } +/// } +/// +/// // Basic no-op implementation +/// pub async fn my_handler( +/// ctx: Arc<()>, +/// request: AdmissionRequest, +/// ) -> AdmissionResponse { +/// AdmissionResponse::from(&request) +/// } +/// ``` pub struct MutatingWebhookServer { mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, From 587f296acc5c626eef08f9aa93fc573f0c7be862 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 19 Nov 2025 13:54:47 +0100 Subject: [PATCH 05/25] refactor: Rename traits and structs --- crates/stackable-webhook/src/lib.rs | 22 ++++++------ .../src/servers/conversion_webhook.rs | 14 ++++---- crates/stackable-webhook/src/servers/mod.rs | 14 ++++---- .../src/servers/mutating_webhook.rs | 34 +++++++++---------- crates/stackable-webhook/src/tls/mod.rs | 6 ++-- 5 files changed, 44 insertions(+), 46 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 76ffbd53a..14cd3ee0c 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -4,7 +4,7 @@ use ::x509_cert::Certificate; use axum::{Router, routing::get}; use futures_util::{FutureExt as _, TryFutureExt, select}; use k8s_openapi::ByteString; -use servers::{WebhookServerImplementation, WebhookServerImplementationError}; +use servers::{Webhook, WebhookError}; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; use tokio::{ @@ -21,10 +21,10 @@ pub mod servers; pub mod tls; /// A result type alias with the [`WebhookError`] type as the default error type. -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(Debug, Snafu)] -pub enum WebhookError { +pub enum WebhookServerError { #[snafu(display("failed to create TLS server"))] CreateTlsServer { source: tls::TlsServerError }, @@ -32,23 +32,21 @@ pub enum WebhookError { RunTlsServer { source: tls::TlsServerError }, #[snafu(display("failed to update certificate"))] - UpdateCertificate { - source: WebhookServerImplementationError, - }, + UpdateCertificate { source: WebhookError }, #[snafu(display("failed to encode CA certificate as PEM format"))] EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, } pub struct WebhookServer { - options: WebhookOptions, - webhooks: Vec>, + options: WebhookServerOptions, + webhooks: Vec>, tls_server: TlsServer, cert_rx: mpsc::Receiver, } #[derive(Clone, Debug)] -pub struct WebhookOptions { +pub struct WebhookServerOptions { /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] /// binds to. pub socket_addr: SocketAddr, @@ -75,8 +73,8 @@ impl WebhookServer { SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); pub async fn new( - options: WebhookOptions, - webhooks: Vec>, + options: WebhookServerOptions, + webhooks: Vec>, ) -> Result { tracing::trace!("create new webhook server"); @@ -170,7 +168,7 @@ impl WebhookServer { } = self; let tls_server = tls_server .run() - .map_err(|err| WebhookError::RunTlsServer { source: err }); + .map_err(|err| WebhookServerError::RunTlsServer { source: err }); let cert_update_loop = async { loop { diff --git a/crates/stackable-webhook/src/servers/conversion_webhook.rs b/crates/stackable-webhook/src/servers/conversion_webhook.rs index f269eed05..7680df9e5 100644 --- a/crates/stackable-webhook/src/servers/conversion_webhook.rs +++ b/crates/stackable-webhook/src/servers/conversion_webhook.rs @@ -22,8 +22,8 @@ use snafu::{ResultExt, Snafu, ensure}; use tokio::sync::oneshot; use x509_cert::Certificate; -use super::{WebhookServerImplementation, WebhookServerImplementationError}; -use crate::WebhookOptions; +use super::{Webhook, WebhookError}; +use crate::WebhookServerOptions; #[derive(Debug, Snafu)] pub enum ConversionWebhookError { @@ -37,7 +37,7 @@ pub enum ConversionWebhookError { }, } -pub struct ConversionWebhookServer { +pub struct ConversionWebhook { crds_and_handlers: Vec<(CustomResourceDefinition, H)>, disable_crd_maintenance: bool, client: Client, @@ -49,7 +49,7 @@ pub struct ConversionWebhookServer { initial_reconcile_tx: Option>, } -impl ConversionWebhookServer { +impl ConversionWebhook { pub fn new( crds_and_handlers: impl IntoIterator, disable_crd_maintenance: bool, @@ -72,7 +72,7 @@ impl ConversionWebhookServer { } #[async_trait] -impl WebhookServerImplementation for ConversionWebhookServer +impl Webhook for ConversionWebhook where H: FnOnce(ConversionReview) -> ConversionReview + Clone + Send + Sync + 'static, { @@ -95,8 +95,8 @@ where &mut self, _new_certificate: &Certificate, new_ca_bundle: &ByteString, - options: &WebhookOptions, - ) -> Result<(), WebhookServerImplementationError> { + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { if self.disable_crd_maintenance { return Ok(()); } diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/servers/mod.rs index 146503691..228d10afc 100644 --- a/crates/stackable-webhook/src/servers/mod.rs +++ b/crates/stackable-webhook/src/servers/mod.rs @@ -1,18 +1,18 @@ use async_trait::async_trait; use axum::Router; -pub use conversion_webhook::{ConversionReview, ConversionWebhookError, ConversionWebhookServer}; +pub use conversion_webhook::{ConversionReview, ConversionWebhook, ConversionWebhookError}; use k8s_openapi::ByteString; -pub use mutating_webhook::{MutatingWebhookError, MutatingWebhookServer}; +pub use mutating_webhook::{MutatingWebhook, MutatingWebhookError}; use snafu::Snafu; use x509_cert::Certificate; -use crate::WebhookOptions; +use crate::WebhookServerOptions; mod conversion_webhook; mod mutating_webhook; #[derive(Snafu, Debug)] -pub enum WebhookServerImplementationError { +pub enum WebhookError { #[snafu(display("conversion webhook error"), context(false))] ConversionWebhookError { source: conversion_webhook::ConversionWebhookError, @@ -27,13 +27,13 @@ pub enum WebhookServerImplementationError { // We still need to use the async-trait crate, as Rust 1.91.1 does not support dynamic dispatch // in combination with async functions. #[async_trait] -pub trait WebhookServerImplementation { +pub trait Webhook { fn register_routes(&self, router: Router) -> Router; async fn handle_certificate_rotation( &mut self, new_certificate: &Certificate, new_ca_bundle: &ByteString, - options: &WebhookOptions, - ) -> Result<(), WebhookServerImplementationError>; + options: &WebhookServerOptions, + ) -> Result<(), WebhookError>; } diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index 64bb850cb..0987621c9 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -17,15 +17,15 @@ use serde::{Serialize, de::DeserializeOwned}; use snafu::{ResultExt, Snafu}; use x509_cert::Certificate; -use super::{WebhookServerImplementation, WebhookServerImplementationError}; -use crate::WebhookOptions; +use super::{Webhook, WebhookError}; +use crate::WebhookServerOptions; #[derive(Debug, Snafu)] pub enum MutatingWebhookError { - #[snafu(display("failed to patch MutatingWebhookConfiguration {vwc_name:?}"))] + #[snafu(display("failed to patch MutatingWebhookConfiguration {mwc_name:?}"))] PatchMutatingWebhookConfiguration { source: kube::Error, - vwc_name: String, + mwc_name: String, }, } @@ -99,11 +99,11 @@ pub enum MutatingWebhookError { /// AdmissionResponse::from(&request) /// } /// ``` -pub struct MutatingWebhookServer { +pub struct MutatingWebhook { mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, handler_state: Arc, - resource: PhantomData, + _resource: PhantomData, disable_mutating_webhook_configuration_maintenance: bool, client: Client, @@ -112,7 +112,7 @@ pub struct MutatingWebhookServer { field_manager: String, } -impl MutatingWebhookServer { +impl MutatingWebhook { /// All webhooks need to set the admissionReviewVersions to `["v1"]`, as this mutating webhook /// only supports that version! A failure to do so will result in a panic. pub fn new( @@ -135,7 +135,7 @@ impl MutatingWebhookServer { mutating_webhook_configuration, handler, handler_state, - resource: PhantomData, + _resource: PhantomData, disable_mutating_webhook_configuration_maintenance, client, field_manager, @@ -148,7 +148,7 @@ impl MutatingWebhookServer { } #[async_trait] -impl WebhookServerImplementation for MutatingWebhookServer +impl Webhook for MutatingWebhook where H: Fn(Arc, AdmissionRequest) -> Fut + Clone + Send + Sync + 'static, Fut: Future + Send + 'static, @@ -182,16 +182,16 @@ where &mut self, _new_certificate: &Certificate, new_ca_bundle: &ByteString, - options: &WebhookOptions, - ) -> Result<(), WebhookServerImplementationError> { + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { if self.disable_mutating_webhook_configuration_maintenance { return Ok(()); } let mut mutating_webhook_configuration = self.mutating_webhook_configuration.clone(); - let vwc_name = mutating_webhook_configuration.name_any(); + let mwc_name = mutating_webhook_configuration.name_any(); tracing::info!( - k8s.MutatingWebhookConfiguration.name = vwc_name, + k8s.mutatingwebhookconfiguration.name = mwc_name, "reconciling mutating webhook configurations" ); @@ -210,15 +210,15 @@ where }; } - let vwc_api: Api = Api::all(self.client.clone()); + let mwc_api: Api = Api::all(self.client.clone()); // Other than with the CRDs we don't need to force-apply the MutatingWebhookConfiguration let patch = Patch::Apply(&mutating_webhook_configuration); let patch_params = PatchParams::apply(&self.field_manager); - vwc_api - .patch(&vwc_name, &patch_params, &patch) + mwc_api + .patch(&mwc_name, &patch_params, &patch) .await - .with_context(|_| PatchMutatingWebhookConfigurationSnafu { vwc_name })?; + .with_context(|_| PatchMutatingWebhookConfigurationSnafu { mwc_name })?; Ok(()) } diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 99d2e7e29..981b2e96e 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -31,7 +31,7 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use x509_cert::Certificate; use crate::{ - WebhookOptions, + WebhookServerOptions, tls::cert_resolver::{CertificateResolver, CertificateResolverError}, }; @@ -86,11 +86,11 @@ impl TlsServer { #[instrument(name = "create_tls_server", skip(router))] pub async fn new( router: Router, - options: WebhookOptions, + options: WebhookServerOptions, ) -> Result<(Self, mpsc::Receiver)> { let (certificate_tx, certificate_rx) = mpsc::channel(1); - let WebhookOptions { + let WebhookServerOptions { socket_addr, operator_namespace, operator_service_name, From 8b518a0774940990ebfa284536c4d41a789e6250 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 19 Nov 2025 16:19:57 +0100 Subject: [PATCH 06/25] Add some docs --- crates/stackable-webhook/src/lib.rs | 6 ++++++ crates/stackable-webhook/src/servers/mod.rs | 9 +++++++++ crates/stackable-webhook/src/servers/mutating_webhook.rs | 2 +- 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 14cd3ee0c..55065a565 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -72,6 +72,12 @@ impl WebhookServer { pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); + /// Creates a new webhook server with the given config and list of webhooks. + /// + /// Currently the webhooks [`ConversionWebhook`](servers::ConversionWebhook) and + /// [`MutatingWebhook`](servers::MutatingWebhook) are implemented. + /// + /// Please read their documentation for details. pub async fn new( options: WebhookServerOptions, webhooks: Vec>, diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/servers/mod.rs index 228d10afc..c354b831f 100644 --- a/crates/stackable-webhook/src/servers/mod.rs +++ b/crates/stackable-webhook/src/servers/mod.rs @@ -24,12 +24,21 @@ pub enum WebhookError { }, } +/// A webhook (such as a conversion or mutating webhook) needs to implement this trait. +// // We still need to use the async-trait crate, as Rust 1.91.1 does not support dynamic dispatch // in combination with async functions. #[async_trait] pub trait Webhook { + /// The webhook can add arbitrary routes to the passed [`Router`] and needs to return the + /// resulting [`Router`]. fn register_routes(&self, router: Router) -> Router; + /// The HTTPS server periodically rotates it's certificate. + /// + /// Typically, some caller of the webhook (e.g. Kubernetes) needs to know the certificate to be + /// able to establish the TLS connection. + /// Webhooks are informed about new certificates by this function and can react accordingly. async fn handle_certificate_rotation( &mut self, new_certificate: &Certificate, diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index 0987621c9..aabd1601e 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -33,7 +33,7 @@ pub enum MutatingWebhookError { /// on the fly. /// /// As the webhook is typed with the Resource type `R`, it can only handle a single resource -/// mutation. Use multiple [`MutatingWebhookServer`] if you need to mutate multiple resource kinds. +/// mutation. Use multiple [`MutatingWebhook`] if you need to mutate multiple resource kinds. /// /// ### Example usage /// From ef0d1662c3095dc74be73255f38e0d5070c3ce61 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 19 Nov 2025 16:28:08 +0100 Subject: [PATCH 07/25] More docs --- crates/stackable-webhook/src/lib.rs | 22 +++++++++++++++++++ .../src/servers/mutating_webhook.rs | 7 +++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 55065a565..6e329acb1 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -38,6 +38,28 @@ pub enum WebhookServerError { EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, } +/// An HTTPS server that serves a bunch of webhooks. +/// +/// It handles TLS certificate rotation. +/// +/// ### Example usage +/// +/// ``` +/// use stackable_webhook::WebhookServer; +/// use stackable_webhook::WebhookServerOptions; +/// use stackable_webhook::servers::Webhook; +/// +/// # async fn docs() { +/// let mut webhooks: Vec> = vec![]; +/// +/// let webhook_options = WebhookServerOptions { +/// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS, +/// operator_namespace: "my-namespace".to_owned(), +/// operator_service_name: "my-operator".to_owned(), +/// }; +/// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap(); +/// # } +/// ``` pub struct WebhookServer { options: WebhookServerOptions, webhooks: Vec>, diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index aabd1601e..ff536cb77 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -45,7 +45,6 @@ pub enum MutatingWebhookError { /// ``` /// use std::sync::Arc; /// -/// use k8s_openapi::api::admissionregistration::v1::MutatingWebhook; /// use k8s_openapi::api::admissionregistration::v1::MutatingWebhookConfiguration; /// use k8s_openapi::api::apps::v1::StatefulSet; /// @@ -54,7 +53,7 @@ pub enum MutatingWebhookError { /// use stackable_operator::kube::core::admission::{AdmissionRequest, AdmissionResponse}; /// use stackable_operator::kvp::Label; /// use stackable_webhook::WebhookServer; -/// use stackable_webhook::servers::MutatingWebhookServer; +/// use stackable_webhook::servers::MutatingWebhook; /// /// # async fn docs() { /// // The Kubernetes client @@ -64,7 +63,7 @@ pub enum MutatingWebhookError { /// // Read in from user input, e.g. CLI arguments /// let disable_restarter_mutating_webhook = false; /// -/// let mutating_webhook = Box::new(MutatingWebhookServer::new( +/// let mutating_webhook = Box::new(MutatingWebhook::new( /// get_mutating_webhook_configuration(), /// my_handler, /// ctx, @@ -82,7 +81,7 @@ pub enum MutatingWebhookError { /// let webhook_name = "pod-labeler.stackable.tech"; /// /// MutatingWebhookConfiguration { -/// webhooks: Some(vec![MutatingWebhook { +/// webhooks: Some(vec![k8s_openapi::api::admissionregistration::v1::MutatingWebhook { /// // This is checked by the stackable_webhook code /// admission_review_versions: vec!["v1".to_owned()], /// ..Default::default() From 16045e482c5200806becd30064a0e05085067b9a Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 09:08:43 +0100 Subject: [PATCH 08/25] Apply suggestions from code review Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- crates/stackable-webhook/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 6e329acb1..027ee9493 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -38,9 +38,9 @@ pub enum WebhookServerError { EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error }, } -/// An HTTPS server that serves a bunch of webhooks. +/// An HTTPS server that serves one or more webhooks. /// -/// It handles TLS certificate rotation. +/// It also handles TLS certificate rotation. /// /// ### Example usage /// From 25c1e8004aab0e18cdefa0fedf30290f17cc6b48 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 09:30:45 +0100 Subject: [PATCH 09/25] Update crates/stackable-webhook/src/lib.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- crates/stackable-webhook/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 027ee9493..48cf5107f 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -69,7 +69,7 @@ pub struct WebhookServer { #[derive(Clone, Debug)] pub struct WebhookServerOptions { - /// The default HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] + /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] /// binds to. pub socket_addr: SocketAddr, From 516f94df8c465a8c27e65fd7fb4912d6e0fe432f Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 09:31:07 +0100 Subject: [PATCH 10/25] formatiing --- crates/stackable-webhook/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 48cf5107f..13b641c30 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -69,8 +69,7 @@ pub struct WebhookServer { #[derive(Clone, Debug)] pub struct WebhookServerOptions { - /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] - /// binds to. + /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to. pub socket_addr: SocketAddr, /// The namespace the operator/webhook is running in. From ade4a1b0e0761e6c69aa750687bf46ada27ddc6e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 11:07:11 +0100 Subject: [PATCH 11/25] Rename operator to webhook --- crates/stackable-webhook/src/lib.rs | 12 ++++++------ .../src/servers/conversion_webhook.rs | 4 ++-- .../src/servers/mutating_webhook.rs | 4 ++-- crates/stackable-webhook/src/tls/mod.rs | 6 +++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 13b641c30..7e1c8512b 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -54,8 +54,8 @@ pub enum WebhookServerError { /// /// let webhook_options = WebhookServerOptions { /// socket_addr: WebhookServer::DEFAULT_SOCKET_ADDRESS, -/// operator_namespace: "my-namespace".to_owned(), -/// operator_service_name: "my-operator".to_owned(), +/// webhook_namespace: "my-namespace".to_owned(), +/// webhook_service_name: "my-operator".to_owned(), /// }; /// let webhook_server = WebhookServer::new(webhook_options, webhooks).await.unwrap(); /// # } @@ -72,11 +72,11 @@ pub struct WebhookServerOptions { /// The HTTPS socket address the [`TcpListener`][tokio::net::TcpListener] binds to. pub socket_addr: SocketAddr, - /// The namespace the operator/webhook is running in. - pub operator_namespace: String, + /// The namespace the webhook is running in. + pub webhook_namespace: String, - /// The name of the Kubernetes service which points to the operator/webhook. - pub operator_service_name: String, + /// The name of the Kubernetes service which points to the webhook. + pub webhook_service_name: String, } impl WebhookServer { diff --git a/crates/stackable-webhook/src/servers/conversion_webhook.rs b/crates/stackable-webhook/src/servers/conversion_webhook.rs index 7680df9e5..8368b0398 100644 --- a/crates/stackable-webhook/src/servers/conversion_webhook.rs +++ b/crates/stackable-webhook/src/servers/conversion_webhook.rs @@ -130,8 +130,8 @@ where conversion_review_versions: vec!["v1".to_owned()], client_config: Some(WebhookClientConfig { service: Some(ServiceReference { - name: options.operator_service_name.to_owned(), - namespace: options.operator_namespace.to_owned(), + name: options.webhook_service_name.to_owned(), + namespace: options.webhook_namespace.to_owned(), path: Some(format!("/convert/{crd_name}")), port: Some(options.socket_addr.port().into()), }), diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index ff536cb77..d0f7a84c2 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -198,8 +198,8 @@ where // We know how we can be called (and with what certificate), so we can always set that webhook.client_config = WebhookClientConfig { service: Some(ServiceReference { - name: options.operator_service_name.to_owned(), - namespace: options.operator_namespace.to_owned(), + name: options.webhook_service_name.to_owned(), + namespace: options.webhook_namespace.to_owned(), path: Some(self.http_path()), port: Some(options.socket_addr.port().into()), }), diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 981b2e96e..943a72a4e 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -92,8 +92,8 @@ impl TlsServer { let WebhookServerOptions { socket_addr, - operator_namespace, - operator_service_name, + webhook_namespace, + webhook_service_name, } = options; // This is how Kubernetes calls us, so it decides about the naming. @@ -101,7 +101,7 @@ impl TlsServer { // TODO (@Techassi): The cluster domain should be included here, so that (non Kubernetes) // HTTP clients can use the FQDN of the service for testing or user use-cases. let subject_alterative_dns_names = - vec![format!("{operator_service_name}.{operator_namespace}.svc")]; + vec![format!("{webhook_service_name}.{webhook_namespace}.svc")]; let cert_resolver = CertificateResolver::new(subject_alterative_dns_names, certificate_tx) .await From 71203107d9002834d97faaf1a7f2645cac14b043 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 11:53:43 +0100 Subject: [PATCH 12/25] Tkae the WebhookServerOptions by reference --- crates/stackable-webhook/src/lib.rs | 2 +- crates/stackable-webhook/src/tls/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 7e1c8512b..976036882 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -129,7 +129,7 @@ impl WebhookServer { } tracing::debug!("create TLS server"); - let (tls_server, cert_rx) = TlsServer::new(router, options.clone()) + let (tls_server, cert_rx) = TlsServer::new(router, &options) .await .context(CreateTlsServerSnafu)?; diff --git a/crates/stackable-webhook/src/tls/mod.rs b/crates/stackable-webhook/src/tls/mod.rs index 943a72a4e..fa493159d 100644 --- a/crates/stackable-webhook/src/tls/mod.rs +++ b/crates/stackable-webhook/src/tls/mod.rs @@ -86,7 +86,7 @@ impl TlsServer { #[instrument(name = "create_tls_server", skip(router))] pub async fn new( router: Router, - options: WebhookServerOptions, + options: &WebhookServerOptions, ) -> Result<(Self, mpsc::Receiver)> { let (certificate_tx, certificate_rx) = mpsc::channel(1); @@ -121,7 +121,7 @@ impl TlsServer { let tls_server = Self { config, cert_resolver, - socket_addr, + socket_addr: *socket_addr, router, }; From a070857c5ceff8a7231f9d94718fb3febef5cb87 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 11:55:13 +0100 Subject: [PATCH 13/25] Remove leftover code --- crates/stackable-webhook/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 976036882..661a2993d 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -191,7 +191,6 @@ impl WebhookServer { mut webhooks, tls_server, mut cert_rx, - // initial_reconcile_tx, } = self; let tls_server = tls_server .run() From 85b73d3a38e9774a0a4cb5d34aaa18a33e048abf Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 11:56:09 +0100 Subject: [PATCH 14/25] Update crates/stackable-webhook/src/servers/mutating_webhook.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- crates/stackable-webhook/src/servers/mutating_webhook.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index d0f7a84c2..cb100574e 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -142,7 +142,8 @@ impl MutatingWebhook { } fn http_path(&self) -> String { - format!("/mutate/{}", self.mutating_webhook_configuration.name_any()) + let mutating_webhook_configuration_name = self.mutating_webhook_configuration.name_any(); + format!("/mutate/{mutating_webhook_configuration_name}") } } From 6fc8a979629f18de6531cefb37a1b1b2823523a4 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 12:22:33 +0100 Subject: [PATCH 15/25] Move tracing layer to after routes --- crates/stackable-webhook/src/lib.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 661a2993d..04a1cfff4 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -114,20 +114,23 @@ impl WebhookServer { // by the Axum project. // // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware - // TODO (@NickLarsenNZ): rename this server_builder and keep it specific to tracing, since it's placement in the chain is important - let service_builder = ServiceBuilder::new().layer(trace_layer); + // TODO (@NickLarsenNZ): keep this server_builder specific to tracing, since it's placement in the chain is important + let trace_service_builder = ServiceBuilder::new().layer(trace_layer); // Create the root router and merge the provided router into it. tracing::debug!("create core router and merge provided router"); - let mut router = Router::new() - .layer(service_builder) - // The health route is below the AxumTraceLayer so as not to be instrumented - .route("/health", get(|| async { "ok" })); - - for webhook in webhooks.iter() { + let mut router = Router::new(); + for webhook in &webhooks { router = webhook.register_routes(router); } + let router = router + // Enrich spans for routes added above. + // Routes defined below it will not be instrumented to reduce noise. + .layer(trace_service_builder) + // The health route is below the AxumTraceLayer so as not to be instrumented + .route("/health", get(|| async { "ok" })); + tracing::debug!("create TLS server"); let (tls_server, cert_rx) = TlsServer::new(router, &options) .await From 2721d72a84def1f9639b7031d89f7d863b932f67 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 12:41:10 +0100 Subject: [PATCH 16/25] Improve tracing --- .../src/servers/conversion_webhook.rs | 150 ++++++++++-------- .../src/servers/mutating_webhook.rs | 2 + 2 files changed, 85 insertions(+), 67 deletions(-) diff --git a/crates/stackable-webhook/src/servers/conversion_webhook.rs b/crates/stackable-webhook/src/servers/conversion_webhook.rs index 8368b0398..7712bb859 100644 --- a/crates/stackable-webhook/src/servers/conversion_webhook.rs +++ b/crates/stackable-webhook/src/servers/conversion_webhook.rs @@ -20,6 +20,7 @@ use kube::{ }; use snafu::{ResultExt, Snafu, ensure}; use tokio::sync::oneshot; +use tracing::instrument; use x509_cert::Certificate; use super::{Webhook, WebhookError}; @@ -69,6 +70,84 @@ impl ConversionWebhook { initial_reconcile_rx, ) } + + #[instrument( + skip(self, crd, crd_api), + fields( + name = crd.name_any(), + kind = &crd.spec.names.kind + ) + )] + async fn reconcile_crd( + &self, + mut crd: CustomResourceDefinition, + crd_api: &Api, + new_ca_bundle: &ByteString, + options: &WebhookServerOptions, + ) -> Result<(), WebhookError> { + let crd_kind = &crd.spec.names.kind; + let crd_name = crd.name_any(); + + tracing::info!( + k8s.crd.kind = crd_kind, + k8s.crd.name = crd_name, + "reconciling custom resource definition" + ); + + crd.spec.conversion = Some(CustomResourceConversion { + strategy: "Webhook".to_owned(), + webhook: Some(WebhookConversion { + // conversionReviewVersions indicates what ConversionReview versions are + // supported by the webhook. The first version in the list understood by the + // API server is sent to the webhook. The webhook must respond with a + // ConversionReview object in the same version it received. We only support + // the stable v1 ConversionReview to keep the implementation as simple as + // possible. + conversion_review_versions: vec!["v1".to_owned()], + client_config: Some(WebhookClientConfig { + service: Some(ServiceReference { + name: options.webhook_service_name.to_owned(), + namespace: options.webhook_namespace.to_owned(), + path: Some(format!("/convert/{crd_name}")), + port: Some(options.socket_addr.port().into()), + }), + // Here, ByteString takes care of encoding the provided content as base64. + ca_bundle: Some(new_ca_bundle.to_owned()), + url: None, + }), + }), + }); + + // Deploy the updated CRDs using a server-side apply. + let patch = Patch::Apply(&crd); + + // We force apply here, because we want to become the sole manager of the CRD. This + // avoids any conflicts from previous deployments via helm or stackablectl which are + // reported with the following error message: + // + // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: + // - .spec.versions + // - .spec.conversion.strategy: Conflict + // + // The official Kubernetes documentation provides three options on how to solve + // these conflicts. Option 1 is used, which is described as follows: + // + // Overwrite value, become sole manager: If overwriting the value was intentional + // (or if the applier is an automated process like a controller) the applier should + // set the force query parameter to true [...], and make the request again. This + // forces the operation to succeed, changes the value of the field, and removes the + // field from all other managers' entries in managedFields. + // + // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts + let patch_params = PatchParams::apply(&self.field_manager).force(); + + crd_api + .patch(&crd_name, &patch_params, &patch) + .await + .with_context(|_| PatchCrdSnafu { crd_name })?; + + Ok(()) + } } #[async_trait] @@ -91,6 +170,7 @@ where router } + #[instrument(skip(self))] async fn handle_certificate_rotation( &mut self, _new_certificate: &Certificate, @@ -101,74 +181,10 @@ where return Ok(()); } - tracing::info!( - k8s.crd.names = ?self.crds_and_handlers.iter().map(|(crd, _)| crd.name_any()).collect::>(), - "reconciling custom resource definitions" - ); - let crd_api: Api = Api::all(self.client.clone()); - - for mut crd in self.crds_and_handlers.iter().map(|(crd, _)| crd).cloned() { - let crd_kind = &crd.spec.names.kind; - let crd_name = crd.name_any(); - - tracing::debug!( - k8s.crd.kind = crd_kind, - k8s.crd.name = crd_name, - "reconciling custom resource definition" - ); - - crd.spec.conversion = Some(CustomResourceConversion { - strategy: "Webhook".to_owned(), - webhook: Some(WebhookConversion { - // conversionReviewVersions indicates what ConversionReview versions are - // supported by the webhook. The first version in the list understood by the - // API server is sent to the webhook. The webhook must respond with a - // ConversionReview object in the same version it received. We only support - // the stable v1 ConversionReview to keep the implementation as simple as - // possible. - conversion_review_versions: vec!["v1".to_owned()], - client_config: Some(WebhookClientConfig { - service: Some(ServiceReference { - name: options.webhook_service_name.to_owned(), - namespace: options.webhook_namespace.to_owned(), - path: Some(format!("/convert/{crd_name}")), - port: Some(options.socket_addr.port().into()), - }), - // Here, ByteString takes care of encoding the provided content as base64. - ca_bundle: Some(new_ca_bundle.to_owned()), - url: None, - }), - }), - }); - - // Deploy the updated CRDs using a server-side apply. - let patch = Patch::Apply(&crd); - - // We force apply here, because we want to become the sole manager of the CRD. This - // avoids any conflicts from previous deployments via helm or stackablectl which are - // reported with the following error message: - // - // Apply failed with 2 conflicts: conflicts with "stackablectl" using apiextensions.k8s.io/v1: - // - .spec.versions - // - .spec.conversion.strategy: Conflict - // - // The official Kubernetes documentation provides three options on how to solve - // these conflicts. Option 1 is used, which is described as follows: - // - // Overwrite value, become sole manager: If overwriting the value was intentional - // (or if the applier is an automated process like a controller) the applier should - // set the force query parameter to true [...], and make the request again. This - // forces the operation to succeed, changes the value of the field, and removes the - // field from all other managers' entries in managedFields. - // - // See https://kubernetes.io/docs/reference/using-api/server-side-apply/#conflicts - let patch_params = PatchParams::apply(&self.field_manager).force(); - - crd_api - .patch(&crd_name, &patch_params, &patch) - .await - .with_context(|_| PatchCrdSnafu { crd_name })?; + for (crd, _) in &self.crds_and_handlers { + self.reconcile_crd(crd.clone(), &crd_api, new_ca_bundle, options) + .await?; } // After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/servers/mutating_webhook.rs index cb100574e..22821b07a 100644 --- a/crates/stackable-webhook/src/servers/mutating_webhook.rs +++ b/crates/stackable-webhook/src/servers/mutating_webhook.rs @@ -15,6 +15,7 @@ use kube::{ }; use serde::{Serialize, de::DeserializeOwned}; use snafu::{ResultExt, Snafu}; +use tracing::instrument; use x509_cert::Certificate; use super::{Webhook, WebhookError}; @@ -178,6 +179,7 @@ where router.route(&route, post(handler_fn)) } + #[instrument(skip(self))] async fn handle_certificate_rotation( &mut self, _new_certificate: &Certificate, From 3ff8b2a1536bda0baa0b6f1605c7a2cec973599f Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 12:43:24 +0100 Subject: [PATCH 17/25] Rename servers mdodule to webhooks --- crates/stackable-webhook/src/lib.rs | 4 ++-- .../src/{servers => webhooks}/conversion_webhook.rs | 0 crates/stackable-webhook/src/{servers => webhooks}/mod.rs | 0 .../src/{servers => webhooks}/mutating_webhook.rs | 0 4 files changed, 2 insertions(+), 2 deletions(-) rename crates/stackable-webhook/src/{servers => webhooks}/conversion_webhook.rs (100%) rename crates/stackable-webhook/src/{servers => webhooks}/mod.rs (100%) rename crates/stackable-webhook/src/{servers => webhooks}/mutating_webhook.rs (100%) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 04a1cfff4..316d88fff 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -4,7 +4,6 @@ use ::x509_cert::Certificate; use axum::{Router, routing::get}; use futures_util::{FutureExt as _, TryFutureExt, select}; use k8s_openapi::ByteString; -use servers::{Webhook, WebhookError}; use snafu::{ResultExt, Snafu}; use stackable_telemetry::AxumTraceLayer; use tokio::{ @@ -13,12 +12,13 @@ use tokio::{ try_join, }; use tower::ServiceBuilder; +use webhooks::{Webhook, WebhookError}; use x509_cert::der::{EncodePem, pem::LineEnding}; use crate::tls::TlsServer; -pub mod servers; pub mod tls; +pub mod webhooks; /// A result type alias with the [`WebhookError`] type as the default error type. pub type Result = std::result::Result; diff --git a/crates/stackable-webhook/src/servers/conversion_webhook.rs b/crates/stackable-webhook/src/webhooks/conversion_webhook.rs similarity index 100% rename from crates/stackable-webhook/src/servers/conversion_webhook.rs rename to crates/stackable-webhook/src/webhooks/conversion_webhook.rs diff --git a/crates/stackable-webhook/src/servers/mod.rs b/crates/stackable-webhook/src/webhooks/mod.rs similarity index 100% rename from crates/stackable-webhook/src/servers/mod.rs rename to crates/stackable-webhook/src/webhooks/mod.rs diff --git a/crates/stackable-webhook/src/servers/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs similarity index 100% rename from crates/stackable-webhook/src/servers/mutating_webhook.rs rename to crates/stackable-webhook/src/webhooks/mutating_webhook.rs From 92f14b30fd89fb5972e3b5b2020ca8d8cd28f669 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 12:47:17 +0100 Subject: [PATCH 18/25] Add some docs --- crates/stackable-webhook/src/webhooks/mutating_webhook.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs index 22821b07a..b9ad850b7 100644 --- a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -115,6 +115,10 @@ pub struct MutatingWebhook { impl MutatingWebhook { /// All webhooks need to set the admissionReviewVersions to `["v1"]`, as this mutating webhook /// only supports that version! A failure to do so will result in a panic. + /// + /// Your [`MutatingWebhookConfiguration`] can contain 0..n webhooks, but it is recommended to + /// only have a single entry in there, as the clientConfig of all entries will be set to the + /// same service, port and HTTP path. pub fn new( mutating_webhook_configuration: MutatingWebhookConfiguration, handler: H, From 6ca52f265b8c0837e9d2d2c05e89cb6658b50e9f Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 12:53:02 +0100 Subject: [PATCH 19/25] Add some docs --- crates/stackable-webhook/src/lib.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 316d88fff..e01374ac7 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -1,3 +1,16 @@ +//! Utility types and functions to easily create ready-to-use webhook servers which can handle +//! different tasks. All webhook servers use HTTPS by default. +//! +//! Currently the following webhooks are supported: +//! +//! * CRD conversion webhooks: [`ConversionWebhook`](`webhooks::ConversionWebhook`) +//! * Mutating webhooks: [`MutatingWebhook`](`webhooks::MutatingWebhook`) +//! * In the future validating webhooks wil be added +//! +//! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data. +//! +//! For usage please look at the [`WebhookServer`] docs as well as the specific [`Webhook`] you are +//! using. use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use ::x509_cert::Certificate; @@ -93,10 +106,7 @@ impl WebhookServer { pub const DEFAULT_SOCKET_ADDRESS: SocketAddr = SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT); - /// Creates a new webhook server with the given config and list of webhooks. - /// - /// Currently the webhooks [`ConversionWebhook`](servers::ConversionWebhook) and - /// [`MutatingWebhook`](servers::MutatingWebhook) are implemented. + /// Creates a new webhook server with the given config and list of [`Webhook`]s. /// /// Please read their documentation for details. pub async fn new( From 2974ba0fe15d928ebc46e26fd4a73d1e67340518 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 13:00:09 +0100 Subject: [PATCH 20/25] changelog --- crates/stackable-webhook/CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index a86b71342..dcfd3a74a 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Added + +- Add support for mutating webhooks ([#1119]). + +### Changed + +- BREAKING: Refactor the entire [`WebhookServer`] mechanism, so multiple webhooks can run in parallel. + Put individual webhooks (currently `ConversionWebhook` and `MutatingWebhook`) behind the `Webhook` trait ([#1119]). + +[#1119]: https://github.com/stackabletech/operator-rs/pull/1119 + ## [0.7.1] - 2025-10-31 ### Fixed From af9d4464379a9e968bd488b3aa92b6b81e83c4ac Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 13:00:35 +0100 Subject: [PATCH 21/25] changelog --- crates/stackable-webhook/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/CHANGELOG.md b/crates/stackable-webhook/CHANGELOG.md index dcfd3a74a..a71ba1a48 100644 --- a/crates/stackable-webhook/CHANGELOG.md +++ b/crates/stackable-webhook/CHANGELOG.md @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file. ### Changed -- BREAKING: Refactor the entire [`WebhookServer`] mechanism, so multiple webhooks can run in parallel. +- BREAKING: Refactor the entire `WebhookServer` mechanism, so multiple webhooks can run in parallel. Put individual webhooks (currently `ConversionWebhook` and `MutatingWebhook`) behind the `Webhook` trait ([#1119]). [#1119]: https://github.com/stackabletech/operator-rs/pull/1119 From da6b1dc259600b68395d10804699ec8278c7539e Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 13:30:36 +0100 Subject: [PATCH 22/25] Update crates/stackable-webhook/src/lib.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- crates/stackable-webhook/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index e01374ac7..71f6ed348 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -124,7 +124,6 @@ impl WebhookServer { // by the Axum project. // // See https://docs.rs/axum/latest/axum/middleware/index.html#applying-multiple-middleware - // TODO (@NickLarsenNZ): keep this server_builder specific to tracing, since it's placement in the chain is important let trace_service_builder = ServiceBuilder::new().layer(trace_layer); // Create the root router and merge the provided router into it. From 00629de2a3cf3c602f42220510ec705718d5582d Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 13:39:23 +0100 Subject: [PATCH 23/25] Update crates/stackable-webhook/src/lib.rs Co-authored-by: Nick <10092581+NickLarsenNZ@users.noreply.github.com> --- crates/stackable-webhook/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 71f6ed348..e664edd91 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -3,8 +3,8 @@ //! //! Currently the following webhooks are supported: //! -//! * CRD conversion webhooks: [`ConversionWebhook`](`webhooks::ConversionWebhook`) -//! * Mutating webhooks: [`MutatingWebhook`](`webhooks::MutatingWebhook`) +//! * [webhooks::ConversionWebhook] +//! * [webhooks::MutatingWebhook] //! * In the future validating webhooks wil be added //! //! This library is fully compatible with the [`tracing`] crate and emits debug level tracing data. From 9c200677e5f01801dee977070e7a5529d2f071e8 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 13:50:55 +0100 Subject: [PATCH 24/25] fix doc tests --- crates/stackable-webhook/src/lib.rs | 2 +- crates/stackable-webhook/src/webhooks/mutating_webhook.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index e664edd91..30685e50d 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -60,7 +60,7 @@ pub enum WebhookServerError { /// ``` /// use stackable_webhook::WebhookServer; /// use stackable_webhook::WebhookServerOptions; -/// use stackable_webhook::servers::Webhook; +/// use stackable_webhook::webhooks::Webhook; /// /// # async fn docs() { /// let mut webhooks: Vec> = vec![]; diff --git a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs index b9ad850b7..82a883ff1 100644 --- a/crates/stackable-webhook/src/webhooks/mutating_webhook.rs +++ b/crates/stackable-webhook/src/webhooks/mutating_webhook.rs @@ -54,7 +54,7 @@ pub enum MutatingWebhookError { /// use stackable_operator::kube::core::admission::{AdmissionRequest, AdmissionResponse}; /// use stackable_operator::kvp::Label; /// use stackable_webhook::WebhookServer; -/// use stackable_webhook::servers::MutatingWebhook; +/// use stackable_webhook::webhooks::MutatingWebhook; /// /// # async fn docs() { /// // The Kubernetes client From d79e432644a359d01435d59eebd81f1b0db6ba60 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Thu, 20 Nov 2025 15:31:55 +0100 Subject: [PATCH 25/25] Improve shutdown message --- crates/stackable-webhook/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stackable-webhook/src/lib.rs b/crates/stackable-webhook/src/lib.rs index 30685e50d..7e0ab998f 100644 --- a/crates/stackable-webhook/src/lib.rs +++ b/crates/stackable-webhook/src/lib.rs @@ -188,7 +188,7 @@ impl WebhookServer { res?; } _ = &mut future_signal => { - tracing::info!("shutdown signal received, stopping server"); + tracing::info!("shutdown signal received, stopping webhook server"); } }