diff --git a/deploy/crd/kafkacluster.crd.yaml b/deploy/crd/kafkacluster.crd.yaml index bb41594f..5e709021 100644 --- a/deploy/crd/kafkacluster.crd.yaml +++ b/deploy/crd/kafkacluster.crd.yaml @@ -275,6 +275,45 @@ spec: required: - roleGroups type: object + config: + default: + tls: + secretClass: tls + internalTls: + secretClass: tls + properties: + clientAuthentication: + description: "Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`" + nullable: true + properties: + authenticationClass: + type: string + required: + - authenticationClass + type: object + internalTls: + default: + secretClass: tls + description: "Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server" + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + tls: + default: + secretClass: tls + description: "Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: \"tls\".to_string() }." + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + type: object log4j: nullable: true type: string diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index b1a307fb..e9e2858c 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -277,6 +277,45 @@ spec: required: - roleGroups type: object + config: + default: + tls: + secretClass: tls + internalTls: + secretClass: tls + properties: + clientAuthentication: + description: "Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`" + nullable: true + properties: + authenticationClass: + type: string + required: + - authenticationClass + type: object + internalTls: + default: + secretClass: tls + description: "Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server" + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + tls: + default: + secretClass: tls + description: "Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: \"tls\".to_string() }." + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + type: object log4j: nullable: true type: string diff --git a/deploy/helm/kafka-operator/templates/roles.yaml b/deploy/helm/kafka-operator/templates/roles.yaml index 65cbdba6..3a7205b7 100644 --- a/deploy/helm/kafka-operator/templates/roles.yaml +++ b/deploy/helm/kafka-operator/templates/roles.yaml @@ -89,3 +89,11 @@ rules: - {{ include "operator.name" . }}clusters/status verbs: - patch + - apiGroups: + - authentication.stackable.tech + resources: + - authenticationclasses + verbs: + - get + - list + - watch diff --git a/deploy/manifests/crds.yaml b/deploy/manifests/crds.yaml index 31defb16..19abd657 100644 --- a/deploy/manifests/crds.yaml +++ b/deploy/manifests/crds.yaml @@ -278,6 +278,45 @@ spec: required: - roleGroups type: object + config: + default: + tls: + secretClass: tls + internalTls: + secretClass: tls + properties: + clientAuthentication: + description: "Only affects client connections. This setting controls: - If clients need to authenticate themselves against the server via TLS - Which ca.crt to use when validating the provided client certs Defaults to `None`" + nullable: true + properties: + authenticationClass: + type: string + required: + - authenticationClass + type: object + internalTls: + default: + secretClass: tls + description: "Only affects internal communication. Use mutual verification between Kafka nodes This setting controls: - Which cert the servers should use to authenticate themselves against other servers - Which ca.crt to use when validating the other server" + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + tls: + default: + secretClass: tls + description: "Only affects client connections. This setting controls: - If TLS encryption is used at all - Which cert the servers should use to authenticate themselves against the client Defaults to `TlsSecretClass` { secret_class: \"tls\".to_string() }." + nullable: true + properties: + secretClass: + type: string + required: + - secretClass + type: object + type: object log4j: nullable: true type: string diff --git a/deploy/manifests/roles.yaml b/deploy/manifests/roles.yaml index 77c8d30a..fa65d3f5 100644 --- a/deploy/manifests/roles.yaml +++ b/deploy/manifests/roles.yaml @@ -89,3 +89,11 @@ rules: - kafkaclusters/status verbs: - patch + - apiGroups: + - authentication.stackable.tech + resources: + - authenticationclasses + verbs: + - get + - list + - watch diff --git a/docs/modules/ROOT/pages/usage.adoc b/docs/modules/ROOT/pages/usage.adoc index 2c1f12f3..0fbd9911 100644 --- a/docs/modules/ROOT/pages/usage.adoc +++ b/docs/modules/ROOT/pages/usage.adoc @@ -1,76 +1,81 @@ = Usage -After installation, the CRD for this operator must be created: +If you are not installing the operator using Helm then after installation the CRD for this operator must be created: kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml To create an Apache Kafka (v3.2.0) cluster named `simple-kafka` assuming that you already have a Zookeeper cluster named `simple-zk`: - cat < + internalTls: + secretClass: kafka-internal-tls # <2> + brokers: + roleGroups: + default: + replicas: 3 +---- +<1> The `tls.secretClass` refers to the client-to-server encryption. Defaults to the `tls` secret. It can be deactivated by setting `config.tls` to `null`. +<2> The `internalTls.secretClass` refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to `tls`. Can be disabled by setting `config.internalTls` to `null`. + +The `tls` secret is deployed from the xref:secret-operator::index.adoc[Secret Operator] and looks like this: + +[source,yaml] +---- +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-ca + namespace: default + autoGenerate: true +---- + +You can create your own secrets and reference them e.g. in the `tls.secretClass` or `internalTls.secretClass` to use different certificates. + +== Authentication + +The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an `AuthenticationClass` reference in the custom resource provided by the xref:commons-operator::index.adoc[Commons Operator]. + +[source,yaml] +---- +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: kafka-client-tls # <2> +spec: + provider: + tls: + clientCertSecretClass: kafka-client-auth-secret # <3> +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-client-auth-secret # <4> +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-ca + namespace: default + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka +spec: + version: 3.2.0-stackable0.1.0 + zookeeperConfigMapName: simple-kafka-znode + config: + tls: + secretClass: tls + clientAuthentication: + authenticationClass: kafka-client-tls # <1> + brokers: + roleGroups: + default: + replicas: 3 +---- +<1> The `config.clientAuthentication.authenticationClass` can be set to use TLS for authentication. This is optional. +<2> The referenced `AuthenticationClass` that references a `SecretClass` to provide certificates. +<3> The reference to a `SecretClass`. +<4> The `SecretClass` that is referenced by the `AuthenticationClass` in order to provide certificates. + == Configuration & Environment Overrides The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role). diff --git a/examples/tls/simple-kafka-cluster-tls.yaml b/examples/tls/simple-kafka-cluster-tls.yaml new file mode 100644 index 00000000..f68cc074 --- /dev/null +++ b/examples/tls/simple-kafka-cluster-tls.yaml @@ -0,0 +1,77 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: simple-zk +spec: + version: 3.8.0-stackable0.7.1 + servers: + roleGroups: + default: + selector: + matchLabels: + kubernetes.io/os: linux + replicas: 3 + config: {} +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: simple-kafka-znode +spec: + clusterRef: + name: simple-zk +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-kafka-internal-tls-ca + namespace: default + autoGenerate: true +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: kafka-client-auth-tls +spec: + provider: + tls: + clientCertSecretClass: kafka-client-auth-secret +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: kafka-client-auth-secret +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-ca + namespace: default + autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka +spec: + version: 3.2.0-stackable0.1.0 + zookeeperConfigMapName: simple-kafka-znode + config: + tls: + secretClass: tls + clientAuthentication: + authenticationClass: kafka-client-auth-tls + internalTls: + secretClass: kafka-internal-tls + brokers: + roleGroups: + default: + replicas: 3 diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index f3fcdfb7..38b95e35 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -1,6 +1,7 @@ +pub mod listener; + use serde::{Deserialize, Serialize}; use snafu::{OptionExt, Snafu}; -use stackable_operator::error::OperatorResult; use stackable_operator::memory::to_java_heap; use stackable_operator::{ commons::{ @@ -8,6 +9,7 @@ use stackable_operator::{ resources::{CpuLimits, MemoryLimits, NoRuntimeLimits, PvcConfig, Resources}, }, config::merge::Merge, + error::OperatorResult, k8s_openapi::{ api::core::v1::{PersistentVolumeClaim, ResourceRequirements}, apimachinery::pkg::api::resource::Quantity, @@ -21,13 +23,67 @@ use std::collections::BTreeMap; use strum::{Display, EnumIter, EnumString}; pub const APP_NAME: &str = "kafka"; -pub const APP_PORT: u16 = 9092; +// ports +pub const CLIENT_PORT_NAME: &str = "http"; +pub const CLIENT_PORT: u16 = 9092; +pub const SECURE_CLIENT_PORT_NAME: &str = "https"; +pub const SECURE_CLIENT_PORT: u16 = 9093; +pub const INTERNAL_PORT: u16 = 19092; +pub const SECURE_INTERNAL_PORT: u16 = 19093; +pub const METRICS_PORT_NAME: &str = "metrics"; pub const METRICS_PORT: u16 = 9606; - +// config files pub const SERVER_PROPERTIES_FILE: &str = "server.properties"; - +// env vars pub const KAFKA_HEAP_OPTS: &str = "KAFKA_HEAP_OPTS"; +// server_properties pub const LOG_DIRS_VOLUME_NAME: &str = "log-dirs"; +// - TLS global +pub const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; +pub const SSL_KEYSTORE_LOCATION: &str = "ssl.keystore.location"; +pub const SSL_KEYSTORE_PASSWORD: &str = "ssl.keystore.password"; +pub const SSL_KEYSTORE_TYPE: &str = "ssl.keystore.type"; +pub const SSL_TRUSTSTORE_LOCATION: &str = "ssl.truststore.location"; +pub const SSL_TRUSTSTORE_PASSWORD: &str = "ssl.truststore.password"; +pub const SSL_TRUSTSTORE_TYPE: &str = "ssl.truststore.type"; +pub const SSL_STORE_PASSWORD: &str = "changeit"; +// - TLS client +pub const CLIENT_SSL_KEYSTORE_LOCATION: &str = "listener.name.client.ssl.keystore.location"; +pub const CLIENT_SSL_KEYSTORE_PASSWORD: &str = "listener.name.client.ssl.keystore.password"; +pub const CLIENT_SSL_KEYSTORE_TYPE: &str = "listener.name.client.ssl.keystore.type"; +pub const CLIENT_SSL_TRUSTSTORE_LOCATION: &str = "listener.name.client.ssl.truststore.location"; +pub const CLIENT_SSL_TRUSTSTORE_PASSWORD: &str = "listener.name.client.ssl.truststore.password"; +pub const CLIENT_SSL_TRUSTSTORE_TYPE: &str = "listener.name.client.ssl.truststore.type"; +// - TLS client authentication +pub const CLIENT_AUTH_SSL_KEYSTORE_LOCATION: &str = + "listener.name.client_auth.ssl.keystore.location"; +pub const CLIENT_AUTH_SSL_KEYSTORE_PASSWORD: &str = + "listener.name.client_auth.ssl.keystore.password"; +pub const CLIENT_AUTH_SSL_KEYSTORE_TYPE: &str = "listener.name.client_auth.ssl.keystore.type"; +pub const CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION: &str = + "listener.name.client_auth.ssl.truststore.location"; +pub const CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD: &str = + "listener.name.client_auth.ssl.truststore.password"; +pub const CLIENT_AUTH_SSL_TRUSTSTORE_TYPE: &str = "listener.name.client_auth.ssl.truststore.type"; +pub const CLIENT_AUTH_SSL_CLIENT_AUTH: &str = "listener.name.client_auth.ssl.client.auth"; +// - TLS internal +pub const SECURITY_INTER_BROKER_PROTOCOL: &str = "security.inter.broker.protocol"; +pub const INTER_BROKER_LISTENER_NAME: &str = "inter.broker.listener.name"; +pub const INTER_SSL_KEYSTORE_LOCATION: &str = "listener.name.internal.ssl.keystore.location"; +pub const INTER_SSL_KEYSTORE_PASSWORD: &str = "listener.name.internal.ssl.keystore.password"; +pub const INTER_SSL_KEYSTORE_TYPE: &str = "listener.name.internal.ssl.keystore.type"; +pub const INTER_SSL_TRUSTSTORE_LOCATION: &str = "listener.name.internal.ssl.truststore.location"; +pub const INTER_SSL_TRUSTSTORE_PASSWORD: &str = "listener.name.internal.ssl.truststore.password"; +pub const INTER_SSL_TRUSTSTORE_TYPE: &str = "listener.name.internal.ssl.truststore.type"; +pub const INTER_SSL_CLIENT_AUTH: &str = "listener.name.internal.ssl.client.auth"; +// directories +pub const STACKABLE_TMP_DIR: &str = "/stackable/tmp"; +pub const STACKABLE_DATA_DIR: &str = "/stackable/data"; +pub const STACKABLE_CONFIG_DIR: &str = "/stackable/config"; +pub const STACKABLE_TLS_CLIENT_DIR: &str = "/stackable/tls_client"; +pub const STACKABLE_TLS_CLIENT_AUTH_DIR: &str = "/stackable/tls_client_auth"; +pub const STACKABLE_TLS_INTERNAL_DIR: &str = "/stackable/tls_internal"; +pub const SYSTEM_TRUST_STORE_DIR: &str = "/etc/pki/java/cacerts"; const JVM_HEAP_FACTOR: f32 = 0.8; @@ -62,9 +118,68 @@ pub struct KafkaClusterSpec { pub zookeeper_config_map_name: String, pub opa: Option, pub log4j: Option, + #[serde(default)] + pub config: GlobalKafkaConfig, pub stopped: Option, } +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GlobalKafkaConfig { + /// Only affects client connections. This setting controls: + /// - If TLS encryption is used at all + /// - Which cert the servers should use to authenticate themselves against the client + /// Defaults to `TlsSecretClass` { secret_class: "tls".to_string() }. + #[serde( + default = "tls_secret_class_default", + skip_serializing_if = "Option::is_none" + )] + pub tls: Option, + /// Only affects client connections. This setting controls: + /// - If clients need to authenticate themselves against the server via TLS + /// - Which ca.crt to use when validating the provided client certs + /// Defaults to `None` + #[serde(skip_serializing_if = "Option::is_none")] + pub client_authentication: Option, + /// Only affects internal communication. Use mutual verification between Kafka nodes + /// This setting controls: + /// - Which cert the servers should use to authenticate themselves against other servers + /// - Which ca.crt to use when validating the other server + #[serde( + default = "tls_secret_class_default", + skip_serializing_if = "Option::is_none" + )] + pub internal_tls: Option, +} + +impl Default for GlobalKafkaConfig { + fn default() -> Self { + GlobalKafkaConfig { + tls: tls_secret_class_default(), + client_authentication: None, + internal_tls: tls_secret_class_default(), + } + } +} + +#[derive(Clone, Default, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientAuthenticationClass { + pub authentication_class: String, +} + +#[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct TlsSecretClass { + pub secret_class: String, +} + +fn tls_secret_class_default() -> Option { + Some(TlsSecretClass { + secret_class: TLS_DEFAULT_SECRET_CLASS.to_string(), + }) +} + impl KafkaCluster { /// The name of the role-level load-balanced Kubernetes `Service` pub fn broker_role_service_name(&self) -> Option { @@ -183,7 +298,7 @@ impl KafkaCluster { .transpose() } - /// Returns the provided docker image e.g. 2.8.1-stackable0.1.0 + /// Returns the provided docker image e.g. 2.8.1-stackable0.1.0. pub fn image_version(&self) -> Result<&str, Error> { self.spec .version @@ -191,7 +306,7 @@ impl KafkaCluster { .context(ObjectHasNoVersionSnafu) } - /// Returns our semver representation for product config e.g. 2.8.1 + /// Returns our semver representation for product config e.g. 2.8.1. pub fn product_version(&self) -> Result<&str, Error> { let image_version = self.image_version()?; image_version @@ -201,6 +316,47 @@ impl KafkaCluster { image_version: image_version.to_string(), }) } + + /// Returns the secret class for client connection encryption. Defaults to `tls`. + pub fn client_tls_secret_class(&self) -> Option<&TlsSecretClass> { + let spec: &KafkaClusterSpec = &self.spec; + spec.config.tls.as_ref() + } + + /// Returns the authentication class used for client authentication + pub fn client_authentication_class(&self) -> Option<&str> { + let spec: &KafkaClusterSpec = &self.spec; + spec.config + .client_authentication + .as_ref() + .map(|tls| tls.authentication_class.as_ref()) + } + + /// Returns the secret class for internal server encryption. + pub fn internal_tls_secret_class(&self) -> Option<&TlsSecretClass> { + let spec: &KafkaClusterSpec = &self.spec; + spec.config.internal_tls.as_ref() + } + + /// Returns the client port based on the security (tls) settings. + pub fn client_port(&self) -> u16 { + if self.client_tls_secret_class().is_some() || self.client_authentication_class().is_some() + { + SECURE_CLIENT_PORT + } else { + CLIENT_PORT + } + } + + /// Returns the client port name based on the security (tls) settings. + pub fn client_port_name(&self) -> &str { + if self.client_tls_secret_class().is_some() || self.client_authentication_class().is_some() + { + SECURE_CLIENT_PORT_NAME + } else { + CLIENT_PORT_NAME + } + } } /// Reference to a single `Pod` that is a component of a [`KafkaCluster`] @@ -280,17 +436,273 @@ impl Configuration for KafkaConfig { ) -> Result>, ConfigError> { let mut config = BTreeMap::new(); - if resource.spec.opa.is_some() && file == SERVER_PROPERTIES_FILE { - config.insert( - "authorizer.class.name".to_string(), - Some("org.openpolicyagent.kafka.OpaAuthorizer".to_string()), - ); + if file == SERVER_PROPERTIES_FILE { + // OPA + if resource.spec.opa.is_some() { + config.insert( + "authorizer.class.name".to_string(), + Some("org.openpolicyagent.kafka.OpaAuthorizer".to_string()), + ); + config.insert( + "opa.authorizer.metrics.enabled".to_string(), + Some("true".to_string()), + ); + } + + // We set either client tls with authentication or client tls without authentication + // If authentication is explicitly required we do not want to have any other CAs to + // be trusted. + if resource.client_authentication_class().is_some() { + config.insert( + CLIENT_AUTH_SSL_KEYSTORE_LOCATION.to_string(), + Some(format!("{}/keystore.p12", STACKABLE_TLS_CLIENT_AUTH_DIR)), + ); + config.insert( + CLIENT_AUTH_SSL_KEYSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + CLIENT_AUTH_SSL_KEYSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + config.insert( + CLIENT_AUTH_SSL_TRUSTSTORE_LOCATION.to_string(), + Some(format!("{}/truststore.p12", STACKABLE_TLS_CLIENT_AUTH_DIR)), + ); + config.insert( + CLIENT_AUTH_SSL_TRUSTSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + CLIENT_AUTH_SSL_TRUSTSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + // client auth required + config.insert( + CLIENT_AUTH_SSL_CLIENT_AUTH.to_string(), + Some("required".to_string()), + ); + } else if resource.client_tls_secret_class().is_some() { + config.insert( + CLIENT_SSL_KEYSTORE_LOCATION.to_string(), + Some(format!("{}/keystore.p12", STACKABLE_TLS_CLIENT_DIR)), + ); + config.insert( + CLIENT_SSL_KEYSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + CLIENT_SSL_KEYSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + config.insert( + CLIENT_SSL_TRUSTSTORE_LOCATION.to_string(), + Some(format!("{}/truststore.p12", STACKABLE_TLS_CLIENT_DIR)), + ); + config.insert( + CLIENT_SSL_TRUSTSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + CLIENT_SSL_TRUSTSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + } + + // Internal TLS + if resource.internal_tls_secret_class().is_some() { + config.insert( + INTER_SSL_KEYSTORE_LOCATION.to_string(), + Some(format!("{}/keystore.p12", STACKABLE_TLS_INTERNAL_DIR)), + ); + config.insert( + INTER_SSL_KEYSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + INTER_SSL_KEYSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + config.insert( + INTER_SSL_TRUSTSTORE_LOCATION.to_string(), + Some(format!("{}/truststore.p12", STACKABLE_TLS_INTERNAL_DIR)), + ); + config.insert( + INTER_SSL_TRUSTSTORE_PASSWORD.to_string(), + Some(SSL_STORE_PASSWORD.to_string()), + ); + config.insert( + INTER_SSL_TRUSTSTORE_TYPE.to_string(), + Some("PKCS12".to_string()), + ); + config.insert( + INTER_SSL_CLIENT_AUTH.to_string(), + Some("required".to_string()), + ); + } + + // common config.insert( - "opa.authorizer.metrics.enabled".to_string(), - Some("true".to_string()), + INTER_BROKER_LISTENER_NAME.to_string(), + Some(listener::KafkaListenerName::Internal.to_string()), ); } Ok(config) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_tls() { + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: + secretClass: simple-kafka-client-tls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + "simple-kafka-client-tls".to_string() + ); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: null + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!(kafka.client_tls_secret_class(), None); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + config: + internalTls: + secretClass: simple-kafka-internal-tls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + "simple-kafka-internal-tls" + ); + } + + #[test] + fn test_internal_tls() { + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + config: + internalTls: + secretClass: simple-kafka-internal-tls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + "simple-kafka-internal-tls".to_string() + ); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: + secretClass: simple-kafka-client-tls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + assert_eq!( + kafka.internal_tls_secret_class().unwrap().secret_class, + TLS_DEFAULT_SECRET_CLASS.to_string() + ); + assert_eq!( + kafka.client_tls_secret_class().unwrap().secret_class, + "simple-kafka-client-tls" + ); + } +} diff --git a/rust/crd/src/listener.rs b/rust/crd/src/listener.rs new file mode 100644 index 00000000..abb00fd8 --- /dev/null +++ b/rust/crd/src/listener.rs @@ -0,0 +1,366 @@ +use crate::{ + KafkaCluster, CLIENT_PORT, CLIENT_PORT_NAME, INTERNAL_PORT, SECURE_CLIENT_PORT, + SECURE_CLIENT_PORT_NAME, SECURE_INTERNAL_PORT, STACKABLE_TMP_DIR, +}; +use snafu::{OptionExt, Snafu}; +use stackable_operator::kube::ResourceExt; +use std::collections::BTreeMap; +use std::fmt::{Display, Formatter}; +use strum::{EnumDiscriminants, EnumString}; + +const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0"; +const LISTENER_NODE_ADDRESS: &str = "$NODE"; + +#[derive(Snafu, Debug, EnumDiscriminants)] +pub enum KafkaListenerError { + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, +} + +#[derive(strum::Display, Debug, EnumString)] +pub enum KafkaListenerProtocol { + /// Unencrypted and unauthenticated HTTP connections + #[strum(serialize = "PLAINTEXT")] + Plaintext, + /// Encrypted and server-authenticated HTTPS connections + #[strum(serialize = "SSL")] + Ssl, +} + +#[derive(strum::Display, Debug, EnumString, Ord, Eq, PartialEq, PartialOrd)] +pub enum KafkaListenerName { + #[strum(serialize = "CLIENT")] + Client, + #[strum(serialize = "CLIENT_AUTH")] + ClientAuth, + #[strum(serialize = "INTERNAL")] + Internal, +} + +#[derive(Debug)] +pub struct KafkaListenerConfig { + listeners: Vec, + advertised_listeners: Vec, + listener_security_protocol_map: BTreeMap, +} + +impl KafkaListenerConfig { + /// Returns the `listeners` for the Kafka `server.properties` config. + pub fn listeners(&self) -> String { + self.listeners + .iter() + .map(|listener| listener.to_string()) + .collect::>() + .join(",") + } + + /// Returns the `advertised.listeners` for the Kafka `server.properties` config. + /// May contain ENV variables and therefore should be used as cli argument + /// like --override \"advertised.listeners=xxx\". + pub fn advertised_listeners(&self) -> String { + self.advertised_listeners + .iter() + .map(|listener| listener.to_string()) + .collect::>() + .join(",") + } + + /// Returns the `listener.security.protocol.map` for the Kafka `server.properties` config. + pub fn listener_security_protocol_map(&self) -> String { + self.listener_security_protocol_map + .iter() + .map(|(name, protocol)| format!("{name}:{protocol}")) + .collect::>() + .join(",") + } +} + +#[derive(Debug)] +struct KafkaListener { + name: KafkaListenerName, + host: String, + port: String, +} + +impl Display for KafkaListener { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}://{}:{}", self.name, self.host, self.port) + } +} + +pub fn get_kafka_listener_config( + kafka: &KafkaCluster, + object_name: &str, +) -> Result { + let pod_fqdn = pod_fqdn(kafka, object_name)?; + let mut listeners = vec![]; + let mut advertised_listeners = vec![]; + let mut listener_security_protocol_map = BTreeMap::new(); + + if kafka.client_authentication_class().is_some() { + // 1) If client authentication required, we expose only CLIENT_AUTH connection with SSL + listeners.push(KafkaListener { + name: KafkaListenerName::ClientAuth, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: SECURE_CLIENT_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::ClientAuth, + host: LISTENER_NODE_ADDRESS.to_string(), + port: node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + }); + listener_security_protocol_map + .insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl); + } else if kafka.client_tls_secret_class().is_some() { + // 2) If no client authentication but tls is required we expose CLIENT with SSL + listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: SECURE_CLIENT_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_NODE_ADDRESS.to_string(), + port: node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl); + } else { + // 3) If no client auth or tls is required we expose CLIENT with PLAINTEXT + listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: CLIENT_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Client, + host: LISTENER_NODE_ADDRESS.to_string(), + port: node_port_cmd(STACKABLE_TMP_DIR, CLIENT_PORT_NAME), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext); + } + + if kafka.internal_tls_secret_class().is_some() { + // 4) If internal tls is required we expose INTERNAL as SSL + listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: SECURE_INTERNAL_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: pod_fqdn, + port: SECURE_INTERNAL_PORT.to_string(), + }); + listener_security_protocol_map + .insert(KafkaListenerName::Internal, KafkaListenerProtocol::Ssl); + } else { + // 5) If no internal tls is required we expose INTERNAL as PLAINTEXT + listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: LISTENER_LOCAL_ADDRESS.to_string(), + port: INTERNAL_PORT.to_string(), + }); + advertised_listeners.push(KafkaListener { + name: KafkaListenerName::Internal, + host: pod_fqdn, + port: INTERNAL_PORT.to_string(), + }); + listener_security_protocol_map.insert( + KafkaListenerName::Internal, + KafkaListenerProtocol::Plaintext, + ); + } + + Ok(KafkaListenerConfig { + listeners, + advertised_listeners, + listener_security_protocol_map, + }) +} + +fn node_port_cmd(directory: &str, port_name: &str) -> String { + format!("$(cat {directory}/{port_name}_nodeport)") +} + +fn pod_fqdn(kafka: &KafkaCluster, object_name: &str) -> Result { + Ok(format!( + "$POD_NAME.{}.{}.svc.cluster.local", + object_name, + kafka.namespace().context(ObjectHasNoNamespaceSnafu)? + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_kafka_listeners_config() { + let object_name = "simple-kafka-broker-default"; + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + namespace: default + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: + secretClass: tls + clientAuthentication: + authenticationClass: kafka-client-tls + internalTls: + secretClass: internalTls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + let config = get_kafka_listener_config(&kafka, object_name).unwrap(); + + assert_eq!( + config.listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::ClientAuth, + host = LISTENER_LOCAL_ADDRESS, + port = SECURE_CLIENT_PORT, + internal_name = KafkaListenerName::Internal, + internal_host = LISTENER_LOCAL_ADDRESS, + internal_port = SECURE_INTERNAL_PORT, + ) + ); + + assert_eq!( + config.advertised_listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::ClientAuth, + host = LISTENER_NODE_ADDRESS, + port = node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + internal_name = KafkaListenerName::Internal, + internal_host = pod_fqdn(&kafka, object_name).unwrap(), + internal_port = SECURE_INTERNAL_PORT, + ) + ); + + assert_eq!( + config.listener_security_protocol_map(), + format!( + "{name}:{protocol},{internal_name}:{internal_protocol}", + name = KafkaListenerName::ClientAuth, + protocol = KafkaListenerProtocol::Ssl, + internal_name = KafkaListenerName::Internal, + internal_protocol = KafkaListenerProtocol::Ssl + ) + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + namespace: default + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: + secretClass: tls + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + let config = get_kafka_listener_config(&kafka, object_name).unwrap(); + + assert_eq!( + config.listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = LISTENER_LOCAL_ADDRESS, + port = SECURE_CLIENT_PORT, + internal_name = KafkaListenerName::Internal, + internal_host = LISTENER_LOCAL_ADDRESS, + internal_port = SECURE_INTERNAL_PORT, + ) + ); + + assert_eq!( + config.advertised_listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = LISTENER_NODE_ADDRESS, + port = node_port_cmd(STACKABLE_TMP_DIR, SECURE_CLIENT_PORT_NAME), + internal_name = KafkaListenerName::Internal, + internal_host = pod_fqdn(&kafka, object_name).unwrap(), + internal_port = SECURE_INTERNAL_PORT, + ) + ); + + assert_eq!( + config.listener_security_protocol_map(), + format!( + "{name}:{protocol},{internal_name}:{internal_protocol}", + name = KafkaListenerName::Client, + protocol = KafkaListenerProtocol::Ssl, + internal_name = KafkaListenerName::Internal, + internal_protocol = KafkaListenerProtocol::Ssl + ) + ); + + let input = r#" + apiVersion: kafka.stackable.tech/v1alpha1 + kind: KafkaCluster + metadata: + name: simple-kafka + namespace: default + spec: + version: abc + zookeeperConfigMapName: xyz + config: + tls: null + internalTls: null + "#; + let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input"); + let config = get_kafka_listener_config(&kafka, object_name).unwrap(); + + assert_eq!( + config.listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = LISTENER_LOCAL_ADDRESS, + port = CLIENT_PORT, + internal_name = KafkaListenerName::Internal, + internal_host = LISTENER_LOCAL_ADDRESS, + internal_port = INTERNAL_PORT, + ) + ); + + assert_eq!( + config.advertised_listeners(), + format!( + "{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}", + name = KafkaListenerName::Client, + host = LISTENER_NODE_ADDRESS, + port = node_port_cmd(STACKABLE_TMP_DIR, CLIENT_PORT_NAME), + internal_name = KafkaListenerName::Internal, + internal_host = pod_fqdn(&kafka, object_name).unwrap(), + internal_port = INTERNAL_PORT, + ) + ); + + assert_eq!( + config.listener_security_protocol_map(), + format!( + "{name}:{protocol},{internal_name}:{internal_protocol}", + name = KafkaListenerName::Client, + protocol = KafkaListenerProtocol::Plaintext, + internal_name = KafkaListenerName::Internal, + internal_protocol = KafkaListenerProtocol::Plaintext + ) + ); + } +} diff --git a/rust/operator/src/command.rs b/rust/operator/src/command.rs new file mode 100644 index 00000000..2cf8b322 --- /dev/null +++ b/rust/operator/src/command.rs @@ -0,0 +1,113 @@ +use stackable_kafka_crd::{ + KafkaCluster, CLIENT_PORT, SECURE_CLIENT_PORT, SSL_STORE_PASSWORD, STACKABLE_DATA_DIR, + STACKABLE_TLS_CLIENT_AUTH_DIR, STACKABLE_TLS_CLIENT_DIR, STACKABLE_TLS_INTERNAL_DIR, + STACKABLE_TMP_DIR, SYSTEM_TRUST_STORE_DIR, +}; + +pub fn prepare_container_cmd_args(kafka: &KafkaCluster) -> String { + let mut args = vec![]; + + if kafka.client_authentication_class().is_some() { + args.extend(create_key_and_trust_store( + STACKABLE_TLS_CLIENT_AUTH_DIR, + "stackable-tls-client-auth-ca-cert", + )); + args.extend(chown_and_chmod(STACKABLE_TLS_CLIENT_AUTH_DIR)); + } else if kafka.client_tls_secret_class().is_some() { + // Copy system truststore to stackable truststore + args.push(format!("keytool -importkeystore -srckeystore {SYSTEM_TRUST_STORE_DIR} -srcstoretype jks -srcstorepass {SSL_STORE_PASSWORD} -destkeystore {STACKABLE_TLS_CLIENT_DIR}/truststore.p12 -deststoretype pkcs12 -deststorepass {SSL_STORE_PASSWORD} -noprompt")); + args.extend(create_key_and_trust_store( + STACKABLE_TLS_CLIENT_DIR, + "stackable-tls-client-ca-cert", + )); + args.extend(chown_and_chmod(STACKABLE_TLS_CLIENT_DIR)); + } + + if kafka.internal_tls_secret_class().is_some() { + args.extend(create_key_and_trust_store( + STACKABLE_TLS_INTERNAL_DIR, + "stackable-tls-internal-ca-cert", + )); + args.extend(chown_and_chmod(STACKABLE_TLS_INTERNAL_DIR)); + } + + args.extend(chown_and_chmod(STACKABLE_DATA_DIR)); + args.extend(chown_and_chmod(STACKABLE_TMP_DIR)); + + args.join(" && ") +} + +pub fn get_svc_container_cmd_args(kafka: &KafkaCluster) -> String { + get_node_port(STACKABLE_TMP_DIR, kafka.client_port_name()) +} + +pub fn kcat_container_cmd_args(kafka: &KafkaCluster) -> Vec { + let mut args = vec!["kcat".to_string()]; + + if kafka.client_authentication_class().is_some() { + args.push("-b".to_string()); + args.push(format!("localhost:{}", SECURE_CLIENT_PORT)); + args.extend(kcat_client_auth_ssl(STACKABLE_TLS_CLIENT_AUTH_DIR)); + } else if kafka.client_tls_secret_class().is_some() { + args.push("-b".to_string()); + args.push(format!("localhost:{}", SECURE_CLIENT_PORT)); + args.extend(kcat_client_ssl(STACKABLE_TLS_CLIENT_DIR)); + } else { + args.push("-b".to_string()); + args.push(format!("localhost:{}", CLIENT_PORT)); + } + + args.push("-L".to_string()); + args +} + +fn kcat_client_auth_ssl(cert_directory: &str) -> Vec { + vec![ + "-X".to_string(), + "security.protocol=SSL".to_string(), + "-X".to_string(), + format!("ssl.key.location={cert_directory}/tls.key"), + "-X".to_string(), + format!("ssl.certificate.location={cert_directory}/tls.crt"), + "-X".to_string(), + format!("ssl.ca.location={cert_directory}/ca.crt"), + ] +} + +fn kcat_client_ssl(cert_directory: &str) -> Vec { + vec![ + "-X".to_string(), + "security.protocol=SSL".to_string(), + "-X".to_string(), + format!("ssl.ca.location={cert_directory}/ca.crt"), + ] +} + +/// Generates the shell script to create key and truststores from the certificates provided +/// by the secret operator. +fn create_key_and_trust_store(directory: &str, alias_name: &str) -> Vec { + vec![ + format!("echo [{dir}] Creating truststore", dir = directory), + format!("keytool -importcert -file {dir}/ca.crt -keystore {dir}/truststore.p12 -storetype pkcs12 -noprompt -alias {alias} -storepass {password}", + dir = directory, alias = alias_name, password = SSL_STORE_PASSWORD), + format!("echo [{dir}] Creating certificate chain", dir = directory), + format!("cat {dir}/ca.crt {dir}/tls.crt > {dir}/chain.crt", dir = directory), + format!("echo [{dir}] Creating keystore", dir = directory), + format!("openssl pkcs12 -export -in {dir}/chain.crt -inkey {dir}/tls.key -out {dir}/keystore.p12 --passout pass:{password}", + dir = directory, password = SSL_STORE_PASSWORD), + ] +} + +/// Generates a shell script to chown and chmod the provided directory. +fn chown_and_chmod(directory: &str) -> Vec { + vec![ + format!("echo chown and chmod {dir}", dir = directory), + format!("chown -R stackable:stackable {dir}", dir = directory), + format!("chmod -R a=,u=rwX {dir}", dir = directory), + ] +} + +/// Extract the nodeport from the nodeport service +fn get_node_port(directory: &str, port_name: &str) -> String { + format!("kubectl get service \"$POD_NAME\" -o jsonpath='{{.spec.ports[?(@.name==\"{name}\")].nodePort}}' | tee {dir}/{name}_nodeport", dir = directory, name = port_name) +} diff --git a/rust/operator/src/discovery.rs b/rust/operator/src/discovery.rs index 467fcbbc..a1734dce 100644 --- a/rust/operator/src/discovery.rs +++ b/rust/operator/src/discovery.rs @@ -46,13 +46,14 @@ pub async fn build_discovery_configmaps( svc: &Service, ) -> Result, Error> { let name = owner.name(); + let port_name = kafka.client_port_name(); Ok(vec![ - build_discovery_configmap(&name, owner, kafka, service_hosts(svc, "kafka")?)?, + build_discovery_configmap(&name, owner, kafka, service_hosts(svc, port_name)?)?, build_discovery_configmap( &format!("{}-nodeport", name), owner, kafka, - nodeport_hosts(client, svc, "kafka").await?, + nodeport_hosts(client, svc, port_name).await?, )?, ]) } diff --git a/rust/operator/src/kafka_controller.rs b/rust/operator/src/kafka_controller.rs index d41b7207..0369f5e1 100644 --- a/rust/operator/src/kafka_controller.rs +++ b/rust/operator/src/kafka_controller.rs @@ -1,34 +1,41 @@ //! Ensures that `Pod`s are configured and running for each [`KafkaCluster`] -use std::{ - borrow::Cow, - collections::{BTreeMap, HashMap}, - sync::Arc, - time::Duration, -}; - use snafu::{OptionExt, ResultExt, Snafu}; use stackable_kafka_crd::{ - KafkaCluster, KafkaRole, APP_NAME, APP_PORT, KAFKA_HEAP_OPTS, LOG_DIRS_VOLUME_NAME, - METRICS_PORT, SERVER_PROPERTIES_FILE, + listener::get_kafka_listener_config, KafkaCluster, KafkaRole, TlsSecretClass, APP_NAME, + CLIENT_PORT, CLIENT_PORT_NAME, KAFKA_HEAP_OPTS, LOG_DIRS_VOLUME_NAME, METRICS_PORT, + METRICS_PORT_NAME, SECURE_CLIENT_PORT, SECURE_CLIENT_PORT_NAME, SERVER_PROPERTIES_FILE, + STACKABLE_CONFIG_DIR, STACKABLE_DATA_DIR, STACKABLE_TLS_CLIENT_AUTH_DIR, + STACKABLE_TLS_CLIENT_DIR, STACKABLE_TLS_INTERNAL_DIR, STACKABLE_TMP_DIR, + TLS_DEFAULT_SECRET_CLASS, }; use stackable_operator::{ - builder::{ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder}, - commons::opa::OpaApiVersion, + builder::{ + ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, + SecretOperatorVolumeSourceBuilder, SecurityContextBuilder, VolumeBuilder, + }, + commons::{ + authentication::{AuthenticationClass, AuthenticationClassProvider}, + opa::OpaApiVersion, + tls::TlsAuthenticationProvider, + }, k8s_openapi::{ api::{ apps::v1::{StatefulSet, StatefulSetSpec}, core::v1::{ - ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, EmptyDirVolumeSource, - EnvVar, EnvVarSource, ExecAction, ObjectFieldSelector, PodSpec, Probe, - SecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec, Volume, + ConfigMap, ConfigMapKeySelector, ConfigMapVolumeSource, ContainerPort, + EmptyDirVolumeSource, EnvVar, EnvVarSource, ExecAction, ObjectFieldSelector, + PodSpec, Probe, Service, ServiceAccount, ServicePort, ServiceSpec, Volume, }, rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject}, }, apimachinery::pkg::apis::meta::v1::LabelSelector, Resource, }, - kube::runtime::{controller::Action, reflector::ObjectRef}, + kube::{ + api::DynamicObject, + runtime::{controller::Action, reflector::ObjectRef}, + }, labels::{role_group_selector_labels, role_selector_labels}, logging::controller::ReconcilerError, product_config::{ @@ -37,9 +44,17 @@ use stackable_operator::{ product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config}, role_utils::RoleGroupRef, }; +use std::{ + borrow::Cow, + collections::{BTreeMap, HashMap}, + sync::Arc, + time::Duration, +}; use strum::{EnumDiscriminants, IntoStaticStr}; +use crate::command::{get_svc_container_cmd_args, kcat_container_cmd_args}; use crate::{ + command, discovery::{self, build_discovery_configmaps}, pod_svc_controller, utils::{self, ObjectRefExt}, @@ -60,6 +75,8 @@ pub struct Ctx { pub enum Error { #[snafu(display("object has no name"))] ObjectHasNoName, + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, #[snafu(display("object defines no version"))] ObjectHasNoVersion, #[snafu(display("object defines no broker role"))] @@ -139,6 +156,25 @@ pub enum Error { }, #[snafu(display("failed to parse Kafka version/image"))] KafkaVersionParseFailure { source: stackable_kafka_crd::Error }, + #[snafu(display("failed to retrieve {}", authentication_class))] + AuthenticationClassRetrieval { + source: stackable_operator::error::Error, + authentication_class: ObjectRef, + }, + #[snafu(display( + "failed to use authentication mechanism {} - supported methods: {:?}", + method, + supported + ))] + AuthenticationMethodNotSupported { + authentication_class: ObjectRef, + supported: Vec, + method: String, + }, + #[snafu(display("invalid kafka listeners"))] + InvalidKafkaListeners { + source: stackable_kafka_crd::listener::KafkaListenerError, + }, } type Result = std::result::Result; @@ -146,6 +182,43 @@ impl ReconcilerError for Error { fn category(&self) -> &'static str { ErrorDiscriminants::from(self).into() } + + fn secondary_object(&self) -> Option> { + match self { + Error::ObjectHasNoName => None, + Error::ObjectHasNoNamespace => None, + Error::ObjectHasNoVersion => None, + Error::NoBrokerRole => None, + Error::GlobalServiceNameNotFound => None, + Error::ApplyRoleService { .. } => None, + Error::ApplyRoleServiceAccount { .. } => None, + Error::ApplyRoleRoleBinding { .. } => None, + Error::ApplyRoleGroupService { .. } => None, + Error::BuildRoleGroupConfig { .. } => None, + Error::ApplyRoleGroupConfig { .. } => None, + Error::ApplyRoleGroupStatefulSet { .. } => None, + Error::GenerateProductConfig { .. } => None, + Error::InvalidProductConfig { .. } => None, + Error::SerializeZooCfg { .. } => None, + Error::ObjectMissingMetadataForOwnerRef { .. } => None, + Error::BuildDiscoveryConfig { .. } => None, + Error::ApplyDiscoveryConfig { .. } => None, + Error::RoleGroupNotFound { .. } => None, + Error::InvalidServiceAccount { .. } => None, + Error::InvalidOpaConfig { .. } => None, + Error::InvalidJavaHeapConfig { .. } => None, + Error::KafkaVersionParseFailure { .. } => None, + Error::AuthenticationClassRetrieval { + authentication_class, + .. + } => Some(authentication_class.clone().erase()), + Error::AuthenticationMethodNotSupported { + authentication_class, + .. + } => Some(authentication_class.clone().erase()), + Error::InvalidKafkaListeners { .. } => None, + } + } } pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result { @@ -181,6 +254,39 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .map(Cow::Borrowed) .unwrap_or_default(); + // TODO: switch to AuthenticationClass::resolve if operator-rs is updated + let client_authentication_class = if let Some(auth_class) = kafka.client_authentication_class() + { + Some( + client + .get::(auth_class, None) // AuthenticationClass has ClusterScope + .await + .context(AuthenticationClassRetrievalSnafu { + authentication_class: ObjectRef::::new(auth_class), + })?, + ) + } else { + None + }; + + // Assemble the OPA connection string from the discovery and the given path if provided + // Will be passed as --override parameter in the cli in the state ful set + let opa_connect = if let Some(opa_spec) = &kafka.spec.opa { + Some( + opa_spec + .full_document_url_from_config_map( + client, + &*kafka, + Some("allow"), + OpaApiVersion::V1, + ) + .await + .context(InvalidOpaConfigSnafu)?, + ) + } else { + None + }; + let broker_role_service = build_broker_role_service(&kafka)?; let (broker_role_serviceaccount, broker_role_rolebinding) = build_broker_role_serviceaccount(&kafka, &ctx.controller_config)?; @@ -210,24 +316,6 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< .await .context(ApplyRoleRoleBindingSnafu)?; - // Assemble the OPA connection string from the discovery and the given path if provided - // Will be passed as --override parameter in the cli in the state ful set - let opa_connect = if let Some(opa_spec) = &kafka.spec.opa { - Some( - opa_spec - .full_document_url_from_config_map( - client, - &*kafka, - Some("allow"), - OpaApiVersion::V1, - ) - .await - .context(InvalidOpaConfigSnafu)?, - ) - } else { - None - }; - for (rolegroup_name, rolegroup_config) in role_broker_config.iter() { let rolegroup = kafka.broker_rolegroup_ref(rolegroup_name); @@ -239,6 +327,7 @@ pub async fn reconcile_kafka(kafka: Arc, ctx: Arc) -> Result< rolegroup_config, &broker_role_serviceaccount_ref, opa_connect.as_deref(), + client_authentication_class.as_ref(), )?; client .apply_patch(FIELD_MANAGER_SCOPE, &rg_service, &rg_service) @@ -297,12 +386,7 @@ pub fn build_broker_role_service(kafka: &KafkaCluster) -> Result { ) .build(), spec: Some(ServiceSpec { - ports: Some(vec![ServicePort { - name: Some("kafka".to_string()), - port: APP_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }]), + ports: Some(service_ports(kafka)), selector: Some(role_selector_labels(kafka, APP_NAME, &role_name)), type_: Some("NodePort".to_string()), ..ServiceSpec::default() @@ -443,20 +527,7 @@ fn build_broker_rolegroup_service( .build(), spec: Some(ServiceSpec { cluster_ip: Some("None".to_string()), - ports: Some(vec![ - ServicePort { - name: Some("kafka".to_string()), - port: APP_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }, - ServicePort { - name: Some("metrics".to_string()), - port: METRICS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }, - ]), + ports: Some(service_ports(kafka)), selector: Some(role_group_selector_labels( kafka, APP_NAME, @@ -479,7 +550,13 @@ fn build_broker_rolegroup_statefulset( broker_config: &HashMap>, serviceaccount: &ObjectRef, opa_connect_string: Option<&str>, + client_authentication_class: Option<&AuthenticationClass>, ) -> Result { + let mut cb_kafka = ContainerBuilder::new(APP_NAME); + let mut cb_prepare = ContainerBuilder::new("prepare"); + let mut cb_kcat_prober = ContainerBuilder::new("kcat-prober"); + let mut pod_builder = PodBuilder::new(); + let role = kafka.spec.brokers.as_ref().context(NoBrokerRoleSnafu)?; let rolegroup = role .role_groups @@ -492,6 +569,57 @@ fn build_broker_rolegroup_statefulset( .context(KafkaVersionParseFailureSnafu)?; let image = format!("docker.stackable.tech/stackable/kafka:{}", image_version); + let get_svc_args = get_svc_container_cmd_args(kafka); + + // add client authentication volumes if required + if let Some(auth_class) = client_authentication_class { + match &auth_class.spec.provider { + AuthenticationClassProvider::Tls(TlsAuthenticationProvider { + client_cert_secret_class: Some(secret_class), + }) => { + cb_prepare.add_volume_mount( + "client-tls-authentication-certificate", + STACKABLE_TLS_CLIENT_AUTH_DIR, + ); + cb_kafka.add_volume_mount( + "client-tls-authentication-certificate", + STACKABLE_TLS_CLIENT_AUTH_DIR, + ); + cb_kcat_prober.add_volume_mount( + "client-tls-authentication-certificate", + STACKABLE_TLS_CLIENT_AUTH_DIR, + ); + pod_builder.add_volume(create_tls_volume( + "client-tls-authentication-certificate", + Some(&TlsSecretClass { + secret_class: secret_class.clone(), + }), + )); + } + _ => { + return Err(Error::AuthenticationMethodNotSupported { + authentication_class: ObjectRef::from_obj(auth_class), + supported: vec!["tls".to_string()], + method: auth_class.spec.provider.to_string(), + }) + } + } + } else if let Some(tls) = kafka.client_tls_secret_class() { + cb_prepare.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); + cb_kafka.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); + cb_kcat_prober.add_volume_mount("client-tls-certificate", STACKABLE_TLS_CLIENT_DIR); + pod_builder.add_volume(create_tls_volume("client-tls-certificate", Some(tls))); + } + + if let Some(tls_internal) = kafka.internal_tls_secret_class() { + cb_prepare.add_volume_mount("internal-tls-certificate", STACKABLE_TLS_INTERNAL_DIR); + cb_kafka.add_volume_mount("internal-tls-certificate", STACKABLE_TLS_INTERNAL_DIR); + pod_builder.add_volume(create_tls_volume( + "internal-tls-certificate", + Some(tls_internal), + )); + } + let container_get_svc = ContainerBuilder::new("get-svc") .image("docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0") .command(vec!["bash".to_string()]) @@ -499,11 +627,7 @@ fn build_broker_rolegroup_statefulset( "-euo".to_string(), "pipefail".to_string(), "-c".to_string(), - [ - "kubectl get service \"$POD_NAME\" -o jsonpath='{.spec.ports[0].nodePort}'", - "tee /stackable/tmp/nodeport", - ] - .join(" | "), + get_svc_args, ]) .add_env_vars(vec![EnvVar { name: "POD_NAME".to_string(), @@ -516,35 +640,21 @@ fn build_broker_rolegroup_statefulset( }), ..EnvVar::default() }]) - .add_volume_mount("tmp", "/stackable/tmp") + .add_volume_mount("tmp", STACKABLE_TMP_DIR) .build(); - // For most storage classes the mounts will belong to the root user and not be writeable to - // other users. - // Since kafka runs as the user stackable inside of the container the data directory needs to be - // chowned to that user for it to be able to store data there. - let mut container_chown = ContainerBuilder::new("chown-data") - .image(&image) + cb_prepare + .image("docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0") .command(vec![ "/bin/bash".to_string(), "-euo".to_string(), "pipefail".to_string(), "-c".to_string(), ]) - .args(vec![[ - "echo chowning data directory", - "chown -R stackable:stackable /stackable/data", - "echo chmodding data directory", - "chmod -R a=,u=rwX /stackable/data", - ] - .join(" && ")]) - .add_volume_mount(LOG_DIRS_VOLUME_NAME, "/stackable/data") - .build(); - - container_chown - .security_context - .get_or_insert_with(SecurityContext::default) - .run_as_user = Some(0); + .args(vec![command::prepare_container_cmd_args(kafka)]) + .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) + .add_volume_mount("tmp", STACKABLE_TMP_DIR) + .security_context(SecurityContextBuilder::run_as_root()); let (pvc, resources) = kafka.resources(rolegroup_ref); @@ -594,6 +704,17 @@ fn build_broker_rolegroup_statefulset( }), ..EnvVar::default() }); + env.push(EnvVar { + name: "POD_NAME".to_string(), + value_from: Some(EnvVarSource { + field_ref: Some(ObjectFieldSelector { + api_version: Some("v1".to_string()), + field_path: "metadata.name".to_string(), + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }); // add env var for log4j if set if kafka.spec.log4j.is_some() { @@ -608,13 +729,23 @@ fn build_broker_rolegroup_statefulset( let jvm_args = format!("-javaagent:/stackable/jmx/jmx_prometheus_javaagent-0.16.1.jar={}:/stackable/jmx/broker.yaml", METRICS_PORT); let zookeeper_override = "--override \"zookeeper.connect=$ZOOKEEPER\""; - let advertised_listeners_override = - "--override \"advertised.listeners=PLAINTEXT://$NODE:$(cat /stackable/tmp/nodeport)\""; + + let kafka_listeners = get_kafka_listener_config(kafka, &rolegroup_ref.object_name()) + .context(InvalidKafkaListenersSnafu)?; + let listeners_override = format!("--override \"listeners={}\"", kafka_listeners.listeners()); + let advertised_listeners_override = format!( + "--override \"advertised.listeners={}\"", + kafka_listeners.advertised_listeners() + ); + let listener_security_protocol_map_override = format!( + "--override \"listener.security.protocol.map={}\"", + kafka_listeners.listener_security_protocol_map() + ); let opa_url_override = opa_connect_string.map_or("".to_string(), |opa| { format!("--override \"opa.authorizer.url={}\"", opa) }); - let container_kafka = ContainerBuilder::new("kafka") + cb_kafka .image(image) .args(vec![ "sh".to_string(), @@ -623,24 +754,24 @@ fn build_broker_rolegroup_statefulset( "bin/kafka-server-start.sh", &format!("/stackable/config/{}", SERVER_PROPERTIES_FILE), zookeeper_override, - advertised_listeners_override, + &listeners_override, + &advertised_listeners_override, + &listener_security_protocol_map_override, &opa_url_override, ] .join(" "), ]) .add_env_vars(env) .add_env_var("EXTRA_ARGS", jvm_args) - .add_container_port("kafka", APP_PORT.into()) - .add_container_port("metrics", METRICS_PORT.into()) - .add_volume_mount(LOG_DIRS_VOLUME_NAME, "/stackable/data") - .add_volume_mount("config", "/stackable/config") - .add_volume_mount("tmp", "/stackable/tmp") - .resources(resources) - .build(); + .add_container_ports(container_ports(kafka)) + .add_volume_mount(LOG_DIRS_VOLUME_NAME, STACKABLE_DATA_DIR) + .add_volume_mount("config", STACKABLE_CONFIG_DIR) + .add_volume_mount("tmp", STACKABLE_TMP_DIR) + .resources(resources); // Use kcat sidecar for probing container status rather than the official Kafka tools, since they incur a lot of // unacceptable perf overhead - let mut container_kcat_prober = ContainerBuilder::new("kcat-prober") + let mut container_kcat_prober = cb_kcat_prober .image("edenhill/kcat:1.7.0") .command(vec!["sh".to_string()]) // Only allow the global load balancing service to send traffic to pods that are members of the quorum @@ -648,20 +779,15 @@ fn build_broker_rolegroup_statefulset( .readiness_probe(Probe { exec: Some(ExecAction { // If the broker is able to get its fellow cluster members then it has at least completed basic registration at some point - command: Some(vec![ - "kcat".to_string(), - "-b".to_string(), - format!("localhost:{}", APP_PORT), - "-L".to_string(), - ]), + command: Some(kcat_container_cmd_args(kafka)), }), - timeout_seconds: Some(3), - period_seconds: Some(1), + timeout_seconds: Some(5), + period_seconds: Some(2), ..Probe::default() }) .build(); container_kcat_prober.stdin = Some(true); - let mut pod_template = PodBuilder::new() + let mut pod_template = pod_builder .metadata_builder(|m| { m.with_recommended_labels( kafka, @@ -672,9 +798,9 @@ fn build_broker_rolegroup_statefulset( ) .with_label(pod_svc_controller::LABEL_ENABLE, "true") }) - .add_init_container(container_chown) + .add_init_container(cb_prepare.build()) .add_init_container(container_get_svc) - .add_container(container_kafka) + .add_container(cb_kafka.build()) .add_container(container_kcat_prober) .add_volume(Volume { name: "config".to_string(), @@ -741,3 +867,74 @@ fn build_broker_rolegroup_statefulset( pub fn error_policy(_error: &Error, _ctx: Arc) -> Action { Action::requeue(Duration::from_secs(5)) } + +/// We only expose client HTTP / HTTPS and Metrics ports. +fn service_ports(kafka: &KafkaCluster) -> Vec { + let mut ports = vec![ServicePort { + name: Some(METRICS_PORT_NAME.to_string()), + port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }]; + + if kafka.client_tls_secret_class().is_some() || kafka.client_authentication_class().is_some() { + ports.push(ServicePort { + name: Some(SECURE_CLIENT_PORT_NAME.to_string()), + port: SECURE_CLIENT_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }); + } else { + ports.push(ServicePort { + name: Some(CLIENT_PORT_NAME.to_string()), + port: CLIENT_PORT.into(), + protocol: Some("TCP".to_string()), + ..ServicePort::default() + }); + } + + ports +} + +/// We only expose client HTTP / HTTPS and Metrics ports. +fn container_ports(kafka: &KafkaCluster) -> Vec { + let mut ports = vec![ContainerPort { + name: Some(METRICS_PORT_NAME.to_string()), + container_port: METRICS_PORT.into(), + protocol: Some("TCP".to_string()), + ..ContainerPort::default() + }]; + + if kafka.client_tls_secret_class().is_some() || kafka.client_authentication_class().is_some() { + ports.push(ContainerPort { + name: Some(SECURE_CLIENT_PORT_NAME.to_string()), + container_port: SECURE_CLIENT_PORT.into(), + protocol: Some("TCP".to_string()), + ..ContainerPort::default() + }); + } else { + ports.push(ContainerPort { + name: Some(CLIENT_PORT_NAME.to_string()), + container_port: CLIENT_PORT.into(), + protocol: Some("TCP".to_string()), + ..ContainerPort::default() + }); + } + + ports +} + +fn create_tls_volume(volume_name: &str, tls_secret_class: Option<&TlsSecretClass>) -> Volume { + let secret_class_name = tls_secret_class + .map(|t| t.secret_class.as_ref()) + .unwrap_or(TLS_DEFAULT_SECRET_CLASS); + + VolumeBuilder::new(volume_name) + .ephemeral( + SecretOperatorVolumeSourceBuilder::new(secret_class_name) + .with_pod_scope() + .with_node_scope() + .build(), + ) + .build() +} diff --git a/rust/operator/src/lib.rs b/rust/operator/src/lib.rs index 98660b2f..6f3d36d6 100644 --- a/rust/operator/src/lib.rs +++ b/rust/operator/src/lib.rs @@ -1,3 +1,4 @@ +mod command; mod discovery; mod kafka_controller; mod pod_svc_controller; diff --git a/rust/operator/src/pod_svc_controller.rs b/rust/operator/src/pod_svc_controller.rs index 8431405b..a4d00ccc 100644 --- a/rust/operator/src/pod_svc_controller.rs +++ b/rust/operator/src/pod_svc_controller.rs @@ -1,8 +1,8 @@ use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_kafka_crd::APP_PORT; +use stackable_kafka_crd::APP_NAME; use stackable_operator::{ k8s_openapi::{ - api::core::v1::{Pod, Service, ServicePort, ServiceSpec}, + api::core::v1::{Container, Pod, Service, ServicePort, ServiceSpec}, apimachinery::pkg::apis::meta::v1::OwnerReference, }, kube::{core::ObjectMeta, runtime::controller::Action}, @@ -44,6 +44,27 @@ impl ReconcilerError for Error { pub async fn reconcile_pod(pod: Arc, ctx: Arc) -> Result { tracing::info!("Starting reconcile"); let name = pod.metadata.name.clone().context(ObjectHasNoNameSnafu)?; + let mut ports: Vec = vec![]; + + if let Some(spec) = &pod.spec { + for container in &spec + .containers + .iter() + .filter(|container| container.name == APP_NAME) + .collect::>() + { + if let Some(container_ports) = &container.ports { + for port in container_ports { + ports.push(ServicePort { + name: port.name.clone(), + port: port.container_port, + ..ServicePort::default() + }); + } + } + } + } + let svc = Service { metadata: ObjectMeta { namespace: pod.metadata.namespace.clone(), @@ -60,11 +81,7 @@ pub async fn reconcile_pod(pod: Arc, ctx: Arc) -> Result { spec: Some(ServiceSpec { type_: Some("NodePort".to_string()), external_traffic_policy: Some("Local".to_string()), - ports: Some(vec![ServicePort { - name: Some("kafka".to_string()), - port: APP_PORT.into(), - ..ServicePort::default() - }]), + ports: Some(ports), selector: Some([(LABEL_STS_POD_NAME.to_string(), name)].into()), publish_not_ready_addresses: Some(true), ..ServiceSpec::default() diff --git a/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 b/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 index bd107566..9e6ef15e 100644 --- a/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/smoke/01-install-kafka.yaml.j2 @@ -12,6 +12,13 @@ metadata: spec: version: {{ test_scenario['values']['kafka'] }} zookeeperConfigMapName: kafka-zk + config: +{% if test_scenario['values']['use-client-tls'] == 'true' %} + tls: + secretClass: tls +{% else %} + tls: null +{% endif %} brokers: config: resources: diff --git a/tests/templates/kuttl/tls/10-assert.yaml b/tests/templates/kuttl/tls/10-assert.yaml new file mode 100644 index 00000000..a1583b3e --- /dev/null +++ b/tests/templates/kuttl/tls/10-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: install-test-zk +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-zk-server-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/tls/10-install-zookeeper.yaml.j2 b/tests/templates/kuttl/tls/10-install-zookeeper.yaml.j2 new file mode 100644 index 00000000..72acb99a --- /dev/null +++ b/tests/templates/kuttl/tls/10-install-zookeeper.yaml.j2 @@ -0,0 +1,13 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: test-zk +spec: + version: {{ test_scenario['values']['zookeeper-latest'] }} + config: + tls: null + servers: + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/tls/20-assert.yaml b/tests/templates/kuttl/tls/20-assert.yaml new file mode 100644 index 00000000..821a471f --- /dev/null +++ b/tests/templates/kuttl/tls/20-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: install-test-kafka +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-kafka-broker-default +status: + readyReplicas: 3 + replicas: 3 diff --git a/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 b/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 new file mode 100644 index 00000000..6952f3f7 --- /dev/null +++ b/tests/templates/kuttl/tls/20-install-kafka.yaml.j2 @@ -0,0 +1,78 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: test-kafka-znode +spec: + clusterRef: + name: test-zk +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: test-kafka-client-auth-tls +spec: + provider: + tls: + clientCertSecretClass: test-kafka-client-auth-tls +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: test-kafka-client-auth-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-auth-ca + namespace: default + autoGenerate: true +{% endif %} +{% if test_scenario['values']['use-internal-tls'] == 'true' %} +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: test-kafka-internal-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-test-kafka-internal-tls-ca + namespace: default + autoGenerate: true +{% endif %} +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: test-kafka +spec: + version: {{ test_scenario['values']['kafka'] }} + zookeeperConfigMapName: test-kafka-znode + config: +{% if test_scenario['values']['use-client-tls'] == 'true' %} + tls: + secretClass: tls +{% else %} + tls: null +{% endif %} +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + clientAuthentication: + authenticationClass: test-kafka-client-auth-tls +{% else %} + clientAuthentication: null +{% endif %} +{% if test_scenario['values']['use-internal-tls'] == 'true' %} + internalTls: + secretClass: test-kafka-internal-tls +{% else %} + internalTls: null +{% endif %} + brokers: + roleGroups: + default: + replicas: 3 diff --git a/tests/templates/kuttl/tls/30-assert.yaml.j2 b/tests/templates/kuttl/tls/30-assert.yaml.j2 new file mode 100644 index 00000000..2a1cf896 --- /dev/null +++ b/tests/templates/kuttl/tls/30-assert.yaml.j2 @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-tls +commands: +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + - script: kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -- /tmp/test_client_auth_tls.sh $NAMESPACE +{% elif test_scenario['values']['use-client-tls'] == 'true' %} + - script: kubectl exec -n $NAMESPACE test-kafka-broker-default-0 -- /tmp/test_client_tls.sh $NAMESPACE +{% endif %} diff --git a/tests/templates/kuttl/tls/30-prepare-test-kafka.yaml.j2 b/tests/templates/kuttl/tls/30-prepare-test-kafka.yaml.j2 new file mode 100644 index 00000000..27ac66ab --- /dev/null +++ b/tests/templates/kuttl/tls/30-prepare-test-kafka.yaml.j2 @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + - script: kubectl cp -n $NAMESPACE ./test_client_auth_tls.sh test-kafka-broker-default-0:/tmp + - script: kubectl cp -n $NAMESPACE ./wrong_keystore.p12 test-kafka-broker-default-0:/tmp + - script: kubectl cp -n $NAMESPACE ./wrong_truststore.p12 test-kafka-broker-default-0:/tmp +{% elif test_scenario['values']['use-client-tls'] == 'true' %} + - script: kubectl cp -n $NAMESPACE ./test_client_tls.sh test-kafka-broker-default-0:/tmp +{% endif %} diff --git a/tests/templates/kuttl/tls/test_client_auth_tls.sh b/tests/templates/kuttl/tls/test_client_auth_tls.sh new file mode 100755 index 00000000..eece2a5e --- /dev/null +++ b/tests/templates/kuttl/tls/test_client_auth_tls.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# Usage: test_tls.sh namespace + +NAMESPACE=$1 + +# to be safe +unset TOPIC +unset BAD_TOPIC + +SERVER="test-kafka-broker-default-0.test-kafka-broker-default.${NAMESPACE}.svc.cluster.local:9093" + +echo "Start client auth TLS testing..." +############################################################################ +# Test the secured connection +############################################################################ +# create random topics +TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config + +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config +then + echo "[SUCCESS] Secure client topic created!" +else + echo "[ERROR] Secure client topic creation failed!" + exit 1 +fi + +if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config | grep "$TOPIC" +then + echo "[SUCCESS] Secure client topic read!" +else + echo "[ERROR] Secure client topic read failed!" + exit 1 +fi + +############################################################################ +# Test the connection without certificates +############################################################################ +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server "$SERVER" &> /dev/null +then + echo "[ERROR] Secure client topic created without certificates!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed without certificates!" +fi + +############################################################################ +# Test the connection with bad host name +############################################################################ +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /tmp/client.config &> /dev/null +then + echo "[ERROR] Secure client topic created with bad host name!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed with bad host name!" +fi + +############################################################################ +# Test the connection with bad certificate +############################################################################ +echo $'security.protocol=SSL\nssl.keystore.location=/tmp/wrong_keystore.p12\nssl.keystore.password=changeit\nssl.truststore.location=/tmp/wrong_truststore.p12\nssl.truststore.password=changeit' > /tmp/client.config +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config &> /dev/null +then + echo "[ERROR] Secure client topic created with wrong certificate!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed with wrong certificate!" +fi + +echo "All client auth TLS tests successful!" +exit 0 diff --git a/tests/templates/kuttl/tls/test_client_tls.sh b/tests/templates/kuttl/tls/test_client_tls.sh new file mode 100755 index 00000000..cc675b0d --- /dev/null +++ b/tests/templates/kuttl/tls/test_client_tls.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +# Usage: test_tls.sh namespace + +NAMESPACE=$1 + +# to be safe +unset TOPIC +unset BAD_TOPIC + +SERVER="test-kafka-broker-default-0.test-kafka-broker-default.${NAMESPACE}.svc.cluster.local:9093" + +echo "Start client TLS testing..." +############################################################################ +# Test the secured connection +############################################################################ +# create random topics +TOPIC=$(tr -dc A-Za-z0-9 /tmp/client.config + +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config +then + echo "[SUCCESS] Secure client topic created!" +else + echo "[ERROR] Secure client topic creation failed!" + exit 1 +fi + +if /stackable/kafka/bin/kafka-topics.sh --list --topic "$TOPIC" --bootstrap-server "$SERVER" --command-config /tmp/client.config | grep "$TOPIC" +then + echo "[SUCCESS] Secure client topic read!" +else + echo "[ERROR] Secure client topic read failed!" + exit 1 +fi + +############################################################################ +# Test the connection without certificates +############################################################################ +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server "$SERVER" &> /dev/null +then + echo "[ERROR] Secure client topic created without certificates!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed without certificates!" +fi + +############################################################################ +# Test the connection with bad host name +############################################################################ +if /stackable/kafka/bin/kafka-topics.sh --create --topic "$BAD_TOPIC" --bootstrap-server localhost:9093 --command-config /tmp/client.config &> /dev/null +then + echo "[ERROR] Secure client topic created with bad host name!" + exit 1 +else + echo "[SUCCESS] Secure client topic creation failed with bad host name!" +fi + +echo "All client TLS tests successful!" +exit 0 diff --git a/tests/templates/kuttl/tls/wrong_keystore.p12 b/tests/templates/kuttl/tls/wrong_keystore.p12 new file mode 100644 index 00000000..e5dc3a42 Binary files /dev/null and b/tests/templates/kuttl/tls/wrong_keystore.p12 differ diff --git a/tests/templates/kuttl/tls/wrong_truststore.p12 b/tests/templates/kuttl/tls/wrong_truststore.p12 new file mode 100644 index 00000000..0eca7262 Binary files /dev/null and b/tests/templates/kuttl/tls/wrong_truststore.p12 differ diff --git a/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 index 8ce54655..393de25d 100644 --- a/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/01-install-kafka.yaml.j2 @@ -2,6 +2,30 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep timeout: 300 +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: test-kafka-client-auth-tls +spec: + provider: + tls: + clientCertSecretClass: test-kafka-client-auth-tls +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: test-kafka-client-auth-tls +spec: + backend: + autoTls: + ca: + secret: + name: secret-provisioner-tls-kafka-client-auth-ca + namespace: default + autoGenerate: true +{% endif %} --- apiVersion: kafka.stackable.tech/v1alpha1 kind: KafkaCluster @@ -10,6 +34,17 @@ metadata: spec: version: {{ test_scenario['values']['upgrade_old'] }} zookeeperConfigMapName: kafka-zk + config: +{% if test_scenario['values']['use-client-tls'] == 'true' %} + tls: + secretClass: tls +{% else %} + tls: null +{% endif %} +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + clientAuthentication: + authenticationClass: test-kafka-client-auth-tls +{% endif %} brokers: roleGroups: default: diff --git a/tests/templates/kuttl/upgrade/02-write-data.yaml b/tests/templates/kuttl/upgrade/02-write-data.yaml deleted file mode 100644 index 15bf418c..00000000 --- a/tests/templates/kuttl/upgrade/02-write-data.yaml +++ /dev/null @@ -1,27 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: write-data -spec: - template: - spec: - containers: - - name: write-data - image: edenhill/kcat:1.7.1 - command: [sh, -euo, pipefail, -c] - args: - - | - echo "message written before upgrade" > message - kcat -b $KAFKA -t upgrade-test-data -P message - env: - - name: KAFKA - valueFrom: - configMapKeyRef: - name: simple-kafka - key: KAFKA - restartPolicy: Never diff --git a/tests/templates/kuttl/upgrade/02-write-data.yaml.j2 b/tests/templates/kuttl/upgrade/02-write-data.yaml.j2 new file mode 100644 index 00000000..8d642452 --- /dev/null +++ b/tests/templates/kuttl/upgrade/02-write-data.yaml.j2 @@ -0,0 +1,58 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 300 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: write-data +spec: + template: + spec: + containers: + - name: write-data + image: edenhill/kcat:1.7.1 + command: [sh, -euo, pipefail, -c] + args: + - | +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + export SSL_OPTIONS="-X security.protocol=SSL -X ssl.key.location=/stackable/tls_client/tls.key -X ssl.certificate.location=/stackable/tls_client/tls.crt -X ssl.ca.location=/stackable/tls_client/ca.crt" +{% elif test_scenario['values']['use-client-tls'] == 'true' %} + export SSL_OPTIONS="-X security.protocol=SSL -X ssl.ca.location=/stackable/tls_client/ca.crt" +{% else %} + export SSL_OPTIONS="" +{% endif %} + echo "message written before upgrade" > message + kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -P message + env: + - name: KAFKA + valueFrom: + configMapKeyRef: + name: simple-kafka + key: KAFKA + volumeMounts: + - mountPath: /stackable/tls_client + name: tls + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + secrets.stackable.tech/class: test-kafka-client-auth-tls +{% else %} + secrets.stackable.tech/class: tls +{% endif %} + secrets.stackable.tech/scope: pod,node + creationTimestamp: null + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + name: tls + restartPolicy: Never diff --git a/tests/templates/kuttl/upgrade/03-upgrade-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/03-upgrade-kafka.yaml.j2 index 37fb742e..8e81a2fd 100644 --- a/tests/templates/kuttl/upgrade/03-upgrade-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/03-upgrade-kafka.yaml.j2 @@ -2,15 +2,8 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep timeout: 300 ---- -apiVersion: kafka.stackable.tech/v1alpha1 -kind: KafkaCluster -metadata: - name: simple-kafka -spec: - version: {{ test_scenario['values']['upgrade_new'] }} - zookeeperConfigMapName: kafka-zk - brokers: - roleGroups: - default: - replicas: 1 +commands: + - script: >- + kubectl --namespace $NAMESPACE + patch kafkaclusters.kafka.stackable.tech simple-kafka + --type=merge --patch '{ "spec": { "version": "{{ test_scenario['values']['upgrade_new'] }}" }}' diff --git a/tests/templates/kuttl/upgrade/04-read-data.yaml b/tests/templates/kuttl/upgrade/04-read-data.yaml deleted file mode 100644 index 599d83c6..00000000 --- a/tests/templates/kuttl/upgrade/04-read-data.yaml +++ /dev/null @@ -1,35 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -timeout: 300 ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: read-data -spec: - template: - spec: - containers: - - name: read-data - image: edenhill/kcat:1.7.1 - command: [sh, -euo, pipefail, -c] - args: - - | - echo "message written after upgrade" > message - kcat -b $KAFKA -t upgrade-test-data -P message - - echo "message written before upgrade" > expected-messages - echo >> expected-messages - cat message >> expected-messages - echo >> expected-messages - kcat -b $KAFKA -t upgrade-test-data -C -e > read-messages - diff read-messages expected-messages - cmp read-messages expected-messages - env: - - name: KAFKA - valueFrom: - configMapKeyRef: - name: simple-kafka - key: KAFKA - restartPolicy: Never diff --git a/tests/templates/kuttl/upgrade/04-read-data.yaml.j2 b/tests/templates/kuttl/upgrade/04-read-data.yaml.j2 new file mode 100644 index 00000000..c3b60550 --- /dev/null +++ b/tests/templates/kuttl/upgrade/04-read-data.yaml.j2 @@ -0,0 +1,66 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 300 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: read-data +spec: + template: + spec: + containers: + - name: read-data + image: edenhill/kcat:1.7.1 + command: [sh, -euo, pipefail, -c] + args: + - | +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + export SSL_OPTIONS="-X security.protocol=SSL -X ssl.key.location=/stackable/tls_client/tls.key -X ssl.certificate.location=/stackable/tls_client/tls.crt -X ssl.ca.location=/stackable/tls_client/ca.crt" +{% elif test_scenario['values']['use-client-tls'] == 'true' %} + export SSL_OPTIONS="-X security.protocol=SSL -X ssl.ca.location=/stackable/tls_client/ca.crt" +{% else %} + export SSL_OPTIONS="" +{% endif %} + echo "message written after upgrade" > message + kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -P message + + echo "message written before upgrade" > expected-messages + echo >> expected-messages + cat message >> expected-messages + echo >> expected-messages + kcat -b $KAFKA $SSL_OPTIONS -t upgrade-test-data -C -e > read-messages + diff read-messages expected-messages + cmp read-messages expected-messages + env: + - name: KAFKA + valueFrom: + configMapKeyRef: + name: simple-kafka + key: KAFKA + volumeMounts: + - mountPath: /stackable/tls_client + name: tls + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: +{% if test_scenario['values']['use-client-auth-tls'] == 'true' %} + secrets.stackable.tech/class: test-kafka-client-auth-tls +{% else %} + secrets.stackable.tech/class: tls +{% endif %} + secrets.stackable.tech/scope: pod,node + creationTimestamp: null + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem + name: tls + restartPolicy: Never diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 79e91463..8c9af1b1 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -11,6 +11,9 @@ dimensions: - 3.6.3-stackable0.7.1 - 3.7.0-stackable0.7.1 - 3.8.0-stackable0.7.1 + - name: zookeeper-latest + values: + - 3.8.0-stackable0.7.1 - name: upgrade_old values: - 2.7.1-stackable0 @@ -19,13 +22,35 @@ dimensions: - name: upgrade_new values: - 3.2.0-stackable0.1.0 + - name: use-client-tls + values: + - "true" + - "false" + - name: use-client-auth-tls + values: + - "true" + - "false" + - name: use-internal-tls + values: + - "true" + - "false" tests: - name: smoke dimensions: - kafka - zookeeper + - use-client-tls - name: upgrade dimensions: - zookeeper - upgrade_new - upgrade_old + - use-client-tls + - use-client-auth-tls + - name: tls + dimensions: + - kafka + - zookeeper-latest + - use-client-tls + - use-client-auth-tls + - use-internal-tls