Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CPU overrides for CC capacity config #6892

Merged
merged 11 commits into from Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@
* The `UseStrimziPodSets` feature gate moves to beta stage.
By default, StrimziPodSets are used instead of StatefulSets.
If needed, `UseStrimziPodSets` can be disabled in the feature gates configuration in the Cluster Operator.
* Add CPU capacity overrides for Cruise Control capacity config

## 0.29.0

Expand Down
Expand Up @@ -30,14 +30,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"disk", "cpuUtilization", "inboundNetwork", "outboundNetwork", "overrides"})
@JsonPropertyOrder({"disk", "cpuUtilization", "cpu", "inboundNetwork", "outboundNetwork", "overrides"})
@EqualsAndHashCode
public class BrokerCapacity implements UnknownPropertyPreserving, Serializable {

private static final long serialVersionUID = 1L;

private String disk;
private Integer cpuUtilization;
private String cpu;
private String inboundNetwork;
private String outboundNetwork;
private List<BrokerCapacityOverride> overrides;
Expand Down Expand Up @@ -72,6 +73,19 @@ public void setCpuUtilization(Integer cpuUtilization) {
this.cpuUtilization = cpuUtilization;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$")
@Description("Broker capacity for CPU resource in cores or millicores. " +
"For example, 1, 1.500, 1500m. " +
"For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu")
public String getCpu() {
return cpu;
}

public void setCpu(String cpu) {
this.cpu = cpu;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([KMG]i?)?B/s$")
@Description("Broker capacity for inbound network throughput in bytes per second. " +
Expand Down
Expand Up @@ -28,12 +28,13 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"brokers", "inboundNetwork", "outboundNetwork"})
@JsonPropertyOrder({"brokers", "cpu", "inboundNetwork", "outboundNetwork"})
@EqualsAndHashCode
public class BrokerCapacityOverride implements UnknownPropertyPreserving, Serializable {
private static final long serialVersionUID = 1L;

private List<Integer> brokers;
private String cpu;
private String inboundNetwork;
private String outboundNetwork;
private Map<String, Object> additionalProperties = new HashMap<>(0);
Expand All @@ -50,6 +51,19 @@ public void setBrokers(List<Integer> brokers) {
this.brokers = brokers;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([.][0-9]{0,3}|[m]?)$")
@Description("Broker capacity for CPU resource in cores or millicores. " +
"For example, 1, 1.500, 1500m. " +
"For more information on valid CPU resource units see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu")
public String getCpu() {
return cpu;
}

public void setCpu(String cpu) {
this.cpu = cpu;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Pattern("^[0-9]+([KMG]i?)?B/s$")
@Description("Broker capacity for inbound network throughput in bytes per second. " +
Expand Down
Expand Up @@ -8,21 +8,20 @@ public class BrokerCapacity {
// CC allows specifying a generic "default" broker entry in the capacity configuration to apply to all brokers without a specific broker entry.
// CC designates the id of this default broker entry as "-1".
public static final int DEFAULT_BROKER_ID = -1;
public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in percentage, network throughput is in KiB.";

public static final String DEFAULT_CPU_UTILIZATION_CAPACITY = "100"; // as a percentage (0-100)
public static final String DEFAULT_BROKER_DOC = "This is the default capacity. Capacity unit used for disk is in MiB, cpu is in number of cores, network throughput is in KiB.";
public static final String DEFAULT_CPU_CORE_CAPACITY = "1";
public static final String DEFAULT_DISK_CAPACITY_IN_MIB = "100000";
public static final String DEFAULT_INBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000";
public static final String DEFAULT_OUTBOUND_NETWORK_CAPACITY_IN_KIB_PER_SECOND = "10000";

private int id;
private String cpu;
private CpuCapacity cpu;
private DiskCapacity disk;
private String inboundNetwork;
private String outboundNetwork;
private final String doc;

public BrokerCapacity(int brokerId, String cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) {
public BrokerCapacity(int brokerId, CpuCapacity cpu, DiskCapacity disk, String inboundNetwork, String outboundNetwork) {
this.id = brokerId;
this.cpu = cpu;
this.disk = disk;
Expand All @@ -35,7 +34,7 @@ public Integer getId() {
return id;
}

public String getCpu() {
public CpuCapacity getCpu() {
return cpu;
}

Expand All @@ -59,7 +58,7 @@ public void setId(Integer id) {
this.id = id;
}

public void setCpu(String cpu) {
public void setCpu(CpuCapacity cpu) {
this.cpu = cpu;
}

Expand Down
Expand Up @@ -4,6 +4,8 @@
*/
package io.strimzi.operator.cluster.model.cruisecontrol;

import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.strimzi.api.kafka.model.KafkaSpec;
import io.strimzi.api.kafka.model.balancing.BrokerCapacityOverride;
import io.strimzi.api.kafka.model.storage.EphemeralStorage;
Expand All @@ -14,12 +16,14 @@
import io.strimzi.operator.cluster.model.AbstractModel;
import io.strimzi.operator.cluster.model.StorageUtils;
import io.strimzi.operator.cluster.model.VolumeUtils;
import io.strimzi.operator.cluster.operator.resource.Quantities;
import io.strimzi.operator.common.ReconciliationLogger;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

Expand All @@ -46,12 +50,15 @@
* deleteClaim: false
* cruiseControl:
* brokerCapacity:
* cpu: "1"
* inboundNetwork: 10000KB/s
* outboundNetwork: 10000KB/s
* overrides:
* - brokers: [0]
* cpu: "2.345"
* outboundNetwork: 40000KB/s
* - brokers: [1, 2]
* cpu: 4000m
* inboundNetwork: 60000KB/s
* outboundNetwork: 20000KB/s
*
Expand All @@ -66,7 +73,7 @@
* "/var/lib/kafka0/kafka-log-1": "100000",
* "/var/lib/kafka1/kafka-log-1": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "1"},
* "NW_IN": "10000",
* "NW_OUT": "10000"
* },
Expand All @@ -79,7 +86,7 @@
* "/var/lib/kafka0/kafka-log0": "100000",
* "/var/lib/kafka1/kafka-log0": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "2.345"},
* "NW_IN": "10000",
* "NW_OUT": "40000"
* },
Expand All @@ -92,7 +99,7 @@
* "/var/lib/kafka0/kafka-log1": "100000",
* "/var/lib/kafka1/kafka-log1": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "4"},
* "NW_IN": "60000",
* "NW_OUT": "20000"
* },
Expand All @@ -104,7 +111,7 @@
* "/var/lib/kafka0/kafka-log2": "100000",
* "/var/lib/kafka1/kafka-log2": "200000"
* },
* "CPU": "100",
* "CPU": {"num.cores": "4"},
* "NW_IN": "60000",
* "NW_OUT": "20000"
* },
Expand All @@ -122,26 +129,80 @@ public class Capacity {
private static final String BROKER_ID_KEY = "brokerId";
public static final String CAPACITY_KEY = "capacity";
public static final String DISK_KEY = "DISK";
private static final String CPU_KEY = "CPU";
public static final String CPU_KEY = "CPU";
private static final String INBOUND_NETWORK_KEY = "NW_IN";
private static final String OUTBOUND_NETWORK_KEY = "NW_OUT";
private static final String DOC_KEY = "doc";

private final int replicas;
public static final String RESOURCE_TYPE = "cpu";

private final Storage storage;

public Capacity(KafkaSpec spec, Storage storage) {
io.strimzi.api.kafka.model.balancing.BrokerCapacity bc = spec.getCruiseControl().getBrokerCapacity();
private enum ResourceRequirementType {
REQUEST,
LIMIT;

private Quantity getQuantity(ResourceRequirements resources) {
Map<String, Quantity> resourceRequirement;
switch (this) {
case REQUEST:
resourceRequirement = resources.getRequests();
break;
case LIMIT:
resourceRequirement = resources.getLimits();
break;
default:
resourceRequirement = null;
}
if (resourceRequirement != null) {
return resourceRequirement.get(RESOURCE_TYPE);
}
return null;
}
}

public Capacity(KafkaSpec spec, Storage storage) {
this.capacityEntries = new TreeMap<>();
this.replicas = spec.getKafka().getReplicas();
this.storage = storage;

processCapacityEntries(bc);
processCapacityEntries(spec);
}

public static String processCpu() {
return BrokerCapacity.DEFAULT_CPU_UTILIZATION_CAPACITY;
private static Integer getResourceRequirement(ResourceRequirements resources, ResourceRequirementType requirementType) {
if (resources != null) {
Quantity quantity = requirementType.getQuantity(resources);
if (quantity != null) {
return Quantities.parseCpuAsMilliCpus(quantity.toString());
}
}
return null;
}

private static String getCpuBasedOnRequirements(ResourceRequirements resourceRequirements) {
Integer request = getResourceRequirement(resourceRequirements, ResourceRequirementType.REQUEST);
Integer limit = getResourceRequirement(resourceRequirements, ResourceRequirementType.LIMIT);

if (limit != null) {
if (request == null || limit.intValue() == request.intValue()) {
return CpuCapacity.milliCpuToCpu(limit);
}
}
return null;
}

public static CpuCapacity processCpu(io.strimzi.api.kafka.model.balancing.BrokerCapacity bc, BrokerCapacityOverride override, String cpuBasedOnRequirements) {
if (cpuBasedOnRequirements != null) {
if ((override != null && override.getCpu() != null) || (bc != null && bc.getCpu() != null)) {
LOGGER.warnOp("Ignoring CPU capacity override settings since they are automatically set to resource limits");
}
return new CpuCapacity(cpuBasedOnRequirements);
} else if (override != null && override.getCpu() != null) {
return new CpuCapacity(override.getCpu());
} else if (bc != null && bc.getCpu() != null) {
return new CpuCapacity(bc.getCpu());
} else {
return new CpuCapacity(BrokerCapacity.DEFAULT_CPU_CORE_CAPACITY);
}
}

public static DiskCapacity processDisk(Storage storage, int brokerId) {
Expand Down Expand Up @@ -243,8 +304,12 @@ public static String getThroughputInKiB(String throughput) {
return String.valueOf(StorageUtils.convertTo(size, "Ki"));
}

private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity) {
String cpu = processCpu();
private void processCapacityEntries(KafkaSpec spec) {
io.strimzi.api.kafka.model.balancing.BrokerCapacity brokerCapacity = spec.getCruiseControl().getBrokerCapacity();
String cpuBasedOnRequirements = getCpuBasedOnRequirements(spec.getKafka().getResources());
int replicas = spec.getKafka().getReplicas();

CpuCapacity cpu = processCpu(brokerCapacity, null, cpuBasedOnRequirements);
DiskCapacity disk = processDisk(storage, BrokerCapacity.DEFAULT_BROKER_ID);
String inboundNetwork = processInboundNetwork(brokerCapacity, null);
String outboundNetwork = processOutboundNetwork(brokerCapacity, null);
Expand Down Expand Up @@ -276,6 +341,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC
} else {
for (BrokerCapacityOverride override : overrides) {
List<Integer> ids = override.getBrokers();
cpu = processCpu(brokerCapacity, override, cpuBasedOnRequirements);
inboundNetwork = processInboundNetwork(brokerCapacity, override);
outboundNetwork = processOutboundNetwork(brokerCapacity, override);
for (int id : ids) {
Expand All @@ -285,6 +351,7 @@ private void processCapacityEntries(io.strimzi.api.kafka.model.balancing.BrokerC
if (capacityEntries.containsKey(id)) {
if (overrideIds.add(id)) {
BrokerCapacity brokerCapacityEntry = capacityEntries.get(id);
brokerCapacityEntry.setCpu(cpu);
brokerCapacityEntry.setInboundNetwork(inboundNetwork);
brokerCapacityEntry.setOutboundNetwork(outboundNetwork);
} else {
Expand Down Expand Up @@ -314,7 +381,7 @@ private JsonObject generateBrokerCapacity(BrokerCapacity brokerCapacity) {
.put(BROKER_ID_KEY, brokerCapacity.getId())
.put(CAPACITY_KEY, new JsonObject()
.put(DISK_KEY, brokerCapacity.getDisk().getJson())
.put(CPU_KEY, brokerCapacity.getCpu())
.put(CPU_KEY, brokerCapacity.getCpu().getJson())
.put(INBOUND_NETWORK_KEY, brokerCapacity.getInboundNetwork())
.put(OUTBOUND_NETWORK_KEY, brokerCapacity.getOutboundNetwork())
)
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.model.cruisecontrol;

import io.strimzi.operator.cluster.operator.resource.Quantities;
import io.vertx.core.json.JsonObject;

public class CpuCapacity {
private static final String CORES_KEY = "num.cores";

private String cores;

public CpuCapacity(String cores) {
this.cores = milliCpuToCpu(Quantities.parseCpuAsMilliCpus(cores));
}

public static String milliCpuToCpu(int milliCPU) {
return String.valueOf(milliCPU / 1000.0);
}

public JsonObject getJson() {
return new JsonObject().put(CORES_KEY, this.cores);
}

public String toString() {
return this.getJson().toString();
}
}
Expand Up @@ -4,7 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.resource;

class Quantities {
public class Quantities {
private Quantities() {

}
Expand Down