From 4942ccd9a1c33451d5290d707b6ef555ce8c86a7 Mon Sep 17 00:00:00 2001 From: jichuan <13161840126@163.com> Date: Mon, 5 Jul 2021 13:34:40 +0800 Subject: [PATCH] feat: add elastic quota controller typo: fix controller name and comment test: add elastic quota controller integration test feat: use anonymous import to add scheme --- cmd/controller/app/import_known_versions.go | 21 + cmd/controller/app/server.go | 37 +- pkg/apis/scheduling/scheme/scheme.go | 33 ++ pkg/apis/scheduling/v1alpha1/types.go | 12 +- pkg/capacityscheduling/capacity_scheduling.go | 2 +- pkg/controller/elasticquota.go | 343 +++++++++++++++++ pkg/controller/elasticquota_test.go | 221 +++++++++++ pkg/controller/podgroup.go | 76 ++-- test/integration/base.go | 107 ++++++ .../elasticquota_controller_test.go | 362 ++++++++++++++++++ .../loadVariationRiskBalancing_test.go | 16 + test/integration/targetloadpacking_test.go | 16 + 12 files changed, 1185 insertions(+), 61 deletions(-) create mode 100644 cmd/controller/app/import_known_versions.go create mode 100644 pkg/apis/scheduling/scheme/scheme.go create mode 100644 pkg/controller/elasticquota.go create mode 100644 pkg/controller/elasticquota_test.go create mode 100644 test/integration/elasticquota_controller_test.go diff --git a/cmd/controller/app/import_known_versions.go b/cmd/controller/app/import_known_versions.go new file mode 100644 index 000000000..aa283294a --- /dev/null +++ b/cmd/controller/app/import_known_versions.go @@ -0,0 +1,21 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + _ "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/scheme" +) diff --git a/cmd/controller/app/server.go b/cmd/controller/app/server.go index d2b37983f..bfe44ce75 100644 --- a/cmd/controller/app/server.go +++ b/cmd/controller/app/server.go @@ -18,7 +18,8 @@ package app import ( "context" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/server" "k8s.io/client-go/informers" @@ -29,12 +30,10 @@ import ( "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" - "os" "sigs.k8s.io/scheduler-plugins/pkg/controller" - pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" - pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" - "sigs.k8s.io/scheduler-plugins/pkg/util" + schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" + schedformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" ) func newConfig(kubeconfig, master string, inCluster bool) (*restclient.Config, error) { @@ -62,24 +61,25 @@ func Run(s *ServerRunOptions) error { config.QPS = float32(s.ApiServerQPS) config.Burst = s.ApiServerBurst stopCh := server.SetupSignalHandler() - - pgClient := pgclientset.NewForConfigOrDie(config) + schedClient := schedclientset.NewForConfigOrDie(config) kubeClient := kubernetes.NewForConfigOrDie(config) - pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0) - pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() + schedInformerFactory := schedformers.NewSharedInformerFactory(schedClient, 0) + pgInformer := schedInformerFactory.Scheduling().V1alpha1().PodGroups() + eqInformer := schedInformerFactory.Scheduling().V1alpha1().ElasticQuotas() + + coreInformerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + podInformer := coreInformerFactory.Core().V1().Pods() + pgCtrl := controller.NewPodGroupController(kubeClient, pgInformer, podInformer, schedClient) + eqCtrl := controller.NewElasticQuotaController(kubeClient, eqInformer, podInformer, schedClient) - informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTweakListOptions(func(opt *metav1.ListOptions) { - opt.LabelSelector = util.PodGroupLabel - })) - podInformer := informerFactory.Core().V1().Pods() - ctrl := controller.NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient) - pgInformerFactory.Start(stopCh) - informerFactory.Start(stopCh) run := func(ctx context.Context) { - ctrl.Run(s.Workers, ctx.Done()) + go pgCtrl.Run(s.Workers, ctx.Done()) + go eqCtrl.Run(s.Workers, ctx.Done()) + select {} } - + schedInformerFactory.Start(stopCh) + coreInformerFactory.Start(stopCh) if !s.EnableLeaderElection { run(ctx) } else { @@ -87,7 +87,6 @@ func Run(s *ServerRunOptions) error { if err != nil { return err } - // add a uniquifier so that two processes on the same host don't accidentally both become active id = id + "_" + string(uuid.NewUUID()) diff --git a/pkg/apis/scheduling/scheme/scheme.go b/pkg/apis/scheduling/scheme/scheme.go new file mode 100644 index 000000000..5407de4c7 --- /dev/null +++ b/pkg/apis/scheduling/scheme/scheme.go @@ -0,0 +1,33 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheme + +import ( + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes/scheme" + schedv1alpha1 "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" +) + +func init() { + AddToScheme(scheme.Scheme) +} + +// AddToScheme builds the kubescheduler scheme using all known versions of the kubescheduler api. +func AddToScheme(scheme *runtime.Scheme) { + utilruntime.Must(schedv1alpha1.AddToScheme(scheme)) +} diff --git a/pkg/apis/scheduling/v1alpha1/types.go b/pkg/apis/scheduling/v1alpha1/types.go index 4a0c77ca4..728e126ae 100644 --- a/pkg/apis/scheduling/v1alpha1/types.go +++ b/pkg/apis/scheduling/v1alpha1/types.go @@ -80,21 +80,21 @@ type PodGroupPhase string // These are the valid phase of podGroups. const ( - // PodPending means the pod group has been accepted by the system, but scheduler can not allocate + // PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate // enough resources to it. PodGroupPending PodGroupPhase = "Pending" - // PodRunning means `spec.minMember` pods of PodGroups has been in running phase. + // PodGroupRunning means `spec.minMember` pods of PodGroups has been in running phase. PodGroupRunning PodGroupPhase = "Running" - // PreScheduling means all of pods has been are waiting to be scheduled, enqueue waitingPod + // PodGroupPreScheduling means all of pods has been are waiting to be scheduled, enqueue waitingPod PodGroupPreScheduling PodGroupPhase = "PreScheduling" - // PodRunning means some of pods has been scheduling in running phase but have not reach the `spec. + // PodGroupScheduling means some of pods has been scheduling in running phase but have not reach the `spec. // minMember` pods of PodGroups. PodGroupScheduling PodGroupPhase = "Scheduling" - // PodScheduled means `spec.minMember` pods of PodGroups have been scheduled finished and pods have been in running + // PodGroupScheduled means `spec.minMember` pods of PodGroups have been scheduled finished and pods have been in running // phase. PodGroupScheduled PodGroupPhase = "Scheduled" @@ -102,7 +102,7 @@ const ( // be scheduled, e.g. not enough resource; scheduler will wait for related controller to recover it. PodGroupUnknown PodGroupPhase = "Unknown" - // PodGroupFinish means all of `spec.minMember` pods are successfully. + // PodGroupFinished means all of `spec.minMember` pods are successfully. PodGroupFinished PodGroupPhase = "Finished" // PodGroupFailed means at least one of `spec.minMember` pods is failed. diff --git a/pkg/capacityscheduling/capacity_scheduling.go b/pkg/capacityscheduling/capacity_scheduling.go index ab9dfea1c..ebc81ce7d 100644 --- a/pkg/capacityscheduling/capacity_scheduling.go +++ b/pkg/capacityscheduling/capacity_scheduling.go @@ -72,7 +72,7 @@ func (s *PreFilterState) Clone() framework.StateData { return s } -// ElasticQuotaSnapshot stores the snapshot of elasticQuotas. +// ElasticQuotaSnapshotState stores the snapshot of elasticQuotas. type ElasticQuotaSnapshotState struct { elasticQuotaInfos ElasticQuotaInfos } diff --git a/pkg/controller/elasticquota.go b/pkg/controller/elasticquota.go new file mode 100644 index 000000000..f6e232e86 --- /dev/null +++ b/pkg/controller/elasticquota.go @@ -0,0 +1,343 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + v1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + quota "k8s.io/apiserver/pkg/quota/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + coreinformer "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + corelister "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + kubefeatures "k8s.io/kubernetes/pkg/features" + "sigs.k8s.io/scheduler-plugins/pkg/util" + + schedv1alpha1 "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" + schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" + schedinformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1" + schedlister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1" +) + +// ElasticQuotaController is a controller that process elastic quota using provided Handler interface +type ElasticQuotaController struct { + // schedClient is a clientSet for SchedulingV1alpha1 API group + schedClient schedclientset.Interface + + eqLister schedlister.ElasticQuotaLister + // podLister is lister for pod event and uses to compute namespaced resource used + podLister corelister.PodLister + eqListerSynced cache.InformerSynced + podListerSynced cache.InformerSynced + eqQueue workqueue.RateLimitingInterface + recorder record.EventRecorder +} + +// NewElasticQuotaController returns a new *ElasticQuotaController +func NewElasticQuotaController( + kubeClient kubernetes.Interface, + eqInformer schedinformer.ElasticQuotaInformer, + podInformer coreinformer.PodInformer, + schedClient schedclientset.Interface, + newOpt ...func(ctrl *ElasticQuotaController), +) *ElasticQuotaController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events(v1.NamespaceAll)}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ElasticQuotaController"}) + // set up elastic quota ctrl + ctrl := &ElasticQuotaController{ + schedClient: schedClient, + eqLister: eqInformer.Lister(), + podLister: podInformer.Lister(), + eqListerSynced: eqInformer.Informer().HasSynced, + podListerSynced: podInformer.Informer().HasSynced, + eqQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ElasticQuota"), + recorder: recorder, + } + for _, f := range newOpt { + f(ctrl) + } + klog.V(5).Info("Setting up elastic quota event handlers") + eqInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.eqAdded, + UpdateFunc: ctrl.eqUpdated, + DeleteFunc: ctrl.eqDeleted, + }, + ) + klog.V(5).Info("Setting up pod event handlers") + podInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.podAdded, + UpdateFunc: ctrl.podUpdated, + DeleteFunc: ctrl.podDeleted, + }, + ) + return ctrl +} + +func (ctrl *ElasticQuotaController) Run(workers int, stopCh <-chan struct{}) { + defer runtime.HandleCrash() + defer ctrl.eqQueue.ShutDown() + // Start the informer factories to begin populating the informer caches + klog.Info("Starting Elastic Quota control loop") + // Wait for the caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, ctrl.eqListerSynced, ctrl.podListerSynced); !ok { + klog.Fatalf("Cannot sync caches") + } + klog.Info("Elastic Quota sync finished") + klog.V(5).Infof("Starting %d workers to process elastic quota", workers) + // Launch workers to process elastic quota resources + for i := 0; i < workers; i++ { + go wait.Until(ctrl.worker, time.Second, stopCh) + } + <-stopCh + klog.V(2).Info("Shutting down elastic quota workers") +} + +// WithFakeRecorder will set a fake recorder.It is usually used for unit testing +func WithFakeRecorder(bufferSize int) func(ctrl *ElasticQuotaController) { + return func(ctrl *ElasticQuotaController) { + ctrl.recorder = record.NewFakeRecorder(bufferSize) + } +} + +func (ctrl *ElasticQuotaController) worker() { + for ctrl.processNextWorkItem() { + } +} + +// sync deals with one key off the queue. It returns false when it's time to quit. +func (ctrl *ElasticQuotaController) processNextWorkItem() bool { + keyObj, quit := ctrl.eqQueue.Get() + if quit { + return false + } + defer ctrl.eqQueue.Done(keyObj) + key, ok := keyObj.(string) + if !ok { + ctrl.eqQueue.Forget(keyObj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", keyObj)) + return true + } + if err := ctrl.syncHandler(key); err != nil { + runtime.HandleError(err) + klog.Errorf("error syncing elastic quota %q: %s", key, err.Error()) + return true + } + ctrl.eqQueue.Forget(keyObj) + klog.V(5).Infof("Successfully synced %s", key) + return true +} + +// syncHandler syncs elastic quota and convert status.used +func (ctrl *ElasticQuotaController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + // Get the elastic quota resource with this namespace/name + eq, err := ctrl.eqLister.ElasticQuotas(namespace).Get(name) + if apierrs.IsNotFound(err) { + klog.V(5).Infof("Elastic quota %q has been deleted ", key) + return nil + } + if err != nil { + klog.V(3).Infof("Unable to retrieve elastic quota %q from store: %v", key, err) + return err + } + + klog.V(5).Infof("Try to process elastic quota: %q", key) + used, err := ctrl.computeElasticQuotaUsed(namespace, eq) + if err != nil { + return err + } + // create a usage object that is based on the elastic quota version that will handle updates + // by default, we set used to the current status + newEQ := eq.DeepCopy() + newEQ.Status.Used = used + // Ignore this loop if the usage value has not changed + if apiequality.Semantic.DeepEqual(newEQ.Status, eq.Status) { + return nil + } + patch, err := util.CreateMergePatch(eq, newEQ) + if err != nil { + return err + } + if _, err = ctrl.schedClient.SchedulingV1alpha1().ElasticQuotas(namespace). + Patch(context.TODO(), eq.Name, types.MergePatchType, + patch, metav1.PatchOptions{}); err != nil { + return err + } + ctrl.recorder.Event(eq, v1.EventTypeNormal, "Synced", fmt.Sprintf("Elastic Quota %s synced successfully", key)) + return nil +} + +func (ctrl *ElasticQuotaController) computeElasticQuotaUsed(namespace string, eq *schedv1alpha1.ElasticQuota) (v1.ResourceList, error) { + used := newZeroUsed(eq) + pods, err := ctrl.podLister.Pods(namespace).List(labels.NewSelector()) + if err != nil { + return nil, err + } + for _, p := range pods { + if p.Status.Phase == v1.PodRunning { + used = quota.Add(used, computePodResourceRequest(p)) + } + } + return used, nil +} + +// eqAdded reacts to a ElasticQuota creation +func (ctrl *ElasticQuotaController) eqAdded(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + klog.V(5).Infof("Enqueue new elastic quota key %q", key) + ctrl.eqQueue.AddRateLimited(key) +} + +// eqUpdated reacts to a ElasticQuota update +func (ctrl *ElasticQuotaController) eqUpdated(oldObj, newObj interface{}) { + ctrl.eqAdded(newObj) +} + +// eqDeleted reacts to a ElasticQuota deletion +func (ctrl *ElasticQuotaController) eqDeleted(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + klog.V(5).Infof("Enqueue deleted elastic quota key %q", key) + ctrl.eqQueue.AddRateLimited(key) +} + +// podAdded reacts to a pod creation +func (ctrl *ElasticQuotaController) podAdded(obj interface{}) { + pod := obj.(*v1.Pod) + list, err := ctrl.eqLister.ElasticQuotas(pod.Namespace).List(labels.Everything()) + if err != nil { + runtime.HandleError(err) + return + } + if len(list) == 0 { + return + } + // todo(yuchen-sun) When elastic quota supports multiple instances in a namespace, modify this + ctrl.eqAdded(list[0]) +} + +// podUpdated reacts to a pod update +func (ctrl *ElasticQuotaController) podUpdated(oldObj, newObj interface{}) { + oldPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + if newPod.ResourceVersion == oldPod.ResourceVersion { + return + } + ctrl.podAdded(newObj) +} + +// podDeleted reacts to a pod delete +func (ctrl *ElasticQuotaController) podDeleted(obj interface{}) { + _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + ctrl.podAdded(obj) +} + +// computePodResourceRequest returns a v1.ResourceList that covers the largest +// width in each resource dimension. Because init-containers run sequentially, we collect +// the max in each dimension iteratively. In contrast, we sum the resource vectors for +// regular containers since they run simultaneously. +// +// If Pod Overhead is specified and the feature gate is set, the resources defined for Overhead +// are added to the calculated Resource request sum +// +// Example: +// +// Pod: +// InitContainers +// IC1: +// CPU: 2 +// Memory: 1G +// IC2: +// CPU: 2 +// Memory: 3G +// Containers +// C1: +// CPU: 2 +// Memory: 1G +// C2: +// CPU: 1 +// Memory: 1G +// +// Result: CPU: 3, Memory: 3G +func computePodResourceRequest(pod *v1.Pod) v1.ResourceList { + result := v1.ResourceList{} + for _, container := range pod.Spec.Containers { + result = quota.Add(result, container.Resources.Requests) + } + initRes := v1.ResourceList{} + // take max_resource for init_containers + for _, container := range pod.Spec.InitContainers { + initRes = quota.Max(initRes, container.Resources.Requests) + } + // If Overhead is being utilized, add to the total requests for the pod + if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodOverhead) { + quota.Add(result, pod.Spec.Overhead) + } + // take max_resource for init_containers and containers + return quota.Max(result, initRes) +} + +// newZeroUsed will return the zero value of the union of min and max +func newZeroUsed(eq *schedv1alpha1.ElasticQuota) v1.ResourceList { + minResources := quota.ResourceNames(eq.Spec.Min) + maxResources := quota.ResourceNames(eq.Spec.Max) + res := v1.ResourceList{} + for _, v := range minResources { + res[v] = *resource.NewQuantity(0, resource.DecimalSI) + } + for _, v := range maxResources { + res[v] = *resource.NewQuantity(0, resource.DecimalSI) + } + return res +} diff --git a/pkg/controller/elasticquota_test.go b/pkg/controller/elasticquota_test.go new file mode 100644 index 000000000..7f5b63563 --- /dev/null +++ b/pkg/controller/elasticquota_test.go @@ -0,0 +1,221 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + quota "k8s.io/apiserver/pkg/quota/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller" + + "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" + schedfake "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake" + schedinformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" + testutil "sigs.k8s.io/scheduler-plugins/test/integration" +) + +func TestElasticQuotaController_Run(t *testing.T) { + ctx := context.TODO() + cases := []struct { + name string + elasticQuotas []*v1alpha1.ElasticQuota + pods []*v1.Pod + want []*v1alpha1.ElasticQuota + }{ + { + name: "no init Containers pod", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t1-ns1", "t1-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).GPU(1).Obj()).Obj(), + }, + pods: []*v1.Pod{ + testutil.MakePod("t1-ns1", "pod1").Phase(v1.PodRunning).Container( + testutil.MakeResourceList().CPU(1).Mem(2).GPU(1).Obj()).Obj(), + testutil.MakePod("t1-ns1", "pod2").Phase(v1.PodPending).Container( + testutil.MakeResourceList().CPU(1).Mem(2).GPU(0).Obj()).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t1-ns1", "t1-eq1"). + Used(testutil.MakeResourceList().CPU(1).Mem(2).GPU(1).Obj()).Obj(), + }, + }, + { + name: "have init Containers pod", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t2-ns1", "t2-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).Obj()).Obj(), + }, + + pods: []*v1.Pod{ + // CPU: 2, Mem: 4 + testutil.MakePod("t2-ns1", "pod1").Phase(v1.PodRunning). + Container( + testutil.MakeResourceList().CPU(1).Mem(2).Obj()). + Container( + testutil.MakeResourceList().CPU(1).Mem(2).Obj()).Obj(), + // CPU: 3, Mem: 3 + testutil.MakePod("t2-ns1", "pod2").Phase(v1.PodRunning). + InitContainerRequest( + testutil.MakeResourceList().CPU(2).Mem(1).Obj()). + InitContainerRequest( + testutil.MakeResourceList().CPU(2).Mem(3).Obj()). + Container( + testutil.MakeResourceList().CPU(2).Mem(1).Obj()). + Container( + testutil.MakeResourceList().CPU(1).Mem(1).Obj()).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t2-ns1", "t2-eq1"). + Used(testutil.MakeResourceList().CPU(5).Mem(7).Obj()).Obj(), + }, + }, + { + name: "update pods", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t3-ns1", "t3-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).Obj()).Obj(), + testutil.MakeEQ("t3-ns2", "t3-eq2"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).Obj()).Obj(), + }, + pods: []*v1.Pod{ + // CPU: 2, Mem: 4 + testutil.MakePod("t3-ns1", "pod1").Phase(v1.PodRunning). + Container(testutil.MakeResourceList().CPU(1).Mem(2).GPU(1).Obj()). + Container(testutil.MakeResourceList().CPU(1).Mem(2).Obj()).Obj(), + // CPU: 3, Mem: 3 + testutil.MakePod("t3-ns1", "pod1").Phase(v1.PodPending). + InitContainerRequest(testutil.MakeResourceList().CPU(2).Mem(1).Obj()). + InitContainerRequest(testutil.MakeResourceList().CPU(2).Mem(3).Obj()). + Container(testutil.MakeResourceList().CPU(2).Mem(1).Obj()). + Container(testutil.MakeResourceList().CPU(1).Mem(1).Obj()).Obj(), + // CPU: 4, Mem: 3 + testutil.MakePod("t3-ns2", "pod2").Phase(v1.PodRunning). + InitContainerRequest(testutil.MakeResourceList().CPU(2).Mem(1).Obj()). + InitContainerRequest(testutil.MakeResourceList().CPU(2).Mem(3).Obj()). + Container(testutil.MakeResourceList().CPU(3).Mem(1).Obj()). + Container(testutil.MakeResourceList().CPU(1).Mem(1).Obj()).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t3-ns1", "t3-eq1"). + Used(testutil.MakeResourceList().CPU(0).Mem(0).Obj()).Obj(), + testutil.MakeEQ("t3-ns2", "t3-eq2"). + Used(testutil.MakeResourceList().CPU(4).Mem(3).Obj()).Obj(), + }, + }, + { + name: "min and max have the same fields", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t4-ns1", "t4-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).Obj()).Obj(), + }, + pods: []*v1.Pod{}, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t4-ns1", "t4-eq1"). + Used(testutil.MakeResourceList().CPU(0).Mem(0).Obj()).Obj(), + }, + }, + { + name: "min and max have the different fields", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t5-ns1", "t5-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).GPU(2).Obj()). + Max(testutil.MakeResourceList().CPU(5).Mem(15).Obj()).Obj(), + }, + pods: []*v1.Pod{}, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t5-ns1", "t5-eq1"). + Used(testutil.MakeResourceList().CPU(0).Mem(0).GPU(0).Obj()).Obj(), + }, + }, + { + name: "pod and eq in the different namespaces", + elasticQuotas: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t6-ns1", "t6-eq1"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).GPU(2).Obj()). + Max(testutil.MakeResourceList().CPU(50).Mem(15).Obj()).Obj(), + testutil.MakeEQ("t6-ns2", "t6-eq2"). + Min(testutil.MakeResourceList().CPU(3).Mem(5).GPU(2).Obj()). + Max(testutil.MakeResourceList().CPU(50).Mem(15).Obj()).Obj(), + }, + pods: []*v1.Pod{ + testutil.MakePod("t6-ns3", "pod1").Phase(v1.PodRunning). + Container(testutil.MakeResourceList().CPU(1).Mem(2).GPU(1).Obj()). + Container(testutil.MakeResourceList().CPU(1).Mem(2).Obj()).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + testutil.MakeEQ("t6-ns1", "t6-eq1"). + Used(testutil.MakeResourceList().CPU(0).Mem(0).GPU(0).Obj()).Obj(), + testutil.MakeEQ("t6-ns2", "t6-eq2"). + Used(testutil.MakeResourceList().CPU(0).Mem(0).GPU(0).Obj()).Obj(), + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + schedClient := schedfake.NewSimpleClientset() + for _, v := range c.elasticQuotas { + schedClient.Tracker().Add(v) + } + informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) + pgInformerFactory := schedinformer.NewSharedInformerFactory(schedClient, controller.NoResyncPeriodFunc()) + podInformer := informerFactory.Core().V1().Pods() + eqInformer := pgInformerFactory.Scheduling().V1alpha1().ElasticQuotas() + ctrl := NewElasticQuotaController(kubeClient, eqInformer, podInformer, schedClient, WithFakeRecorder(3)) + + pgInformerFactory.Start(ctx.Done()) + informerFactory.Start(ctx.Done()) + // 0 means not set + for _, p := range c.pods { + kubeClient.Tracker().Add(p) + kubeClient.CoreV1().Pods(p.Namespace).UpdateStatus(ctx, p, metav1.UpdateOptions{}) + } + + go ctrl.Run(1, ctx.Done()) + err := wait.Poll(200*time.Millisecond, 1*time.Second, func() (done bool, err error) { + for _, v := range c.want { + get, err := schedClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if !quota.Equals(get.Status.Used, v.Status.Used) { + return false, fmt.Errorf("want %v, got %v", v.Status.Used, get.Status.Used) + } + } + return true, nil + }) + if err != nil { + klog.Fatal(err) + } + }) + } +} diff --git a/pkg/controller/podgroup.go b/pkg/controller/podgroup.go index 265dd6043..53580dc2e 100644 --- a/pkg/controller/podgroup.go +++ b/pkg/controller/podgroup.go @@ -68,8 +68,8 @@ func NewPodGroupController(client kubernetes.Interface, broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: client.CoreV1().Events(v1.NamespaceAll)}) ctrl := &PodGroupController{ - eventRecorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "Coscheduling"}), - pgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Coscheduling-queue"), + eventRecorder: broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "PodGroupController"}), + pgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "PodGroup"), } pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -93,16 +93,16 @@ func NewPodGroupController(client kubernetes.Interface, func (ctrl *PodGroupController) Run(workers int, stopCh <-chan struct{}) { defer ctrl.pgQueue.ShutDown() - klog.Info("Starting coscheduling") - defer klog.Info("Shutting coscheduling") + klog.Info("Starting Pod Group controller") + defer klog.Info("Shutting Pod Group controller") if !cache.WaitForCacheSync(stopCh, ctrl.pgListerSynced, ctrl.podListerSynced) { klog.Error("Cannot sync caches") return } - klog.Info("Coscheduling sync finished") + klog.Info("Pod Group sync finished") for i := 0; i < workers; i++ { - go wait.Until(ctrl.sync, 0, stopCh) + go wait.Until(ctrl.worker, time.Second, stopCh) } <-stopCh @@ -154,58 +154,63 @@ func (ctrl *PodGroupController) podUpdated(old, new interface{}) { ctrl.podAdded(new) } -// syncPG deals with one key off the queue. It returns false when it's time to quit. -func (ctrl *PodGroupController) sync() { +func (ctrl *PodGroupController) worker() { + for ctrl.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (ctrl *PodGroupController) processNextWorkItem() bool { keyObj, quit := ctrl.pgQueue.Get() if quit { - return + return false } defer ctrl.pgQueue.Done(keyObj) - key := keyObj.(string) - namespace, pgName, err := cache.SplitMetaNamespaceKey(key) - klog.V(4).Infof("Started PG processing %q", pgName) - - // get PG to process - pg, err := ctrl.pgLister.PodGroups(namespace).Get(pgName) - ctx := context.TODO() - if err != nil { - if apierrs.IsNotFound(err) { - pg, err = ctrl.pgClient.SchedulingV1alpha1().PodGroups(namespace).Get(ctx, pgName, metav1.GetOptions{}) - if err != nil && apierrs.IsNotFound(err) { - // PG was deleted in the meantime, ignore. - klog.V(3).Infof("PG %q deleted", pgName) - return - } - } - klog.Errorf("Error getting PodGroup %q: %v", pgName, err) - ctrl.pgQueue.AddRateLimited(keyObj) - return + key, ok := keyObj.(string) + if !ok { + ctrl.pgQueue.Forget(keyObj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", keyObj)) + return true } - ctrl.syncHandler(ctx, pg) + if err := ctrl.syncHandler(key); err != nil { + runtime.HandleError(err) + klog.Errorf("error syncing pod group %q: %s", key, err.Error()) + return true + } + return true } // syncHandle syncs pod group and convert status -func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *schedv1alpha1.PodGroup) { - key, err := cache.MetaNamespaceKeyFunc(pg) +func (ctrl *PodGroupController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(err) - return + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil } - defer func() { if err != nil { ctrl.pgQueue.AddRateLimited(key) return } }() + pg, err := ctrl.pgLister.PodGroups(namespace).Get(name) + if apierrs.IsNotFound(err) { + klog.V(5).Infof("Pod group %q has been deleted ", key) + return nil + } + if err != nil { + klog.V(3).Infof("Unable to retrieve pod group %q from store: %v", key, err) + return err + } pgCopy := pg.DeepCopy() selector := labels.Set(map[string]string{util.PodGroupLabel: pgCopy.Name}).AsSelector() pods, err := ctrl.podLister.List(selector) if err != nil { klog.Errorf("List pods for group %v failed: %v", pgCopy.Name, err) - return + return err } switch pgCopy.Status.Phase { @@ -259,6 +264,7 @@ func (ctrl *PodGroupController) syncHandler(ctx context.Context, pg *schedv1alph if err == nil { ctrl.pgQueue.Forget(pg) } + return err } func (ctrl *PodGroupController) patchPodGroup(old, new *schedv1alpha1.PodGroup) error { diff --git a/test/integration/base.go b/test/integration/base.go index d9698f113..0a545cad3 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -18,6 +18,12 @@ package integration import ( "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + st "k8s.io/kubernetes/pkg/scheduler/testing" + "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" @@ -39,3 +45,104 @@ func podScheduled(c clientset.Interface, podNamespace, podName string) bool { } return true } + +type resourceWrapper struct{ v1.ResourceList } + +func MakeResourceList() *resourceWrapper { + return &resourceWrapper{v1.ResourceList{}} +} + +func (r *resourceWrapper) CPU(val int64) *resourceWrapper { + r.ResourceList[v1.ResourceCPU] = *resource.NewQuantity(val, resource.DecimalSI) + return r +} + +func (r *resourceWrapper) Mem(val int64) *resourceWrapper { + r.ResourceList[v1.ResourceMemory] = *resource.NewQuantity(val, resource.DecimalSI) + return r +} + +func (r *resourceWrapper) GPU(val int64) *resourceWrapper { + r.ResourceList["nvidia.com/gpu"] = *resource.NewQuantity(val, resource.DecimalSI) + return r +} + +func (r *resourceWrapper) Obj() v1.ResourceList { + return r.ResourceList +} + +type podWrapper struct{ *v1.Pod } + +func MakePod(namespace, name string) *podWrapper { + pod := st.MakePod().Namespace(namespace).Name(name).Obj() + + return &podWrapper{pod} +} + +func (p *podWrapper) Phase(phase v1.PodPhase) *podWrapper { + p.Pod.Status.Phase = phase + return p +} + +func (p *podWrapper) Container(request v1.ResourceList) *podWrapper { + p.Pod.Spec.Containers = append(p.Pod.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("con%d", len(p.Pod.Spec.Containers)), + Image: "image", + Resources: v1.ResourceRequirements{ + Requests: request, + }, + }) + return p +} + +func (p *podWrapper) InitContainerRequest(request v1.ResourceList) *podWrapper { + p.Pod.Spec.InitContainers = append(p.Pod.Spec.InitContainers, v1.Container{ + Name: fmt.Sprintf("con%d", len(p.Pod.Spec.Containers)), + Image: "image", + Resources: v1.ResourceRequirements{ + Requests: request, + }, + }) + return p +} + +func (p *podWrapper) Node(name string) *podWrapper { + p.Pod.Spec.NodeName = name + return p +} + +func (p *podWrapper) Obj() *v1.Pod { + return p.Pod +} + +type eqWrapper struct{ *v1alpha1.ElasticQuota } + +func MakeEQ(namespace, name string) *eqWrapper { + eq := &v1alpha1.ElasticQuota{ + TypeMeta: metav1.TypeMeta{Kind: "ElasticQuota", APIVersion: "scheduling.sigs.k8s.io/v1alpha1"}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + return &eqWrapper{eq} +} + +func (e *eqWrapper) Min(min v1.ResourceList) *eqWrapper { + e.ElasticQuota.Spec.Min = min + return e +} + +func (e *eqWrapper) Max(max v1.ResourceList) *eqWrapper { + e.ElasticQuota.Spec.Max = max + return e +} + +func (e *eqWrapper) Used(used v1.ResourceList) *eqWrapper { + e.ElasticQuota.Status.Used = used + return e +} + +func (e *eqWrapper) Obj() *v1alpha1.ElasticQuota { + return e.ElasticQuota +} diff --git a/test/integration/elasticquota_controller_test.go b/test/integration/elasticquota_controller_test.go new file mode 100644 index 000000000..d1ef00920 --- /dev/null +++ b/test/integration/elasticquota_controller_test.go @@ -0,0 +1,362 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "context" + "os" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + quota "k8s.io/apiserver/pkg/quota/v1" + "k8s.io/apiserver/pkg/server" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler" + fwkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + st "k8s.io/kubernetes/pkg/scheduler/testing" + testutil "k8s.io/kubernetes/test/integration/util" + + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling" + "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" + schedv1alpha1 "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" + "sigs.k8s.io/scheduler-plugins/pkg/controller" + "sigs.k8s.io/scheduler-plugins/pkg/coscheduling" + schedclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" + schedformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions" + "sigs.k8s.io/scheduler-plugins/test/util" +) + +func TestElasticController(t *testing.T) { + todo := context.TODO() + ctx, cancelFunc := context.WithCancel(todo) + testCtx := &testutil.TestContext{ + Ctx: ctx, + CancelFn: cancelFunc, + CloseFn: func() {}, + } + registry := fwkruntime.Registry{coscheduling.Name: coscheduling.New} + t.Log("create apiserver") + _, config := util.StartApi(t, todo.Done()) + + config.ContentType = "application/json" + + apiExtensionClient, err := apiextensionsclient.NewForConfig(config) + if err != nil { + t.Fatal(err) + } + + kubeConfigPath := util.BuildKubeConfigFile(config) + if len(kubeConfigPath) == 0 { + t.Fatal("Build KubeConfigFile failed") + } + defer os.RemoveAll(kubeConfigPath) + + t.Log("create crd") + if _, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, makeElasticQuotaCRD(), metav1.CreateOptions{}); err != nil { + t.Fatal(err) + } + + cs := kubernetes.NewForConfigOrDie(config) + extClient := schedclientset.NewForConfigOrDie(config) + schedInformerFactory := schedformers.NewSharedInformerFactory(extClient, 0) + eqInformer := schedInformerFactory.Scheduling().V1alpha1().ElasticQuotas() + + coreInformerFactory := informers.NewSharedInformerFactory(cs, 0) + podInformer := coreInformerFactory.Core().V1().Pods() + eqCtrl := controller.NewElasticQuotaController(cs, eqInformer, podInformer, extClient) + runtime.Must(schedv1alpha1.AddToScheme(scheme.Scheme)) + if err = wait.Poll(100*time.Millisecond, 3*time.Second, func() (done bool, err error) { + groupList, _, err := cs.ServerGroupsAndResources() + if err != nil { + return false, nil + } + for _, group := range groupList { + if group.Name == scheduling.GroupName { + return true, nil + } + } + t.Log("waiting for crd api ready") + return false, nil + }); err != nil { + t.Fatalf("Waiting for crd read time out: %v", err) + } + + // start controller + stopCh := server.SetupSignalHandler() + go eqCtrl.Run(1, ctx.Done()) + schedInformerFactory.Start(stopCh) + coreInformerFactory.Start(stopCh) + + testCtx.ClientSet = cs + testCtx = util.InitTestSchedulerWithOptions( + t, + testCtx, + true, + scheduler.WithFrameworkOutOfTreeRegistry(registry), + ) + t.Log("init scheduler success") + defer testutil.CleanupTest(t, testCtx) + + // Create a Node. + nodeName := "fake-node" + node := st.MakeNode().Name(nodeName).Label("node", nodeName).Obj() + node.Status.Allocatable = v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(300, resource.DecimalSI), + v1.ResourceCPU: *resource.NewQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI), + } + node.Status.Capacity = v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(300, resource.DecimalSI), + v1.ResourceCPU: *resource.NewQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI), + } + node, err = cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Node %q: %v", nodeName, err) + } + + for _, ns := range []string{"ns1", "ns2"} { + _, err := cs.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}}, metav1.CreateOptions{}) + if err != nil && !errors.IsAlreadyExists(err) { + t.Fatalf("Failed to integration test ns: %v", err) + } + autoCreate := false + t.Logf("namespaces %+v", ns) + _, err = cs.CoreV1().ServiceAccounts(ns).Create(ctx, &v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: ns}, AutomountServiceAccountToken: &autoCreate}, metav1.CreateOptions{}) + if err != nil && !errors.IsAlreadyExists(err) { + t.Fatalf("Failed to create ns default: %v", err) + } + } + + for _, tt := range []struct { + name string + elasticQuotas []*v1alpha1.ElasticQuota + existingPods []*v1.Pod + used []*v1alpha1.ElasticQuota + incomingPods []*v1.Pod + want []*v1alpha1.ElasticQuota + }{ + { + name: "The status of the pod changes from pending to running", + elasticQuotas: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t1-eq1"). + Min(MakeResourceList().CPU(100).Mem(1000).Obj()). + Max(MakeResourceList().CPU(100).Mem(1000).Obj()).Obj(), + MakeEQ("ns2", "t1-eq2"). + Min(MakeResourceList().CPU(100).Mem(1000).Obj()). + Max(MakeResourceList().CPU(100).Mem(1000).Obj()).Obj(), + }, + existingPods: []*v1.Pod{ + MakePod("ns1", "t1-p1"). + Container(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + MakePod("ns1", "t1-p2"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns1", "t1-p3"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns2", "t1-p4"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + used: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t1-eq1"). + Used(MakeResourceList().CPU(0).Mem(0).Obj()).Obj(), + MakeEQ("ns2", "t1-eq2"). + Used(MakeResourceList().CPU(0).Mem(0).Obj()).Obj(), + }, + incomingPods: []*v1.Pod{ + MakePod("ns1", "t1-p1").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + MakePod("ns1", "t1-p2").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns1", "t1-p3").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns2", "t1-p4").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + + want: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t1-eq1"). + Used(MakeResourceList().CPU(30).Mem(40).Obj()).Obj(), + MakeEQ("ns2", "t1-eq2"). + Used(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + }, + { + name: "The status of the pod changes from running to others", + elasticQuotas: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t2-eq1"). + Min(MakeResourceList().CPU(100).Mem(1000).Obj()). + Max(MakeResourceList().CPU(100).Mem(1000).Obj()).Obj(), + MakeEQ("ns2", "t2-eq2"). + Min(MakeResourceList().CPU(100).Mem(1000).Obj()). + Max(MakeResourceList().CPU(100).Mem(1000).Obj()).Obj(), + }, + existingPods: []*v1.Pod{ + MakePod("ns1", "t2-p1").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + MakePod("ns1", "t2-p2").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns1", "t2-p3").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakePod("ns2", "t2-p4").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + used: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t2-eq1"). + Used(MakeResourceList().CPU(30).Mem(40).Obj()).Obj(), + MakeEQ("ns2", "t2-eq2"). + Used(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + incomingPods: []*v1.Pod{ + MakePod("ns1", "t2-p1").Phase(v1.PodSucceeded).Obj(), + MakePod("ns1", "t2-p3").Phase(v1.PodFailed).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t2-eq1"). + Used(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + MakeEQ("ns2", "t2-eq2"). + Used(MakeResourceList().CPU(10).Mem(10).Obj()).Obj(), + }, + }, + { + name: "Different resource between max and min", + elasticQuotas: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t3-eq1"). + Min(MakeResourceList().Mem(1000).Obj()). + Max(MakeResourceList().CPU(100).Obj()).Obj(), + }, + existingPods: []*v1.Pod{ + MakePod("ns1", "t3-p1"). + Container(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + }, + used: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t3-eq1"). + Used(MakeResourceList().CPU(0).Mem(0).Obj()).Obj(), + }, + incomingPods: []*v1.Pod{ + MakePod("ns1", "t3-p1").Phase(v1.PodRunning).Node("fake-node"). + Container(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + }, + want: []*v1alpha1.ElasticQuota{ + MakeEQ("ns1", "t3-eq1"). + Used(MakeResourceList().CPU(10).Mem(20).Obj()).Obj(), + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + defer cleanupElasticQuotas(ctx, extClient, tt.elasticQuotas) + defer testutil.CleanupPods(cs, t, tt.existingPods) + defer testutil.CleanupPods(cs, t, tt.incomingPods) + // create elastic quota + if err := createElasticQuotas(ctx, extClient, tt.elasticQuotas); err != nil { + t.Fatal(err) + } + + // create now pod and update status + for _, pod := range tt.existingPods { + _, err := cs.CoreV1().Pods(pod.Namespace).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + if pod.Status.Phase == v1.PodRunning { + _, err = cs.CoreV1().Pods(pod.Namespace).UpdateStatus(testCtx.Ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Pod status %q: %v", pod.Name, err) + } + } + } + err = wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { + for _, pod := range tt.incomingPods { + if !podScheduled(cs, pod.Namespace, pod.Name) { + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("%v Waiting existPods created error: %v", tt.name, err.Error()) + } + + err = wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { + for _, v := range tt.used { + eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(context.TODO(), v.Name, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + klog.Errorf("klog error %v", err) + return false, err + } + if !quota.Equals(eq.Status.Used, v.Status.Used) { + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("%v Waiting nowEQUsed error: %v", tt.name, err.Error()) + } + + // update Pods status to check if EQ.used has changed as expected + for _, pod := range tt.incomingPods { + _, err = cs.CoreV1().Pods(pod.Namespace).UpdateStatus(testCtx.Ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Pod status %q: %v", pod.Name, err) + } + } + err = wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { + for _, pod := range tt.incomingPods { + if !podScheduled(cs, pod.Namespace, pod.Name) { + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("%v Waiting nextPods update status error: %v", tt.name, err.Error()) + } + + err = wait.Poll(time.Millisecond*200, 10*time.Second, func() (bool, error) { + for _, v := range tt.want { + eq, err := extClient.SchedulingV1alpha1().ElasticQuotas(v.Namespace).Get(context.TODO(), v.Name, metav1.GetOptions{}) + if err != nil { + // This could be a connection error so we want to retry. + klog.Errorf("klog error %v", err) + return false, err + } + if !quota.Equals(eq.Status.Used, v.Status.Used) { + return false, nil + } + } + return true, nil + }) + if err != nil { + t.Fatalf("%v Waiting nextEQUsed error: %v", tt.name, err.Error()) + } + t.Logf("case %v finished", tt.name) + }) + } +} diff --git a/test/integration/loadVariationRiskBalancing_test.go b/test/integration/loadVariationRiskBalancing_test.go index da8ef583e..fc445ea2c 100644 --- a/test/integration/loadVariationRiskBalancing_test.go +++ b/test/integration/loadVariationRiskBalancing_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package integration import ( diff --git a/test/integration/targetloadpacking_test.go b/test/integration/targetloadpacking_test.go index 49042ffdc..120516c1f 100644 --- a/test/integration/targetloadpacking_test.go +++ b/test/integration/targetloadpacking_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package integration import (