From d23dc38a29f729117530c5e76222ac59fb6f87b6 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Tue, 21 Feb 2023 09:26:03 +0100 Subject: [PATCH 1/9] Use fragments in DruidClusterSpec --- rust/crd/src/lib.rs | 118 ++++++++++++++++++++++++++++++--------- rust/crd/src/resource.rs | 104 ++++++++++++---------------------- rust/crd/src/storage.rs | 49 ++++++++-------- 3 files changed, 152 insertions(+), 119 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 0d7c26d6..38fb3cea 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -17,10 +17,11 @@ use stackable_operator::{ client::Client, commons::{ product_image_selection::ProductImage, - resources::{NoRuntimeLimits, ResourcesFragment}, + resources::{NoRuntimeLimits, Resources}, s3::{InlinedS3BucketSpec, S3BucketDef, S3ConnectionDef, S3ConnectionSpec}, tls::{CaCert, Tls, TlsServerVerification, TlsVerification}, }, + config::{fragment::Fragment, merge::Merge}, k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector, kube::{CustomResource, ResourceExt}, labels::ObjectLabels, @@ -152,15 +153,15 @@ pub struct DruidClusterSpec { /// The Druid image to use pub image: ProductImage, /// Configuration of the broker role - pub brokers: Role, + pub brokers: Role, /// Configuration of the coordinator role - pub coordinators: Role, + pub coordinators: Role, /// Configuration of the historical role - pub historicals: Role, + pub historicals: Role, /// Configuration of the middle managed role - pub middle_managers: Role, + pub middle_managers: Role, /// Configuration of the router role - pub routers: Role, + pub routers: Role, /// Common cluster wide configuration that can not differ or be overridden on a role or role group level pub cluster_config: DruidClusterConfig, } @@ -664,37 +665,102 @@ pub struct IngestionSpec { pub s3connection: Option, } -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] pub struct BrokerConfig { - resources: Option>, + #[fragment_attrs(serde(default))] + resources: Resources, } -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] pub struct CoordinatorConfig { - resources: Option>, + #[fragment_attrs(serde(default))] + resources: Resources, } -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] pub struct MiddleManagerConfig { - resources: Option>, + #[fragment_attrs(serde(default))] + resources: Resources, } -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] pub struct RouterConfig { - resources: Option>, + #[fragment_attrs(serde(default))] + resources: Resources, } -#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] +#[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] +#[fragment_attrs( + derive( + Clone, + Debug, + Default, + Deserialize, + Merge, + JsonSchema, + PartialEq, + Serialize + ), + serde(rename_all = "camelCase") +)] pub struct HistoricalConfig { - resources: Option>, + #[fragment_attrs(serde(default))] + resources: Resources, } -impl Configuration for BrokerConfig { +impl Configuration for BrokerConfigFragment { type Configurable = DruidCluster; fn compute_env( @@ -724,7 +790,7 @@ impl Configuration for BrokerConfig { } } -impl Configuration for HistoricalConfig { +impl Configuration for HistoricalConfigFragment { type Configurable = DruidCluster; fn compute_env( @@ -754,7 +820,7 @@ impl Configuration for HistoricalConfig { } } -impl Configuration for RouterConfig { +impl Configuration for RouterConfigFragment { type Configurable = DruidCluster; fn compute_env( @@ -784,7 +850,7 @@ impl Configuration for RouterConfig { } } -impl Configuration for MiddleManagerConfig { +impl Configuration for MiddleManagerConfigFragment { type Configurable = DruidCluster; fn compute_env( @@ -823,7 +889,7 @@ impl Configuration for MiddleManagerConfig { } } -impl Configuration for CoordinatorConfig { +impl Configuration for CoordinatorConfigFragment { type Configurable = DruidCluster; fn compute_env( diff --git a/rust/crd/src/resource.rs b/rust/crd/src/resource.rs index 1633ebbf..fef65379 100644 --- a/rust/crd/src/resource.rs +++ b/rust/crd/src/resource.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use crate::memory::{HistoricalDerivedSettings, RESERVED_OS_MEMORY}; -use crate::storage::{self, FreePercentageEmptyDirFragment}; +use crate::storage::{self, default_free_percentage_empty_dir_fragment}; use crate::{DruidCluster, DruidRole, PATH_SEGMENT_CACHE, PROP_SEGMENT_CACHE_LOCATIONS}; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt, Snafu}; @@ -208,7 +208,7 @@ lazy_static! { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, }; } @@ -224,48 +224,23 @@ fn default_resources(role: &DruidRole) -> Option { } } -fn role_resources(druid: &DruidCluster, role: &DruidRole) -> Option { +fn role_resources(druid: &DruidCluster, role: &DruidRole) -> RoleResourceFragment { match role { - DruidRole::Broker => druid - .spec - .brokers - .config - .config - .resources - .clone() - .map(RoleResourceFragment::DruidFragment), - DruidRole::Coordinator => druid - .spec - .coordinators - .config - .config - .resources - .clone() - .map(RoleResourceFragment::DruidFragment), - DruidRole::Historical => druid - .spec - .historicals - .config - .config - .resources - .clone() - .map(RoleResourceFragment::HistoricalFragment), - DruidRole::MiddleManager => druid - .spec - .middle_managers - .config - .config - .resources - .clone() - .map(RoleResourceFragment::DruidFragment), - DruidRole::Router => druid - .spec - .routers - .config - .config - .resources - .clone() - .map(RoleResourceFragment::DruidFragment), + DruidRole::Broker => { + RoleResourceFragment::DruidFragment(druid.spec.brokers.config.config.resources.clone()) + } + DruidRole::Coordinator => RoleResourceFragment::DruidFragment( + druid.spec.coordinators.config.config.resources.clone(), + ), + DruidRole::Historical => RoleResourceFragment::HistoricalFragment( + druid.spec.historicals.config.config.resources.clone(), + ), + DruidRole::MiddleManager => RoleResourceFragment::DruidFragment( + druid.spec.middle_managers.config.config.resources.clone(), + ), + DruidRole::Router => { + RoleResourceFragment::DruidFragment(druid.spec.routers.config.config.resources.clone()) + } } } @@ -280,40 +255,35 @@ fn rolegroup_resources( .brokers .role_groups .get(&rolegroup_ref.role_group) - .map(|rg| &rg.config.config) - .and_then(|rg| rg.resources.clone()) + .map(|rg| rg.config.config.resources.clone()) .map(RoleResourceFragment::DruidFragment), DruidRole::Coordinator => druid .spec .coordinators .role_groups .get(&rolegroup_ref.role_group) - .map(|rg| &rg.config.config) - .and_then(|rg| rg.resources.clone()) + .map(|rg| rg.config.config.resources.clone()) .map(RoleResourceFragment::DruidFragment), DruidRole::MiddleManager => druid .spec .middle_managers .role_groups .get(&rolegroup_ref.role_group) - .map(|rg| &rg.config.config) - .and_then(|rg| rg.resources.clone()) + .map(|rg| rg.config.config.resources.clone()) .map(RoleResourceFragment::DruidFragment), DruidRole::Historical => druid .spec .historicals .role_groups .get(&rolegroup_ref.role_group) - .map(|rg| &rg.config.config) - .and_then(|rg| rg.resources.clone()) + .map(|rg| rg.config.config.resources.clone()) .map(RoleResourceFragment::HistoricalFragment), DruidRole::Router => druid .spec .routers .role_groups .get(&rolegroup_ref.role_group) - .map(|rg| &rg.config.config) - .and_then(|rg| rg.resources.clone()) + .map(|rg| rg.config.config.resources.clone()) .map(RoleResourceFragment::DruidFragment), } } @@ -326,7 +296,7 @@ pub fn resources( ) -> Result { try_merge(&[ rolegroup_resources(druid, role, rolegroup_ref), - role_resources(druid, role), + Some(role_resources(druid, role)), default_resources(role), ]) .with_context(|_| ResourcesMergeSnafu { @@ -355,8 +325,6 @@ fn try_merge_private( match (ra, rb) { (RoleResourceFragment::DruidFragment(a), RoleResourceFragment::DruidFragment(b)) => { a.merge(b); - let _: Resources = - fragment::validate(a.clone()).context(ResourceValidationSnafu)?; Ok(RoleResourceFragment::DruidFragment(a.clone())) } ( @@ -364,8 +332,6 @@ fn try_merge_private( RoleResourceFragment::HistoricalFragment(b), ) => { a.merge(b); - let _: Resources = - fragment::validate(a.clone()).context(ResourceValidationSnafu)?; Ok(RoleResourceFragment::HistoricalFragment(a.clone())) } _ => Err(Error::IncompatibleStorageMerging), @@ -375,7 +341,7 @@ fn try_merge_private( #[cfg(test)] mod test { use super::*; - use crate::{storage::FreePercentageEmptyDir, tests::deserialize_yaml_file}; + use crate::{storage::default_free_percentage_empty_dir, tests::deserialize_yaml_file}; use rstest::*; use stackable_operator::{ @@ -399,7 +365,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment{}, }, storage: storage::HistoricalStorageFragment{ - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), None, @@ -414,7 +380,7 @@ mod test { runtime_limits: NoRuntimeLimits{}, }, storage: storage::HistoricalStorage{ - segment_cache: FreePercentageEmptyDir::default(), + segment_cache: default_free_percentage_empty_dir(), }, }), )] @@ -429,7 +395,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment { @@ -442,7 +408,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), None, @@ -456,7 +422,7 @@ mod test { runtime_limits: NoRuntimeLimits {}, }, storage: storage::HistoricalStorage { - segment_cache: FreePercentageEmptyDir::default(), + segment_cache: default_free_percentage_empty_dir(), }, }), )] @@ -471,7 +437,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), Some(RoleResourceFragment::HistoricalFragment (ResourcesFragment { @@ -484,7 +450,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), Some(RoleResourceFragment::HistoricalFragment (ResourcesFragment { @@ -497,7 +463,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), RoleResource::Historical(Resources { @@ -510,7 +476,7 @@ mod test { runtime_limits: NoRuntimeLimits {}, }, storage: storage::HistoricalStorage { - segment_cache: FreePercentageEmptyDir::default(), + segment_cache: default_free_percentage_empty_dir(), }, }), )] @@ -537,7 +503,7 @@ mod test { runtime_limits: NoRuntimeLimitsFragment {}, }, storage: storage::HistoricalStorageFragment { - segment_cache: FreePercentageEmptyDirFragment::default(), + segment_cache: default_free_percentage_empty_dir_fragment(), }, })), Some(RoleResourceFragment ::DruidFragment (ResourcesFragment { diff --git a/rust/crd/src/storage.rs b/rust/crd/src/storage.rs index 0fee9e55..25648b89 100644 --- a/rust/crd/src/storage.rs +++ b/rust/crd/src/storage.rs @@ -43,9 +43,18 @@ pub struct HistoricalStorage { pub segment_cache: FreePercentageEmptyDir, } -#[derive(Clone, Debug, Fragment, PartialEq, JsonSchema)] +#[derive(Clone, Debug, Default, Fragment, PartialEq, JsonSchema)] #[fragment_attrs( - derive(Merge, Serialize, Deserialize, JsonSchema, Debug, Clone, PartialEq), + derive( + Clone, + Debug, + Default, + Deserialize, + JsonSchema, + Merge, + PartialEq, + Serialize, + ), serde(rename_all = "camelCase") )] pub struct FreePercentageEmptyDir { @@ -55,31 +64,23 @@ pub struct FreePercentageEmptyDir { pub empty_dir: CapacityEmptyDir, } -/// Default values for the `segmentCache` property of the custom resource. -/// See also `Default` for `FreePercentageEmptyDirFragment` below. -impl Default for FreePercentageEmptyDir { - fn default() -> Self { - FreePercentageEmptyDir { - free_percentage: Some(5), - empty_dir: CapacityEmptyDir { - capacity: Quantity("1G".to_string()), - medium: Some("".to_string()), - }, - } +pub fn default_free_percentage_empty_dir() -> FreePercentageEmptyDir { + FreePercentageEmptyDir { + free_percentage: Some(5), + empty_dir: CapacityEmptyDir { + capacity: Quantity("1G".to_string()), + medium: Some("".to_string()), + }, } } -/// Default values for the `segmentCache` property of the custom resource. -/// See also `Default` for `FreePercentageEmptyDir` above. -impl Default for FreePercentageEmptyDirFragment { - fn default() -> Self { - FreePercentageEmptyDirFragment { - free_percentage: Some(5), - empty_dir: CapacityEmptyDirFragment { - capacity: Some(Quantity("1G".to_string())), - medium: Some("".to_string()), - }, - } +pub fn default_free_percentage_empty_dir_fragment() -> FreePercentageEmptyDirFragment { + FreePercentageEmptyDirFragment { + free_percentage: Some(5), + empty_dir: CapacityEmptyDirFragment { + capacity: Some(Quantity("1G".to_string())), + medium: Some("".to_string()), + }, } } From df661b5bcfebaf5bf3aa692ea4dbf405eef21584 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 11:30:34 +0100 Subject: [PATCH 2/9] Implement configuration merge --- rust/crd/src/lib.rs | 196 ++++++++++- rust/crd/src/resource.rs | 321 +++---------------- rust/operator-binary/src/druid_controller.rs | 16 +- 3 files changed, 258 insertions(+), 275 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 38fb3cea..450b0cea 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -11,6 +11,7 @@ use crate::authentication::DruidAuthentication; use crate::tls::DruidTls; use authorization::DruidAuthorization; +use resource::RoleResource; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ @@ -21,13 +22,16 @@ use stackable_operator::{ s3::{InlinedS3BucketSpec, S3BucketDef, S3ConnectionDef, S3ConnectionSpec}, tls::{CaCert, Tls, TlsServerVerification, TlsVerification}, }, - config::{fragment::Fragment, merge::Merge}, + config::{ + fragment::{self, Fragment, FromFragment, ValidationError}, + merge::Merge, + }, k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector, kube::{CustomResource, ResourceExt}, labels::ObjectLabels, product_config::types::PropertyNameKind, product_config_utils::{ConfigError, Configuration}, - role_utils::{Role, RoleGroupRef}, + role_utils::{CommonConfiguration, Role, RoleGroup, RoleGroupRef}, schemars::{self, JsonSchema}, }; use std::{ @@ -128,6 +132,8 @@ pub enum Error { UnknownDruidRole { role: String, roles: Vec }, #[snafu(display("missing namespace for resource {name}"))] MissingNamespace { name: String }, + #[snafu(display("fragment validation failure"))] + FragmentValidationFailure { source: ValidationError }, } #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] @@ -191,6 +197,76 @@ pub struct DruidClusterConfig { pub zookeeper_config_map_name: String, } +pub struct MergedConfig { + pub brokers: HashMap, + pub coordinators: HashMap, + pub historicals: HashMap, + pub middle_managers: HashMap, + pub routers: HashMap, +} + +impl MergedConfig { + pub fn resources(&self, role: DruidRole, role_group: &str) -> RoleResource { + self.common_config(role, role_group).resources + } + + pub fn common_config(&self, role: DruidRole, role_group: &str) -> CommonConfig { + match role { + DruidRole::Broker => { + let config = self + .brokers + .get(role_group) + .cloned() + // TODO default? + .unwrap_or_default(); + CommonConfig { + resources: RoleResource::Druid(config.resources), + } + } + DruidRole::Coordinator => { + let config = self + .coordinators + .get(role_group) + .cloned() + .unwrap_or_default(); + CommonConfig { + resources: RoleResource::Druid(config.resources), + } + } + DruidRole::Historical => { + let config = self + .historicals + .get(role_group) + .cloned() + .unwrap_or_default(); + CommonConfig { + resources: RoleResource::Historical(config.resources), + } + } + DruidRole::MiddleManager => { + let config = self + .middle_managers + .get(role_group) + .cloned() + .unwrap_or_default(); + CommonConfig { + resources: RoleResource::Druid(config.resources), + } + } + DruidRole::Router => { + let config = self.routers.get(role_group).cloned().unwrap_or_default(); + CommonConfig { + resources: RoleResource::Druid(config.resources), + } + } + } + } +} + +pub struct CommonConfig { + pub resources: RoleResource, +} + #[derive( Clone, Debug, @@ -592,6 +668,82 @@ impl DruidCluster { let s3_storage = self.spec.cluster_config.deep_storage.is_s3(); s3_ingestion || s3_storage } + + pub fn merged_config(&self) -> Result { + Ok(MergedConfig { + brokers: DruidCluster::merged_role_config( + &self.spec.brokers, + &BrokerConfig::default_config(), + )?, + coordinators: DruidCluster::merged_role_config( + &self.spec.coordinators, + &CoordinatorConfig::default_config(), + )?, + historicals: DruidCluster::merged_role_config( + &self.spec.historicals, + &HistoricalConfig::default_config(), + )?, + middle_managers: DruidCluster::merged_role_config( + &self.spec.middle_managers, + &MiddleManagerConfig::default_config(), + )?, + routers: DruidCluster::merged_role_config( + &self.spec.routers, + &RouterConfig::default_config(), + )?, + }) + } + + fn merged_role_config( + role: &Role, + default_config: &T::Fragment, + ) -> Result, Error> + where + T: FromFragment, + T::Fragment: Clone + Merge, + { + let mut merged_role_config = HashMap::new(); + + for ( + rolegroup_name, + RoleGroup { + config: + CommonConfiguration { + config: rolegroup_config, + .. + }, + .. + }, + ) in &role.role_groups + { + let merged_rolegroup_config = DruidCluster::merged_rolegroup_config( + rolegroup_config, + &role.config.config, + default_config, + )?; + merged_role_config.insert(rolegroup_name.to_owned(), merged_rolegroup_config); + } + + Ok(merged_role_config) + } + + pub fn merged_rolegroup_config( + rolegroup_config: &T::Fragment, + role_config: &T::Fragment, + default_config: &T::Fragment, + ) -> Result + where + T: FromFragment, + T::Fragment: Clone + Merge, + { + let mut role_config = role_config.to_owned(); + let mut rolegroup_config = rolegroup_config.to_owned(); + + role_config.merge(default_config); + rolegroup_config.merge(&role_config); + + fragment::validate(rolegroup_config).context(FragmentValidationFailureSnafu) + } } #[derive(Clone, Debug, Default, Deserialize, JsonSchema, Serialize)] @@ -684,6 +836,14 @@ pub struct BrokerConfig { resources: Resources, } +impl BrokerConfig { + fn default_config() -> BrokerConfigFragment { + BrokerConfigFragment { + resources: resource::DEFAULT_RESOURCES.to_owned(), + } + } +} + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -703,6 +863,14 @@ pub struct CoordinatorConfig { resources: Resources, } +impl CoordinatorConfig { + fn default_config() -> CoordinatorConfigFragment { + CoordinatorConfigFragment { + resources: resource::DEFAULT_RESOURCES.to_owned(), + } + } +} + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -722,6 +890,14 @@ pub struct MiddleManagerConfig { resources: Resources, } +impl MiddleManagerConfig { + fn default_config() -> MiddleManagerConfigFragment { + MiddleManagerConfigFragment { + resources: resource::DEFAULT_RESOURCES.to_owned(), + } + } +} + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -741,6 +917,14 @@ pub struct RouterConfig { resources: Resources, } +impl RouterConfig { + fn default_config() -> RouterConfigFragment { + RouterConfigFragment { + resources: resource::DEFAULT_RESOURCES.to_owned(), + } + } +} + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -760,6 +944,14 @@ pub struct HistoricalConfig { resources: Resources, } +impl HistoricalConfig { + fn default_config() -> HistoricalConfigFragment { + HistoricalConfigFragment { + resources: resource::HISTORICAL_RESOURCES.to_owned(), + } + } +} + impl Configuration for BrokerConfigFragment { type Configurable = DruidCluster; diff --git a/rust/crd/src/resource.rs b/rust/crd/src/resource.rs index fef65379..7dbc1d42 100644 --- a/rust/crd/src/resource.rs +++ b/rust/crd/src/resource.rs @@ -2,19 +2,16 @@ use std::collections::BTreeMap; use crate::memory::{HistoricalDerivedSettings, RESERVED_OS_MEMORY}; use crate::storage::{self, default_free_percentage_empty_dir_fragment}; -use crate::{DruidCluster, DruidRole, PATH_SEGMENT_CACHE, PROP_SEGMENT_CACHE_LOCATIONS}; +use crate::{DruidRole, PATH_SEGMENT_CACHE, PROP_SEGMENT_CACHE_LOCATIONS}; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt, Snafu}; -use stackable_operator::config::fragment; use stackable_operator::memory::MemoryQuantity; -use stackable_operator::role_utils::RoleGroupRef; use stackable_operator::{ builder::{ContainerBuilder, PodBuilder, VolumeBuilder}, commons::resources::{ CpuLimitsFragment, MemoryLimits, MemoryLimitsFragment, NoRuntimeLimits, NoRuntimeLimitsFragment, Resources, ResourcesFragment, }, - config::merge::Merge, k8s_openapi::{ api::core::v1::{EmptyDirVolumeSource, ResourceRequirements}, apimachinery::pkg::api::resource::Quantity, @@ -27,18 +24,6 @@ use strum::{EnumDiscriminants, IntoStaticStr}; #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("no resources available for merging"))] - NoResourcesToMerge, - #[snafu(display("cannot merge storage types of different roles"))] - IncompatibleStorageMerging, - #[snafu(display("failed to validate resources"))] - ResourceValidation { source: fragment::ValidationError }, - #[snafu(display("failed to merge resources for {rolegroup_ref}"))] - ResourcesMerge { - #[snafu(source(from(Error, Box::new)))] - source: Box, - rolegroup_ref: RoleGroupRef, - }, #[snafu(display("failed to derive Druid settings from resources"))] DeriveMemorySettings { source: crate::memory::Error }, #[snafu(display("failed to get memory limits"))] @@ -51,34 +36,12 @@ pub enum Error { InconsistentConfiguration, } -/// The sole purpose of this enum is to handle merging. It's needed because currently -/// the operator-rs 0.26.1 doesn't handle fragment enum merging. -#[derive(Debug, Clone, PartialEq)] -enum RoleResourceFragment { - DruidFragment(ResourcesFragment), - HistoricalFragment(ResourcesFragment), -} - #[derive(Debug, Clone, PartialEq)] pub enum RoleResource { Druid(Resources), Historical(Resources), } -impl TryFrom for RoleResource { - type Error = Error; - fn try_from(rrf: RoleResourceFragment) -> Result { - match rrf { - RoleResourceFragment::DruidFragment(fragment) => Ok(RoleResource::Druid( - fragment::validate(fragment).with_context(|_| ResourceValidationSnafu)?, - )), - RoleResourceFragment::HistoricalFragment(fragment) => Ok(RoleResource::Historical( - fragment::validate(fragment).with_context(|_| ResourceValidationSnafu)?, - )), - } - } -} - impl RoleResource { pub fn as_resource_requirements(&self) -> ResourceRequirements { match self { @@ -213,135 +176,14 @@ lazy_static! { }; } -fn default_resources(role: &DruidRole) -> Option { - match role { - DruidRole::Historical => Some(RoleResourceFragment::HistoricalFragment( - HISTORICAL_RESOURCES.clone(), - )), - _ => Some(RoleResourceFragment::DruidFragment( - DEFAULT_RESOURCES.clone(), - )), - } -} - -fn role_resources(druid: &DruidCluster, role: &DruidRole) -> RoleResourceFragment { - match role { - DruidRole::Broker => { - RoleResourceFragment::DruidFragment(druid.spec.brokers.config.config.resources.clone()) - } - DruidRole::Coordinator => RoleResourceFragment::DruidFragment( - druid.spec.coordinators.config.config.resources.clone(), - ), - DruidRole::Historical => RoleResourceFragment::HistoricalFragment( - druid.spec.historicals.config.config.resources.clone(), - ), - DruidRole::MiddleManager => RoleResourceFragment::DruidFragment( - druid.spec.middle_managers.config.config.resources.clone(), - ), - DruidRole::Router => { - RoleResourceFragment::DruidFragment(druid.spec.routers.config.config.resources.clone()) - } - } -} - -fn rolegroup_resources( - druid: &DruidCluster, - role: &DruidRole, - rolegroup_ref: &RoleGroupRef, -) -> Option { - match role { - DruidRole::Broker => druid - .spec - .brokers - .role_groups - .get(&rolegroup_ref.role_group) - .map(|rg| rg.config.config.resources.clone()) - .map(RoleResourceFragment::DruidFragment), - DruidRole::Coordinator => druid - .spec - .coordinators - .role_groups - .get(&rolegroup_ref.role_group) - .map(|rg| rg.config.config.resources.clone()) - .map(RoleResourceFragment::DruidFragment), - DruidRole::MiddleManager => druid - .spec - .middle_managers - .role_groups - .get(&rolegroup_ref.role_group) - .map(|rg| rg.config.config.resources.clone()) - .map(RoleResourceFragment::DruidFragment), - DruidRole::Historical => druid - .spec - .historicals - .role_groups - .get(&rolegroup_ref.role_group) - .map(|rg| rg.config.config.resources.clone()) - .map(RoleResourceFragment::HistoricalFragment), - DruidRole::Router => druid - .spec - .routers - .role_groups - .get(&rolegroup_ref.role_group) - .map(|rg| rg.config.config.resources.clone()) - .map(RoleResourceFragment::DruidFragment), - } -} - -/// Retrieve and merge resource configs for role and role groups -pub fn resources( - druid: &DruidCluster, - role: &DruidRole, - rolegroup_ref: &RoleGroupRef, -) -> Result { - try_merge(&[ - rolegroup_resources(druid, role, rolegroup_ref), - Some(role_resources(druid, role)), - default_resources(role), - ]) - .with_context(|_| ResourcesMergeSnafu { - rolegroup_ref: rolegroup_ref.clone(), - }) -} - -/// Merge resources from beginning to end of the array: element 0 > element 1 > element 2. -/// Return a copy of the merged struct. -fn try_merge(resources: &[Option]) -> Result { - let mut resources = resources.iter().flatten(); - let mut result = resources.next().ok_or(Error::NoResourcesToMerge)?.clone(); - - for resource in resources { - try_merge_private(&mut result, resource)?; - } - - RoleResource::try_from(result) -} - -/// Merges `rb` into `ra`, i.e. `ra` has precedence over `rb`. -fn try_merge_private( - ra: &mut RoleResourceFragment, - rb: &RoleResourceFragment, -) -> Result { - match (ra, rb) { - (RoleResourceFragment::DruidFragment(a), RoleResourceFragment::DruidFragment(b)) => { - a.merge(b); - Ok(RoleResourceFragment::DruidFragment(a.clone())) - } - ( - RoleResourceFragment::HistoricalFragment(a), - RoleResourceFragment::HistoricalFragment(b), - ) => { - a.merge(b); - Ok(RoleResourceFragment::HistoricalFragment(a.clone())) - } - _ => Err(Error::IncompatibleStorageMerging), - } -} - #[cfg(test)] mod test { use super::*; - use crate::{storage::default_free_percentage_empty_dir, tests::deserialize_yaml_file}; + use crate::{ + storage::{default_free_percentage_empty_dir, HistoricalStorage}, + tests::deserialize_yaml_file, + DruidCluster, MiddleManagerConfig, + }; use rstest::*; use stackable_operator::{ @@ -350,12 +192,11 @@ mod test { NoRuntimeLimitsFragment, }, k8s_openapi::apimachinery::pkg::api::resource::Quantity, - kube::runtime::reflector::ObjectRef, }; #[rstest] #[case( - Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment{ + Some(ResourcesFragment{ cpu: CpuLimitsFragment{ min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -367,10 +208,10 @@ mod test { storage: storage::HistoricalStorageFragment{ segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), + }), None, None, - RoleResource::Historical(Resources{ + Resources{ cpu: CpuLimits{ min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -382,10 +223,10 @@ mod test { storage: storage::HistoricalStorage{ segment_cache: default_free_percentage_empty_dir(), }, - }), + }, )] #[case( - Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment { + Some(ResourcesFragment { cpu: CpuLimitsFragment { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -397,8 +238,8 @@ mod test { storage: storage::HistoricalStorageFragment { segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), - Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment { + }), + Some(ResourcesFragment { cpu: CpuLimitsFragment { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -410,9 +251,9 @@ mod test { storage: storage::HistoricalStorageFragment { segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), + }), None, - RoleResource::Historical(Resources { + Resources { cpu: CpuLimits { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -424,10 +265,10 @@ mod test { storage: storage::HistoricalStorage { segment_cache: default_free_percentage_empty_dir(), }, - }), + }, )] #[case( - Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment { + Some(ResourcesFragment { cpu: CpuLimitsFragment { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -439,8 +280,8 @@ mod test { storage: storage::HistoricalStorageFragment { segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), - Some(RoleResourceFragment::HistoricalFragment (ResourcesFragment { + }), + Some(ResourcesFragment { cpu: CpuLimitsFragment { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -452,8 +293,8 @@ mod test { storage: storage::HistoricalStorageFragment { segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), - Some(RoleResourceFragment::HistoricalFragment (ResourcesFragment { + }), + Some(ResourcesFragment { cpu: CpuLimitsFragment { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -465,8 +306,8 @@ mod test { storage: storage::HistoricalStorageFragment { segment_cache: default_free_percentage_empty_dir_fragment(), }, - })), - RoleResource::Historical(Resources { + }), + Resources { cpu: CpuLimits { min: Some(Quantity("200m".to_owned())), max: Some(Quantity("4".to_owned())), @@ -478,79 +319,34 @@ mod test { storage: storage::HistoricalStorage { segment_cache: default_free_percentage_empty_dir(), }, - }), + }, )] fn test_try_merge_ok( - #[case] first: Option, - #[case] second: Option, - #[case] third: Option, - #[case] expected: RoleResource, + #[case] first: Option>, + #[case] second: Option>, + #[case] third: Option>, + #[case] expected: Resources, ) { - let got = try_merge(&[first, second, third]); + let got = DruidCluster::merged_rolegroup_config( + &first.unwrap_or_default(), + &second.unwrap_or_default(), + &third.unwrap_or_default(), + ); assert_eq!(expected, got.unwrap()); } - #[rstest] - #[case( - Some(RoleResourceFragment::HistoricalFragment(ResourcesFragment { - cpu: CpuLimitsFragment { - min: Some(Quantity("200m".to_owned())), - max: Some(Quantity("4".to_owned())), - }, - memory: MemoryLimitsFragment { - limit: Some(Quantity("2Gi".to_owned())), - runtime_limits: NoRuntimeLimitsFragment {}, - }, - storage: storage::HistoricalStorageFragment { - segment_cache: default_free_percentage_empty_dir_fragment(), - }, - })), - Some(RoleResourceFragment ::DruidFragment (ResourcesFragment { - cpu: CpuLimitsFragment { - min: Some(Quantity("200m".to_owned())), - max: Some(Quantity("4".to_owned())), - }, - memory: MemoryLimitsFragment { - limit: Some(Quantity("2Gi".to_owned())), - runtime_limits: NoRuntimeLimitsFragment {}, - }, - storage: storage::DruidStorageFragment { }, - })), - None, - Error::IncompatibleStorageMerging, - )] - #[case(None, None, None, Error::NoResourcesToMerge)] - fn test_try_merge_err( - #[case] first: Option, - #[case] second: Option, - #[case] third: Option, - #[case] expected: Error, - ) { - let got = try_merge(&[first, second, third]); - - // Poor man's assert_eq since Error cannot derive PartialEq - match (expected, got.err().unwrap()) { - (Error::IncompatibleStorageMerging, Error::IncompatibleStorageMerging) => (), - (Error::NoResourcesToMerge, Error::NoResourcesToMerge) => (), - _ => panic!("something went wrong here"), - } - } - #[test] fn test_resources() -> Result<(), Error> { - let cluster = deserialize_yaml_file("test/resources/resource_merge/druid_cluster.yaml"); - - let resources_from_role_group = RoleGroupRef { - cluster: ObjectRef::from_obj(&cluster), - role: "middle_managers".into(), - role_group: "resources-from-role-group".into(), - }; - if let RoleResource::Druid(middlemanager_resources_from_rg) = resources( - &cluster, - &DruidRole::MiddleManager, - &resources_from_role_group, - )? { + let cluster = deserialize_yaml_file::( + "test/resources/resource_merge/druid_cluster.yaml", + ); + + let config = cluster.merged_config().unwrap(); + if let Some(MiddleManagerConfig { + resources: middlemanager_resources_from_rg, + }) = config.middle_managers.get("resources-from-role-group") + { let expected = Resources { cpu: CpuLimits { min: Some(Quantity("300m".to_owned())), @@ -564,20 +360,16 @@ mod test { }; assert_eq!( - middlemanager_resources_from_rg, expected, + middlemanager_resources_from_rg, &expected, "middlemanager resources from role group" ); } else { panic!("No role group named [resources-from-role-group] found"); } - let resources_from_role = RoleGroupRef { - cluster: ObjectRef::from_obj(&cluster), - role: "middle_managers".into(), - role_group: "resources-from-role".into(), - }; - if let RoleResource::Druid(middlemanager_resources_from_rg) = - resources(&cluster, &DruidRole::MiddleManager, &resources_from_role)? + if let Some(MiddleManagerConfig { + resources: middlemanager_resources_from_rg, + }) = config.middle_managers.get("resources-from-role") { let expected = Resources { cpu: CpuLimits { @@ -592,7 +384,7 @@ mod test { }; assert_eq!( - middlemanager_resources_from_rg, expected, + middlemanager_resources_from_rg, &expected, "resources from role" ); } else { @@ -604,15 +396,13 @@ mod test { #[test] fn test_segment_cache() -> Result<(), Error> { - let cluster = deserialize_yaml_file("test/resources/resource_merge/segment_cache.yaml"); + let cluster = deserialize_yaml_file::( + "test/resources/resource_merge/segment_cache.yaml", + ); // ---------- default role group - let rolegroup_ref = RoleGroupRef { - cluster: ObjectRef::from_obj(&cluster), - role: DruidRole::Historical.to_string(), - role_group: "default".into(), - }; - let res = resources(&cluster, &DruidRole::Historical, &rolegroup_ref)?; + let config = cluster.merged_config().unwrap(); + let res = config.resources(DruidRole::Historical, "default"); let mut got = BTreeMap::new(); assert!(res.update_druid_config_file(&mut got).is_ok()); @@ -623,12 +413,7 @@ mod test { assert_eq!(value, &expected, "primary"); // ---------- secondary role group - let rolegroup_ref = RoleGroupRef { - cluster: ObjectRef::from_obj(&cluster), - role: DruidRole::Historical.to_string(), - role_group: "secondary".into(), - }; - let res = resources(&cluster, &DruidRole::Historical, &rolegroup_ref)?; + let res = config.resources(DruidRole::Historical, "secondary"); let mut got = BTreeMap::new(); assert!(res.update_druid_config_file(&mut got).is_ok()); diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index 4cf533ad..0022cb40 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -165,6 +165,8 @@ pub enum Error { FailedToResolveResourceConfig { source: stackable_druid_crd::resource::Error, }, + #[snafu(display("failed to resolve and merge config for role and role group"))] + FailedToResolveConfig { source: stackable_druid_crd::Error }, #[snafu(display("invalid java heap config - missing default or value in crd?"))] InvalidJavaHeapConfig, #[snafu(display("failed to convert java heap config to unit [{unit}]"))] @@ -311,6 +313,8 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< ) .context(CreateClusterResourcesSnafu)?; + let config = druid.merged_config().context(FailedToResolveConfigSnafu)?; + for (role_name, role_config) in validated_role_config.iter() { let druid_role = DruidRole::from_str(role_name).context(UnidentifiedDruidRoleSnafu { role: role_name.to_string(), @@ -338,8 +342,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< role_group: rolegroup_name.into(), }; - let resources = resource::resources(&druid, &druid_role, &rolegroup) - .context(FailedToResolveResourceConfigSnafu)?; + let resources = config.resources(druid_role.clone(), rolegroup_name); let rg_service = build_rolegroup_services( &druid, @@ -927,6 +930,8 @@ mod test { Resource { source: stackable_druid_crd::resource::Error, }, + #[snafu(display("failed to resolve and merge config for role and role group"))] + FailedToResolveConfig { source: stackable_druid_crd::Error }, } #[rstest] @@ -976,6 +981,8 @@ mod test { let mut druid_segment_cache_property = "invalid".to_string(); + let config = druid.merged_config().context(FailedToResolveConfigSnafu)?; + for (role_name, role_config) in validated_role_config.iter() { for (rolegroup_name, rolegroup_config) in role_config.iter() { if rolegroup_name == tested_rolegroup_name @@ -987,9 +994,8 @@ mod test { role_group: rolegroup_name.clone(), }; - let resources = - resource::resources(&druid, &DruidRole::Historical, &rolegroup_ref) - .context(ResourceSnafu)?; + let resources = config.resources(DruidRole::Historical, rolegroup_name); + let ldap_settings: Option = None; let rg_configmap = build_rolegroup_config_map( From 22ad147d730db79a7998b05fc5346a5924f60a84 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 12:56:15 +0100 Subject: [PATCH 3/9] Add replicas and selector to CommonConfig --- rust/crd/src/lib.rs | 246 +++++++------------ rust/crd/src/resource.rs | 35 ++- rust/operator-binary/src/druid_controller.rs | 49 ++-- 3 files changed, 152 insertions(+), 178 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 450b0cea..e6f946cb 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -31,13 +31,10 @@ use stackable_operator::{ labels::ObjectLabels, product_config::types::PropertyNameKind, product_config_utils::{ConfigError, Configuration}, - role_utils::{CommonConfiguration, Role, RoleGroup, RoleGroupRef}, + role_utils::{CommonConfiguration, Role, RoleGroup}, schemars::{self, JsonSchema}, }; -use std::{ - collections::{BTreeMap, HashMap}, - str::FromStr, -}; +use std::collections::{BTreeMap, HashMap}; use strum::{Display, EnumDiscriminants, EnumIter, EnumString, IntoStaticStr}; use tls::default_druid_tls; @@ -130,6 +127,8 @@ pub enum Error { IncompatibleS3Connections, #[snafu(display("Unknown Druid role found {role}. Should be one of {roles:?}"))] UnknownDruidRole { role: String, roles: Vec }, + #[snafu(display("the role group {rolegroup_name} is not defined"))] + CannotRetrieveRoleGroup { rolegroup_name: String }, #[snafu(display("missing namespace for resource {name}"))] MissingNamespace { name: String }, #[snafu(display("fragment validation failure"))] @@ -198,66 +197,76 @@ pub struct DruidClusterConfig { } pub struct MergedConfig { - pub brokers: HashMap, - pub coordinators: HashMap, - pub historicals: HashMap, - pub middle_managers: HashMap, - pub routers: HashMap, + pub brokers: HashMap>, + pub coordinators: HashMap>, + pub historicals: HashMap>, + pub middle_managers: HashMap>, + pub routers: HashMap>, } impl MergedConfig { - pub fn resources(&self, role: DruidRole, role_group: &str) -> RoleResource { - self.common_config(role, role_group).resources - } - - pub fn common_config(&self, role: DruidRole, role_group: &str) -> CommonConfig { + pub fn common_config( + &self, + role: DruidRole, + rolegroup_name: &str, + ) -> Result { match role { DruidRole::Broker => { - let config = self + let rolegroup = self .brokers - .get(role_group) - .cloned() - // TODO default? - .unwrap_or_default(); - CommonConfig { - resources: RoleResource::Druid(config.resources), - } + .get(rolegroup_name) + .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; + Ok(CommonConfig { + resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) } DruidRole::Coordinator => { - let config = self + let rolegroup = self .coordinators - .get(role_group) - .cloned() - .unwrap_or_default(); - CommonConfig { - resources: RoleResource::Druid(config.resources), - } + .get(rolegroup_name) + .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; + Ok(CommonConfig { + resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) } DruidRole::Historical => { - let config = self + let rolegroup = self .historicals - .get(role_group) - .cloned() - .unwrap_or_default(); - CommonConfig { - resources: RoleResource::Historical(config.resources), - } + .get(rolegroup_name) + .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; + Ok(CommonConfig { + resources: RoleResource::Historical( + rolegroup.config.config.resources.to_owned(), + ), + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) } DruidRole::MiddleManager => { - let config = self + let rolegroup = self .middle_managers - .get(role_group) - .cloned() - .unwrap_or_default(); - CommonConfig { - resources: RoleResource::Druid(config.resources), - } + .get(rolegroup_name) + .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; + Ok(CommonConfig { + resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) } DruidRole::Router => { - let config = self.routers.get(role_group).cloned().unwrap_or_default(); - CommonConfig { - resources: RoleResource::Druid(config.resources), - } + let rolegroup = self + .routers + .get(rolegroup_name) + .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; + Ok(CommonConfig { + resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) } } } @@ -265,6 +274,8 @@ impl MergedConfig { pub struct CommonConfig { pub resources: RoleResource, + pub replicas: Option, + pub selector: Option, } #[derive( @@ -448,86 +459,6 @@ impl DruidCluster { Ok(result) } - /// Takes a rolegoup_ref (with role and role group name) and returns the selector defined for - /// that role group. - pub fn node_selector( - &self, - rolegroup_ref: &RoleGroupRef, - ) -> Option { - match DruidRole::from_str(rolegroup_ref.role.as_str()).unwrap() { - DruidRole::Broker => self - .spec - .brokers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.selector.clone()), - DruidRole::MiddleManager => self - .spec - .middle_managers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.selector.clone()), - DruidRole::Coordinator => self - .spec - .coordinators - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.selector.clone()), - DruidRole::Historical => self - .spec - .historicals - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.selector.clone()), - DruidRole::Router => self - .spec - .routers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.selector.clone()), - } - } - - pub fn replicas(&self, rolegroup_ref: &RoleGroupRef) -> Option { - match DruidRole::from_str(rolegroup_ref.role.as_str()).unwrap() { - DruidRole::Broker => self - .spec - .brokers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.replicas) - .map(i32::from), - DruidRole::MiddleManager => self - .spec - .middle_managers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.replicas) - .map(i32::from), - DruidRole::Coordinator => self - .spec - .coordinators - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.replicas) - .map(i32::from), - DruidRole::Historical => self - .spec - .historicals - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.replicas) - .map(i32::from), - DruidRole::Router => self - .spec - .routers - .role_groups - .get(&rolegroup_ref.role_group) - .and_then(|rg| rg.replicas) - .map(i32::from), - } - } - pub fn build_role_properties( &self, ) -> HashMap< @@ -671,62 +602,73 @@ impl DruidCluster { pub fn merged_config(&self) -> Result { Ok(MergedConfig { - brokers: DruidCluster::merged_role_config( + brokers: DruidCluster::merged_role( &self.spec.brokers, &BrokerConfig::default_config(), )?, - coordinators: DruidCluster::merged_role_config( + coordinators: DruidCluster::merged_role( &self.spec.coordinators, &CoordinatorConfig::default_config(), )?, - historicals: DruidCluster::merged_role_config( + historicals: DruidCluster::merged_role( &self.spec.historicals, &HistoricalConfig::default_config(), )?, - middle_managers: DruidCluster::merged_role_config( + middle_managers: DruidCluster::merged_role( &self.spec.middle_managers, &MiddleManagerConfig::default_config(), )?, - routers: DruidCluster::merged_role_config( + routers: DruidCluster::merged_role( &self.spec.routers, &RouterConfig::default_config(), )?, }) } - fn merged_role_config( + fn merged_role( role: &Role, default_config: &T::Fragment, - ) -> Result, Error> + ) -> Result>, Error> where T: FromFragment, T::Fragment: Clone + Merge, { let mut merged_role_config = HashMap::new(); - for ( - rolegroup_name, - RoleGroup { - config: - CommonConfiguration { - config: rolegroup_config, - .. - }, - .. - }, - ) in &role.role_groups - { - let merged_rolegroup_config = DruidCluster::merged_rolegroup_config( - rolegroup_config, - &role.config.config, - default_config, - )?; + for (rolegroup_name, rolegroup) in &role.role_groups { + let merged_rolegroup_config = + DruidCluster::merged_rolegroup(rolegroup, &role.config.config, default_config)?; merged_role_config.insert(rolegroup_name.to_owned(), merged_rolegroup_config); } Ok(merged_role_config) } + fn merged_rolegroup( + rolegroup: &RoleGroup, + role_config: &T::Fragment, + default_config: &T::Fragment, + ) -> Result, Error> + where + T: FromFragment, + T::Fragment: Clone + Merge, + { + Ok(RoleGroup { + config: CommonConfiguration { + config: DruidCluster::merged_rolegroup_config( + &rolegroup.config.config, + role_config, + default_config, + )?, + config_overrides: rolegroup.config.config_overrides.to_owned(), + env_overrides: rolegroup.config.env_overrides.to_owned(), + cli_overrides: rolegroup.config.cli_overrides.to_owned(), + }, + replicas: rolegroup.replicas, + selector: rolegroup.selector.to_owned(), + }) + } + pub fn merged_rolegroup_config( rolegroup_config: &T::Fragment, role_config: &T::Fragment, diff --git a/rust/crd/src/resource.rs b/rust/crd/src/resource.rs index 7dbc1d42..232fa82e 100644 --- a/rust/crd/src/resource.rs +++ b/rust/crd/src/resource.rs @@ -192,6 +192,7 @@ mod test { NoRuntimeLimitsFragment, }, k8s_openapi::apimachinery::pkg::api::resource::Quantity, + role_utils::{CommonConfiguration, RoleGroup}, }; #[rstest] @@ -343,8 +344,16 @@ mod test { ); let config = cluster.merged_config().unwrap(); - if let Some(MiddleManagerConfig { - resources: middlemanager_resources_from_rg, + if let Some(RoleGroup { + config: + CommonConfiguration { + config: + MiddleManagerConfig { + resources: middlemanager_resources_from_rg, + }, + .. + }, + .. }) = config.middle_managers.get("resources-from-role-group") { let expected = Resources { @@ -367,8 +376,16 @@ mod test { panic!("No role group named [resources-from-role-group] found"); } - if let Some(MiddleManagerConfig { - resources: middlemanager_resources_from_rg, + if let Some(RoleGroup { + config: + CommonConfiguration { + config: + MiddleManagerConfig { + resources: middlemanager_resources_from_rg, + }, + .. + }, + .. }) = config.middle_managers.get("resources-from-role") { let expected = Resources { @@ -402,7 +419,10 @@ mod test { // ---------- default role group let config = cluster.merged_config().unwrap(); - let res = config.resources(DruidRole::Historical, "default"); + let res = config + .common_config(DruidRole::Historical, "default") + .unwrap() + .resources; let mut got = BTreeMap::new(); assert!(res.update_druid_config_file(&mut got).is_ok()); @@ -413,7 +433,10 @@ mod test { assert_eq!(value, &expected, "primary"); // ---------- secondary role group - let res = config.resources(DruidRole::Historical, "secondary"); + let res = config + .common_config(DruidRole::Historical, "secondary") + .unwrap() + .resources; let mut got = BTreeMap::new(); assert!(res.update_druid_config_file(&mut got).is_ok()); diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index 0022cb40..41958344 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -11,6 +11,7 @@ use crate::{ }; use snafu::{OptionExt, ResultExt, Snafu}; +use stackable_druid_crd::build_recommended_labels; use stackable_druid_crd::{ authorization::DruidAuthorization, build_string_list, @@ -19,15 +20,11 @@ use stackable_druid_crd::{ PLACEHOLDER_LDAP_BIND_USER, }, security::{resolve_authentication_classes, DruidTlsSecurity}, - DeepStorageSpec, DruidCluster, DruidRole, APP_NAME, AUTH_AUTHORIZER_OPA_URI, CERTS_DIR, - CREDENTIALS_SECRET_PROPERTY, DRUID_CONFIG_DIRECTORY, DS_BUCKET, EXTENSIONS_LOADLIST, + CommonConfig, DeepStorageSpec, DruidCluster, DruidRole, APP_NAME, AUTH_AUTHORIZER_OPA_URI, + CERTS_DIR, CREDENTIALS_SECRET_PROPERTY, DRUID_CONFIG_DIRECTORY, DS_BUCKET, EXTENSIONS_LOADLIST, HDFS_CONFIG_DIRECTORY, JVM_CONFIG, LOG4J2_CONFIG, RUNTIME_PROPS, RW_CONFIG_DIRECTORY, S3_ENDPOINT_URL, S3_PATH_STYLE_ACCESS, S3_SECRET_DIR_NAME, ZOOKEEPER_CONNECTION_STRING, }; -use stackable_druid_crd::{ - build_recommended_labels, - resource::{self, RoleResource}, -}; use stackable_operator::{ builder::{ ConfigMapBuilder, ContainerBuilder, ObjectMetaBuilder, PodBuilder, @@ -167,6 +164,8 @@ pub enum Error { }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: stackable_druid_crd::Error }, + #[snafu(display("invalid configuration"))] + InvalidConfiguration { source: stackable_druid_crd::Error }, #[snafu(display("invalid java heap config - missing default or value in crd?"))] InvalidJavaHeapConfig, #[snafu(display("failed to convert java heap config to unit [{unit}]"))] @@ -313,7 +312,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< ) .context(CreateClusterResourcesSnafu)?; - let config = druid.merged_config().context(FailedToResolveConfigSnafu)?; + let merged_config = druid.merged_config().context(FailedToResolveConfigSnafu)?; for (role_name, role_config) in validated_role_config.iter() { let druid_role = DruidRole::from_str(role_name).context(UnidentifiedDruidRoleSnafu { @@ -342,7 +341,9 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< role_group: rolegroup_name.into(), }; - let resources = config.resources(druid_role.clone(), rolegroup_name); + let merged_rolegroup_config = merged_config + .common_config(druid_role.clone(), rolegroup_name) + .context(FailedToResolveConfigSnafu)?; let rg_service = build_rolegroup_services( &druid, @@ -355,11 +356,11 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< &resolved_product_image, &rolegroup, rolegroup_config, + &merged_rolegroup_config, &zk_connstr, opa_connstr.as_deref(), s3_conn.as_ref(), deep_storage_bucket_name.as_deref(), - &resources, &druid_tls_security, &druid_ldap_settings, )?; @@ -368,8 +369,8 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< &resolved_product_image, &rolegroup, rolegroup_config, + &merged_rolegroup_config, s3_conn.as_ref(), - &resources, &druid_tls_security, &druid_ldap_settings, )?; @@ -463,11 +464,11 @@ fn build_rolegroup_config_map( resolved_product_image: &ResolvedProductImage, rolegroup: &RoleGroupRef, rolegroup_config: &HashMap>, + merged_rolegroup_config: &CommonConfig, zk_connstr: &str, opa_connstr: Option<&str>, s3_conn: Option<&S3ConnectionSpec>, deep_storage_bucket_name: Option<&str>, - resources: &RoleResource, druid_tls_security: &DruidTlsSecurity, druid_ldap_settings: &Option, ) -> Result { @@ -483,7 +484,8 @@ fn build_rolegroup_config_map( // This has to be done here since there is no other suitable place for it. // Previously such properties were added in the compute_files() function, // but that code path is now incompatible with the design of fragment merging. - resources + merged_rolegroup_config + .resources .update_druid_config_file(&mut conf) .context(UpdateDruidConfigFromResourcesSnafu)?; // NOTE: druid.host can be set manually - if it isn't, the canonical host name of @@ -551,7 +553,8 @@ fn build_rolegroup_config_map( cm_conf_data.insert(RUNTIME_PROPS.to_string(), runtime_properties); } PropertyNameKind::File(file_name) if file_name == JVM_CONFIG => { - let (heap, direct) = resources + let (heap, direct) = merged_rolegroup_config + .resources .get_memory_sizes(&role) .context(DeriveMemorySettingsSnafu)?; let jvm_config = get_jvm_config(&role, heap, direct).context(GetJvmConfigSnafu)?; @@ -646,8 +649,8 @@ fn build_rolegroup_statefulset( resolved_product_image: &ResolvedProductImage, rolegroup_ref: &RoleGroupRef, rolegroup_config: &HashMap>, + merged_rolegroup_config: &CommonConfig, s3_conn: Option<&S3ConnectionSpec>, - resources: &RoleResource, druid_tls_security: &DruidTlsSecurity, ldap_settings: &Option, ) -> Result { @@ -663,7 +666,7 @@ fn build_rolegroup_statefulset( .context(FailedContainerBuilderCreationSnafu { name: APP_NAME })?; // init pod builder let mut pb = PodBuilder::new(); - pb.node_selector_opt(druid.node_selector(rolegroup_ref)); + pb.node_selector_opt(merged_rolegroup_config.selector.to_owned()); if let Some(ldap_settings) = ldap_settings { // TODO: Connecting to an LDAP server without bind credentials does not seem to be configurable in Druid at the moment @@ -690,7 +693,9 @@ fn build_rolegroup_statefulset( &mut cb_druid, &mut pb, ); - resources.update_volumes_and_volume_mounts(&mut cb_druid, &mut pb); + merged_rolegroup_config + .resources + .update_volumes_and_volume_mounts(&mut cb_druid, &mut pb); let prepare_container_command = druid_tls_security.build_tls_key_stores_cmd(); @@ -727,7 +732,7 @@ fn build_rolegroup_statefulset( .readiness_probe(druid_tls_security.get_tcp_socket_probe(10, 10, 1, 3)) // 10s * 3 = 30s to be restarted .liveness_probe(druid_tls_security.get_tcp_socket_probe(10, 10, 3, 3)) - .resources(resources.as_resource_requirements()); + .resources(merged_rolegroup_config.resources.as_resource_requirements()); pb.image_pull_secrets_from_product_image(resolved_product_image) .add_init_container(cb_prepare.build()) @@ -768,7 +773,7 @@ fn build_rolegroup_statefulset( replicas: if druid.spec.stopped.unwrap_or(false) { Some(0) } else { - druid.replicas(rolegroup_ref) + merged_rolegroup_config.replicas.map(i32::from) }, selector: LabelSelector { match_labels: Some(role_group_selector_labels( @@ -932,6 +937,8 @@ mod test { }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: stackable_druid_crd::Error }, + #[snafu(display("invalid configuration"))] + InvalidConfiguration { source: stackable_druid_crd::Error }, } #[rstest] @@ -994,7 +1001,9 @@ mod test { role_group: rolegroup_name.clone(), }; - let resources = config.resources(DruidRole::Historical, rolegroup_name); + let merged_rolegroup_config = config + .common_config(DruidRole::Historical, rolegroup_name) + .context(InvalidConfigurationSnafu)?; let ldap_settings: Option = None; @@ -1003,11 +1012,11 @@ mod test { &resolved_product_image, &rolegroup_ref, rolegroup_config, + &merged_rolegroup_config, "zookeeper-connection-string", None, None, None, - &resources, &druid_tls_security, &ldap_settings, ) From b244a22b3ca03eb70cec39bb8c3c1ab9806a598f Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 12:56:54 +0100 Subject: [PATCH 4/9] Regenerate charts --- deploy/helm/druid-operator/crds/crds.yaml | 104 ++++++++++++++++++---- 1 file changed, 88 insertions(+), 16 deletions(-) diff --git a/deploy/helm/druid-operator/crds/crds.yaml b/deploy/helm/druid-operator/crds/crds.yaml index 7ee8df84..dcbf8dfc 100644 --- a/deploy/helm/druid-operator/crds/crds.yaml +++ b/deploy/helm/druid-operator/crds/crds.yaml @@ -36,7 +36,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -89,7 +96,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -523,7 +537,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -576,7 +597,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -670,7 +698,17 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: + segmentCache: + emptyDir: + capacity: null properties: cpu: default: @@ -699,10 +737,8 @@ spec: properties: segmentCache: default: - freePercentage: 5 emptyDir: - capacity: 1G - medium: '' + capacity: null properties: emptyDir: default: @@ -749,7 +785,17 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: + segmentCache: + emptyDir: + capacity: null properties: cpu: default: @@ -778,10 +824,8 @@ spec: properties: segmentCache: default: - freePercentage: 5 emptyDir: - capacity: 1G - medium: '' + capacity: null properties: emptyDir: default: @@ -912,7 +956,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -965,7 +1016,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -1059,7 +1117,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: @@ -1112,7 +1177,14 @@ spec: default: {} properties: resources: - nullable: true + default: + memory: + limit: null + runtimeLimits: {} + cpu: + min: null + max: null + storage: {} properties: cpu: default: From e7518fc2dc647816e026807a5585201d60fcd9c3 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 13:06:24 +0100 Subject: [PATCH 5/9] Remove unused error variants --- rust/crd/src/lib.rs | 2 -- rust/operator-binary/src/druid_controller.rs | 15 --------------- 2 files changed, 17 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index e6f946cb..fe656ed8 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -125,8 +125,6 @@ pub enum Error { }, #[snafu(display("2 differing s3 connections were given, this is unsupported by Druid"))] IncompatibleS3Connections, - #[snafu(display("Unknown Druid role found {role}. Should be one of {roles:?}"))] - UnknownDruidRole { role: String, roles: Vec }, #[snafu(display("the role group {rolegroup_name} is not defined"))] CannotRetrieveRoleGroup { rolegroup_name: String }, #[snafu(display("missing namespace for resource {name}"))] diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index 41958344..ab4b8f0f 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -158,21 +158,10 @@ pub enum Error { source: strum::ParseError, role: String, }, - #[snafu(display("failed to resolve and merge resource config for role and role group"))] - FailedToResolveResourceConfig { - source: stackable_druid_crd::resource::Error, - }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: stackable_druid_crd::Error }, #[snafu(display("invalid configuration"))] InvalidConfiguration { source: stackable_druid_crd::Error }, - #[snafu(display("invalid java heap config - missing default or value in crd?"))] - InvalidJavaHeapConfig, - #[snafu(display("failed to convert java heap config to unit [{unit}]"))] - FailedToConvertJavaHeap { - source: stackable_operator::error::Error, - unit: String, - }, #[snafu(display("failed to create cluster resources"))] CreateClusterResources { source: stackable_operator::error::Error, @@ -931,10 +920,6 @@ mod test { OperatorFramework { source: stackable_operator::error::Error, }, - #[snafu(display("resource error"))] - Resource { - source: stackable_druid_crd::resource::Error, - }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: stackable_druid_crd::Error }, #[snafu(display("invalid configuration"))] From 267b31a73724bc4c96d4e4739abacd0ed468e4e5 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 13:52:42 +0100 Subject: [PATCH 6/9] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e57b5baa..8d8c71a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. ### Changed - Upgrade to `operator-rs` `0.31.0` ([#374]) +- Merging and validation of the configuration refactored ([#404]). ### Fixed @@ -20,6 +21,7 @@ All notable changes to this project will be documented in this file. [#374]: https://github.com/stackabletech/druid-operator/pull/374 [#380]: https://github.com/stackabletech/druid-operator/pull/380 [#387]: https://github.com/stackabletech/druid-operator/pull/387 +[#404]: https://github.com/stackabletech/druid-operator/pull/404 ## [23.1.0] - 2023-01-23 From 7ae150aa47c61db8d0559f2f38ea5e30b429bb3a Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 14:12:44 +0100 Subject: [PATCH 7/9] Upgrade operator-rs to version 0.35.0 --- CHANGELOG.md | 2 +- Cargo.lock | 60 ++++++++++++++++----------------- rust/crd/Cargo.toml | 2 +- rust/operator-binary/Cargo.toml | 6 ++-- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d8c71a5..36946e54 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ All notable changes to this project will be documented in this file. ### Changed -- Upgrade to `operator-rs` `0.31.0` ([#374]) +- Upgrade to `operator-rs` `0.35.0` ([#374], [#380], [#404]) - Merging and validation of the configuration refactored ([#404]). ### Fixed diff --git a/Cargo.lock b/Cargo.lock index 0c7fc4e5..08eb23ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -398,9 +398,9 @@ checksum = "c9b0705efd4599c15a38151f4721f7bc388306f61084d3bfd50bd07fbca5cb60" [[package]] name = "either" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" [[package]] name = "encoding" @@ -535,9 +535,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" dependencies = [ "futures-channel", "futures-core", @@ -550,9 +550,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" dependencies = [ "futures-core", "futures-sink", @@ -560,15 +560,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" [[package]] name = "futures-executor" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" dependencies = [ "futures-core", "futures-task", @@ -577,15 +577,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" +checksum = "bfb8371b6fb2aeb2d280374607aeabfc99d95c72edfe51692e42d3d7f0d08531" [[package]] name = "futures-macro" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +checksum = "95a73af87da33b5acf53acfebdc339fe592ecf5357ac7c0a7734ab9d8c876a70" dependencies = [ "proc-macro2", "quote", @@ -594,15 +594,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" +checksum = "f310820bb3e8cfd46c80db4d7fb8353e15dfff853a127158425f31e0be6c8364" [[package]] name = "futures-task" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" +checksum = "dcf79a1bf610b10f42aea489289c5a2c478a786509693b80cd39c44ccd936366" [[package]] name = "futures-timer" @@ -612,9 +612,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" dependencies = [ "futures 0.1.31", "futures-channel", @@ -957,7 +957,7 @@ dependencies = [ "chrono", "dirs-next", "either", - "futures 0.3.25", + "futures 0.3.26", "http", "http-body", "hyper", @@ -1021,7 +1021,7 @@ dependencies = [ "ahash", "backoff", "derivative", - "futures 0.3.25", + "futures 0.3.26", "json-patch", "k8s-openapi", "kube-client", @@ -1249,7 +1249,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e785d273968748578931e4dc3b4f5ec86b26e09d9e0d66b55adda7fce742f7a" dependencies = [ "async-trait", - "futures 0.3.25", + "futures 0.3.26", "futures-executor", "once_cell", "opentelemetry", @@ -1558,7 +1558,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f2d176c472198ec1e6551dc7da28f1c089652f66a7b722676c2238ebc0edf" dependencies = [ - "futures 0.3.25", + "futures 0.3.26", "futures-timer", "rstest_macros", "rustc_version", @@ -1836,7 +1836,7 @@ dependencies = [ "built", "clap", "fnv", - "futures 0.3.25", + "futures 0.3.26", "indoc", "lazy_static", "openssl", @@ -1856,15 +1856,15 @@ dependencies = [ [[package]] name = "stackable-operator" -version = "0.33.0" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.33.0#600bc948f2763d070a0f3c354a2b66434cf9f953" +version = "0.35.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.35.0#707b043fceef300416cec2172be18a3e7a45a0e2" dependencies = [ "chrono", "clap", "const_format", "derivative", "either", - "futures 0.3.25", + "futures 0.3.26", "json-patch", "k8s-openapi", "kube", @@ -1890,8 +1890,8 @@ dependencies = [ [[package]] name = "stackable-operator-derive" -version = "0.33.0" -source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.33.0#600bc948f2763d070a0f3c354a2b66434cf9f953" +version = "0.35.0" +source = "git+https://github.com/stackabletech/operator-rs.git?tag=0.35.0#707b043fceef300416cec2172be18a3e7a45a0e2" dependencies = [ "darling", "proc-macro2", diff --git a/rust/crd/Cargo.toml b/rust/crd/Cargo.toml index 01783877..6301fe7d 100644 --- a/rust/crd/Cargo.toml +++ b/rust/crd/Cargo.toml @@ -9,7 +9,7 @@ version = "0.0.0-dev" publish = false [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.33.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.35.0" } semver = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 4bff8d4e..13bb2931 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -9,10 +9,10 @@ version = "0.0.0-dev" publish = false [dependencies] -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.33.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.35.0" } stackable-druid-crd = { path = "../crd" } anyhow = "1.0" -clap = "4.0" +clap = "4.1" fnv = "1.0" futures = { version = "0.3", features = ["compat"] } indoc = "2.0.0" @@ -30,7 +30,7 @@ lazy_static = "1.4" [build-dependencies] built = { version = "0.5", features = ["chrono", "git2"] } -stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.33.0" } +stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "0.35.0" } stackable-druid-crd = { path = "../crd" } [dev-dependencies] From f768f766b53068b5942a714f581d77240112d45f Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Wed, 22 Feb 2023 15:02:33 +0100 Subject: [PATCH 8/9] Add code comments --- rust/crd/src/lib.rs | 51 +++++++++++++------- rust/operator-binary/src/druid_controller.rs | 13 ++--- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index fe656ed8..946d4ecc 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -194,27 +194,45 @@ pub struct DruidClusterConfig { pub zookeeper_config_map_name: String, } +/// Common configuration for all role groups +pub struct CommonRoleGroupConfig { + pub resources: RoleResource, + pub replicas: Option, + pub selector: Option, +} + +/// Container for the merged and validated role group configurations +/// +/// This structure contains for every role a map from the role group names to their configurations. +/// The role group configurations are merged with the role and default configurations. The product +/// configuration is not applied. pub struct MergedConfig { + /// Merged configuration of the broker role pub brokers: HashMap>, + /// Merged configuration of the coordinator role pub coordinators: HashMap>, + /// Merged configuration of the historical role pub historicals: HashMap>, + /// Merged configuration of the middle manager role pub middle_managers: HashMap>, + /// Merged configuration of the router role pub routers: HashMap>, } impl MergedConfig { + /// Returns the common configuration for the given role and rolegroup name pub fn common_config( &self, role: DruidRole, rolegroup_name: &str, - ) -> Result { + ) -> Result { match role { DruidRole::Broker => { let rolegroup = self .brokers .get(rolegroup_name) .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; - Ok(CommonConfig { + Ok(CommonRoleGroupConfig { resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), replicas: rolegroup.replicas, selector: rolegroup.selector.to_owned(), @@ -225,7 +243,7 @@ impl MergedConfig { .coordinators .get(rolegroup_name) .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; - Ok(CommonConfig { + Ok(CommonRoleGroupConfig { resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), replicas: rolegroup.replicas, selector: rolegroup.selector.to_owned(), @@ -236,7 +254,7 @@ impl MergedConfig { .historicals .get(rolegroup_name) .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; - Ok(CommonConfig { + Ok(CommonRoleGroupConfig { resources: RoleResource::Historical( rolegroup.config.config.resources.to_owned(), ), @@ -249,7 +267,7 @@ impl MergedConfig { .middle_managers .get(rolegroup_name) .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; - Ok(CommonConfig { + Ok(CommonRoleGroupConfig { resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), replicas: rolegroup.replicas, selector: rolegroup.selector.to_owned(), @@ -260,7 +278,7 @@ impl MergedConfig { .routers .get(rolegroup_name) .context(CannotRetrieveRoleGroupSnafu { rolegroup_name })?; - Ok(CommonConfig { + Ok(CommonRoleGroupConfig { resources: RoleResource::Druid(rolegroup.config.config.resources.to_owned()), replicas: rolegroup.replicas, selector: rolegroup.selector.to_owned(), @@ -270,12 +288,6 @@ impl MergedConfig { } } -pub struct CommonConfig { - pub resources: RoleResource, - pub replicas: Option, - pub selector: Option, -} - #[derive( Clone, Debug, @@ -598,6 +610,7 @@ impl DruidCluster { s3_ingestion || s3_storage } + /// Returns the merged and validated configuration for all roles pub fn merged_config(&self) -> Result { Ok(MergedConfig { brokers: DruidCluster::merged_role( @@ -623,6 +636,7 @@ impl DruidCluster { }) } + /// Merges and validates the role groups of the given role with the given default configuration fn merged_role( role: &Role, default_config: &T::Fragment, @@ -642,6 +656,7 @@ impl DruidCluster { Ok(merged_role_config) } + /// Merges and validates the given role group with the given role and default configurations fn merged_rolegroup( rolegroup: &RoleGroup, role_config: &T::Fragment, @@ -651,13 +666,14 @@ impl DruidCluster { T: FromFragment, T::Fragment: Clone + Merge, { + let merged_config = DruidCluster::merged_rolegroup_config( + &rolegroup.config.config, + role_config, + default_config, + )?; Ok(RoleGroup { config: CommonConfiguration { - config: DruidCluster::merged_rolegroup_config( - &rolegroup.config.config, - role_config, - default_config, - )?, + config: merged_config, config_overrides: rolegroup.config.config_overrides.to_owned(), env_overrides: rolegroup.config.env_overrides.to_owned(), cli_overrides: rolegroup.config.cli_overrides.to_owned(), @@ -667,6 +683,7 @@ impl DruidCluster { }) } + /// Merges and validates the given role group, role, and default configurations pub fn merged_rolegroup_config( rolegroup_config: &T::Fragment, role_config: &T::Fragment, diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index ab4b8f0f..af3fbe34 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -20,10 +20,11 @@ use stackable_druid_crd::{ PLACEHOLDER_LDAP_BIND_USER, }, security::{resolve_authentication_classes, DruidTlsSecurity}, - CommonConfig, DeepStorageSpec, DruidCluster, DruidRole, APP_NAME, AUTH_AUTHORIZER_OPA_URI, - CERTS_DIR, CREDENTIALS_SECRET_PROPERTY, DRUID_CONFIG_DIRECTORY, DS_BUCKET, EXTENSIONS_LOADLIST, - HDFS_CONFIG_DIRECTORY, JVM_CONFIG, LOG4J2_CONFIG, RUNTIME_PROPS, RW_CONFIG_DIRECTORY, - S3_ENDPOINT_URL, S3_PATH_STYLE_ACCESS, S3_SECRET_DIR_NAME, ZOOKEEPER_CONNECTION_STRING, + CommonRoleGroupConfig, DeepStorageSpec, DruidCluster, DruidRole, APP_NAME, + AUTH_AUTHORIZER_OPA_URI, CERTS_DIR, CREDENTIALS_SECRET_PROPERTY, DRUID_CONFIG_DIRECTORY, + DS_BUCKET, EXTENSIONS_LOADLIST, HDFS_CONFIG_DIRECTORY, JVM_CONFIG, LOG4J2_CONFIG, + RUNTIME_PROPS, RW_CONFIG_DIRECTORY, S3_ENDPOINT_URL, S3_PATH_STYLE_ACCESS, S3_SECRET_DIR_NAME, + ZOOKEEPER_CONNECTION_STRING, }; use stackable_operator::{ builder::{ @@ -453,7 +454,7 @@ fn build_rolegroup_config_map( resolved_product_image: &ResolvedProductImage, rolegroup: &RoleGroupRef, rolegroup_config: &HashMap>, - merged_rolegroup_config: &CommonConfig, + merged_rolegroup_config: &CommonRoleGroupConfig, zk_connstr: &str, opa_connstr: Option<&str>, s3_conn: Option<&S3ConnectionSpec>, @@ -638,7 +639,7 @@ fn build_rolegroup_statefulset( resolved_product_image: &ResolvedProductImage, rolegroup_ref: &RoleGroupRef, rolegroup_config: &HashMap>, - merged_rolegroup_config: &CommonConfig, + merged_rolegroup_config: &CommonRoleGroupConfig, s3_conn: Option<&S3ConnectionSpec>, druid_tls_security: &DruidTlsSecurity, ldap_settings: &Option, From 90bb2e6543a3dfe7172f2155352f935f9d4a2a30 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 27 Feb 2023 08:55:15 +0100 Subject: [PATCH 9/9] Optimize code --- rust/crd/src/resource.rs | 4 ++-- rust/operator-binary/src/druid_controller.rs | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/rust/crd/src/resource.rs b/rust/crd/src/resource.rs index 232fa82e..d41f3231 100644 --- a/rust/crd/src/resource.rs +++ b/rust/crd/src/resource.rs @@ -52,8 +52,8 @@ impl RoleResource { pub fn as_memory_limits(&self) -> MemoryLimits { match self { - Self::Druid(r) => r.clone().memory, - Self::Historical(r) => r.clone().memory, + Self::Druid(r) => r.memory.clone(), + Self::Historical(r) => r.memory.clone(), } } diff --git a/rust/operator-binary/src/druid_controller.rs b/rust/operator-binary/src/druid_controller.rs index af3fbe34..6f9f886f 100644 --- a/rust/operator-binary/src/druid_controller.rs +++ b/rust/operator-binary/src/druid_controller.rs @@ -282,9 +282,7 @@ pub async fn reconcile_druid(druid: Arc, ctx: Arc) -> Result< let druid_tls_security = DruidTlsSecurity::new_from_druid_cluster(&druid, resolved_authentication_classes); - // False positive, auto-deref breaks type inference - #[allow(clippy::explicit_auto_deref)] - let role_config = transform_all_roles_to_config(&*druid, druid.build_role_properties()); + let role_config = transform_all_roles_to_config(druid.as_ref(), druid.build_role_properties()); let validated_role_config = validate_all_roles_and_groups_config( &resolved_product_image.product_version, &role_config.context(ProductConfigTransformSnafu)?,