diff --git a/CHANGELOG.md b/CHANGELOG.md index 0fd40fed37a..01fa79793bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * The `UseStrimziPodSets` feature gate moves to beta stage. By default, StrimziPodSets are used instead of StatefulSets. If needed, `UseStrimziPodSets` can be disabled in the feature gates configuration in the Cluster Operator. +* Add CPU capacity overrides for Cruise Control capacity config ## 0.29.0 diff --git a/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacity.java b/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacity.java index 2380b5590bc..72a47b85325 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacity.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacity.java @@ -30,7 +30,7 @@ builderPackage = Constants.FABRIC8_KUBERNETES_API ) @JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({"disk", "cpuUtilization", "inboundNetwork", "outboundNetwork", "overrides"}) +@JsonPropertyOrder({"disk", "cpuUtilization", "cpu", "inboundNetwork", "outboundNetwork", "overrides"}) @EqualsAndHashCode public class BrokerCapacity implements UnknownPropertyPreserving, Serializable { @@ -38,6 +38,7 @@ public class BrokerCapacity implements UnknownPropertyPreserving, Serializable { private String disk; private Integer cpuUtilization; + private String cpu; private String inboundNetwork; private String outboundNetwork; private List overrides; @@ -72,6 +73,19 @@ public void setCpuUtilization(Integer cpuUtilization) { this.cpuUtilization = cpuUtilization; } + @JsonInclude(JsonInclude.Include.NON_NULL) + @Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$") + @Description("Broker capacity for CPU resource in cores or millicores. " + + "For example, 1, 1.500, 1500m. " + + "For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu") + public String getCpu() { + return cpu; + } + + public void setCpu(String cpu) { + this.cpu = cpu; + } + @JsonInclude(JsonInclude.Include.NON_NULL) @Pattern("^[0-9]+([KMG]i?)?B/s$") @Description("Broker capacity for inbound network throughput in bytes per second. " + diff --git a/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacityOverride.java b/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacityOverride.java index fd0f0bc2ff9..4a72b3cd565 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacityOverride.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/balancing/BrokerCapacityOverride.java @@ -28,12 +28,13 @@ builderPackage = Constants.FABRIC8_KUBERNETES_API ) @JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({"brokers", "inboundNetwork", "outboundNetwork"}) +@JsonPropertyOrder({"brokers", "cpu", "inboundNetwork", "outboundNetwork"}) @EqualsAndHashCode public class BrokerCapacityOverride implements UnknownPropertyPreserving, Serializable { private static final long serialVersionUID = 1L; private List brokers; + private String cpu; private String inboundNetwork; private String outboundNetwork; private Map additionalProperties = new HashMap<>(0); @@ -50,6 +51,19 @@ public void setBrokers(List brokers) { this.brokers = brokers; } + @JsonInclude(JsonInclude.Include.NON_NULL) + @Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$") + @Description("Broker capacity for CPU resource in cores or millicores. " + + "For example, 1, 1.500, 1500m. " + + "For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu") + public String getCpu() { + return cpu; + } + + public void setCpu(String cpu) { + this.cpu = cpu; + } + @JsonInclude(JsonInclude.Include.NON_NULL) @Pattern("^[0-9]+([KMG]i?)?B/s$") @Description("Broker capacity for inbound network throughput in bytes per second. " + diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java index 4f8740ba8c8..b08e4e418bf 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/BrokerCapacity.java @@ -8,21 +8,20 @@ public class BrokerCapacity { // CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry. // CC designates the id of this default broker entry as "-1". public static final int DEFAULT_BROKER_ID = -1; - public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in percentage, network throughput is in KiB."; - - public static final String DEFAULT_CPU_UTILIZATION_CAPACITY = "100"; // as a percentage (0-100) + public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB."; + public static final String DEFAULT_CPU_CORE_CAPACITY = "1"; public static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000"; public static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000"; private int id; - private String cpu; + private CpuCapacity cpu; private DiskCapacity disk; private String inboundNetwork; private String outboundNetwork; private final String doc; - public BrokerCapacity(int brokerId, String cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) { + public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) { this.id = brokerId; this.cpu = cpu; this.disk = disk; @@ -35,7 +34,7 @@ public Integer getId() { return id; } - public String getCpu() { + public CpuCapacity getCpu() { return cpu; } @@ -59,7 +58,7 @@ public void setId(Integer id) { this.id = id; } - public void setCpu(String cpu) { + public void setCpu(CpuCapacity cpu) { this.cpu = cpu; } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java index 0e5bdb6d26f..cf41e4794b9 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/Capacity.java @@ -4,6 +4,8 @@ */ package io.strimzi.operator.cluster.model.cruisecontrol; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.strimzi.api.kafka.model.KafkaSpec; import io.strimzi.api.kafka.model.balancing.BrokerCapacityOverride; import io.strimzi.api.kafka.model.storage.EphemeralStorage; @@ -14,12 +16,14 @@ import io.strimzi.operator.cluster.model.AbstractModel; import io.strimzi.operator.cluster.model.StorageUtils; import io.strimzi.operator.cluster.model.VolumeUtils; +import io.strimzi.operator.cluster.operator.resource.Quantities; import io.strimzi.operator.common.ReconciliationLogger; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -46,12 +50,15 @@ * deleteClaim: false * cruiseControl: * brokerCapacity: + * cpu: "1" * inboundNetwork: 10000KB/s * outboundNetwork: 10000KB/s * overrides: * - brokers: [0] + * cpu: "2.345" * outboundNetwork: 40000KB/s * - brokers: [1, 2] + * cpu: 4000m * inboundNetwork: 60000KB/s * outboundNetwork: 20000KB/s * @@ -66,7 +73,7 @@ * "/var/lib/kafka0/kafka-log-1": "100000", * "/var/lib/kafka1/kafka-log-1": "200000" * }, - * "CPU": "100", + * "CPU": {"num.cores": "1"}, * "NW_IN": "10000", * "NW_OUT": "10000" * }, @@ -79,7 +86,7 @@ * "/var/lib/kafka0/kafka-log0": "100000", * "/var/lib/kafka1/kafka-log0": "200000" * }, - * "CPU": "100", + * "CPU": {"num.cores": "2.345"}, * "NW_IN": "10000", * "NW_OUT": "40000" * }, @@ -92,7 +99,7 @@ * "/var/lib/kafka0/kafka-log1": "100000", * "/var/lib/kafka1/kafka-log1": "200000" * }, - * "CPU": "100", + * "CPU": {"num.cores": "4"}, * "NW_IN": "60000", * "NW_OUT": "20000" * }, @@ -104,7 +111,7 @@ * "/var/lib/kafka0/kafka-log2": "100000", * "/var/lib/kafka1/kafka-log2": "200000" * }, - * "CPU": "100", + * "CPU": {"num.cores": "4"}, * "NW_IN": "60000", * "NW_OUT": "20000" * }, @@ -122,26 +129,80 @@ public class Capacity { private static final String BROKER_ID_KEY = "brokerId"; public static final String CAPACITY_KEY = "capacity"; public static final String DISK_KEY = "DISK"; - private static final String CPU_KEY = "CPU"; + public static final String CPU_KEY = "CPU"; private static final String INBOUND_NETWORK_KEY = "NW_IN"; private static final String OUTBOUND_NETWORK_KEY = "NW_OUT"; private static final String DOC_KEY = "doc"; - private final int replicas; + public static final String RESOURCE_TYPE = "cpu"; + private final Storage storage; - public Capacity(KafkaSpec spec, Storage storage) { - io.strimzi.api.kafka.model.balancing.BrokerCapacity bc = spec.getCruiseControl().getBrokerCapacity(); + private enum ResourceRequirementType { + REQUEST, + LIMIT; + private Quantity getQuantity(ResourceRequirements resources) { + Map resourceRequirement; + switch (this) { + case REQUEST: + resourceRequirement = resources.getRequests(); + break; + case LIMIT: + resourceRequirement = resources.getLimits(); + break; + default: + resourceRequirement = null; + } + if (resourceRequirement != null) { + return resourceRequirement.get(RESOURCE_TYPE); + } + return null; + } + } + + public Capacity(KafkaSpec spec, Storage storage) { this.capacityEntries = new TreeMap<>(); - this.replicas = spec.getKafka().getReplicas(); this.storage = storage; - processCapacityEntries(bc); + processCapacityEntries(spec); } - public static String processCpu() { - return BrokerCapacity.DEFAULT_CPU_UTILIZATION_CAPACITY; + private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) { + if (resources != null) { + Quantity quantity = requirementType.getQuantity(resources); + if (quantity != null) { + return Quantities.parseCpuAsMilliCpus(quantity.toString()); + } + } + return null; + } + + private static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) { + Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST); + Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT); + + if (limit != null) { + if (request == null || limit.intValue() == request.intValue()) { + return CpuCapacity.milliCpuToCpu(limit); + } + } + return null; + } + + public static CpuCapacity processCpu(io.strimzi.api.kafka.model.balancing.BrokerCapacity bc, BrokerCapacityOverride override, String cpuBasedOnRequirements) { + if (cpuBasedOnRequirements != null) { + if ((override != null && override.getCpu() != null) || (bc != null && bc.getCpu() != null)) { + LOGGER.warnOp("Ignoring CPU capacity override settings since they are automatically set to resource limits"); + } + return new CpuCapacity(cpuBasedOnRequirements); + } else if (override != null && override.getCpu() != null) { + return new CpuCapacity(override.getCpu()); + } else if (bc != null && bc.getCpu() != null) { + return new CpuCapacity(bc.getCpu()); + } else { + return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY); + } } public static DiskCapacity processDisk(Storage storage, int brokerId) { @@ -243,8 +304,12 @@ public static String getThroughputInKiB(String throughput) { return String.valueOf(StorageUtils.convertTo(size, "Ki")); } - private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity) { - String cpu = processCpu(); + private void processCapacityEntries(KafkaSpec spec) { + io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity = spec.getCruiseControl().getBrokerCapacity(); + String cpuBasedOnRequirements = getCpuBasedOnRequirements(spec.getKafka().getResources()); + int replicas = spec.getKafka().getReplicas(); + + CpuCapacity cpu = processCpu(brokerCapacity, null, cpuBasedOnRequirements); DiskCapacity disk = processDisk(storage, BrokerCapacity.DEFAULT_BROKER_ID); String inboundNetwork = processInboundNetwork(brokerCapacity, null); String outboundNetwork = processOutboundNetwork(brokerCapacity, null); @@ -276,6 +341,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC } else { for (BrokerCapacityOverride override : overrides) { List ids = override.getBrokers(); + cpu = processCpu(brokerCapacity, override, cpuBasedOnRequirements); inboundNetwork = processInboundNetwork(brokerCapacity, override); outboundNetwork = processOutboundNetwork(brokerCapacity, override); for (int id : ids) { @@ -285,6 +351,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC if (capacityEntries.containsKey(id)) { if (overrideIds.add(id)) { BrokerCapacity brokerCapacityEntry = capacityEntries.get(id); + brokerCapacityEntry.setCpu(cpu); brokerCapacityEntry.setInboundNetwork(inboundNetwork); brokerCapacityEntry.setOutboundNetwork(outboundNetwork); } else { @@ -314,7 +381,7 @@ private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) { .put(BROKER_ID_KEY, brokerCapacity.getId()) .put(CAPACITY_KEY, new JsonObject() .put(DISK_KEY, brokerCapacity.getDisk().getJson()) - .put(CPU_KEY, brokerCapacity.getCpu()) + .put(CPU_KEY, brokerCapacity.getCpu().getJson()) .put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork()) .put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork()) ) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java new file mode 100644 index 00000000000..f97b9f8609a --- /dev/null +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/cruisecontrol/CpuCapacity.java @@ -0,0 +1,30 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.model.cruisecontrol; + +import io.strimzi.operator.cluster.operator.resource.Quantities; +import io.vertx.core.json.JsonObject; + +public class CpuCapacity { + private static final String CORES_KEY = "num.cores"; + + private String cores; + + public CpuCapacity(String cores) { + this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores)); + } + + public static String milliCpuToCpu(int milliCPU) { + return String.valueOf(milliCPU / 1000.0); + } + + public JsonObject getJson() { + return new JsonObject().put(CORES_KEY, this.cores); + } + + public String toString() { + return this.getJson().toString(); + } +} diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/Quantities.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/Quantities.java index 6dc02c7bd54..262d69fcca9 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/Quantities.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/Quantities.java @@ -4,7 +4,7 @@ */ package io.strimzi.operator.cluster.operator.resource; -class Quantities { +public class Quantities { private Quantities() { } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java index 6636e4d91b4..9c9bcc4889f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/CruiseControlTest.java @@ -45,8 +45,8 @@ import io.strimzi.api.kafka.model.SystemPropertyBuilder; import io.strimzi.api.kafka.model.storage.EphemeralStorage; import io.strimzi.api.kafka.model.storage.JbodStorage; -import io.strimzi.api.kafka.model.storage.PersistentClaimStorage; -import io.strimzi.api.kafka.model.storage.SingleVolumeStorage; +import io.strimzi.api.kafka.model.storage.JbodStorageBuilder; +import io.strimzi.api.kafka.model.storage.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.storage.Storage; import io.strimzi.api.kafka.model.template.IpFamily; import io.strimzi.api.kafka.model.template.IpFamilyPolicy; @@ -54,6 +54,7 @@ import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.cruisecontrol.BrokerCapacity; import io.strimzi.operator.cluster.model.cruisecontrol.Capacity; +import io.strimzi.operator.cluster.model.cruisecontrol.CpuCapacity; import io.strimzi.operator.cluster.operator.resource.cruisecontrol.CruiseControlConfigurationParameters; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.Labels; @@ -210,7 +211,10 @@ private static boolean isJBOD(Map brokerCapacity) { @ParallelTest public void testBrokerCapacities() throws JsonProcessingException { // Test user defined capacities + String userDefinedCpuCapacity = "2575m"; + io.strimzi.api.kafka.model.balancing.BrokerCapacity userDefinedBrokerCapacity = new io.strimzi.api.kafka.model.balancing.BrokerCapacity(); + userDefinedBrokerCapacity.setCpu(userDefinedCpuCapacity); userDefinedBrokerCapacity.setInboundNetwork("50000KB/s"); userDefinedBrokerCapacity.setOutboundNetwork("50000KB/s"); @@ -226,41 +230,40 @@ public void testBrokerCapacities() throws JsonProcessingException { assertThat(getCapacityConfigurationFromEnvVar(resource, ENV_VAR_CRUISE_CONTROL_CAPACITY_CONFIGURATION), is(capacity.toString())); // Test generated disk capacity - JbodStorage jbodStorage = new JbodStorage(); - List volumes = new ArrayList<>(); - - PersistentClaimStorage p1 = new PersistentClaimStorage(); - p1.setId(0); - p1.setSize("50Gi"); - volumes.add(p1); - - PersistentClaimStorage p2 = new PersistentClaimStorage(); - p2.setId(1); - volumes.add(p2); + JbodStorage jbodStorage = new JbodStorageBuilder() + .withVolumes( + new PersistentClaimStorageBuilder().withDeleteClaim(true).withId(0).withSize("50Gi").build(), + new PersistentClaimStorageBuilder().withDeleteClaim(true).withId(1).build() + ).build(); - jbodStorage.setVolumes(volumes); + Map requests = Map.of(Capacity.RESOURCE_TYPE, new Quantity("400m")); + Map limits = Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5")); resource = new KafkaBuilder(ResourceUtils.createKafka(namespace, cluster, replicas, image, healthDelay, healthTimeout)) .editSpec() .editKafka() .withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION) .withStorage(jbodStorage) + .withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build()) .endKafka() .withCruiseControl(cruiseControlSpec) .endSpec() .build(); capacity = new Capacity(resource.getSpec(), jbodStorage); + String cpuCapacity = new CpuCapacity(userDefinedCpuCapacity).toString(); JsonArray brokerEntries = capacity.generateCapacityConfig().getJsonArray(Capacity.CAPACITIES_KEY); for (Object brokerEntry : brokerEntries) { Map brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY).getMap(); assertThat(isJBOD(brokerCapacity), is(true)); + assertThat(brokerCapacity.get(Capacity.CPU_KEY).toString(), is(cpuCapacity)); } assertThat(getCapacityConfigurationFromEnvVar(resource, ENV_VAR_CRUISE_CONTROL_CAPACITY_CONFIGURATION), is(capacity.toString())); // Test capacity overrides + String userDefinedCpuCapacityOverride0 = "1.222"; String inboundNetwork = "50000KB/s"; String inboundNetworkOverride0 = "25000KB/s"; String inboundNetworkOverride1 = "10000KiB/s"; @@ -276,9 +279,11 @@ public void testBrokerCapacities() throws JsonProcessingException { cruiseControlSpec = new CruiseControlSpecBuilder() .withImage(ccImage) .withNewBrokerCapacity() + .withCpu(userDefinedCpuCapacity) .withInboundNetwork(inboundNetwork) .addNewOverride() .withBrokers(overrideList0) + .withCpu(userDefinedCpuCapacityOverride0) .withInboundNetwork(inboundNetworkOverride0) .endOverride() .addNewOverride() @@ -299,7 +304,7 @@ public void testBrokerCapacities() throws JsonProcessingException { } TreeMap capacityEntries = capacity.getCapacityEntries(); - + assertThat(capacityEntries.get(BrokerCapacity.DEFAULT_BROKER_ID).getCpu().toString(), is(new CpuCapacity(userDefinedCpuCapacity).toString())); assertThat(capacityEntries.get(BrokerCapacity.DEFAULT_BROKER_ID).getInboundNetwork(), is(Capacity.getThroughputInKiB(inboundNetwork))); assertThat(capacityEntries.get(BrokerCapacity.DEFAULT_BROKER_ID).getOutboundNetwork(), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); @@ -307,12 +312,40 @@ public void testBrokerCapacities() throws JsonProcessingException { assertThat(capacityEntries.get(broker0).getOutboundNetwork(), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); // When the same broker id is specified in brokers list of multiple overrides, use the value specified in the first override. + assertThat(capacityEntries.get(broker1).getCpu().toString(), is(new CpuCapacity(userDefinedCpuCapacityOverride0).toString())); assertThat(capacityEntries.get(broker1).getInboundNetwork(), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); assertThat(capacityEntries.get(broker1).getOutboundNetwork(), is(BrokerCapacity.DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND)); + assertThat(capacityEntries.get(broker2).getCpu().toString(), is(new CpuCapacity(userDefinedCpuCapacityOverride0).toString())); assertThat(capacityEntries.get(broker2).getInboundNetwork(), is(Capacity.getThroughputInKiB(inboundNetworkOverride0))); assertThat(getCapacityConfigurationFromEnvVar(resource, ENV_VAR_CRUISE_CONTROL_CAPACITY_CONFIGURATION), is(capacity.toString())); + + // Test generated CPU capacity + userDefinedCpuCapacity = "500m"; + + requests = Map.of(Capacity.RESOURCE_TYPE, new Quantity(userDefinedCpuCapacity)); + limits = Map.of(Capacity.RESOURCE_TYPE, new Quantity("0.5")); + + resource = new KafkaBuilder(ResourceUtils.createKafka(namespace, cluster, replicas, image, healthDelay, healthTimeout)) + .editSpec() + .editKafka() + .withVersion(KafkaVersionTestUtils.DEFAULT_KAFKA_VERSION) + .withStorage(jbodStorage) + .withResources(new ResourceRequirementsBuilder().withRequests(requests).withLimits(limits).build()) + .endKafka() + .withCruiseControl(cruiseControlSpec) + .endSpec() + .build(); + + capacity = new Capacity(resource.getSpec(), kafkaStorage); + cpuCapacity = new CpuCapacity(userDefinedCpuCapacity).toString(); + + brokerEntries = capacity.generateCapacityConfig().getJsonArray(Capacity.CAPACITIES_KEY); + for (Object brokerEntry : brokerEntries) { + Map brokerCapacity = ((JsonObject) brokerEntry).getJsonObject(Capacity.CAPACITY_KEY).getMap(); + assertThat(brokerCapacity.get(Capacity.CPU_KEY).toString(), is(cpuCapacity)); + } } @ParallelTest diff --git a/docker-images/artifacts/kafka-thirdparty-libs/3.1.x/pom.xml b/docker-images/artifacts/kafka-thirdparty-libs/3.1.x/pom.xml index 9ef530495ae..cb93582c34b 100644 --- a/docker-images/artifacts/kafka-thirdparty-libs/3.1.x/pom.xml +++ b/docker-images/artifacts/kafka-thirdparty-libs/3.1.x/pom.xml @@ -17,7 +17,7 @@ 0.10.0 - 2.5.89 + 2.5.94 1.4.0 0.2.0 1.2.0 diff --git a/docker-images/artifacts/kafka-thirdparty-libs/3.2.x/pom.xml b/docker-images/artifacts/kafka-thirdparty-libs/3.2.x/pom.xml index eb127cbe238..dd206272b03 100644 --- a/docker-images/artifacts/kafka-thirdparty-libs/3.2.x/pom.xml +++ b/docker-images/artifacts/kafka-thirdparty-libs/3.2.x/pom.xml @@ -17,7 +17,7 @@ 0.10.0 - 2.5.91 + 2.5.94 1.4.0 0.2.0 1.2.0 diff --git a/docker-images/artifacts/kafka-thirdparty-libs/cc/pom.xml b/docker-images/artifacts/kafka-thirdparty-libs/cc/pom.xml index 960a4128451..a0473e4ba46 100644 --- a/docker-images/artifacts/kafka-thirdparty-libs/cc/pom.xml +++ b/docker-images/artifacts/kafka-thirdparty-libs/cc/pom.xml @@ -16,7 +16,7 @@ - 2.5.89 + 2.5.94 diff --git a/documentation/api/io.strimzi.api.kafka.model.CruiseControlSpec.adoc b/documentation/api/io.strimzi.api.kafka.model.CruiseControlSpec.adoc index 3c536df9995..f878e6e2066 100644 --- a/documentation/api/io.strimzi.api.kafka.model.CruiseControlSpec.adoc +++ b/documentation/api/io.strimzi.api.kafka.model.CruiseControlSpec.adoc @@ -140,19 +140,17 @@ You specify capacity limits for Kafka broker resources in the `brokerCapacity` p They are enabled by default and you can change their default values. Capacity limits can be set for the following broker resources: +* `cpu` - CPU resource in millicores or CPU cores (Default: 1) * `inboundNetwork` - Inbound network throughput in byte units per second (Default: 10000KiB/s) * `outboundNetwork` - Outbound network throughput in byte units per second (Default: 10000KiB/s) For network throughput, use an integer value with standard Kubernetes byte units (K, M, G) or their bibyte (power of two) equivalents (Ki, Mi, Gi) per second. NOTE: Disk and CPU capacity limits are automatically generated by Strimzi, so you do not need to set them. - -[NOTE] -==== -In order to guarantee accurate rebalance proposal when using CPU goals, you can set CPU requests equal to CPU limits in `Kafka.spec.kafka.resources`. +In order to guarantee accurate rebalance proposals when using CPU goals, you can set CPU requests equal to CPU limits in `Kafka.spec.kafka.resources`. That way, all CPU resources are reserved upfront and are always available. This configuration allows Cruise Control to properly evaluate the CPU utilization when preparing the rebalance proposals based on CPU goals. -==== +In cases where you cannot set CPU requests equal to CPU limits in `Kafka.spec.kafka.resources`, you can set the CPU capacity manually for the same accuracy. .Example Cruise Control brokerCapacity configuration using bibyte units [source,yaml,subs="attributes+"] @@ -166,6 +164,7 @@ spec: cruiseControl: # ... brokerCapacity: + cpu: "2" inboundNetwork: 10000KiB/s outboundNetwork: 10000KiB/s # ... @@ -174,11 +173,12 @@ spec: [id='property-cruise-control-capacity-overrides-{context}'] === Capacity overrides -Brokers might be running on nodes with heterogeneous network resources. -If that's the case, specify `overrides` that set the network capacity limits for each broker. +Brokers might be running on nodes with heterogeneous network or CPU resources. +If that's the case, specify `overrides` that set the network capacity and CPU limits for each broker. The overrides ensure an accurate rebalance between the brokers. Override capacity limits can be set for the following broker resources: +* `cpu` - CPU resource in millicores or CPU cores (Default: 1) * `inboundNetwork` - Inbound network throughput in byte units per second (Default: 10000KiB/s) * `outboundNetwork` - Outbound network throughput in byte units per second (Default: 10000KiB/s) @@ -194,13 +194,16 @@ spec: cruiseControl: # ... brokerCapacity: + cpu: "1" inboundNetwork: 10000KiB/s outboundNetwork: 10000KiB/s overrides: - brokers: [0] + cpu: "2.755" inboundNetwork: 20000KiB/s outboundNetwork: 20000KiB/s - brokers: [1, 2] + cpu: 3000m inboundNetwork: 30000KiB/s outboundNetwork: 30000KiB/s ---- diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index 682e75f1a83..238a5be3b34 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -1379,6 +1379,8 @@ Used in: xref:type-CruiseControlSpec-{context}[`CruiseControlSpec`] |string |cpuUtilization 1.2+<.