Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move cruise control server, capacity and server logging metrics config to ConfigMap #8977

Merged
merged 9 commits into from
Jan 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ public static String secretName(String clusterName) {
}

/**
* Returns the name of the Cruise Control log {@code ConfigMap} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @return The name of the corresponding Cruise Control log {@code ConfigMap}.
* Returns the name of the Cruise Control {@code ConfigMap} for a {@code Kafka} cluster of the given name.
* @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
* @return The name of the corresponding Cruise Control {@code ConfigMap}.
*/
public static String logAndMetricsConfigMapName(String clusterName) {
public static String configMapName(String clusterName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update the javadoc.

/**
 * Returns the name of the Cruise Control {@code ConfigMap} for a {@code Kafka} cluster of the given name.
 * @param clusterName The {@code metadata.name} of the {@code Kafka} resource.
 * @return The name of the corresponding Cruise Control {@code ConfigMap}.
 */

return clusterName + "-cruise-control-config";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.model.Labels;
Expand Down Expand Up @@ -91,15 +92,34 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup
protected static final String TLS_CC_CERTS_VOLUME_MOUNT = "/etc/cruise-control/cc-certs/";
protected static final String TLS_CA_CERTS_VOLUME_NAME = "cluster-ca-certs";
protected static final String TLS_CA_CERTS_VOLUME_MOUNT = "/etc/cruise-control/cluster-ca-certs/";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "cruise-control-metrics-and-logging";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_MOUNT = "/opt/cruise-control/custom-config/";
protected static final String CONFIG_VOLUME_NAME = "config";
/**
* Server config file name
*/
public static final String SERVER_CONFIG_FILENAME = "cruisecontrol.properties";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using the SERVER_ naming here? The same applies to the annotation below. Isn't this constant and the annotation related to Cruise Control only, so that something like CRUISE_CONTROL_ is clearer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to draw the distinction between the configuration of the Cruise Control server application and the Cruise Control broker capacity, since both are Cruise Control configurations. I figured since this fields and annotations are in the Cruise Control class and Cruise Control pod respectively, it would be clear they were apart of Cruise Control!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that explanation make sense, does it sound reasonable to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense. Thanks!

/**
* Capacity config file name
*/
public static final String CAPACITY_CONFIG_FILENAME = "capacity.json";
protected static final String CONFIG_VOLUME_MOUNT = "/opt/cruise-control/custom-config/";
protected static final String API_AUTH_CONFIG_VOLUME_NAME = "api-auth-config";
protected static final String API_AUTH_CONFIG_VOLUME_MOUNT = "/opt/cruise-control/api-auth-config/";

protected static final String API_AUTH_CREDENTIALS_FILE = API_AUTH_CONFIG_VOLUME_MOUNT + API_AUTH_FILE_KEY;

protected static final String ENV_VAR_CRUISE_CONTROL_METRICS_ENABLED = "CRUISE_CONTROL_METRICS_ENABLED";

/**
* Annotation for rolling a cluster whenever the server configuration has changed.
* When the configuration hash annotation change is detected, we force a pod restart.
*/
public static final String ANNO_STRIMZI_SERVER_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "server-configuration-hash";

/**
* Annotation for rolling a cluster whenever the capacity configuration has changed.
* When the configuration hash annotation change is detected, we force a pod restart.
*/
public static final String ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH = Annotations.STRIMZI_DOMAIN + "capacity-configuration-hash";

// Configuration defaults
protected static final boolean DEFAULT_CRUISE_CONTROL_METRICS_ENABLED = false;

Expand All @@ -120,16 +140,9 @@ public class CruiseControl extends AbstractModel implements SupportsMetrics, Sup

/* test */ static final String MIN_INSYNC_REPLICAS = "min.insync.replicas";

/* test */ Capacity getCapacity() {
return capacity;
}

// Cruise Control configuration keys (EnvVariables)
protected static final String ENV_VAR_CRUISE_CONTROL_CONFIGURATION = "CRUISE_CONTROL_CONFIGURATION";
protected static final String ENV_VAR_STRIMZI_KAFKA_BOOTSTRAP_SERVERS = "STRIMZI_KAFKA_BOOTSTRAP_SERVERS";

protected static final String ENV_VAR_CRUISE_CONTROL_CAPACITY_CONFIGURATION = "CRUISE_CONTROL_CAPACITY_CONFIGURATION";

protected static final String ENV_VAR_API_SSL_ENABLED = "STRIMZI_CC_API_SSL_ENABLED";
protected static final String ENV_VAR_API_AUTH_ENABLED = "STRIMZI_CC_API_AUTH_ENABLED";
protected static final String ENV_VAR_API_USER = "API_USER";
Expand Down Expand Up @@ -311,27 +324,28 @@ protected List<Volume> getVolumes(boolean isOpenShift) {
createSecretVolume(TLS_CC_CERTS_VOLUME_NAME, CruiseControlResources.secretName(cluster), isOpenShift),
createSecretVolume(TLS_CA_CERTS_VOLUME_NAME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift),
createSecretVolume(API_AUTH_CONFIG_VOLUME_NAME, CruiseControlResources.apiSecretName(cluster), isOpenShift),
createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, CruiseControlResources.logAndMetricsConfigMapName(cluster)));
createConfigMapVolume(CONFIG_VOLUME_NAME, CruiseControlResources.configMapName(cluster)));
}

protected List<VolumeMount> getVolumeMounts() {
return List.of(VolumeUtils.createTempDirVolumeMount(),
createVolumeMount(CruiseControl.TLS_CC_CERTS_VOLUME_NAME, CruiseControl.TLS_CC_CERTS_VOLUME_MOUNT),
createVolumeMount(CruiseControl.TLS_CA_CERTS_VOLUME_NAME, CruiseControl.TLS_CA_CERTS_VOLUME_MOUNT),
createVolumeMount(CruiseControl.API_AUTH_CONFIG_VOLUME_NAME, CruiseControl.API_AUTH_CONFIG_VOLUME_MOUNT),
createVolumeMount(LOG_AND_METRICS_CONFIG_VOLUME_NAME, LOG_AND_METRICS_CONFIG_VOLUME_MOUNT));
createVolumeMount(CONFIG_VOLUME_NAME, CONFIG_VOLUME_MOUNT));
}

/**
* Generates Kubernetes Deployment for Cruise Control
*
* @param annotations Map with annotations
* @param isOpenShift Flag indicating if we are on OpenShift or not
* @param imagePullPolicy Image pull policy
* @param imagePullSecrets Image pull secrets
*
* @return Cruise Control Kubernetes Deployment
*/
public Deployment generateDeployment(boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets) {
public Deployment generateDeployment(Map<String, String> annotations, boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets) {
return WorkloadUtils.createDeployment(
componentName,
namespace,
Expand All @@ -346,7 +360,7 @@ public Deployment generateDeployment(boolean isOpenShift, ImagePullPolicy imageP
labels,
templatePod,
DEFAULT_POD_LABELS,
Map.of(),
annotations,
templatePod != null ? templatePod.getAffinity() : null,
null,
List.of(createContainer(imagePullPolicy)),
Expand Down Expand Up @@ -380,8 +394,6 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_BOOTSTRAP_SERVERS, KafkaResources.bootstrapServiceName(cluster) + ":" + KafkaCluster.REPLICATION_PORT));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_KAFKA_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty line.

varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_CAPACITY_CONFIGURATION, capacity.toString()));

varList.add(ContainerUtils.createEnvVar(ENV_VAR_API_SSL_ENABLED, String.valueOf(this.sslEnabled)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_API_AUTH_ENABLED, String.valueOf(this.authEnabled)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_API_USER, API_USER_NAME));
Expand All @@ -392,10 +404,6 @@ protected List<EnvVar> getEnvVars() {
JvmOptionUtils.jvmPerformanceOptions(varList, jvmOptions);
JvmOptionUtils.jvmSystemProperties(varList, jvmOptions);

if (configuration != null && !configuration.getConfiguration().isEmpty()) {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_CONFIGURATION, configuration.getConfiguration()));
}

// Add shared environment variables used for all containers
varList.addAll(sharedEnvironmentProvider.variables());

Expand Down Expand Up @@ -498,25 +506,6 @@ public NetworkPolicy generateNetworkPolicy(String operatorNamespace, Labels oper
);
}

/**
* Generates a metrics and logging ConfigMap according to the configuration. If this operand doesn't support logging
* or metrics, they will nto be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @return The generated ConfigMap
*/
public ConfigMap generateMetricsAndLogConfigMap(MetricsAndLogging metricsAndLogging) {
return ConfigMapUtils
.createConfigMap(
CruiseControlResources.logAndMetricsConfigMapName(cluster),
namespace,
labels,
ownerReference,
ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging)
);
}

/**
* @return Metrics Model instance for configuring Prometheus metrics
*/
Expand All @@ -530,4 +519,31 @@ public MetricsModel metrics() {
public LoggingModel logging() {
return logging;
}
}

/**
* Generates a ConfigMap with the following:
*
* (1) Cruise Control server configuration
* (2) Cruise Control broker capacity configuration
* (3) Cruise Control server metrics and logging configuration
*
* @param metricsAndLogging The logging and metrics configuration
*
* @return The generated data
*/
public ConfigMap generateConfigMap(MetricsAndLogging metricsAndLogging) {
Map<String, String> configMapData = new HashMap<>();
configMapData.put(SERVER_CONFIG_FILENAME, configuration.asOrderedProperties().asPairs());
configMapData.put(CAPACITY_CONFIG_FILENAME, capacity.toString());
configMapData.putAll(ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging));

return ConfigMapUtils
.createConfigMap(
CruiseControlResources.configMapName(cluster),
namespace,
labels,
ownerReference,
configMapData
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non necessary blank line.

/**
* Class for handling Cruise Control configuration passed by the user
*/
Expand Down Expand Up @@ -90,7 +91,8 @@ public class CruiseControlConfiguration extends AbstractConfiguration {
Map.entry(CruiseControlConfigurationParameters.WEBSERVER_SSL_ENABLE.getValue(), Boolean.toString(CruiseControlConfigurationParameters.DEFAULT_WEBSERVER_SSL_ENABLED)),
Map.entry(CruiseControlConfigurationParameters.PARTITION_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_PARTITION_METRIC_TOPIC_NAME),
Map.entry(CruiseControlConfigurationParameters.BROKER_METRIC_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_BROKER_METRIC_TOPIC_NAME),
Map.entry(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME)
Map.entry(CruiseControlConfigurationParameters.METRIC_REPORTER_TOPIC_NAME.getValue(), CruiseControlConfigurationParameters.DEFAULT_METRIC_REPORTER_TOPIC_NAME),
Map.entry(CruiseControlConfigurationParameters.CAPACITY_CONFIG_FILE.getValue(), CruiseControl.CONFIG_VOLUME_MOUNT + CruiseControl.CAPACITY_CONFIG_FILENAME)
)));

private static final List<String> FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesToList(CruiseControlSpec.FORBIDDEN_PREFIXES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ public class Capacity {
*/
public static final String CPU_KEY = "CPU";

/**
* Inbound network key
*/
public static final String INBOUND_NETWORK_KEY = "NW_IN";

/**
* Outbound network key
*/
public static final String OUTBOUND_NETWORK_KEY = "NW_OUT";
Comment on lines +150 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably does not have to be fixed in this PR right now. But I think the fact that you have to make this public because of tests of a different class instead of just marking it as /* test */ shows that there are some design issues here. Either these fields should be defined in a different class or the tests are badly structured or the packages are done wrong here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either these fields should be defined in a different class or the tests are badly structured or the packages are done wrong here.

This. The class/package structure for CC files needs to be refactored, will do it in a follow up PR


/**
* Resource type
*/
Expand All @@ -155,8 +165,6 @@ public class Capacity {
private static final String KAFKA_MOUNT_PATH = "/var/lib/kafka";
private static final String KAFKA_LOG_DIR = "kafka-log";
private static final String BROKER_ID_KEY = "brokerId";
private static final String INBOUND_NETWORK_KEY = "NW_IN";
private static final String OUTBOUND_NETWORK_KEY = "NW_OUT";
private static final String DOC_KEY = "doc";

private enum ResourceRequirementType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,17 @@ protected static String milliCpuToCpu(int milliCPU) {
return String.valueOf(milliCPU / 1000.0);
}

protected JsonObject getJson() {
/**
* Returns CpuCapacity object as a JsonObject
*
* @return The CpuCapacity object as a JsonObject
*/
public JsonObject getJson() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, it looks like getCores() is not used anymore, so we can get rid of it.

return new JsonObject().put(CORES_KEY, this.cores);
}

@Override
public String toString() {
return this.getJson().toString();
}

/**
* Retrieves the value of the 'cores' property.
*
* @return The value of the 'cores' property.
*/
public String getCores() {
return cores;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.strimzi.operator.cluster.model.KafkaVersion;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.Util;
Expand All @@ -36,10 +35,12 @@
import io.vertx.core.Future;

import java.time.Clock;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
* Class used for reconciliation of Cruise Control. This class contains both the steps of the Cruise Control
* reconciliation pipeline and is also used to store the state between them.
Expand All @@ -65,6 +66,9 @@ public class CruiseControlReconciler {

private boolean existingCertsChanged = false;

private String serverConfigurationHash = "";
private String capacityConfigurationHash = "";

/**
* Constructs the Cruise Control reconciler
*
Expand Down Expand Up @@ -122,7 +126,7 @@ public CruiseControlReconciler(
public Future<Void> reconcile(boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets, Clock clock) {
return networkPolicy()
.compose(i -> serviceAccount())
.compose(i -> metricsAndLoggingConfigMap())
.compose(i -> configMap())
.compose(i -> certificatesSecret(clock))
.compose(i -> apiSecret())
.compose(i -> service())
Expand Down Expand Up @@ -165,26 +169,32 @@ protected Future<Void> serviceAccount() {
}

/**
* Manages the Cruise Control Config Map with logging and metrics configuration.
* Manages the Cruise Control ConfigMap which contains the following:
* (1) Cruise Control server configuration
* (2) Cruise Control broker capacity configuration
* (3) Cruise Control server logging and metrics configuration
*
* @return Future which completes when the reconciliation is done
* @return Future which completes when the reconciliation is done
*/
protected Future<Void> metricsAndLoggingConfigMap() {
if (cruiseControl != null) {
protected Future<Void> configMap() {
if (cruiseControl != null) {
return MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperator, cruiseControl.logging(), cruiseControl.metrics())
.compose(metricsAndLogging -> {
ConfigMap logAndMetricsConfigMap = cruiseControl.generateMetricsAndLogConfigMap(metricsAndLogging);
ConfigMap configMap = cruiseControl.generateConfigMap(metricsAndLogging);

this.serverConfigurationHash = Util.hashStub(configMap.getData().get(CruiseControl.SERVER_CONFIG_FILENAME));
this.capacityConfigurationHash = Util.hashStub(configMap.getData().get(CruiseControl.CAPACITY_CONFIG_FILENAME));

return configMapOperator
.reconcile(
reconciliation,
reconciliation.namespace(),
CruiseControlResources.logAndMetricsConfigMapName(reconciliation.name()),
logAndMetricsConfigMap
CruiseControlResources.configMapName(reconciliation.name()),
configMap
).map((Void) null);
});
} else {
return configMapOperator.reconcile(reconciliation, reconciliation.namespace(), CruiseControlResources.logAndMetricsConfigMapName(reconciliation.name()), null)
return configMapOperator.reconcile(reconciliation, reconciliation.namespace(), CruiseControlResources.configMapName(reconciliation.name()), null)
.map((Void) null);
}
}
Expand Down Expand Up @@ -270,14 +280,13 @@ protected Future<Void> service() {
*/
protected Future<Void> deployment(boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets) {
if (cruiseControl != null) {
Deployment deployment = cruiseControl.generateDeployment(isOpenShift, imagePullPolicy, imagePullSecrets);
Map<String, String> podAnnotations = new LinkedHashMap<>();
podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()));
podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()));
podAnnotations.put(CruiseControl.ANNO_STRIMZI_SERVER_CONFIGURATION_HASH, serverConfigurationHash);
podAnnotations.put(CruiseControl.ANNO_STRIMZI_CAPACITY_CONFIGURATION_HASH, capacityConfigurationHash);

int caCertGeneration = clusterCa.caCertGeneration();
Annotations.annotations(deployment.getSpec().getTemplate()).put(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(caCertGeneration));
int caKeyGeneration = clusterCa.caKeyGeneration();
Annotations.annotations(deployment.getSpec().getTemplate()).put(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(caKeyGeneration));
Deployment deployment = cruiseControl.generateDeployment(podAnnotations, isOpenShift, imagePullPolicy, imagePullSecrets);

return deploymentOperator
.reconcile(reconciliation, reconciliation.namespace(), CruiseControlResources.componentName(reconciliation.name()), deployment)
Expand Down