Skip to content

Commit

Permalink
UPSTREAM: <carry>: add support for cpu limits into management workloads
Browse files Browse the repository at this point in the history
Added support to allow workload partitioning to use the CPU limits for a container, to allow the runtime to make better decisions around workload cpu quotas we are passing down the cpu limit as part of the cpulimit value in the annotation. CRI-O will take that information and calculate the quota per node. This should support situations where workloads might have different cpu period overrides assigned.

Updated kubelet for static pods and the admission webhook for regular to support cpu limits.

Updated unit test to reflect changes.

Signed-off-by: ehila <ehila@redhat.com>
  • Loading branch information
eggfoobar committed Apr 2, 2024
1 parent a0beecc commit 2a12f9a
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ func Register(plugins *admission.Plugins) {
})
}

type resourceAnnotation struct {
// CPUShares contains resource annotation value cpushares key
CPUShares uint64 `json:"cpushares,omitempty"`
// CPULimit contains the cpu limit in millicores to be used by the container runtime to calculate
// quota
CPULimit int64 `json:"cpulimit,omitempty"`
}

// managementCPUsOverride presents admission plugin that should replace pod container CPU requests with a new management resource.
// It applies to all pods that:
// 1. are in an allowed namespace
Expand Down Expand Up @@ -252,13 +260,6 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
return nil
}

// we should skip mutation of the pod that has container with both CPU limit and request because once we will remove
// the request, the defaulter will set the request back with the CPU limit value
if podHasBothCPULimitAndRequest(allContainers) {
pod.Annotations[workloadAdmissionWarning] = "skip pod CPUs requests modifications because pod container has both CPU limit and request"
return nil
}

// before we update the pod available under admission attributes, we need to verify that deletion of the CPU request
// will not change the pod QoS class, otherwise skip pod mutation
// 1. Copy the pod
Expand Down Expand Up @@ -360,6 +361,14 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
continue
}

resourceAnno := resourceAnnotation{}

if c.Resources.Limits != nil {
if value, ok := c.Resources.Limits[coreapi.ResourceCPU]; ok {
resourceAnno.CPULimit = value.MilliValue()
}
}

if c.Resources.Requests != nil {
if _, ok := c.Resources.Requests[coreapi.ResourceCPU]; !ok {
continue
Expand All @@ -368,9 +377,20 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
cpuRequest := c.Resources.Requests[coreapi.ResourceCPU]
cpuRequestInMilli := cpuRequest.MilliValue()

cpuShares := cm.MilliCPUToShares(cpuRequestInMilli)
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, cpuShares)
// Casting to uint64, Linux build returns uint64, noop Darwin build returns int64
resourceAnno.CPUShares = uint64(cm.MilliCPUToShares(cpuRequestInMilli))

// This should not error but if something does go wrong we default to string creation of just CPU Shares
// and add a warning annotation
resourceAnnoString, err := json.Marshal(resourceAnno)
if err != nil {
podAnnotations[workloadAdmissionWarning] = fmt.Sprintf("failed to marshal cpu resources, using fallback: err: %s", err.Error())
podAnnotations[cpusharesAnnotationKey] = fmt.Sprintf(`{"%s": %d}`, containerResourcesAnnotationValueKeyCPUShares, resourceAnno.CPUShares)
} else {
podAnnotations[cpusharesAnnotationKey] = string(resourceAnnoString)
}
delete(c.Resources.Requests, coreapi.ResourceCPU)
delete(c.Resources.Limits, coreapi.ResourceCPU)

if c.Resources.Limits == nil {
c.Resources.Limits = coreapi.ResourceList{}
Expand Down Expand Up @@ -569,17 +589,20 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
}

workloadResourceAnnotations := map[string]map[string]int{}
workloadResourceAnnotations := resourceAnnotation{}
hasWorkloadAnnotation := false
for k, v := range pod.Annotations {
if !strings.HasPrefix(k, containerResourcesAnnotationPrefix) {
continue
}
hasWorkloadAnnotation = true

resourceAnnotationValue := map[string]int{}
if err := json.Unmarshal([]byte(v), &resourceAnnotationValue); err != nil {
// Custom decoder to print invalid fields for resources
decoder := json.NewDecoder(strings.NewReader(v))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&workloadResourceAnnotations); err != nil {
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, err.Error()))
}
workloadResourceAnnotations[k] = resourceAnnotationValue
}

containersWorkloadResources := map[string]*coreapi.Container{}
Expand All @@ -596,9 +619,9 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At
}
}

// the pod does not have workload annotation
if len(workloadType) == 0 {
if len(workloadResourceAnnotations) > 0 {
switch {
case len(workloadType) == 0: // the pod does not have workload annotation
if hasWorkloadAnnotation {
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, "the pod without workload annotation can not have resource annotation"))
}

Expand All @@ -609,21 +632,8 @@ func (a *managementCPUsOverride) Validate(ctx context.Context, attr admission.At

allErrs = append(allErrs, field.Invalid(field.NewPath("spec.containers.resources.requests"), c.Resources.Requests, fmt.Sprintf("the pod without workload annotations can not have containers with workload resources %q", resourceName)))
}
} else {
if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) { // pod has workload annotation, but the pod does not have workload annotation
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the pod can not have workload annotation, when the namespace %q does not allow it", ns.Name)))
}

for _, v := range workloadResourceAnnotations {
if len(v) > 1 {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value can not have more than one key"))
}

// the pod should not have any resource annotations with the value that includes keys different from cpushares
if _, ok := v[containerResourcesAnnotationValueKeyCPUShares]; len(v) == 1 && !ok {
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata.annotations"), pod.Annotations, "the pod resource annotation value should have only cpushares key"))
}
}
case !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType): // pod has workload annotation, but the namespace does not allow specified workload
allErrs = append(allErrs, getPodInvalidWorkloadAnnotationError(pod.Annotations, fmt.Sprintf("the namespace %q does not allow the workload type %s", ns.Name, workloadType)))
}

if len(allErrs) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func TestAdmit(t *testing.T) {
expectedCpuRequest: resource.Quantity{},
namespace: testManagedNamespace(),
expectedAnnotations: map[string]string{
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s": 256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
},
nodes: []*corev1.Node{testNodeWithManagementResource()},
Expand Down Expand Up @@ -236,12 +236,14 @@ func TestAdmit(t *testing.T) {
infra: testClusterSNOInfra(),
},
{
name: "should ignore pod when one of pod containers have both CPU limit and request",
name: "should not ignore pod when one of pod containers have both CPU limit and request",
pod: testManagedPod("500m", "250m", "500Mi", ""),
expectedCpuRequest: resource.MustParse("250m"),
expectedCpuRequest: resource.Quantity{},
namespace: testManagedNamespace(),
expectedAnnotations: map[string]string{
workloadAdmissionWarning: fmt.Sprintf("skip pod CPUs requests modifications because pod container has both CPU limit and request"),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "test"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, "initTest"): fmt.Sprintf(`{"%s":256,"cpulimit":500}`, containerResourcesAnnotationValueKeyCPUShares),
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
},
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
Expand All @@ -258,12 +260,12 @@ func TestAdmit(t *testing.T) {
infra: testClusterSNOInfra(),
},
{
name: "should not mutate the pod when at least one node does not have management resources",
name: "should not mutate the pod when cpu partitioning is not set to AllNodes",
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
expectedCpuRequest: resource.MustParse("250m"),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNode()},
infra: testClusterSNOInfra(),
infra: testClusterInfraWithoutWorkloadPartitioning(),
},
{
name: "should return admission error when the cluster does not have any nodes",
Expand Down Expand Up @@ -426,7 +428,7 @@ func TestValidate(t *testing.T) {
),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
expectedError: fmt.Errorf("he pod resource annotation value should have only cpushares key"),
expectedError: fmt.Errorf("json: unknown field \"cpuset\""),
},
{
name: "should return invalid error when the pod does not have workload annotation, but has resource annotation",
Expand Down Expand Up @@ -466,7 +468,7 @@ func TestValidate(t *testing.T) {
),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
expectedError: fmt.Errorf("the pod can not have workload annotation, when the namespace %q does not allow it", "managed-namespace"),
expectedError: fmt.Errorf("the namespace %q does not allow the workload type %s", "managed-namespace", "non-existent"),
},
{
name: "should not return any errors when the pod has workload annotation, but the pod namespace has no annotations",
Expand Down Expand Up @@ -699,9 +701,8 @@ func testClusterSNOInfra() *configv1.Infrastructure {
}
}

func testClusterInfraWithoutTopologyFields() *configv1.Infrastructure {
func testClusterInfraWithoutWorkloadPartitioning() *configv1.Infrastructure {
infra := testClusterSNOInfra()
infra.Status.ControlPlaneTopology = ""
infra.Status.InfrastructureTopology = ""
infra.Status.CPUPartitioning = configv1.CPUPartitioningNone
return infra
}
6 changes: 6 additions & 0 deletions pkg/kubelet/managed/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (

type WorkloadContainerAnnotation struct {
CpuShares uint64 `json:"cpushares"`
CpuLimit int64 `json:"cpulimit,omitempty"`
}

func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation {
Expand Down Expand Up @@ -115,6 +116,10 @@ func updateContainers(workloadName string, pod *v1.Pod) error {
cpuRequestInMilli := cpuRequest.MilliValue()

containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli))
if value, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
containerAnnotation.CpuLimit = value.MilliValue()
}

jsonAnnotation, _ := containerAnnotation.Serialize()
containerNameKey := fmt.Sprintf(ContainerAnnotationFormat, container.Name)

Expand All @@ -125,6 +130,7 @@ func updateContainers(workloadName string, pod *v1.Pod) error {
container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest

delete(container.Resources.Requests, v1.ResourceCPU)
delete(container.Resources.Limits, v1.ResourceCPU)
return nil
}
for idx := range pod.Spec.Containers {
Expand Down
136 changes: 136 additions & 0 deletions pkg/kubelet/managed/managed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,74 @@ func TestStaticPodManaged(t *testing.T) {
"resources.workload.openshift.io/c_3": `{"cpushares":1024}`,
},
},
{
pod: &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
UID: "12345",
Namespace: "mynamespace",
Annotations: map[string]string{
"target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
Image: "test/nginx",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("200m"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
},
},
{
Name: "c2",
Image: "test/image",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
},
},
{
Name: "c_3",
Image: "test/image",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
},
},
},
SecurityContext: &v1.PodSecurityContext{},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
expectedAnnotations: map[string]string{
"target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`,
"resources.workload.openshift.io/c1": `{"cpushares":102,"cpulimit":200}`,
"resources.workload.openshift.io/c2": `{"cpushares":1024}`,
"resources.workload.openshift.io/c_3": `{"cpushares":1024,"cpulimit":1000}`,
},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -247,6 +315,74 @@ func TestStaticPodThrottle(t *testing.T) {
"resources.workload.openshift.io/c_3": `{"cpushares":1024}`,
},
},
{
pod: &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "",
},
ObjectMeta: metav1.ObjectMeta{
Name: "test",
UID: "12345",
Namespace: "mynamespace",
Annotations: map[string]string{
"target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
Image: "test/image",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("200m"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("200m"),
},
},
},
{
Name: "c2",
Image: "test/image",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
Limits: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("2"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("200m"),
},
},
},
{
Name: "c_3",
Image: "test/image",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"),
v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"),
},
},
},
},
SecurityContext: &v1.PodSecurityContext{},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
expectedAnnotations: map[string]string{
"target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`,
"resources.workload.openshift.io/c1": `{"cpushares":102,"cpulimit":200}`,
"resources.workload.openshift.io/c2": `{"cpushares":1024,"cpulimit":2000}`,
"resources.workload.openshift.io/c_3": `{"cpushares":1024}`,
},
},
}

for _, tc := range testCases {
Expand Down

0 comments on commit 2a12f9a

Please sign in to comment.