Skip to content

Commit

Permalink
Add CPU overrides for CC capacity config (#6892)
Browse files Browse the repository at this point in the history
* Add CPU overrides for CC capacity config

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Addressing comments - JS, PP

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Generate heml charts

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Fix CRD property validation + doc example

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Refactoring + change property cpuCores -> cpu

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Update ST

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Fixing examples

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Addressing comments - PM

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Generate helm charts

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Addressing feedback - TC

Signed-off-by: Kyle Liberti <kliberti@redhat.com>

* Update default CC config doc

Signed-off-by: Kyle Liberti <kliberti@redhat.com>
  • Loading branch information
kyguy committed Jun 14, 2022
1 parent 71da982 commit 2627f85
Show file tree
Hide file tree
Showing 16 changed files with 244 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@
* The `UseStrimziPodSets` feature gate moves to beta stage.
By default, StrimziPodSets are used instead of StatefulSets.
If needed, `UseStrimziPodSets` can be disabled in the feature gates configuration in the Cluster Operator.
* Add CPU capacity overrides for Cruise Control capacity config

## 0.29.0

Expand Down
Expand Up @@ -30,14 +30,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"disk", "cpuUtilization", "inboundNetwork", "outboundNetwork", "overrides"})
@JsonPropertyOrder({"disk", "cpuUtilization", "cpu", "inboundNetwork", "outboundNetwork", "overrides"})
@EqualsAndHashCode
public class BrokerCapacity implements UnknownPropertyPreserving, Serializable {

private static final long serialVersionUID = 1L;

private String disk;
private Integer cpuUtilization;
private String cpu;
private String inboundNetwork;
private String outboundNetwork;
private List<BrokerCapacityOverride> overrides;
Expand Down Expand Up @@ -72,6 +73,19 @@ public void setCpuUtilization(Integer cpuUtilization) {
this.cpuUtilization = cpuUtilization;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$")
@Description("Broker capacity for CPU resource in cores or millicores. " +
"For example, 1, 1.500, 1500m. " +
"For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu")
public String getCpu() {
return cpu;
}

public void setCpu(String cpu) {
this.cpu = cpu;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([KMG]i?)?B/s$")
@Description("Broker capacity for inbound network throughput in bytes per second. " +
Expand Down
Expand Up @@ -28,12 +28,13 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"brokers", "inboundNetwork", "outboundNetwork"})
@JsonPropertyOrder({"brokers", "cpu", "inboundNetwork", "outboundNetwork"})
@EqualsAndHashCode
public class BrokerCapacityOverride implements UnknownPropertyPreserving, Serializable {
private static final long serialVersionUID = 1L;

private List<Integer> brokers;
private String cpu;
private String inboundNetwork;
private String outboundNetwork;
private Map<String, Object> additionalProperties = new HashMap<>(0);
Expand All @@ -50,6 +51,19 @@ public void setBrokers(List<Integer> brokers) {
this.brokers = brokers;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$")
@Description("Broker capacity for CPU resource in cores or millicores. " +
"For example, 1, 1.500, 1500m. " +
"For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu")
public String getCpu() {
return cpu;
}

public void setCpu(String cpu) {
this.cpu = cpu;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([KMG]i?)?B/s$")
@Description("Broker capacity for inbound network throughput in bytes per second. " +
Expand Down
Expand Up @@ -8,21 +8,20 @@ public class BrokerCapacity {
// CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry.
// CC designates the id of this default broker entry as "-1".
public static final int DEFAULT_BROKER_ID = -1;
public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in percentage, network throughput is in KiB.";

public static final String DEFAULT_CPU_UTILIZATION_CAPACITY = "100"; // as a percentage (0-100)
public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB.";
public static final String DEFAULT_CPU_CORE_CAPACITY = "1";
public static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000";
public static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000";
public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000";

private int id;
private String cpu;
private CpuCapacity cpu;
private DiskCapacity disk;
private String inboundNetwork;
private String outboundNetwork;
private final String doc;

public BrokerCapacity(int brokerId, String cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) {
public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) {
this.id = brokerId;
this.cpu = cpu;
this.disk = disk;
Expand All @@ -35,7 +34,7 @@ public Integer getId() {
return id;
}

public String getCpu() {
public CpuCapacity getCpu() {
return cpu;
}

Expand All @@ -59,7 +58,7 @@ public void setId(Integer id) {
this.id = id;
}

public void setCpu(String cpu) {
public void setCpu(CpuCapacity cpu) {
this.cpu = cpu;
}

Expand Down
Expand Up @@ -4,6 +4,8 @@
*/
package io.strimzi.operator.cluster.model.cruisecontrol;

import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.strimzi.api.kafka.model.KafkaSpec;
import io.strimzi.api.kafka.model.balancing.BrokerCapacityOverride;
import io.strimzi.api.kafka.model.storage.EphemeralStorage;
Expand All @@ -14,12 +16,14 @@
import io.strimzi.operator.cluster.model.AbstractModel;
import io.strimzi.operator.cluster.model.StorageUtils;
import io.strimzi.operator.cluster.model.VolumeUtils;
import io.strimzi.operator.cluster.operator.resource.Quantities;
import io.strimzi.operator.common.ReconciliationLogger;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

Expand All @@ -46,12 +50,15 @@
* deleteClaim: false
* cruiseControl:
* brokerCapacity:
* cpu: "1"
* inboundNetwork: 10000KB/s
* outboundNetwork: 10000KB/s
* overrides:
* - brokers: [0]
* cpu: "2.345"
* outboundNetwork: 40000KB/s
* - brokers: [1, 2]
* cpu: 4000m
* inboundNetwork: 60000KB/s
* outboundNetwork: 20000KB/s
*
Expand All @@ -66,7 +73,7 @@
* "/var/lib/kafka0/kafka-log-1": "100000",
* "/var/lib/kafka1/kafka-log-1": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "1"},
* "NW_IN": "10000",
* "NW_OUT": "10000"
* },
Expand All @@ -79,7 +86,7 @@
* "/var/lib/kafka0/kafka-log0": "100000",
* "/var/lib/kafka1/kafka-log0": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "2.345"},
* "NW_IN": "10000",
* "NW_OUT": "40000"
* },
Expand All @@ -92,7 +99,7 @@
* "/var/lib/kafka0/kafka-log1": "100000",
* "/var/lib/kafka1/kafka-log1": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "4"},
* "NW_IN": "60000",
* "NW_OUT": "20000"
* },
Expand All @@ -104,7 +111,7 @@
* "/var/lib/kafka0/kafka-log2": "100000",
* "/var/lib/kafka1/kafka-log2": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "4"},
* "NW_IN": "60000",
* "NW_OUT": "20000"
* },
Expand All @@ -122,26 +129,80 @@ public class Capacity {
private static final String BROKER_ID_KEY = "brokerId";
public static final String CAPACITY_KEY = "capacity";
public static final String DISK_KEY = "DISK";
private static final String CPU_KEY = "CPU";
public static final String CPU_KEY = "CPU";
private static final String INBOUND_NETWORK_KEY = "NW_IN";
private static final String OUTBOUND_NETWORK_KEY = "NW_OUT";
private static final String DOC_KEY = "doc";

private final int replicas;
public static final String RESOURCE_TYPE = "cpu";

private final Storage storage;

public Capacity(KafkaSpec spec, Storage storage) {
io.strimzi.api.kafka.model.balancing.BrokerCapacity bc = spec.getCruiseControl().getBrokerCapacity();
private enum ResourceRequirementType {
REQUEST,
LIMIT;

private Quantity getQuantity(ResourceRequirements resources) {
Map<String, Quantity> resourceRequirement;
switch (this) {
case REQUEST:
resourceRequirement = resources.getRequests();
break;
case LIMIT:
resourceRequirement = resources.getLimits();
break;
default:
resourceRequirement = null;
}
if (resourceRequirement != null) {
return resourceRequirement.get(RESOURCE_TYPE);
}
return null;
}
}

public Capacity(KafkaSpec spec, Storage storage) {
this.capacityEntries = new TreeMap<>();
this.replicas = spec.getKafka().getReplicas();
this.storage = storage;

processCapacityEntries(bc);
processCapacityEntries(spec);
}

public static String processCpu() {
return BrokerCapacity.DEFAULT_CPU_UTILIZATION_CAPACITY;
private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) {
if (resources != null) {
Quantity quantity = requirementType.getQuantity(resources);
if (quantity != null) {
return Quantities.parseCpuAsMilliCpus(quantity.toString());
}
}
return null;
}

private static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) {
Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST);
Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT);

if (limit != null) {
if (request == null || limit.intValue() == request.intValue()) {
return CpuCapacity.milliCpuToCpu(limit);
}
}
return null;
}

public static CpuCapacity processCpu(io.strimzi.api.kafka.model.balancing.BrokerCapacity bc, BrokerCapacityOverride override, String cpuBasedOnRequirements) {
if (cpuBasedOnRequirements != null) {
if ((override != null && override.getCpu() != null) || (bc != null && bc.getCpu() != null)) {
LOGGER.warnOp("Ignoring CPU capacity override settings since they are automatically set to resource limits");
}
return new CpuCapacity(cpuBasedOnRequirements);
} else if (override != null && override.getCpu() != null) {
return new CpuCapacity(override.getCpu());
} else if (bc != null && bc.getCpu() != null) {
return new CpuCapacity(bc.getCpu());
} else {
return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY);
}
}

public static DiskCapacity processDisk(Storage storage, int brokerId) {
Expand Down Expand Up @@ -243,8 +304,12 @@ public static String getThroughputInKiB(String throughput) {
return String.valueOf(StorageUtils.convertTo(size, "Ki"));
}

private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity) {
String cpu = processCpu();
private void processCapacityEntries(KafkaSpec spec) {
io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity = spec.getCruiseControl().getBrokerCapacity();
String cpuBasedOnRequirements = getCpuBasedOnRequirements(spec.getKafka().getResources());
int replicas = spec.getKafka().getReplicas();

CpuCapacity cpu = processCpu(brokerCapacity, null, cpuBasedOnRequirements);
DiskCapacity disk = processDisk(storage, BrokerCapacity.DEFAULT_BROKER_ID);
String inboundNetwork = processInboundNetwork(brokerCapacity, null);
String outboundNetwork = processOutboundNetwork(brokerCapacity, null);
Expand Down Expand Up @@ -276,6 +341,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC
} else {
for (BrokerCapacityOverride override : overrides) {
List<Integer> ids = override.getBrokers();
cpu = processCpu(brokerCapacity, override, cpuBasedOnRequirements);
inboundNetwork = processInboundNetwork(brokerCapacity, override);
outboundNetwork = processOutboundNetwork(brokerCapacity, override);
for (int id : ids) {
Expand All @@ -285,6 +351,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC
if (capacityEntries.containsKey(id)) {
if (overrideIds.add(id)) {
BrokerCapacity brokerCapacityEntry = capacityEntries.get(id);
brokerCapacityEntry.setCpu(cpu);
brokerCapacityEntry.setInboundNetwork(inboundNetwork);
brokerCapacityEntry.setOutboundNetwork(outboundNetwork);
} else {
Expand Down Expand Up @@ -314,7 +381,7 @@ private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) {
.put(BROKER_ID_KEY, brokerCapacity.getId())
.put(CAPACITY_KEY, new JsonObject()
.put(DISK_KEY, brokerCapacity.getDisk().getJson())
.put(CPU_KEY, brokerCapacity.getCpu())
.put(CPU_KEY, brokerCapacity.getCpu().getJson())
.put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork())
.put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork())
)
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.model.cruisecontrol;

import io.strimzi.operator.cluster.operator.resource.Quantities;
import io.vertx.core.json.JsonObject;

public class CpuCapacity {
private static final String CORES_KEY = "num.cores";

private String cores;

public CpuCapacity(String cores) {
this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores));
}

public static String milliCpuToCpu(int milliCPU) {
return String.valueOf(milliCPU / 1000.0);
}

public JsonObject getJson() {
return new JsonObject().put(CORES_KEY, this.cores);
}

public String toString() {
return this.getJson().toString();
}
}
Expand Up @@ -4,7 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.resource;

class Quantities {
public class Quantities {
private Quantities() {

}
Expand Down

0 comments on commit 2627f85

Please sign in to comment.