Skip to content

Commit

Permalink
UPSTREAM: <carry>: openshift-kube-apiserver: add kube-apiserver patches
Browse files Browse the repository at this point in the history
pod .spec.nodeName should not override project node selector in
podNodeEnvironment admission plugin

UPSTREAM: <carry>: Update management webhook pod admission logic

Updating the logic for pod admission to allow a pod creation with workload partitioning annotations to be run in a namespace that has no workload allow annoations.

The pod will be stripped of its workload annotations and treated as if it were normal, a warning annoation will be placed to note the behavior on the pod.

Signed-off-by: ehila <ehila@redhat.com>

UPSTREAM: <carry>: add new admission for handling shared cpus

Adding a new mutation plugin that handles the following:

1. In case of `workload.openshift.io/enable-shared-cpus` request, it
   adds an annotation to hint runtime about the request. runtime
   is not aware of extended resources, hence we need the annotation.
2. It validates the pod's QoS class and return an error if it's not a
   guaranteed QoS class
3. It validates that no more than a single resource is being request.
4. It validates that the pod deployed in a namespace that has mixedcpus
   workloads allowed annotation.

For more information see - openshift/enhancements#1396

Signed-off-by: Talor Itzhak <titzhak@redhat.com>

UPSTREAM: <carry>: Add context to ObjectValidator
TODO: add router validation logic to implement ctx add in ObjectValidator

Co-authored-by: Swarup Ghosh <swghosh@redhat.com>
Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
  • Loading branch information
2 people authored and dinhxuanvu committed Apr 15, 2024
1 parent 7642069 commit 7014eed
Show file tree
Hide file tree
Showing 28 changed files with 640 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/mixedcpus"

"github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy"
imagepolicyapiv1 "github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy/apis/imagepolicy/v1"
Expand Down Expand Up @@ -32,6 +33,7 @@ func RegisterOpenshiftKubeAdmissionPlugins(plugins *admission.Plugins) {
ingressadmission.Register(plugins)
managementcpusoverride.Register(plugins)
managednode.Register(plugins)
mixedcpus.Register(plugins)
projectnodeenv.Register(plugins)
quotaclusterresourceoverride.Register(plugins)
quotaclusterresourcequota.Register(plugins)
Expand Down Expand Up @@ -74,6 +76,7 @@ var (
hostassignment.PluginName, // "route.openshift.io/RouteHostAssignment"
csiinlinevolumesecurity.PluginName, // "storage.openshift.io/CSIInlineVolumeSecurity"
managednode.PluginName, // "autoscaling.openshift.io/ManagedNode"
mixedcpus.PluginName, // "autoscaling.openshift.io/MixedCPUs"
}

// openshiftAdmissionPluginsForKubeAfterResourceQuota are the plugins to add after ResourceQuota plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ func (a *managementCPUsOverride) Admit(ctx context.Context, attr admission.Attri
return err
}

if _, found := ns.Annotations[namespaceAllowedAnnotation]; !found && len(workloadType) > 0 {
pod.Annotations[workloadAdmissionWarning] = fmt.Sprintf(
"skipping pod CPUs requests modifications because the %s namespace is not annotated with %s to allow workload partitioning",
ns.GetName(), namespaceAllowedAnnotation)
return nil
}

if !doesNamespaceAllowWorkloadType(ns.Annotations, workloadType) {
return admission.NewForbidden(attr, fmt.Errorf("%s the pod namespace %q does not allow the workload type %s", PluginName, ns.Name, workloadType))
}
Expand Down Expand Up @@ -378,7 +385,7 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
}
}

func isGuaranteed(containers []coreapi.Container) bool {
func IsGuaranteed(containers []coreapi.Container) bool {
for _, c := range containers {
// only memory and CPU resources are relevant to decide pod QoS class
for _, r := range []coreapi.ResourceName{coreapi.ResourceMemory, coreapi.ResourceCPU} {
Expand Down Expand Up @@ -425,7 +432,7 @@ func isBestEffort(containers []coreapi.Container) bool {
}

func getPodQoSClass(containers []coreapi.Container) coreapi.PodQOSClass {
if isGuaranteed(containers) {
if IsGuaranteed(containers) {
return coreapi.PodQOSGuaranteed
}

Expand All @@ -449,10 +456,13 @@ func podHasBothCPULimitAndRequest(containers []coreapi.Container) bool {
return false
}

// doesNamespaceAllowWorkloadType will return false when a workload type does not match any present ones.
func doesNamespaceAllowWorkloadType(annotations map[string]string, workloadType string) bool {
v, found := annotations[namespaceAllowedAnnotation]
// When a namespace contains no annotation for workloads we infer that to mean all workload types are allowed.
// The mutation hook will strip all workload annotation from pods that contain them in that circumstance.
if !found {
return false
return true
}

for _, t := range strings.Split(v, ",") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func TestAdmit(t *testing.T) {
}{
{
name: "should return admission error when the pod namespace does not allow the workload type",
pod: testManagedPod("500m", "250m", "500Mi", "250Mi"),
pod: testManagedPodWithWorkloadAnnotation("500m", "250m", "500Mi", "250Mi", "non-existent"),
expectedCpuRequest: resource.MustParse("250m"),
namespace: testNamespace(),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type management", "namespace"),
expectedError: fmt.Errorf("the pod namespace %q does not allow the workload type non-existent", "managed-namespace"),
},
{
name: "should ignore pods that do not have managed annotation",
Expand Down Expand Up @@ -167,6 +167,25 @@ func TestAdmit(t *testing.T) {
expectedError: fmt.Errorf(`failed to get workload annotation effect: the workload annotation value map["test":"test"] does not have "effect" key`),
infra: testClusterSNOInfra(),
},
{
name: "should return admission warning when the pod has workload annotation but the namespace does not",
pod: testManagedPodWithAnnotations(
"500m",
"250m",
"500Mi",
"250Mi",
map[string]string{
fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement): `{"test": "test"}`,
},
),
expectedCpuRequest: resource.MustParse("250m"),
expectedAnnotations: map[string]string{
workloadAdmissionWarning: "skipping pod CPUs requests modifications because the namespace namespace is not annotated with workload.openshift.io/allowed to allow workload partitioning",
},
namespace: testNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
infra: testClusterSNOInfra(),
},
{
name: "should delete CPU requests and update workload CPU annotations for the burstable pod with managed annotation",
pod: testManagedPod("", "250m", "500Mi", "250Mi"),
Expand Down Expand Up @@ -437,16 +456,28 @@ func TestValidate(t *testing.T) {
expectedError: fmt.Errorf("the pod without workload annotations can not have containers with workload resources %q", "management.workload.openshift.io/cores"),
},
{
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed annotation",
pod: testManagedPod(
name: "should return invalid error when the pod has workload annotation, but the pod namespace does not have allowed workload type",
pod: testManagedPodWithWorkloadAnnotation(
"500m",
"250m",
"500Mi",
"250Mi",
"non-existent",
),
namespace: testNamespace(),
namespace: testManagedNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
expectedError: fmt.Errorf("the pod can not have workload annotation, when the namespace %q does not allow it", "namespace"),
expectedError: fmt.Errorf("the pod can not have workload annotation, when the namespace %q does not allow it", "managed-namespace"),
},
{
name: "should not return any errors when the pod has workload annotation, but the pod namespace has no annotations",
pod: testManagedPod(
"500m",
"250m",
"500Mi",
"250Mi",
),
namespace: testNamespace(),
nodes: []*corev1.Node{testNodeWithManagementResource()},
},
{
name: "should not return any errors when the pod and namespace valid",
Expand Down Expand Up @@ -532,19 +563,12 @@ func testManagedStaticPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest strin
}

func testManagedPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest string) *kapi.Pod {
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)

pod.Annotations = map[string]string{}
for _, c := range pod.Spec.InitContainers {
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
}
for _, c := range pod.Spec.Containers {
cpusetAnnotation := fmt.Sprintf("%s%s", containerResourcesAnnotationPrefix, c.Name)
pod.Annotations[cpusetAnnotation] = `{"cpuset": "0-1"}`
}
return testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest, workloadTypeManagement)
}

managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadTypeManagement)
func testManagedPodWithWorkloadAnnotation(cpuLimit, cpuRequest, memoryLimit, memoryRequest string, workloadType string) *kapi.Pod {
pod := testPod(cpuLimit, cpuRequest, memoryLimit, memoryRequest)
managementWorkloadAnnotation := fmt.Sprintf("%s%s", podWorkloadTargetAnnotationPrefix, workloadType)
pod.Annotations = map[string]string{
managementWorkloadAnnotation: fmt.Sprintf(`{"%s":"%s"}`, podWorkloadAnnotationEffect, workloadEffectPreferredDuringScheduling),
}
Expand Down
152 changes: 152 additions & 0 deletions openshift-kube-apiserver/admission/autoscaling/mixedcpus/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package mixedcpus

import (
"context"
"fmt"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/managementcpusoverride"
coreapi "k8s.io/kubernetes/pkg/apis/core"
)

const (
PluginName = "autoscaling.openshift.io/MixedCPUs"
annotationEnable = "enable"
// containerResourceRequestName is the name of the resource that should be specified under the container's request in the pod spec
containerResourceRequestName = "workload.openshift.io/enable-shared-cpus"
// runtimeAnnotationPrefix is the prefix for the annotation that is expected by the runtime
runtimeAnnotationPrefix = "cpu-shared.crio.io"
// namespaceAllowedAnnotation contains the namespace allowed annotation key
namespaceAllowedAnnotation = "workload.mixedcpus.openshift.io/allowed"
)

var _ = initializer.WantsExternalKubeClientSet(&mixedCPUsMutation{})
var _ = initializer.WantsExternalKubeInformerFactory(&mixedCPUsMutation{})
var _ = admission.MutationInterface(&mixedCPUsMutation{})

type mixedCPUsMutation struct {
*admission.Handler
client kubernetes.Interface
podLister corev1listers.PodLister
podListerSynced func() bool
nsLister corev1listers.NamespaceLister
nsListerSynced func() bool
}

func Register(plugins *admission.Plugins) {
plugins.Register(PluginName,
func(config io.Reader) (admission.Interface, error) {
return &mixedCPUsMutation{
Handler: admission.NewHandler(admission.Create),
}, nil
})
}

// SetExternalKubeClientSet implements the WantsExternalKubeClientSet interface.
func (s *mixedCPUsMutation) SetExternalKubeClientSet(client kubernetes.Interface) {
s.client = client
}

func (s *mixedCPUsMutation) SetExternalKubeInformerFactory(kubeInformers informers.SharedInformerFactory) {
s.podLister = kubeInformers.Core().V1().Pods().Lister()
s.podListerSynced = kubeInformers.Core().V1().Pods().Informer().HasSynced
s.nsLister = kubeInformers.Core().V1().Namespaces().Lister()
s.nsListerSynced = kubeInformers.Core().V1().Namespaces().Informer().HasSynced
}

func (s *mixedCPUsMutation) ValidateInitialization() error {
if s.client == nil {
return fmt.Errorf("%s plugin needs a kubernetes client", PluginName)
}
if s.podLister == nil {
return fmt.Errorf("%s did not get a pod lister", PluginName)
}
if s.podListerSynced == nil {
return fmt.Errorf("%s plugin needs a pod lister synced", PluginName)
}
if s.nsLister == nil {
return fmt.Errorf("%s did not get a namespace lister", PluginName)
}
if s.nsListerSynced == nil {
return fmt.Errorf("%s plugin needs a namespace lister synced", PluginName)
}
return nil
}

func (s *mixedCPUsMutation) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
if attr.GetResource().GroupResource() != coreapi.Resource("pods") || attr.GetSubresource() != "" {
return nil
}

pod, ok := attr.GetObject().(*coreapi.Pod)
if !ok {
return admission.NewForbidden(attr, fmt.Errorf("%s unexpected object: %#v", attr.GetObject(), PluginName))
}

for i := 0; i < len(pod.Spec.Containers); i++ {
cnt := &pod.Spec.Containers[i]
requested, v := isContainerRequestForSharedCPUs(cnt)
if !requested {
continue
}
ns, err := s.getPodNs(ctx, pod.Namespace)
if err != nil {
return fmt.Errorf("%s %w", PluginName, err)
}
_, found := ns.Annotations[namespaceAllowedAnnotation]
if !found {
return admission.NewForbidden(attr, fmt.Errorf("%s pod %s namespace %s is not allowed for %s resource request", PluginName, pod.Name, pod.Namespace, containerResourceRequestName))
}
if !managementcpusoverride.IsGuaranteed(pod.Spec.Containers) {
return admission.NewForbidden(attr, fmt.Errorf("%s %s/%s requests for %q resource but pod is not Guaranteed QoS class", PluginName, pod.Name, cnt.Name, containerResourceRequestName))
}
if v.Value() > 1 {
return admission.NewForbidden(attr, fmt.Errorf("%s %s/%s more than a single %q resource is forbiden, please set the request to 1 or remove it", PluginName, pod.Name, cnt.Name, containerResourceRequestName))
}
addRuntimeAnnotation(pod, cnt.Name)
}
return nil
}

func (s *mixedCPUsMutation) getPodNs(ctx context.Context, nsName string) (*v1.Namespace, error) {
ns, err := s.nsLister.Get(nsName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("%s failed to retrieve namespace %q from lister; %w", PluginName, nsName, err)
}
// cache didn't update fast enough
ns, err = s.client.CoreV1().Namespaces().Get(ctx, nsName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("%s failed to retrieve namespace %q from api server; %w", PluginName, nsName, err)
}
}
return ns, nil
}

func isContainerRequestForSharedCPUs(container *coreapi.Container) (bool, resource.Quantity) {
for rName, quan := range container.Resources.Requests {
if rName == containerResourceRequestName {
return true, quan
}
}
return false, resource.Quantity{}
}

func addRuntimeAnnotation(pod *coreapi.Pod, cntName string) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[getRuntimeAnnotationName(cntName)] = annotationEnable
}

func getRuntimeAnnotationName(cntName string) string {
return fmt.Sprintf("%s/%s", runtimeAnnotationPrefix, cntName)
}

0 comments on commit 7014eed

Please sign in to comment.