diff --git a/CHANGELOG.md b/CHANGELOG.md index d432f57f..5896c0a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file. ### Fixed - BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory ([#632]). +- Failing to parse one `DruidCluster`/`AuthenticationClass` should no longer cause the whole operator to stop functioning ([#638]). ### Removed @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file. [#621]: https://github.com/stackabletech/druid-operator/pull/621 [#631]: https://github.com/stackabletech/druid-operator/pull/631 [#632]: https://github.com/stackabletech/druid-operator/pull/632 +[#638]: https://github.com/stackabletech/druid-operator/pull/638 ## [24.7.0] - 2024-07-24 diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index 3b0cb936..1ea87d51 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -1,7 +1,6 @@ //! Ensures that `Pod`s are configured and running for each [`DruidCluster`] use std::{ collections::{BTreeMap, HashMap}, - ops::Deref, str::FromStr, sync::Arc, }; @@ -49,6 +48,7 @@ use stackable_operator::{ DeepMerge, }, kube::{ + core::{error_boundary, DeserializeGuard}, runtime::{controller::Action, reflector::ObjectRef}, Resource, }, @@ -358,6 +358,11 @@ pub enum Error { AddVolumeMount { source: builder::pod::container::Error, }, + + #[snafu(display("DruidCluster object is invalid"))] + InvalidDruidCluster { + source: error_boundary::InvalidObject, + }, } type Result = std::result::Result; @@ -368,8 +373,17 @@ impl ReconcilerError for Error { } } -pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result { +pub async fn reconcile_druid( + druid: Arc>, + ctx: Arc, +) -> Result { tracing::info!("Starting reconcile"); + let druid = druid + .0 + .as_ref() + .map_err(error_boundary::InvalidObject::clone) + .context(InvalidDruidClusterSnafu)?; + let client = &ctx.client; let namespace = &druid .metadata @@ -394,7 +408,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< cm_name: zk_confmap.clone(), })?; - let vector_aggregator_address = resolve_vector_aggregator_address(&druid, client) + let vector_aggregator_address = resolve_vector_aggregator_address(druid, client) .await .context(ResolveVectorAggregatorAddressSnafu)?; @@ -404,12 +418,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< { Some( opa_config - .full_document_url_from_config_map( - client, - druid.deref(), - Some("allow"), - OpaApiVersion::V1, - ) + .full_document_url_from_config_map(client, druid, Some("allow"), OpaApiVersion::V1) .await .context(GetOpaConnStringSnafu { cm_name: opa_config.config_map_name.clone(), @@ -444,12 +453,12 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< .context(AuthenticationClassRetrievalSnafu)?; let druid_tls_security = - DruidTlsSecurity::new_from_druid_cluster(&druid, &resolved_auth_classes); + DruidTlsSecurity::new_from_druid_cluster(druid, &resolved_auth_classes); let druid_auth_config = DruidAuthenticationConfig::try_from(resolved_auth_classes) .context(InvalidDruidAuthenticationConfigSnafu)?; - let role_config = transform_all_roles_to_config(druid.as_ref(), druid.build_role_properties()); + let role_config = transform_all_roles_to_config(druid, druid.build_role_properties()); let validated_role_config = validate_all_roles_and_groups_config( &resolved_product_image.product_version, &role_config.context(ProductConfigTransformSnafu)?, @@ -471,7 +480,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< let merged_config = druid.merged_config().context(FailedToResolveConfigSnafu)?; let (rbac_sa, rbac_rolebinding) = build_rbac_resources( - druid.as_ref(), + druid, APP_NAME, cluster_resources .get_required_labels() @@ -495,7 +504,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< })?; let role_service = build_role_service( - &druid, + druid, &resolved_product_image, &druid_role, &druid_tls_security, @@ -505,13 +514,13 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< .await .context(ApplyRoleServiceSnafu)?; - create_shared_internal_secret(&druid, client, DRUID_CONTROLLER_NAME) + create_shared_internal_secret(druid, client, DRUID_CONTROLLER_NAME) .await .context(FailedInternalSecretCreationSnafu)?; for (rolegroup_name, rolegroup_config) in role_config.iter() { let rolegroup = RoleGroupRef { - cluster: ObjectRef::from_obj(&*druid), + cluster: ObjectRef::from_obj(druid), role: role_name.into(), role_group: rolegroup_name.into(), }; @@ -521,13 +530,13 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< .context(FailedToResolveConfigSnafu)?; let rg_service = build_rolegroup_services( - &druid, + druid, &resolved_product_image, &rolegroup, &druid_tls_security, )?; let rg_configmap = build_rolegroup_config_map( - &druid, + druid, &resolved_product_image, &rolegroup, rolegroup_config, @@ -541,7 +550,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< &druid_auth_config, )?; let rg_statefulset = build_rolegroup_statefulset( - &druid, + druid, &resolved_product_image, &druid_role, &rolegroup, @@ -577,7 +586,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< add_pdbs( &role_config.pod_disruption_budget, - &druid, + druid, &druid_role, client, &mut cluster_resources, @@ -587,14 +596,10 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< } // discovery - for discovery_cm in build_discovery_configmaps( - &druid, - &*druid, - &resolved_product_image, - &druid_tls_security, - ) - .await - .context(BuildDiscoveryConfigSnafu)? + for discovery_cm in + build_discovery_configmaps(druid, druid, &resolved_product_image, &druid_tls_security) + .await + .context(BuildDiscoveryConfigSnafu)? { cluster_resources .add(client, discovery_cm) @@ -606,10 +611,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< ClusterOperationsConditionBuilder::new(&druid.spec.cluster_operation); let status = DruidClusterStatus { - conditions: compute_conditions( - druid.as_ref(), - &[&ss_cond_builder, &cluster_operation_cond_builder], - ), + conditions: compute_conditions(druid, &[&ss_cond_builder, &cluster_operation_cond_builder]), }; cluster_resources @@ -617,7 +619,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< .await .context(DeleteOrphanedResourcesSnafu)?; client - .apply_patch_status(OPERATOR_NAME, &*druid, &status) + .apply_patch_status(OPERATOR_NAME, druid, &status) .await .context(ApplyStatusSnafu)?; @@ -1297,8 +1299,15 @@ fn add_log_volume_and_volume_mounts( Ok(()) } -pub fn error_policy(_obj: Arc, _error: &Error, _ctx: Arc) -> Action { - Action::requeue(*Duration::from_secs(5)) +pub fn error_policy( + _obj: Arc>, + error: &Error, + _ctx: Arc, +) -> Action { + match error { + Error::InvalidDruidCluster { .. } => Action::await_change(), + _ => Action::requeue(*Duration::from_secs(5)), + } } #[cfg(test)] diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 91f1d91f..ea369bac 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -21,6 +21,7 @@ use stackable_operator::{ apps::v1::StatefulSet, core::v1::{ConfigMap, Service}, }, + kube::core::DeserializeGuard, kube::runtime::{watcher, Controller}, logging::controller::report_controller_reconciled, }; @@ -67,7 +68,7 @@ async fn main() -> anyhow::Result<()> { stackable_operator::client::create_client(Some(OPERATOR_NAME.to_string())).await?; Controller::new( - watch_namespace.get_api::(&client), + watch_namespace.get_api::>(&client), watcher::Config::default(), ) .owns(