From 2b8b01c8998487409016dc0449a321c5635db2b1 Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Thu, 21 Mar 2024 09:30:47 +0000 Subject: [PATCH] Passing parameters to ProvisioningRequest from annotations --- .../provisioning/controller.go | 27 +++++--- .../provisioning/controller_test.go | 43 +++++++++++++ pkg/controller/constants/constants.go | 3 + pkg/controller/jobframework/reconciler.go | 10 +-- .../jobs/job/job_controller_test.go | 32 ++++++++++ .../jobs/jobset/jobset_controller_test.go | 23 +++++-- .../jobs/mxjob/mxjob_controller_test.go | 31 +++++++++- pkg/controller/jobs/pod/pod_controller.go | 8 ++- .../jobs/pod/pod_controller_test.go | 61 ++++++++++++++++++- pkg/util/admissioncheck/admissioncheck.go | 13 ++++ pkg/util/testing/wrappers.go | 9 ++- pkg/util/testingjobs/jobset/wrappers.go | 6 ++ pkg/util/testingjobs/mxjob/wrappers.go | 6 ++ .../provisioning/provisioning_test.go | 23 ++++--- .../jobs/job/job_controller_test.go | 9 ++- .../jobs/pod/pod_controller_test.go | 9 ++- 16 files changed, 277 insertions(+), 36 deletions(-) diff --git a/pkg/controller/admissionchecks/provisioning/controller.go b/pkg/controller/admissionchecks/provisioning/controller.go index 01307d93ea..2208646193 100644 --- a/pkg/controller/admissionchecks/provisioning/controller.go +++ b/pkg/controller/admissionchecks/provisioning/controller.go @@ -25,6 +25,7 @@ import ( "maps" "regexp" "strconv" + "strings" "time" corev1 "k8s.io/api/core/v1" @@ -45,6 +46,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/util/api" @@ -173,7 +175,7 @@ func (c *Controller) activeOrLastPRForChecks(ctx context.Context, wl *kueue.Work // PRs relevant for the admission check if matches(req, wl.Name, checkName) { prc, err := c.helper.ConfigForAdmissionCheck(ctx, checkName) - if err == nil && c.reqIsNeeded(ctx, wl, prc) && requestHasParameters(req, prc) { + if err == nil && c.reqIsNeeded(ctx, wl, prc) && provReqSyncedWithConfig(req, prc) { if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(ctx, currPr, wl.Name, checkName) < getAttempt(ctx, req, wl.Name, checkName) { activeOrLastPRForChecks[checkName] = req } @@ -267,6 +269,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo Parameters: parametersKueueToProvisioning(prc.Spec.Parameters), }, } + passProvReqParams(wl, req) expectedPodSets := requiredPodSets(wl.Spec.PodSets, prc.Spec.ManagedResources) psaMap := slices.ToRefMap(wl.Status.Admission.PodSetAssignments, func(p *kueue.PodSetAssignment) string { return p.Name }) @@ -434,21 +437,31 @@ func parametersKueueToProvisioning(in map[string]kueue.Parameter) map[string]aut return out } -func requestHasParameters(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool { +// provReqSyncedWithConfig checks if the provisioning request has the same provisioningClassName as the provisioning request config +// and contains all the parameters from the config +func provReqSyncedWithConfig(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool { if req.Spec.ProvisioningClassName != prc.Spec.ProvisioningClassName { return false } - if len(req.Spec.Parameters) != len(prc.Spec.Parameters) { - return false - } - for k, vReq := range req.Spec.Parameters { - if vCfg, found := prc.Spec.Parameters[k]; !found || vReq != autoscaling.Parameter(vCfg) { + for k, vCfg := range prc.Spec.Parameters { + if vReq, found := req.Spec.Parameters[k]; !found || string(vReq) != string(vCfg) { return false } } return true } +// passProvReqParams extracts from Workload's annotations ones that should be passed to ProvisioningRequest +func passProvReqParams(wl *kueue.Workload, req *autoscaling.ProvisioningRequest) { + if req.Spec.Parameters == nil { + req.Spec.Parameters = make(map[string]autoscaling.Parameter, 0) + } + for annotation, val := range admissioncheck.FilterProvReqAnnotations(wl.Annotations) { + paramName := strings.TrimPrefix(annotation, constants.ProvReqAnnotationPrefix) + req.Spec.Parameters[paramName] = autoscaling.Parameter(val) + } +} + func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, checks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) error { log := ctrl.LoggerFrom(ctx) checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) diff --git a/pkg/controller/admissionchecks/provisioning/controller_test.go b/pkg/controller/admissionchecks/provisioning/controller_test.go index 13e2898dbf..180e94669c 100644 --- a/pkg/controller/admissionchecks/provisioning/controller_test.go +++ b/pkg/controller/admissionchecks/provisioning/controller_test.go @@ -115,6 +115,8 @@ func TestReconcile(t *testing.T) { }). Obj() + basePodSet := []autoscaling.PodSet{{PodTemplateRef: autoscaling.Reference{Name: "ppt-wl-check1-1-main"}, Count: 1}} + baseWorkloadWithCheck1Ready := baseWorkload.DeepCopy() workload.SetAdmissionCheckState(&baseWorkloadWithCheck1Ready.Status.AdmissionChecks, kueue.AdmissionCheckState{ Name: "check1", @@ -311,6 +313,47 @@ func TestReconcile(t *testing.T) { }, }, }, + "workload with provreq annotation": { + workload: utiltesting.MakeWorkload("wl", TestNamespace). + Annotations(map[string]string{ + "provreq.kueue.x-k8s.io/ValidUntilSeconds": "0", + "invalid-provreq-prefix/Foo1": "Bar1", + "another-invalid-provreq-prefix/Foo2": "Bar2"}). + AdmissionChecks(kueue.AdmissionCheckState{ + Name: "check1", + State: kueue.CheckStatePending}). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, + configs: []kueue.ProvisioningRequestConfig{{ObjectMeta: metav1.ObjectMeta{Name: "config1"}}}, + wantRequests: map[string]*autoscaling.ProvisioningRequest{ + GetProvisioningRequestName("wl", baseCheck.Name, 1): { + ObjectMeta: metav1.ObjectMeta{ + Namespace: TestNamespace, + Name: GetProvisioningRequestName("wl", baseCheck.Name, 1), + OwnerReferences: []metav1.OwnerReference{ + { + Name: "wl", + }, + }, + }, + Spec: autoscaling.ProvisioningRequestSpec{ + Parameters: map[string]autoscaling.Parameter{ + "ValidUntilSeconds": "0", + }, + PodSets: basePodSet, + }, + }, + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: client.ObjectKeyFromObject(baseWorkload), + EventType: corev1.EventTypeNormal, + Reason: "ProvisioningRequestCreated", + Message: `Created ProvisioningRequest: "wl-check1-1"`, + }, + }, + }, "remove unnecessary requests": { workload: baseWorkload.DeepCopy(), checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()}, diff --git a/pkg/controller/constants/constants.go b/pkg/controller/constants/constants.go index e455b4fa5e..0287cc4154 100644 --- a/pkg/controller/constants/constants.go +++ b/pkg/controller/constants/constants.go @@ -36,4 +36,7 @@ const ( // workloadPriorityClass name. // This label is always mutable because it might be useful for the preemption. WorkloadPriorityClassLabel = "kueue.x-k8s.io/priority-class" + + // ProvReqAnnotationPrefix is the prefix for annotations that should be pass to ProvisioningRequest as Parameters. + ProvReqAnnotationPrefix = "provreq.kueue.x-k8s.io/" ) diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 865bc0c446..9f1d44b41c 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/podset" + "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/util/equality" "sigs.k8s.io/kueue/pkg/util/kubeversion" "sigs.k8s.io/kueue/pkg/util/maps" @@ -803,10 +804,11 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o wl := &kueue.Workload{ ObjectMeta: metav1.ObjectMeta{ - Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()), - Namespace: object.GetNamespace(), - Labels: map[string]string{}, - Finalizers: []string{kueue.ResourceInUseFinalizerName}, + Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()), + Namespace: object.GetNamespace(), + Labels: map[string]string{}, + Finalizers: []string{kueue.ResourceInUseFinalizerName}, + Annotations: admissioncheck.FilterProvReqAnnotations(job.Object().GetAnnotations()), }, Spec: kueue.WorkloadSpec{ PodSets: podSets, diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index 15d70dc182..ed2d635754 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -425,6 +425,38 @@ func TestReconciler(t *testing.T) { wantEvents []utiltesting.EventRecord wantErr error }{ + "when workload is created, it has its owner ProvReq annotations": { + job: *baseJobWrapper.Clone(). + SetAnnotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + SetAnnotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper.Clone(). + SetAnnotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + SetAnnotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). + UID("test-uid"). + Suspend(true). + Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns"). + Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + Queue("foo"). + Priority(0). + Labels(map[string]string{controllerconsts.JobUIDLabel: "test-uid"}). + Obj(), + }, + + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "job", Namespace: "ns"}, + EventType: "Normal", + Reason: "CreatedWorkload", + Message: "Created Workload: ns/" + GetWorkloadNameForJob(baseJobWrapper.Name, types.UID("test-uid")), + }, + }, + }, "when workload is admitted the PodSetUpdates are propagated to job": { job: *baseJobWrapper.Clone(). Obj(), diff --git a/pkg/controller/jobs/jobset/jobset_controller_test.go b/pkg/controller/jobs/jobset/jobset_controller_test.go index ae71fc5148..e5245900a1 100644 --- a/pkg/controller/jobs/jobset/jobset_controller_test.go +++ b/pkg/controller/jobs/jobset/jobset_controller_test.go @@ -30,6 +30,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/constants" + controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" @@ -202,7 +203,8 @@ var ( } workloadCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"), + cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"), cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), cmpopts.IgnoreFields(kueue.PodSet{}, "Template"), @@ -223,7 +225,7 @@ func TestReconciler(t *testing.T) { wantWorkloads []kueue.Workload wantErr error }{ - "workload is created with podsets": { + "workload is created with podsets and a ProvReq annotation": { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(true), }, @@ -239,8 +241,12 @@ func TestReconciler(t *testing.T) { Replicas: 2, Completions: 2, Parallelism: 2, - }, - ).Obj(), + }). + Annotations(map[string]string{ + controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val", + "invalid-provreq-prefix/test-annotation-2": "test-val-2", + }). + Obj(), wantJob: testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", @@ -253,10 +259,15 @@ func TestReconciler(t *testing.T) { Replicas: 2, Completions: 2, Parallelism: 2, - }, - ).Obj(), + }). + Annotations(map[string]string{ + controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val", + "invalid-provreq-prefix/test-annotation-2": "test-val-2", + }). + Obj(), wantWorkloads: []kueue.Workload{ *utiltesting.MakeWorkload("jobset", "ns"). + Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}). PodSets( *utiltesting.MakePodSet("replicated-job-1", 1).Obj(), *utiltesting.MakePodSet("replicated-job-2", 4).Obj(), diff --git a/pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go b/pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go index ba8d2a56cb..013f366228 100644 --- a/pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go +++ b/pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go @@ -31,6 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingmxjob "sigs.k8s.io/kueue/pkg/util/testingjobs/mxjob" @@ -301,8 +302,8 @@ var ( } workloadCmpOpts = cmp.Options{ cmpopts.EquateEmpty(), - cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"), - cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"), + cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"), cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), cmpopts.IgnoreFields(kueue.PodSet{}, "Template"), } @@ -334,6 +335,32 @@ func TestReconciler(t *testing.T) { Obj(), }, }, + "workload is created with a ProvReq annotation": { + reconcilerOptions: []jobframework.Option{ + jobframework.WithManageJobsWithoutQueueName(true), + }, + job: testingmxjob.MakeMXJob("mxjob", "ns"). + Annotations(map[string]string{ + controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val", + "invalid-provreq-prefix/test-annotation-2": "test-val-2", + }). + Obj(), + wantJob: testingmxjob.MakeMXJob("mxjob", "ns"). + Annotations(map[string]string{ + controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val", + "invalid-provreq-prefix/test-annotation-2": "test-val-2", + }).Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("mxjob", "ns"). + Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}). + PodSets( + *utiltesting.MakePodSet("scheduler", 1).Obj(), + *utiltesting.MakePodSet("server", 1).Obj(), + *utiltesting.MakePodSet("worker", 1).Obj(), + ). + Obj(), + }, + }, "workload isn't created due to manageJobsWithoutQueueName=false": { reconcilerOptions: []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(false), diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index b6161ecaeb..753695897a 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -51,6 +51,7 @@ import ( controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/podset" + "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/util/kubeversion" "sigs.k8s.io/kueue/pkg/util/parallelize" utilslices "sigs.k8s.io/kueue/pkg/util/slices" @@ -897,9 +898,10 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, wl := &kueue.Workload{ ObjectMeta: metav1.ObjectMeta{ - Namespace: p.pod.GetNamespace(), - Labels: map[string]string{}, - Finalizers: []string{kueue.ResourceInUseFinalizerName}, + Namespace: p.pod.GetNamespace(), + Labels: map[string]string{}, + Finalizers: []string{kueue.ResourceInUseFinalizerName}, + Annotations: admissioncheck.FilterProvReqAnnotations(p.pod.GetAnnotations()), }, Spec: kueue.WorkloadSpec{ QueueName: jobframework.QueueName(p), diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 640d7e273c..0f9191c429 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -491,6 +491,55 @@ func TestReconciler(t *testing.T) { }, }, }, + "when a workload is created for the pod it has its ProvReq annotations copied": { + pods: []corev1.Pod{ + *basePodWrapper. + Clone(). + Name("pod"). + KueueFinalizer(). + KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). + Label("kueue.x-k8s.io/managed", "true"). + Obj(), + }, + wantPods: []corev1.Pod{ + *basePodWrapper. + Clone(). + Name("pod"). + KueueFinalizer(). + KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). + Label("kueue.x-k8s.io/managed", "true"). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("wl", "ns"). + Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}). + Obj(), + }, + workloadCmpOpts: []cmp.Option{ + cmpopts.IgnoreFields(kueue.Workload{}, + "TypeMeta", + "ObjectMeta.Name", + "ObjectMeta.Finalizers", + "ObjectMeta.ResourceVersion", + "ObjectMeta.OwnerReferences", + "ObjectMeta.Labels", + "Spec", + ), + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + }, + wantEvents: []utiltesting.EventRecord{ + { + Key: types.NamespacedName{Name: "pod", Namespace: "ns"}, + EventType: "Normal", + Reason: "CreatedWorkload", + Message: "Created Workload: ns/" + GetWorkloadNameForPod("pod", "test-uid"), + }, + }, + }, "workload is composed and created for the pod group": { pods: []corev1.Pod{ *basePodWrapper. @@ -498,6 +547,8 @@ func TestReconciler(t *testing.T) { Label("kueue.x-k8s.io/managed", "true"). KueueFinalizer(). KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). Group("test-group"). GroupTotalCount("2"). Obj(), @@ -507,6 +558,8 @@ func TestReconciler(t *testing.T) { Label("kueue.x-k8s.io/managed", "true"). KueueFinalizer(). KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). Group("test-group"). GroupTotalCount("2"). Obj(), @@ -517,6 +570,8 @@ func TestReconciler(t *testing.T) { Label("kueue.x-k8s.io/managed", "true"). KueueFinalizer(). KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). Group("test-group"). GroupTotalCount("2"). Obj(), @@ -526,6 +581,8 @@ func TestReconciler(t *testing.T) { Label("kueue.x-k8s.io/managed", "true"). KueueFinalizer(). KueueSchedulingGate(). + Annotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val"). + Annotation("invalid-provreq-prefix/test-annotation-2", "test-val-2"). Group("test-group"). GroupTotalCount("2"). Obj(), @@ -542,7 +599,9 @@ func TestReconciler(t *testing.T) { Priority(0). OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid"). OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid"). - Annotations(map[string]string{"kueue.x-k8s.io/is-group-workload": "true"}). + Annotations(map[string]string{ + "kueue.x-k8s.io/is-group-workload": "true", + controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}). Obj(), }, workloadCmpOpts: defaultWorkloadCmpOpts, diff --git a/pkg/util/admissioncheck/admissioncheck.go b/pkg/util/admissioncheck/admissioncheck.go index acc9bd550e..66eff4da30 100644 --- a/pkg/util/admissioncheck/admissioncheck.go +++ b/pkg/util/admissioncheck/admissioncheck.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strings" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -27,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/apiutil" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" ) var ( @@ -141,3 +143,14 @@ func FilterForController(ctx context.Context, c client.Client, states []kueue.Ad } return retActive, nil } + +// FilterProvReqAnnotations returns annotations containing the Provisioning Request annotation prefix. +func FilterProvReqAnnotations(annotations map[string]string) map[string]string { + res := make(map[string]string) + for k, v := range annotations { + if strings.HasPrefix(k, controllerconsts.ProvReqAnnotationPrefix) { + res[k] = v + } + } + return res +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index e74e18622e..84e672f198 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -87,6 +87,11 @@ func (w *WorkloadWrapper) UID(uid types.UID) *WorkloadWrapper { return w } +func (w *WorkloadWrapper) Name(name string) *WorkloadWrapper { + w.Workload.Name = name + return w +} + func (w *WorkloadWrapper) Finalizers(fin ...string) *WorkloadWrapper { w.ObjectMeta.Finalizers = fin return w @@ -277,8 +282,8 @@ func (w *WorkloadWrapper) appendOwnerReference(gvk schema.GroupVersionKind, name return w } -func (w *WorkloadWrapper) Annotations(kv map[string]string) *WorkloadWrapper { - w.ObjectMeta.Annotations = kv +func (w *WorkloadWrapper) Annotations(annotations map[string]string) *WorkloadWrapper { + w.ObjectMeta.Annotations = annotations return w } diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index 714d15971d..7d0ba623b9 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -109,6 +109,12 @@ func (j *JobSetWrapper) Label(k, v string) *JobSetWrapper { return j } +// Annotation sets annotations to the JobSet. +func (j *JobSetWrapper) Annotations(annotations map[string]string) *JobSetWrapper { + j.ObjectMeta.Annotations = annotations + return j +} + // Queue updates the queue name of the JobSet. func (j *JobSetWrapper) Queue(queue string) *JobSetWrapper { return j.Label(constants.QueueLabel, queue) diff --git a/pkg/util/testingjobs/mxjob/wrappers.go b/pkg/util/testingjobs/mxjob/wrappers.go index 820ccfd272..5e06fe95bc 100644 --- a/pkg/util/testingjobs/mxjob/wrappers.go +++ b/pkg/util/testingjobs/mxjob/wrappers.go @@ -132,6 +132,12 @@ func (j *MXJobWrapper) Queue(queue string) *MXJobWrapper { return j } +// Annotations updates annotations of the job. +func (j *MXJobWrapper) Annotations(annotations map[string]string) *MXJobWrapper { + j.ObjectMeta.Annotations = annotations + return j +} + // Request adds a resource request to the default container. func (j *MXJobWrapper) Request(replicaType kftraining.ReplicaType, r corev1.ResourceName, v string) *MXJobWrapper { j.Spec.MXReplicaSpecs[replicaType].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) diff --git a/test/integration/controller/admissionchecks/provisioning/provisioning_test.go b/test/integration/controller/admissionchecks/provisioning/provisioning_test.go index 2d15677b5c..f9f245c564 100644 --- a/test/integration/controller/admissionchecks/provisioning/provisioning_test.go +++ b/test/integration/controller/admissionchecks/provisioning/provisioning_test.go @@ -119,6 +119,9 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure Image("image"). Obj(), ). + Annotations(map[string]string{ + "provreq.kueue.x-k8s.io/ValidUntilSeconds": "0", + "invalid-provreq-prefix/Foo": "Bar"}). Obj() gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) wlKey = client.ObjectKeyFromObject(wl) @@ -225,8 +228,9 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure ginkgo.By("Checking that the provision requests content", func() { gomega.Expect(createdRequest.Spec.ProvisioningClassName).To(gomega.Equal("provisioning-class")) gomega.Expect(createdRequest.Spec.Parameters).To(gomega.BeComparableTo(map[string]autoscaling.Parameter{ - "p1": "v1", - "p2": "v2", + "p1": "v1", + "p2": "v2", + "ValidUntilSeconds": "0", })) gomega.Expect(createdRequest.Spec.PodSets).To(gomega.HaveLen(2)) @@ -449,8 +453,9 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure ginkgo.By("Checking that the provision requests content", func() { gomega.Expect(createdRequest.Spec.ProvisioningClassName).To(gomega.Equal("provisioning-class")) gomega.Expect(createdRequest.Spec.Parameters).To(gomega.BeComparableTo(map[string]autoscaling.Parameter{ - "p1": "v1", - "p2": "v2", + "p1": "v1", + "p2": "v2", + "ValidUntilSeconds": "0", })) }) @@ -477,8 +482,9 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure g.Expect(err).To(gomega.Succeed()) g.Expect(createdRequest.Spec.ProvisioningClassName).To(gomega.Equal("provisioning-class-updated")) g.Expect(createdRequest.Spec.Parameters).To(gomega.BeComparableTo(map[string]autoscaling.Parameter{ - "p1": "v1updated", - "p3": "v3", + "p1": "v1updated", + "p3": "v3", + "ValidUntilSeconds": "0", })) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -503,8 +509,9 @@ var _ = ginkgo.Describe("Provisioning", ginkgo.Ordered, ginkgo.ContinueOnFailure g.Expect(err).To(gomega.Succeed()) g.Expect(createdRequest.Spec.ProvisioningClassName).To(gomega.Equal("provisioning-class2")) g.Expect(createdRequest.Spec.Parameters).To(gomega.BeComparableTo(map[string]autoscaling.Parameter{ - "p1": "v1.2", - "p2": "v2.2", + "p1": "v1.2", + "p2": "v2.2", + "ValidUntilSeconds": "0", })) }, util.Timeout, util.Interval).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 1b8982e3c1..585ce59f6d 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -104,7 +104,11 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu ginkgo.DeferCleanup(func() { gomega.Expect(k8sClient.Delete(ctx, priorityClass)).To(gomega.Succeed()) }) - job := testingjob.MakeJob(jobName, ns.Name).PriorityClass(priorityClassName).Obj() + job := testingjob.MakeJob(jobName, ns.Name). + PriorityClass(priorityClassName). + SetAnnotation("provreq.kueue.x-k8s.io/ValidUntilSeconds", "0"). + SetAnnotation("invalid-provreq-prefix/Foo", "Bar"). + Obj() gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) lookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name} createdJob := &batchv1.Job{} @@ -126,9 +130,10 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu createdTime := createdWorkload.CreationTimestamp - ginkgo.By("checking the workload is created with priority and priorityName") + ginkgo.By("checking the workload is created with priority, priorityName, and ProvisioningRequest annotations") gomega.Expect(createdWorkload.Spec.PriorityClassName).Should(gomega.Equal(priorityClassName)) gomega.Expect(*createdWorkload.Spec.Priority).Should(gomega.Equal(int32(priorityValue))) + gomega.Expect(createdWorkload.Annotations).Should(gomega.Equal(map[string]string{"provreq.kueue.x-k8s.io/ValidUntilSeconds": "0"})) ginkgo.By("checking the workload is updated with queue name when the job does") jobQueueName := "test-queue" diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index e9918e799d..f2d62d5803 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -115,7 +115,11 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu ginkgo.When("Using single pod", func() { ginkgo.It("Should reconcile the single pod with the queue name", func() { - pod := testingpod.MakePod(podName, ns.Name).Queue("test-queue").Obj() + pod := testingpod.MakePod(podName, ns.Name). + Queue("test-queue"). + Annotation("provreq.kueue.x-k8s.io/ValidUntilSeconds", "0"). + Annotation("invalid-provreq-prefix/Foo", "Bar"). + Obj() gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) createdPod := &corev1.Pod{} @@ -148,6 +152,9 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set") + ginkgo.By("checking that workload has ProvisioningRequest annotations") + gomega.Expect(createdWorkload.Annotations).Should(gomega.Equal(map[string]string{"provreq.kueue.x-k8s.io/ValidUntilSeconds": "0"})) + ginkgo.By("checking the pod is unsuspended when workload is assigned") clusterQueue := testing.MakeClusterQueue("cluster-queue").