Skip to content

Commit

Permalink
Set default replication factor for CC sample topics
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Liberti <kliberti@redhat.com>
  • Loading branch information
kyguy committed Feb 9, 2024
1 parent a4f8e21 commit f6effce
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ public static CruiseControl fromCrd(
}
result.image = image;

result.updateConfiguration(ccSpec);
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet());
CruiseControlConfiguration cruiseControlConfiguration = new CruiseControlConfiguration(reconciliation, ccSpec.getConfig().entrySet());
result.updateConfigurationWithDefaults(cruiseControlConfiguration, kafkaConfiguration);

CruiseControlConfiguration ccConfiguration = result.configuration;
result.sslEnabled = ccConfiguration.isApiSslEnabled();
result.authEnabled = ccConfiguration.isApiAuthEnabled();
Expand Down Expand Up @@ -243,16 +246,17 @@ public static CruiseControl fromCrd(
}
}

private void updateConfiguration(CruiseControlSpec spec) {
CruiseControlConfiguration userConfiguration = new CruiseControlConfiguration(reconciliation, spec.getConfig().entrySet());
for (Map.Entry<String, String> defaultEntry : CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap().entrySet()) {
if (userConfiguration.getConfigOption(defaultEntry.getKey()) == null) {
userConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue());
private void updateConfigurationWithDefaults(CruiseControlConfiguration cruiseControlConfiguration, KafkaConfiguration kafkaConfiguration) {
Map<String, String> defaultCruiseControlProperties = new HashMap<>(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
defaultCruiseControlProperties.put(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), kafkaConfiguration.getConfigOption(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR));
for (Map.Entry<String, String> defaultEntry : defaultCruiseControlProperties.entrySet()) {
if (cruiseControlConfiguration.getConfigOption(defaultEntry.getKey()) == null) {
cruiseControlConfiguration.setConfigOption(defaultEntry.getKey(), defaultEntry.getValue());
}
}
// Ensure that the configured anomaly.detection.goals are a sub-set of the default goals
checkGoals(userConfiguration);
this.configuration = userConfiguration;
checkGoals(cruiseControlConfiguration);
this.configuration = cruiseControlConfiguration;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static io.strimzi.operator.cluster.model.CruiseControl.API_HEALTHCHECK_PATH;
Expand Down Expand Up @@ -116,16 +119,16 @@ public class CruiseControlTest {
private static final String IMAGE = "my-image:latest";
private static final int HEALTH_DELAY = 120;
private static final int HEALTH_TIMEOUT = 30;
private static final String REPLICATION_FACTOR = "3";
private static final String MIN_INSYNC_REPLICAS = "2";
private static final String BROKER_CAPACITY_CPU = "6.0";
private static final String BROKER_CAPACITY_OVERRIDE_CPU = "2.0";
private static final String RESOURCE_LIMIT_CPU = "3.0";
private static final String RESOURCE_REQUESTS_CPU = "4.0";

private final Map<String, Object> kafkaConfig = singletonMap(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS);
private final Map<String, Object> ccConfig = new HashMap<>() {{
putAll(CruiseControlConfiguration.getCruiseControlDefaultPropertiesMap());
put("num.partition.metrics.windows", "2");
private final Map<String, Object> kafkaConfig = new HashMap<>() {{
put(CruiseControl.MIN_INSYNC_REPLICAS, MIN_INSYNC_REPLICAS);
put(KafkaConfiguration.DEFAULT_REPLICATION_FACTOR, REPLICATION_FACTOR);
}};

private final Storage kafkaStorage = new EphemeralStorage();
Expand All @@ -141,7 +144,6 @@ public class CruiseControlTest {

private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewPod()
.withTmpDirSizeLimit("100Mi")
Expand Down Expand Up @@ -229,17 +231,6 @@ private static boolean isJBOD(Object diskCapacity) {
return diskCapacity instanceof JsonObject;
}

public Kafka kafkaSpec(CruiseControlSpec cruiseControlSpec, ResourceRequirements resourceRequirements) {
return new KafkaBuilder()
.withNewSpec()
.withNewKafka()
.withResources(resourceRequirements)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();
}

@ParallelTest
public void testBrokerCapacities() {
// Test user defined capacities
Expand Down Expand Up @@ -272,6 +263,7 @@ public void testBrokerCapacities() {
.editSpec()
.editKafka()
.withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION)
.withConfig(kafkaConfig)
.withStorage(jbodStorage)
.withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build())
.endKafka()
Expand Down Expand Up @@ -366,6 +358,7 @@ public void testBrokerCapacities() {
.withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION)
.withStorage(jbodStorage)
.withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build())
.withConfig(kafkaConfig)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
Expand All @@ -387,12 +380,6 @@ public void testBrokerCapacities() {

@ParallelTest
public void testBrokerCapacitiesWithPools() {
Kafka kafka = new KafkaBuilder(ResourceUtils.createKafka(NAMESPACE, CLUSTER, REPLICAS, IMAGE, HEALTH_DELAY, HEALTH_TIMEOUT))
.editSpec()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

Set<NodeRef> nodes = Set.of(
new NodeRef("foo-pool1-0", 0, "pool1", false, true),
new NodeRef("foo-pool1-1", 1, "pool1", false, true),
Expand Down Expand Up @@ -867,7 +854,17 @@ private void verifyBrokerCapacity(Set<NodeRef> nodes,
.withBrokerCapacity(brokerCapacity)
.build();

CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaSpec(cruiseControlSpec, resourceRequirements), VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER);
Kafka kafka = new KafkaBuilder()
.withNewSpec()
.withNewKafka()
.withResources(resourceRequirements)
.withConfig(kafkaConfig)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

CruiseControl cc = CruiseControl.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, VERSIONS, nodes, storage, resources, SHARED_ENV_PROVIDER);
ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null));
JsonObject capacity = new JsonObject(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME));
JsonArray brokerEntries = capacity.getJsonArray(Capacity.CAPACITIES_KEY);
Expand All @@ -894,7 +891,7 @@ public void testApiSecurity(Boolean apiAuthEnabled, Boolean apiSslEnabled) {
String e2Value = apiSslEnabled.toString();
EnvVar e2 = new EnvVar(e2Key, e2Value, null);

Map<String, Object> config = ccConfig;
Map<String, Object> config = new HashMap<>();
config.put(CruiseControlConfigurationParameters.WEBSERVER_SECURITY_ENABLE.getValue(), apiAuthEnabled);
config.put(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), apiSslEnabled);

Expand Down Expand Up @@ -950,7 +947,6 @@ public void testProbeConfiguration() {
public void testSecurityContext() {
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewPod()
.withSecurityContext(new PodSecurityContextBuilder().withFsGroup(123L).withRunAsGroup(456L).withRunAsUser(789L).build())
Expand Down Expand Up @@ -1034,7 +1030,6 @@ public void testCruiseControlContainerSecurityContext() {

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.withNewTemplate()
.withNewCruiseControlContainer()
.withSecurityContext(securityContext)
Expand Down Expand Up @@ -1119,7 +1114,7 @@ public void testGoalsCheck() {
String customGoals = "com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal," +
"com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal";

Map<String, Object> customGoalConfig = ccConfig;
Map<String, Object> customGoalConfig = new HashMap<>();
customGoalConfig.put(DEFAULT_GOALS_CONFIG_KEY.getValue(), customGoals);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
Expand Down Expand Up @@ -1176,12 +1171,9 @@ public void testDefaultTopicNames() {
topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME);
topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME);
topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME);
Map<String, Object> customConfig = ccConfig;
customConfig.putAll(topicConfigs);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(ccConfig)
.build();

Kafka resource = createKafka(cruiseControlSpec);
Expand All @@ -1195,7 +1187,7 @@ public void testCustomTopicNames() {
topicConfigs.put(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), "partition-topic");
topicConfigs.put(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), "broker-topic");
topicConfigs.put(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), "metric-reporter-topic");
Map<String, Object> customConfig = ccConfig;
Map<String, Object> customConfig = new HashMap<>();
customConfig.putAll(topicConfigs);

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
Expand All @@ -1208,6 +1200,40 @@ public void testCustomTopicNames() {
topicConfigs.forEach((configParam, name) -> assertThat(cc.configuration.getConfiguration(), containsString(String.format("%s=%s", configParam, name))));
}

private Properties getCcProperties(Kafka resource) {
CruiseControl cc = createCruiseControl(resource);
ConfigMap configMap = cc.generateConfigMap(new MetricsAndLogging(null, null));
return parsePropertiesString(configMap.getData().get(CruiseControl.SERVER_CONFIG_FILENAME));
}

private static Properties parsePropertiesString(String kafkaPropertiesString) {
Properties properties = new Properties();
try (StringReader reader = new StringReader(kafkaPropertiesString)) {
properties.load(reader);
} catch (IOException e) {
e.printStackTrace();
}
return properties;
}

@ParallelTest
public void testSampleStoreTopicReplicationFactorConfig() {
// Test that the replication factor of Cruise Control's sample store topic is set to Kafka cluster's `default.replication.factor`
// when not explicitly set in Cruise Control config
Properties properties = getCcProperties(kafka);
assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(REPLICATION_FACTOR));

// Test that the replication factor of Cruise Control's sample store topic is set to value set in Cruise Control config
String replicationFactor = "1";
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig(Map.of(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), replicationFactor))
.build();

properties = getCcProperties(createKafka(cruiseControlSpec));
assertThat(properties.getProperty(CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue()), is(replicationFactor));
}

@AfterAll
public static void cleanUp() {
ResourceUtils.cleanUpTemporaryTLSFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.PasswordGenerator;
import io.strimzi.operator.common.model.cruisecontrol.CruiseControlConfigurationParameters;
import io.strimzi.operator.common.operator.MockCertManager;
import io.strimzi.operator.common.operator.resource.ConfigMapOperator;
import io.strimzi.operator.common.operator.resource.DeploymentOperator;
Expand Down Expand Up @@ -67,7 +68,8 @@ public class CruiseControlReconcilerTest {
new NodeRef(NAME + "kafka-2", 2, "kafka", false, true));
private final CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withBrokerCapacity(new BrokerCapacityBuilder().withInboundNetwork("10000KB/s").withOutboundNetwork("10000KB/s").build())
.withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal"))
.withConfig(Map.of("hard.goals", "com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal",
CruiseControlConfigurationParameters.SAMPLE_STORE_TOPIC_REPLICATION_FACTOR.getValue(), "3"))
.build();

@Test
Expand Down Expand Up @@ -152,7 +154,7 @@ public void reconcileEnabledCruiseControl(VertxTestContext context) {
// Verify deployment
assertThat(deployCaptor.getAllValues().size(), is(1));
assertThat(deployCaptor.getValue(), is(notNullValue()));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("f6dc41c7"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH), is("fc0ad847"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH), is("1eb49220"));
assertThat(deployCaptor.getAllValues().get(0).getSpec().getTemplate().getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_AUTH_HASH), is("27ada64b"));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public enum CruiseControlConfigurationParameters {
*/
BROKER_METRIC_TOPIC_NAME("broker.metric.sample.store.topic"),

/**
* Replication factor of Kafka sample store topics
*/
SAMPLE_STORE_TOPIC_REPLICATION_FACTOR("sample.store.topic.replication.factor"),

/**
* Metrics reporter topic
*/
Expand Down

0 comments on commit f6effce

Please sign in to comment.