Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Missing CRD defaults for `status.conditions` field ([#588]).
- Support Kafka 3.4.0 ([#591]).
- Add support for resource quotas ([#595])
- Support podOverrides ([#602])

### Changed

Expand All @@ -26,6 +27,7 @@ All notable changes to this project will be documented in this file.
[#591]: https://github.com/stackabletech/kafka-operator/pull/591
[#595]: https://github.com/stackabletech/kafka-operator/pull/595
[#599]: https://github.com/stackabletech/kafka-operator/pull/599
[#602]: https://github.com/stackabletech/kafka-operator/pull/602

## [23.4.0] - 2023-04-17

Expand Down
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/crd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ tracing = "0.1.37"

[dev-dependencies]
serde_yaml = "0.9"
rstest = "0.17"
20 changes: 14 additions & 6 deletions rust/crd/src/affinity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn get_affinity(cluster_name: &str, role: &KafkaRole) -> StackableAffinityFr
mod tests {
use super::*;

use rstest::rstest;
use std::collections::BTreeMap;

use crate::KafkaCluster;
Expand All @@ -37,8 +38,9 @@ mod tests {
},
};

#[test]
fn test_affinity_defaults() {
#[rstest]
#[case(KafkaRole::Broker)]
fn test_affinity_defaults(#[case] role: KafkaRole) {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand All @@ -55,8 +57,11 @@ mod tests {
default:
replicas: 1
"#;

let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let merged_config = kafka.merged_config(&KafkaRole::Broker, "default").unwrap();
let merged_config = kafka
.merged_config(&role, &role.rolegroup_ref(&kafka, "default"))
.unwrap();

assert_eq!(
merged_config.affinity,
Expand Down Expand Up @@ -95,8 +100,9 @@ mod tests {
);
}

#[test]
fn test_affinity_legacy_node_selector() {
#[rstest]
#[case(KafkaRole::Broker)]
fn test_affinity_legacy_node_selector(#[case] role: KafkaRole) {
let input = r#"
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
Expand All @@ -123,7 +129,9 @@ mod tests {
- antarctica-west1
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let merged_config = kafka.merged_config(&KafkaRole::Broker, "default").unwrap();
let merged_config = kafka
.merged_config(&role, &role.rolegroup_ref(&kafka, "default"))
.unwrap();

assert_eq!(
merged_config.affinity,
Expand Down
102 changes: 77 additions & 25 deletions rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use stackable_operator::{
schemars::{self, JsonSchema},
status::condition::{ClusterCondition, HasStatusCondition},
};
use std::collections::BTreeMap;
use strum::{Display, EnumIter, EnumString};
use std::{collections::BTreeMap, str::FromStr};
use strum::{Display, EnumIter, EnumString, IntoEnumIterator};

pub const DOCKER_IMAGE_BASE_NAME: &str = "kafka";
pub const APP_NAME: &str = "kafka";
Expand All @@ -59,15 +59,32 @@ pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config";
pub enum Error {
#[snafu(display("object has no namespace associated"))]
NoNamespace,

#[snafu(display("failed to validate config of rolegroup {rolegroup}"))]
RoleGroupValidation {
rolegroup: RoleGroupRef<KafkaCluster>,
source: ValidationError,
},

#[snafu(display("the Kafka role [{role}] is missing from spec"))]
MissingKafkaRole { role: String },

#[snafu(display("the role {role} is not defined"))]
CannotRetrieveKafkaRole { role: String },

#[snafu(display("the Kafka node role group [{role_group}] is missing from spec"))]
MissingKafkaRoleGroup { role_group: String },

#[snafu(display("the role group {role_group} is not defined"))]
CannotRetrieveKafkaRoleGroup { role_group: String },

#[snafu(display("unknown role {role}. Should be one of {roles:?}"))]
UnknownKafkaRole {
source: strum::ParseError,
role: String,
roles: Vec<String>,
},

#[snafu(display("fragment validation failure"))]
FragmentValidationFailure { source: ValidationError },
}
Expand Down Expand Up @@ -138,17 +155,31 @@ impl KafkaCluster {
}
}

pub fn role(&self, role_variant: &KafkaRole) -> Result<&Role<KafkaConfigFragment>, Error> {
match role_variant {
KafkaRole::Broker => self.spec.brokers.as_ref(),
}
.with_context(|| CannotRetrieveKafkaRoleSnafu {
role: role_variant.to_string(),
})
}

pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<KafkaCluster>,
) -> Option<(&Role<KafkaConfigFragment>, &RoleGroup<KafkaConfigFragment>)> {
match rolegroup_ref.role.parse().ok()? {
KafkaRole::Broker => {
let role = &self.spec.brokers.as_ref()?;
let rg = role.role_groups.get(&rolegroup_ref.role_group)?;
Some((role, rg))
}
}
) -> Result<&RoleGroup<KafkaConfigFragment>, Error> {
let role_variant =
KafkaRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownKafkaRoleSnafu {
role: rolegroup_ref.role.to_owned(),
roles: KafkaRole::roles(),
})?;

let role = self.role(&role_variant)?;
role.role_groups
.get(&rolegroup_ref.role_group)
.with_context(|| CannotRetrieveKafkaRoleGroupSnafu {
role_group: rolegroup_ref.role_group.to_owned(),
})
}

/// List all pods expected to form the cluster
Expand Down Expand Up @@ -177,33 +208,31 @@ impl KafkaCluster {
}

/// Retrieve and merge resource configs for role and role groups
pub fn merged_config(&self, role: &KafkaRole, role_group: &str) -> Result<KafkaConfig, Error> {
pub fn merged_config(
&self,
role: &KafkaRole,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<KafkaConfig, Error> {
// Initialize the result with all default values as baseline
let conf_defaults = KafkaConfig::default_config(&self.name_any(), role);

let role = self.spec.brokers.as_ref().context(MissingKafkaRoleSnafu {
role: role.to_string(),
})?;

// Retrieve role resource config
let role = self.role(role)?;
let mut conf_role = role.config.config.to_owned();

// Retrieve rolegroup specific resource config
let mut conf_rolegroup = role
.role_groups
.get(role_group)
.map(|rg| rg.config.config.clone())
.unwrap_or_default();
let role_group = self.rolegroup(rolegroup_ref)?;
let mut conf_role_group = role_group.config.config.to_owned();

if let Some(RoleGroup {
selector: Some(selector),
..
}) = role.role_groups.get(role_group)
}) = role.role_groups.get(&rolegroup_ref.role_group)
{
// Migrate old `selector` attribute, see ADR 26 affinities.
// TODO Can be removed after support for the old `selector` field is dropped.
#[allow(deprecated)]
conf_rolegroup.affinity.add_legacy_selector(selector);
conf_role_group.affinity.add_legacy_selector(selector);
}

// Merge more specific configs into default config
Expand All @@ -212,10 +241,10 @@ impl KafkaCluster {
// 2. Role
// 3. Default
conf_role.merge(&conf_defaults);
conf_rolegroup.merge(&conf_role);
conf_role_group.merge(&conf_role);

tracing::debug!("Merged config: {:?}", conf_rolegroup);
fragment::validate(conf_rolegroup).context(FragmentValidationFailureSnafu)
tracing::debug!("Merged config: {:?}", conf_role_group);
fragment::validate(conf_role_group).context(FragmentValidationFailureSnafu)
}
}

Expand Down Expand Up @@ -256,6 +285,29 @@ pub enum KafkaRole {
Broker,
}

impl KafkaRole {
/// Metadata about a rolegroup
pub fn rolegroup_ref(
&self,
kafka: &KafkaCluster,
group_name: impl Into<String>,
) -> RoleGroupRef<KafkaCluster> {
RoleGroupRef {
cluster: ObjectRef::from_obj(kafka),
role: self.to_string(),
role_group: group_name.into(),
}
}

pub fn roles() -> Vec<String> {
let mut roles = vec![];
for role in Self::iter() {
roles.push(role.to_string())
}
roles
}
}

#[derive(Clone, Debug, Default, PartialEq, Fragment, JsonSchema)]
#[fragment_attrs(
derive(
Expand Down
Loading