Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
The `.clusterConfig.metadataStorageDatabase` has subfields according to the supported db types: `postgresql`, `mysql` and `derby`.
- BREAKING: The `.clusterConfig.metadataStorageDatabase` field has been renamed to `.clusterConfig.metadataDatabase` for consistency ([#814]).
- Document Helm deployed RBAC permissions and remove unnecessary permissions ([#810]).
- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#824]).
- test: Bump vector-aggregator to 0.55.0, replace /graphql call with gRPC calls ([#826]).

### Deleted
Expand All @@ -26,6 +27,7 @@ All notable changes to this project will be documented in this file.
[#813]: https://github.com/stackabletech/druid-operator/pull/813
[#814]: https://github.com/stackabletech/druid-operator/pull/814
[#818]: https://github.com/stackabletech/druid-operator/pull/818
[#824]: https://github.com/stackabletech/druid-operator/pull/824
[#826]: https://github.com/stackabletech/druid-operator/pull/826

## [26.3.0] - 2026-03-16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use product_config::{
types::PropertyNameKind,
writer::{PropertiesWriterError, to_java_properties_string},
};
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::{
self,
Expand All @@ -26,11 +26,7 @@ use stackable_operator::{
},
cli::OperatorEnvironmentOptions,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::{
opa::OpaApiVersion,
product_image_selection::{self, ResolvedProductImage},
rbac::build_rbac_resources,
},
commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources},
constants::RESTART_CONTROLLER_ENABLED_LABEL,
crd::s3,
database_connections::drivers::jdbc::JdbcDatabaseConnection as _,
Expand All @@ -49,7 +45,6 @@ use stackable_operator::{
},
kvp::{KeyValuePairError, LabelError, LabelValueError, Labels},
logging::controller::ReconcilerError,
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
product_logging::{
self,
framework::LoggingError,
Expand Down Expand Up @@ -77,7 +72,6 @@ use crate::{
LOG_CONFIG_DIRECTORY, MAX_DRUID_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME,
OPERATOR_NAME, RUNTIME_PROPS, RW_CONFIG_DIRECTORY, S3_ACCESS_KEY, S3_ENDPOINT_URL,
S3_PATH_STYLE_ACCESS, S3_SECRET_KEY, STACKABLE_LOG_DIR, ZOOKEEPER_CONNECTION_STRING,
authentication::AuthenticationClassesResolved, authorization::DruidAuthorization,
build_recommended_labels, build_string_list, security::DruidTlsSecurity, v1alpha1,
},
discovery::{self, build_discovery_configmaps},
Expand All @@ -92,10 +86,13 @@ use crate::{
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
};

mod dereference;
mod validate;

pub const DRUID_CONTROLLER_NAME: &str = "druidcluster";
pub const FULL_CONTROLLER_NAME: &str = concatcp!(DRUID_CONTROLLER_NAME, '.', OPERATOR_NAME);

const CONTAINER_IMAGE_BASE_NAME: &str = "druid";
pub(super) const CONTAINER_IMAGE_BASE_NAME: &str = "druid";

// volume names
const DRUID_CONFIG_VOLUME_NAME: &str = "config";
Expand Down Expand Up @@ -139,63 +136,19 @@ pub enum Error {
rolegroup: RoleGroupRef<v1alpha1::DruidCluster>,
},

#[snafu(display("invalid product configuration"))]
InvalidProductConfig {
source: stackable_operator::product_config_utils::Error,
},

#[snafu(display("invalid authentication configuration"))]
InvalidDruidAuthenticationConfig {
source: crate::authentication::Error,
},

#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},

#[snafu(display(
"failed to get ZooKeeper discovery config map for cluster: {}",
cm_name
))]
GetZookeeperConnStringConfigMap {
source: stackable_operator::client::Error,
cm_name: String,
},

#[snafu(display(
"failed to get OPA discovery config map and/or connection string for cluster: {}",
cm_name
))]
GetOpaConnString {
source: stackable_operator::commons::opa::Error,
cm_name: String,
},

#[snafu(display("failed to get valid S3 connection"))]
GetS3Connection { source: crate::crd::Error },
#[snafu(display("failed to dereference cluster objects"))]
Dereference { source: dereference::Error },

#[snafu(display("failed to configure S3 connection"))]
ConfigureS3 {
source: stackable_operator::crd::s3::v1alpha1::ConnectionError,
},

#[snafu(display("failed to get deep storage bucket"))]
GetDeepStorageBucket {
source: stackable_operator::crd::s3::v1alpha1::BucketError,
},

#[snafu(display(
"failed to get ZooKeeper connection string from config map {}",
cm_name
))]
MissingZookeeperConnString { cm_name: String },

#[snafu(display("failed to transform configs"))]
ProductConfigTransform {
source: stackable_operator::product_config_utils::Error,
},

#[snafu(display("failed to format runtime properties"))]
PropertiesWriteError { source: PropertiesWriterError },

Expand Down Expand Up @@ -245,17 +198,9 @@ pub enum Error {
name: String,
},

#[snafu(display("object defines no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to initialize security context"))]
FailedToInitializeSecurityContext { source: crate::crd::security::Error },

#[snafu(display("failed to retrieve AuthenticationClass"))]
AuthenticationClassRetrieval {
source: crate::crd::authentication::Error,
},

#[snafu(display("failed to get JVM config"))]
GetJvmConfig { source: crate::config::jvm::Error },

Expand Down Expand Up @@ -363,10 +308,8 @@ pub enum Error {
#[snafu(display("failed to configure service"))]
ServiceConfiguration { source: crate::service::Error },

#[snafu(display("failed to resolve product image"))]
ResolveProductImage {
source: product_image_selection::Error,
},
#[snafu(display("failed to validate cluster"))]
ValidateCluster { source: validate::Error },

#[snafu(display("invalid metadata database connection"))]
InvalidMetadataDatabaseConnection {
Expand Down Expand Up @@ -394,89 +337,18 @@ pub async fn reconcile_druid(
.context(InvalidDruidClusterSnafu)?;

let client = &ctx.client;
let namespace = &druid
.metadata
.namespace
.clone()
.with_context(|| ObjectHasNoNamespaceSnafu {})?;
let resolved_product_image = druid
.spec
.image
.resolve(
CONTAINER_IMAGE_BASE_NAME,
&ctx.operator_environment.image_repository,
crate::built_info::PKG_VERSION,
)
.context(ResolveProductImageSnafu)?;

let zk_confmap = druid.spec.cluster_config.zookeeper_config_map_name.clone();
let zk_connstr = client
.get::<ConfigMap>(&zk_confmap, namespace)
let dereferenced_objects = dereference::dereference(client, druid)
.await
.context(GetZookeeperConnStringConfigMapSnafu {
cm_name: zk_confmap.clone(),
})?
.data
.and_then(|mut data| data.remove("ZOOKEEPER"))
.context(MissingZookeeperConnStringSnafu {
cm_name: zk_confmap.clone(),
})?;
.context(DereferenceSnafu)?;

// Assemble the OPA connection string from the discovery and the given path, if a spec is given.
let opa_connstr = if let Some(DruidAuthorization { opa: opa_config }) =
&druid.spec.cluster_config.authorization
{
Some(
opa_config
.full_document_url_from_config_map(client, druid, Some("allow"), &OpaApiVersion::V1)
.await
.context(GetOpaConnStringSnafu {
cm_name: opa_config.config_map_name.clone(),
})?,
)
} else {
None
};

// Get the s3 connection if one is defined
let s3_conn = druid
.get_s3_connection(client)
.await
.context(GetS3ConnectionSnafu)?;

let deep_storage_bucket_name = match &druid.spec.cluster_config.deep_storage {
DeepStorageSpec::S3(s3_spec) => Some(
s3_spec
.bucket
.clone()
.resolve(client, namespace)
.await
.context(GetDeepStorageBucketSnafu)?
.bucket_name,
),
_ => None,
};

let resolved_auth_classes =
AuthenticationClassesResolved::from(&druid.spec.cluster_config, client)
.await
.context(AuthenticationClassRetrievalSnafu)?;

let druid_tls_security =
DruidTlsSecurity::new_from_druid_cluster(druid, &resolved_auth_classes);

let druid_auth_config = DruidAuthenticationConfig::try_from(resolved_auth_classes)
.context(InvalidDruidAuthenticationConfigSnafu)?;

let role_config = transform_all_roles_to_config(druid, &druid.build_role_properties());
let validated_role_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&role_config.context(ProductConfigTransformSnafu)?,
let validated = validate::validate(
druid,
&dereferenced_objects,
&ctx.operator_environment,
&ctx.product_config,
false,
false,
)
.context(InvalidProductConfigSnafu)?;
.context(ValidateClusterSnafu)?;

let mut cluster_resources = ClusterResources::new(
APP_NAME,
Expand Down Expand Up @@ -510,7 +382,7 @@ pub async fn reconcile_druid(

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

for (role_name, role_config) in validated_role_config.iter() {
for (role_name, role_config) in validated.validated_role_config.iter() {
let druid_role = DruidRole::from_str(role_name).context(UnidentifiedDruidRoleSnafu {
role: role_name.to_string(),
})?;
Expand All @@ -533,7 +405,7 @@ pub async fn reconcile_druid(
let role_group_service_recommended_labels = build_recommended_labels(
druid,
DRUID_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
&validated.resolved_product_image.app_version_label_value,
&rolegroup.role,
&rolegroup.role_group,
);
Expand All @@ -548,7 +420,7 @@ pub async fn reconcile_druid(

let rg_headless_service = build_rolegroup_headless_service(
druid,
&druid_tls_security,
&validated.druid_tls_security,
&druid_role,
&rolegroup,
role_group_service_recommended_labels.clone(),
Expand All @@ -565,27 +437,27 @@ pub async fn reconcile_druid(

let rg_configmap = build_rolegroup_config_map(
druid,
&resolved_product_image,
&validated.resolved_product_image,
&rolegroup,
rolegroup_config,
&merged_rolegroup_config,
&zk_connstr,
opa_connstr.as_deref(),
s3_conn.as_ref(),
deep_storage_bucket_name.as_deref(),
&druid_tls_security,
&druid_auth_config,
&validated.zookeeper_connection_string,
validated.opa_connection_string.as_deref(),
validated.s3_connection.as_ref(),
validated.deep_storage_bucket_name.as_deref(),
&validated.druid_tls_security,
&validated.druid_auth_config,
)?;
let rg_statefulset = build_rolegroup_statefulset(
druid,
&resolved_product_image,
&validated.resolved_product_image,
&druid_role,
&rolegroup,
rolegroup_config,
&merged_rolegroup_config,
s3_conn.as_ref(),
&druid_tls_security,
&druid_auth_config,
validated.s3_connection.as_ref(),
&validated.druid_tls_security,
&validated.druid_auth_config,
&rbac_sa,
)?;

Expand Down Expand Up @@ -628,14 +500,14 @@ pub async fn reconcile_druid(
build_recommended_labels(
druid,
DRUID_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
&validated.resolved_product_image.app_version_label_value,
role_name,
"none",
),
listener_class.to_string(),
listener_group_name,
&druid_role,
&druid_tls_security,
&validated.druid_tls_security,
)
.context(ListenerConfigurationSnafu)?;

Expand All @@ -649,8 +521,8 @@ pub async fn reconcile_druid(
for discovery_cm in build_discovery_configmaps(
druid,
druid,
&resolved_product_image,
&druid_tls_security,
&validated.resolved_product_image,
&validated.druid_tls_security,
listener,
)
.await
Expand Down Expand Up @@ -1387,9 +1259,12 @@ pub fn error_policy(
mod test {
use product_config::{ProductConfigManager, writer};
use rstest::*;
use stackable_operator::product_config_utils::{
transform_all_roles_to_config, validate_all_roles_and_groups_config,
};

use super::*;
use crate::crd::PROP_SEGMENT_CACHE_LOCATIONS;
use crate::crd::{PROP_SEGMENT_CACHE_LOCATIONS, authentication::AuthenticationClassesResolved};

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
Expand Down
Loading
Loading