Skip to content

Commit

Permalink
UPSTREAM: <carry>: add new admission for handling shared cpus
Browse files Browse the repository at this point in the history
Adding a new mutation plugin that handles the following:

1. In case of `workload.openshift.io/enable-shared-cpus` request, it
   adds an annotation to hint runtime about the request. runtime
   is not aware of extended resources, hence we need the annotation.
2. It validates the pod's QoS class and return an error if it's not a
   guaranteed QoS class
3. It validates that no more than a single resource is being request.
4. It validates that the pod deployed in a namespace that has mixedcpus
   workloads allowed annotation.

For more information see - openshift/enhancements#1396

Signed-off-by: Talor Itzhak <titzhak@redhat.com>
  • Loading branch information
Tal-or authored and soltysh committed Dec 8, 2023
1 parent e6a7d8a commit 370afd4
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 2 deletions.
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/resourcequota"
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/mixedcpus"

"github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy"
imagepolicyapiv1 "github.com/openshift/apiserver-library-go/pkg/admission/imagepolicy/apis/imagepolicy/v1"
Expand Down Expand Up @@ -32,6 +33,7 @@ func RegisterOpenshiftKubeAdmissionPlugins(plugins *admission.Plugins) {
ingressadmission.Register(plugins)
managementcpusoverride.Register(plugins)
managednode.Register(plugins)
mixedcpus.Register(plugins)
projectnodeenv.Register(plugins)
quotaclusterresourceoverride.Register(plugins)
quotaclusterresourcequota.Register(plugins)
Expand Down Expand Up @@ -74,6 +76,7 @@ var (
hostassignment.PluginName, // "route.openshift.io/RouteHostAssignment"
csiinlinevolumesecurity.PluginName, // "storage.openshift.io/CSIInlineVolumeSecurity"
managednode.PluginName, // "autoscaling.openshift.io/ManagedNode"
mixedcpus.PluginName, // "autoscaling.openshift.io/MixedCPUs"
}

// openshiftAdmissionPluginsForKubeAfterResourceQuota are the plugins to add after ResourceQuota plugin
Expand Down
Expand Up @@ -378,7 +378,7 @@ func updateContainersResources(containers []coreapi.Container, podAnnotations ma
}
}

func isGuaranteed(containers []coreapi.Container) bool {
func IsGuaranteed(containers []coreapi.Container) bool {
for _, c := range containers {
// only memory and CPU resources are relevant to decide pod QoS class
for _, r := range []coreapi.ResourceName{coreapi.ResourceMemory, coreapi.ResourceCPU} {
Expand Down Expand Up @@ -425,7 +425,7 @@ func isBestEffort(containers []coreapi.Container) bool {
}

func getPodQoSClass(containers []coreapi.Container) coreapi.PodQOSClass {
if isGuaranteed(containers) {
if IsGuaranteed(containers) {
return coreapi.PodQOSGuaranteed
}

Expand Down
152 changes: 152 additions & 0 deletions openshift-kube-apiserver/admission/autoscaling/mixedcpus/admission.go
@@ -0,0 +1,152 @@
package mixedcpus

import (
"context"
"fmt"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/openshift-kube-apiserver/admission/autoscaling/managementcpusoverride"
coreapi "k8s.io/kubernetes/pkg/apis/core"
)

const (
PluginName = "autoscaling.openshift.io/MixedCPUs"
annotationEnable = "enable"
// containerResourceRequestName is the name of the resource that should be specified under the container's request in the pod spec
containerResourceRequestName = "workload.openshift.io/enable-shared-cpus"
// runtimeAnnotationPrefix is the prefix for the annotation that is expected by the runtime
runtimeAnnotationPrefix = "cpu-shared.crio.io"
// namespaceAllowedAnnotation contains the namespace allowed annotation key
namespaceAllowedAnnotation = "workload.mixedcpus.openshift.io/allowed"
)

var _ = initializer.WantsExternalKubeClientSet(&mixedCPUsMutation{})
var _ = initializer.WantsExternalKubeInformerFactory(&mixedCPUsMutation{})
var _ = admission.MutationInterface(&mixedCPUsMutation{})

type mixedCPUsMutation struct {
*admission.Handler
client kubernetes.Interface
podLister corev1listers.PodLister
podListerSynced func() bool
nsLister corev1listers.NamespaceLister
nsListerSynced func() bool
}

func Register(plugins *admission.Plugins) {
plugins.Register(PluginName,
func(config io.Reader) (admission.Interface, error) {
return &mixedCPUsMutation{
Handler: admission.NewHandler(admission.Create),
}, nil
})
}

// SetExternalKubeClientSet implements the WantsExternalKubeClientSet interface.
func (s *mixedCPUsMutation) SetExternalKubeClientSet(client kubernetes.Interface) {
s.client = client
}

func (s *mixedCPUsMutation) SetExternalKubeInformerFactory(kubeInformers informers.SharedInformerFactory) {
s.podLister = kubeInformers.Core().V1().Pods().Lister()
s.podListerSynced = kubeInformers.Core().V1().Pods().Informer().HasSynced
s.nsLister = kubeInformers.Core().V1().Namespaces().Lister()
s.nsListerSynced = kubeInformers.Core().V1().Namespaces().Informer().HasSynced
}

func (s *mixedCPUsMutation) ValidateInitialization() error {
if s.client == nil {
return fmt.Errorf("%s plugin needs a kubernetes client", PluginName)
}
if s.podLister == nil {
return fmt.Errorf("%s did not get a pod lister", PluginName)
}
if s.podListerSynced == nil {
return fmt.Errorf("%s plugin needs a pod lister synced", PluginName)
}
if s.nsLister == nil {
return fmt.Errorf("%s did not get a namespace lister", PluginName)
}
if s.nsListerSynced == nil {
return fmt.Errorf("%s plugin needs a namespace lister synced", PluginName)
}
return nil
}

func (s *mixedCPUsMutation) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
if attr.GetResource().GroupResource() != coreapi.Resource("pods") || attr.GetSubresource() != "" {
return nil
}

pod, ok := attr.GetObject().(*coreapi.Pod)
if !ok {
return admission.NewForbidden(attr, fmt.Errorf("%s unexpected object: %#v", attr.GetObject(), PluginName))
}

for i := 0; i < len(pod.Spec.Containers); i++ {
cnt := &pod.Spec.Containers[i]
requested, v := isContainerRequestForSharedCPUs(cnt)
if !requested {
continue
}
ns, err := s.getPodNs(ctx, pod.Namespace)
if err != nil {
return fmt.Errorf("%s %w", PluginName, err)
}
_, found := ns.Annotations[namespaceAllowedAnnotation]
if !found {
return admission.NewForbidden(attr, fmt.Errorf("%s pod %s namespace %s is not allowed for %s resource request", PluginName, pod.Name, pod.Namespace, containerResourceRequestName))
}
if !managementcpusoverride.IsGuaranteed(pod.Spec.Containers) {
return admission.NewForbidden(attr, fmt.Errorf("%s %s/%s requests for %q resource but pod is not Guaranteed QoS class", PluginName, pod.Name, cnt.Name, containerResourceRequestName))
}
if v.Value() > 1 {
return admission.NewForbidden(attr, fmt.Errorf("%s %s/%s more than a single %q resource is forbiden, please set the request to 1 or remove it", PluginName, pod.Name, cnt.Name, containerResourceRequestName))
}
addRuntimeAnnotation(pod, cnt.Name)
}
return nil
}

func (s *mixedCPUsMutation) getPodNs(ctx context.Context, nsName string) (*v1.Namespace, error) {
ns, err := s.nsLister.Get(nsName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("%s failed to retrieve namespace %q from lister; %w", PluginName, nsName, err)
}
// cache didn't update fast enough
ns, err = s.client.CoreV1().Namespaces().Get(ctx, nsName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("%s failed to retrieve namespace %q from api server; %w", PluginName, nsName, err)
}
}
return ns, nil
}

func isContainerRequestForSharedCPUs(container *coreapi.Container) (bool, resource.Quantity) {
for rName, quan := range container.Resources.Requests {
if rName == containerResourceRequestName {
return true, quan
}
}
return false, resource.Quantity{}
}

func addRuntimeAnnotation(pod *coreapi.Pod, cntName string) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[getRuntimeAnnotationName(cntName)] = annotationEnable
}

func getRuntimeAnnotationName(cntName string) string {
return fmt.Sprintf("%s/%s", runtimeAnnotationPrefix, cntName)
}

0 comments on commit 370afd4

Please sign in to comment.