Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set default replication factor for CC sample topics #9471

Merged
merged 1 commit into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ public static CruiseControl fromCrd(
}
result.image = image;

result.updateConfiguration(ccSpec);
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet());
CruiseControlConfiguration cruiseControlConfiguration = new CruiseControlConfiguration(reconciliation, ccSpec.getConfig().entrySet());
result.updateConfigurationWithDefaults(cruiseControlConfiguration, kafkaConfiguration);

CruiseControlConfiguration ccConfiguration = result.configuration;
result.sslEnabled = ccConfiguration.isApiSslEnabled();
result.authEnabled = ccConfiguration.isApiAuthEnabled();
Expand Down Expand Up @@ -243,16 +246,17 @@ public static CruiseControl fromCrd(
}
}

private void updateConfiguration(CruiseControlSpec spec) {
CruiseControlConfiguration userConfiguration = new CruiseControlConfiguration(reconciliation, spec.getConfig().entrySet());
for (Map.Entry<String, String> defaultEntry : CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap().entrySet()) {
if (userConfiguration.getConfigOption(defaultEntry.getKey()) == null) {
userConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue());
private void updateConfigurationWithDefaults(CruiseControlConfiguration cruiseControlConfiguration, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR));
for (Map.Entry<String, String> defaultEntry : defaultCruiseControlProperties.entrySet()) {
if (cruiseControlConfiguration.getConfigOption(defaultEntry.getKey()) == null) {
cruiseControlConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue());
}
}
// Ensure that the configured anomaly.detection.goals are a sub-set of the default goals
checkGoals(userConfiguration);
this.configuration = userConfiguration;
checkGoals(cruiseControlConfiguration);
this.configuration = cruiseControlConfiguration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static io.strimzi.operator.cluster.model.CruiseControl.API_HEALTHCHECK_PATH;
Expand Down Expand Up @@ -116,16 +119,16 @@ public class CruiseControlTest {
private static final String IMAGE = "my-image:latest";
private static final int HEALTH_DELAY = 120;
private static final int HEALTH_TIMEOUT = 30;
private static final String REPLICATION_FACTOR = "3";
private static final String MIN_INSYNC_REPLICAS = "2";
private static final String BROKER_CAPACITY_CPU = "6.0";
private static final String BROKER_CAPACITY_OVERRIDE_CPU = "2.0";
private static final String RESOURCE_LIMIT_CPU = "3.0";
private static final String RESOURCE_REQUESTS_CPU = "4.0";

private final Map<String, Object> kafkaConfig = singletonMap(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS);
private final Map<String, Object> ccConfig = new HashMap<>() {{
putAll(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
put("num.partition.metrics.windows", "2");
private final Map<String, Object> kafkaConfig = new HashMap<>() {{
put(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS);
put(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR, REPLICATION_FACTOR);
}};

private final Storage kafkaStorage = new EphemeralStorage();
Expand All @@ -141,7 +144,6 @@ public class CruiseControlTest {

private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewPod()
.withTmpDirSizeLimit("100Mi")
Expand Down Expand Up @@ -229,17 +231,6 @@ private static boolean isJBOD(Object diskCapacity) {
return diskCapacity instanceof JsonObject;
}

public Kafka kafkaSpec(CruiseControlSpec cruiseControlSpec, ResourceRequirements resourceRequirements) {
return new KafkaBuilder()
.withNewSpec()
.withNewKafka()
.withResources(resourceRequirements)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();
}

@ParallelTest
public void testBrokerCapacities() {
// Test user defined capacities
Expand Down Expand Up @@ -272,6 +263,7 @@ public void testBrokerCapacities() {
.editSpec()
.editKafka()
.withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION)
.withConfig(kafkaConfig)
.withStorage(jbodStorage)
.withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build())
.endKafka()
Expand Down Expand Up @@ -366,6 +358,7 @@ public void testBrokerCapacities() {
.withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION)
.withStorage(jbodStorage)
.withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build())
.withConfig(kafkaConfig)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
Expand All @@ -387,12 +380,6 @@ public void testBrokerCapacities() {

@ParallelTest
public void testBrokerCapacitiesWithPools() {
Kafka kafka = new KafkaBuilder(ResourceUtils.createKafka(NAMESPACE, CLUSTER, REPLICAS, IMAGE, HEALTH_DELAY, HEALTH_TIMEOUT))
.editSpec()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

Set<NodeRef> nodes = Set.of(
new NodeRef("foo-pool1-0", 0, "pool1", false, true),
new NodeRef("foo-pool1-1", 1, "pool1", false, true),
Expand Down Expand Up @@ -867,7 +854,17 @@ private void verifyBrokerCapacity(Set<NodeRef> nodes,
.withBrokerCapacity(brokerCapacity)
.build();

CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaSpec(cruiseControlSpec, resourceRequirements), VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER);
Kafka kafka = new KafkaBuilder()
.withNewSpec()
.withNewKafka()
.withResources(resourceRequirements)
.withConfig(kafkaConfig)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER);
ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null));
JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME));
JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY);
Expand All @@ -894,7 +891,7 @@ public void testApiSecurity(Boolean apiAuthEnabled, Boolean apiSslEnabled) {
String e2Value = apiSslEnabled.toString();
EnvVar e2 = new EnvVar(e2Key, e2Value, null);

Map<String, Object> config = ccConfig;
Map<String, Object> config = new HashMap<>();
config.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), apiAuthEnabled);
config.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), apiSslEnabled);

Expand Down Expand Up @@ -950,7 +947,6 @@ public void testProbeConfiguration() {
public void testSecurityContext() {
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewPod()
.withSecurityContext(new PodSecurityContextBuilder().withFsGroup(123L).withRunAsGroup(456L).withRunAsUser(789L).build())
Expand Down Expand Up @@ -1034,7 +1030,6 @@ public void testCruiseControlContainerSecurityContext() {

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewCruiseControlContainer()
.withSecurityContext(securityContext)
Expand Down Expand Up @@ -1119,7 +1114,7 @@ public void testGoalsCheck() {
String customGoals = "com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal," +
"com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal";

Map<String, Object> customGoalConfig = ccConfig;
Map<String, Object> customGoalConfig = new HashMap<>();
customGoalConfig.put(DEFAULT_GOALS_CONFIG_KEY.getValue(), customGoals);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
Expand Down Expand Up @@ -1176,12 +1171,9 @@ public void testDefaultTopicNames() {
topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME);
topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME);
topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME);
Map<String, Object> customConfig = ccConfig;
customConfig.putAll(topicConfigs);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.build();

Kafka resource = createKafka(cruiseControlSpec);
Expand All @@ -1195,7 +1187,7 @@ public void testCustomTopicNames() {
topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), "partition-topic");
topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), "broker-topic");
topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), "metric-reporter-topic");
Map<String, Object> customConfig = ccConfig;
Map<String, Object> customConfig = new HashMap<>();
customConfig.putAll(topicConfigs);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
Expand All @@ -1208,6 +1200,40 @@ public void testCustomTopicNames() {
topicConfigs.forEach((configParam, name) -> assertThat(cc.configuration.getConfiguration(), containsString(String.format("%s=%s", configParam, name))));
}

private Properties getCcProperties(Kafka resource) {
CruiseControl cc = createCruiseControl(resource);
ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null));
return parsePropertiesString(configMap.getData().get(CruiseControl.SERVER_CONFIG_FILENAME));
}

private static Properties parsePropertiesString(String kafkaPropertiesString) {
Properties properties = new Properties();
try (StringReader reader = new StringReader(kafkaPropertiesString)) {
properties.load(reader);
} catch (IOException e) {
e.printStackTrace();
}
return properties;
}

@ParallelTest
public void testSampleStoreTopicReplicationFactorConfig() {
// Test that the replication factor of Cruise Control's sample store topic is set to Kafka cluster's `default.replication.factor`
// when not explicitly set in Cruise Control config
Properties properties = getCcProperties(kafka);
assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(REPLICATION_FACTOR));

// Test that the replication factor of Cruise Control's sample store topic is set to value set in Cruise Control config
String replicationFactor = "1";
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(Map.of(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), replicationFactor))
.build();

properties = getCcProperties(createKafka(cruiseControlSpec));
assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(replicationFactor));
}

@AfterAll
public static void cleanUp() {
ResourceUtils.cleanUpTemporaryTLSFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.PasswordGenerator;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.operator.MockCertManager;
import io.strimzi.operator.common.operator.resource.ConfigMapOperator;
import io.strimzi.operator.common.operator.resource.DeploymentOperator;
Expand Down Expand Up @@ -67,7 +68,8 @@ public class CruiseControlReconcilerTest {
new NodeRef(NAME + "kafka-2", 2, "kafka", false, true));
private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withBrokerCapacity(new BrokerCapacityBuilder().withInboundNetwork("10000KB/s").withOutboundNetwork("10000KB/s").build())
.withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal"))
.withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal",
CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), "3"))
.build();

@Test
Expand Down Expand Up @@ -152,7 +154,7 @@ public void reconcileEnabledCruiseControl(VertxTestContext context) {
// Verify deployment
assertThat(deployCaptor.getAllValues().size(), is(1));
assertThat(deployCaptor.getValue(), is(notNullValue()));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("f6dc41c7"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("fc0ad847"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("1eb49220"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_AUTH_HASH), is("27ada64b"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public enum CruiseControlConfigurationParameters {
*/
BROKER_METRIC_TOPIC_NAME("broker.metric.sample.store.topic"),

/**
* Replication factor of Kafka sample store topics
*/
SAMPLE_STORE_TOPIC_REPLICATION_FACTOR("sample.store.topic.replication.factor"),

/**
* Metrics reporter topic
*/
Expand Down