diff --git a/pkg/controller/constants/constants.go b/pkg/controller/constants/constants.go index 5249fbf0b4..15805e1cfb 100644 --- a/pkg/controller/constants/constants.go +++ b/pkg/controller/constants/constants.go @@ -25,6 +25,9 @@ const ( // DEPRECATED: Use QueueLabel as a label key. QueueAnnotation = QueueLabel + // PrebuiltWorkloadLabel is the label key of the job holding the name of the pre-built workload to use. + PrebuiltWorkloadLabel = "kueue.x-k8s.io/prebuilt-workload-name" + // ParentWorkloadAnnotation is the annotation used to mark a kubernetes Job // as a child of a Workload. The value is the name of the workload, // in the same namespace. It is used when the parent workload corresponds to diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 93874b6203..3785777f5f 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -129,3 +129,8 @@ func workloadPriorityClassName(job GenericJob) string { } return "" } + +func prebuiltWorkload(job GenericJob) (string, bool) { + name, found := job.Object().GetLabels()[constants.PrebuiltWorkloadLabel] + return name, found +} diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index b992a00c85..7ef0fb19df 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/kueue/pkg/podset" "sigs.k8s.io/kueue/pkg/util/equality" "sigs.k8s.io/kueue/pkg/util/kubeversion" + "sigs.k8s.io/kueue/pkg/util/maps" utilpriority "sigs.k8s.io/kueue/pkg/util/priority" utilslices "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" @@ -439,6 +440,23 @@ func (r *JobReconciler) getParentWorkload(ctx context.Context, job GenericJob, o func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) { log := ctrl.LoggerFrom(ctx) + if prebuiltWorkloadName, usePrebuiltWorkload := prebuiltWorkload(job); usePrebuiltWorkload { + wl := &kueue.Workload{} + err := r.client.Get(ctx, types.NamespacedName{Name: prebuiltWorkloadName, Namespace: object.GetNamespace()}, wl) + if err != nil { + return nil, client.IgnoreNotFound(err) + } + + if owns, err := r.ensurePrebuiltWorkloadOwnership(ctx, wl, object); !owns || err != nil { + return nil, err + } + + if inSync, err := r.ensurePrebuiltWorkloadInSync(ctx, wl, job); !inSync || err != nil { + return nil, err + } + return wl, nil + } + // Find a matching workload first if there is one. var toDelete []*kueue.Workload var match *kueue.Workload @@ -537,6 +555,41 @@ func FindMatchingWorkloads(ctx context.Context, c client.Client, job GenericJob) return match, toDelete, nil } +func (r *JobReconciler) ensurePrebuiltWorkloadOwnership(ctx context.Context, wl *kueue.Workload, object client.Object) (bool, error) { + if !metav1.IsControlledBy(wl, object) { + if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil { + // don't return an error here, since a retry cannot give a different result, + // log the error. + log := ctrl.LoggerFrom(ctx) + log.Error(err, "Cannot take ownership of the workload") + return false, nil + } + + if errs := validation.IsValidLabelValue(string(object.GetUID())); len(errs) == 0 { + wl.Labels = maps.MergeKeepFirst(map[string]string{controllerconsts.JobUIDLabel: string(object.GetUID())}, wl.Labels) + } + + if err := r.client.Update(ctx, wl); err != nil { + return false, err + } + } + return true, nil +} + +func (r *JobReconciler) ensurePrebuiltWorkloadInSync(ctx context.Context, wl *kueue.Workload, job GenericJob) (bool, error) { + if !equivalentToWorkload(job, wl) { + // mark the workload as finished + err := workload.UpdateStatus(ctx, r.client, wl, + kueue.WorkloadFinished, + metav1.ConditionTrue, + "OutOfSync", + "The prebuilt workload is out of sync with its user job", + constants.JobControllerName) + return false, err + } + return true, nil +} + // equivalentToWorkload checks if the job corresponds to the workload func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool { owner := metav1.GetControllerOf(wl) @@ -780,12 +833,25 @@ func (r *JobReconciler) getPodSetsInfoFromStatus(ctx context.Context, w *kueue.W func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job GenericJob, object client.Object) error { log := ctrl.LoggerFrom(ctx) + _, usePrebuiltWorkload := prebuiltWorkload(job) + if usePrebuiltWorkload { + // Stop the job if not already suspended + if stopErr := r.stopJob(ctx, job, nil, StopReasonNoMatchingWorkload, "missing workload"); stopErr != nil { + return stopErr + } + } + // Wait until there are no active pods. if job.IsActive() { log.V(2).Info("Job is suspended but still has active pods, waiting") return nil } + if usePrebuiltWorkload { + log.V(2).Info("Skip workload creation for job with prebuilt workload") + return nil + } + // Create the corresponding workload. wl, err := r.constructWorkload(ctx, job, object) if err != nil { diff --git a/pkg/controller/jobframework/validation.go b/pkg/controller/jobframework/validation.go index c6becc3867..b855774ff6 100644 --- a/pkg/controller/jobframework/validation.go +++ b/pkg/controller/jobframework/validation.go @@ -14,9 +14,11 @@ limitations under the License. package jobframework import ( + "fmt" "strings" apivalidation "k8s.io/apimachinery/pkg/api/validation" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" @@ -29,12 +31,22 @@ var ( parentWorkloadKeyPath = annotationsPath.Key(constants.ParentWorkloadAnnotation) queueNameLabelPath = labelsPath.Key(constants.QueueLabel) workloadPriorityClassNamePath = labelsPath.Key(constants.WorkloadPriorityClassLabel) + supportedPrebuiltWlJobGVKs = sets.New("batch/v1, Kind=Job") ) func ValidateCreateForQueueName(job GenericJob) field.ErrorList { var allErrs field.ErrorList allErrs = append(allErrs, ValidateLabelAsCRDName(job, constants.QueueLabel)...) + allErrs = append(allErrs, ValidateLabelAsCRDName(job, constants.PrebuiltWorkloadLabel)...) allErrs = append(allErrs, ValidateAnnotationAsCRDName(job, constants.QueueAnnotation)...) + + // this rule should be relaxed when its confirmed that running wit a prebuilt wl is fully supported by each integration + if _, hasPrebuilt := job.Object().GetLabels()[constants.PrebuiltWorkloadLabel]; hasPrebuilt { + gvk := job.GVK().String() + if !supportedPrebuiltWlJobGVKs.Has(gvk) { + allErrs = append(allErrs, field.Forbidden(labelsPath.Key(constants.PrebuiltWorkloadLabel), fmt.Sprintf("Is not supported for %q", gvk))) + } + } return allErrs } @@ -73,6 +85,10 @@ func ValidateUpdateForQueueName(oldJob, newJob GenericJob) field.ErrorList { if !newJob.IsSuspended() { allErrs = append(allErrs, apivalidation.ValidateImmutableField(QueueName(oldJob), QueueName(newJob), queueNameLabelPath)...) } + + oldWlName, _ := prebuiltWorkload(oldJob) + newWlName, _ := prebuiltWorkload(newJob) + allErrs = append(allErrs, apivalidation.ValidateImmutableField(oldWlName, newWlName, labelsPath.Key(constants.PrebuiltWorkloadLabel))...) return allErrs } diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index fd335b8c12..76888e72b5 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -205,11 +205,27 @@ func (j *Job) ReclaimablePods() ([]kueue.ReclaimablePod, error) { }}, nil } +// The following labels are managed internally by batch/job controller, we should not +// propagate them to the workload. +var ( + // the legacy names are no longer defined in the api, only in k/2/apis/batch + legacyJobNameLabel = "job-name" + legacyControllerUidLabel = "controller-uid" + managedLabels = []string{legacyJobNameLabel, legacyControllerUidLabel, batchv1.JobNameLabel, batchv1.ControllerUidLabel} +) + +func cleanManagedLabels(pt *corev1.PodTemplateSpec) *corev1.PodTemplateSpec { + for _, managedLabel := range managedLabels { + delete(pt.Labels, managedLabel) + } + return pt +} + func (j *Job) PodSets() []kueue.PodSet { return []kueue.PodSet{ { Name: kueue.DefaultPodSetName, - Template: *j.Spec.Template.DeepCopy(), + Template: *cleanManagedLabels(j.Spec.Template.DeepCopy()), Count: j.podsCount(), MinCount: j.minPodsCount(), }, @@ -247,7 +263,13 @@ func (j *Job) RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool { j.Spec.Completions = j.Spec.Parallelism } } - changed = podset.RestorePodSpec(&j.Spec.Template.ObjectMeta, &j.Spec.Template.Spec, podSetsInfo[0]) || changed + info := podSetsInfo[0] + for _, managedLabel := range managedLabels { + if v, found := j.Spec.Template.Labels[managedLabel]; found { + info.AddOrUpdateLabel(managedLabel, v) + } + } + changed = podset.RestorePodSpec(&j.Spec.Template.ObjectMeta, &j.Spec.Template.Spec, info) || changed return changed } diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index 49cf6afcfe..da1a3b03bd 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -378,6 +378,20 @@ var ( cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"), } + workloadCmpOptsWithOwner = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b kueue.Workload) bool { + return a.Name < b.Name + }), + cmpopts.SortSlices(func(a, b metav1.Condition) bool { + return a.Type < b.Type + }), + cmpopts.IgnoreFields( + kueue.Workload{}, "TypeMeta", "ObjectMeta.Name", "ObjectMeta.ResourceVersion", + ), + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"), + } ) func TestReconciler(t *testing.T) { @@ -1668,6 +1682,128 @@ func TestReconciler(t *testing.T) { Obj(), wantWorkloads: []kueue.Workload{}, }, + "when the prebuilt workload is missing, no new one is created and the job is suspended": { + job: *baseJobWrapper. + Clone(). + Suspend(false). + Label(controllerconsts.PrebuiltWorkloadLabel, "missing-workload"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper. + Clone(). + Label(controllerconsts.PrebuiltWorkloadLabel, "missing-workload"). + UID("test-uid"). + Obj(), + }, + "when the prebuilt workload exists its owner info is updated": { + job: *baseJobWrapper. + Clone(). + Suspend(false). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper. + Clone(). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + }). + OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + Obj(), + }, + }, + "when the prebuilt workload is owned by another object": { + job: *baseJobWrapper. + Clone(). + Suspend(false). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper. + Clone(). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "other-job", "other-uid", true, true). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "other-job", "other-uid", true, true). + Obj(), + }, + }, + "when the prebuilt workload is not equivalent to the job": { + job: *baseJobWrapper. + Clone(). + Suspend(false). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + wantJob: *baseJobWrapper. + Clone(). + Label(controllerconsts.PrebuiltWorkloadLabel, "prebuilt-workload"). + UID("test-uid"). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("prebuilt-workload", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").PriorityClass("test-pc").Obj()). + Queue("test-queue"). + PriorityClass("test-wpc"). + Priority(100). + PriorityClassSource(constants.WorkloadPriorityClassSource). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + }). + OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true). + Condition(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: "OutOfSync", + Message: "The prebuilt workload is out of sync with its user job", + }). + Obj(), + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -1684,10 +1820,16 @@ func TestReconciler(t *testing.T) { kcBuilder = kcBuilder.WithStatusSubresource(&tc.workloads[i]) } + // For prebuilt workloads we are skipping the ownership setup in the test body and + // expect the reconciler to do it. + _, useesPrebuiltWorkload := tc.job.Labels[controllerconsts.PrebuiltWorkloadLabel] + kClient := kcBuilder.Build() for i := range tc.workloads { - if err := ctrl.SetControllerReference(&tc.job, &tc.workloads[i], kClient.Scheme()); err != nil { - t.Fatalf("Could not setup owner reference in Workloads: %v", err) + if !useesPrebuiltWorkload { + if err := ctrl.SetControllerReference(&tc.job, &tc.workloads[i], kClient.Scheme()); err != nil { + t.Fatalf("Could not setup owner reference in Workloads: %v", err) + } } if err := kClient.Create(ctx, &tc.workloads[i]); err != nil { t.Fatalf("Could not create workload: %v", err) @@ -1715,7 +1857,13 @@ func TestReconciler(t *testing.T) { if err := kClient.List(ctx, &gotWorkloads); err != nil { t.Fatalf("Could not get Workloads after reconcile: %v", err) } - if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, workloadCmpOpts...); diff != "" { + + wlCheckOpts := workloadCmpOpts + if useesPrebuiltWorkload { + wlCheckOpts = workloadCmpOptsWithOwner + } + + if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, wlCheckOpts...); diff != "" { t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) } }) diff --git a/pkg/controller/jobs/job/job_webhook_test.go b/pkg/controller/jobs/job/job_webhook_test.go index 86953d500c..36b596b0db 100644 --- a/pkg/controller/jobs/job/job_webhook_test.go +++ b/pkg/controller/jobs/job/job_webhook_test.go @@ -50,6 +50,7 @@ var ( labelsPath = field.NewPath("metadata", "labels") parentWorkloadKeyPath = annotationsPath.Key(constants.ParentWorkloadAnnotation) queueNameLabelPath = labelsPath.Key(constants.QueueLabel) + prebuiltWlNameLabelPath = labelsPath.Key(constants.PrebuiltWorkloadLabel) queueNameAnnotationsPath = annotationsPath.Key(constants.QueueAnnotation) workloadPriorityClassNamePath = labelsPath.Key(constants.WorkloadPriorityClassLabel) ) @@ -233,6 +234,30 @@ func TestValidateCreate(t *testing.T) { wantErr: nil, serverVersion: "1.27.0", }, + { + name: "invalid prebuilt workload", + job: testingutil.MakeJob("job", "default"). + Parallelism(4). + Completions(4). + Label(constants.PrebuiltWorkloadLabel, "workload name"). + Indexed(true). + Obj(), + wantErr: field.ErrorList{ + field.Invalid(prebuiltWlNameLabelPath, "workload name", invalidRFC1123Message), + }, + serverVersion: "1.27.0", + }, + { + name: "valid prebuilt workload", + job: testingutil.MakeJob("job", "default"). + Parallelism(4). + Completions(4). + Label(constants.PrebuiltWorkloadLabel, "workload-name"). + Indexed(true). + Obj(), + wantErr: nil, + serverVersion: "1.27.0", + }, } for _, tc := range testcases { @@ -407,6 +432,18 @@ func TestValidateUpdate(t *testing.T) { field.Invalid(workloadPriorityClassNamePath, "test-1", apivalidation.FieldImmutableErrorMsg), }, }, + { + name: "immutable prebuilt workload ", + oldJob: testingutil.MakeJob("job", "default"). + Suspend(true). + Label(constants.PrebuiltWorkloadLabel, "old-workload"). + Obj(), + newJob: testingutil.MakeJob("job", "default"). + Suspend(false). + Label(constants.PrebuiltWorkloadLabel, "new-workload"). + Obj(), + wantErr: apivalidation.ValidateImmutableField("old-workload", "new-workload", prebuiltWlNameLabelPath), + }, } for _, tc := range testcases { diff --git a/pkg/podset/podset.go b/pkg/podset/podset.go index 89b1ba3f48..c969c8968a 100644 --- a/pkg/podset/podset.go +++ b/pkg/podset/podset.go @@ -112,6 +112,16 @@ func (podSetInfo *PodSetInfo) Merge(o PodSetInfo) error { return nil } +// AddOrUpdateLabel adds or updates the label identified by k with value v +// allocating a new Labels nap if nil +func (podSetInfo *PodSetInfo) AddOrUpdateLabel(k, v string) { + if podSetInfo.Labels == nil { + podSetInfo.Labels = map[string]string{k: v} + } else { + podSetInfo.Labels[k] = v + } +} + // Merge updates or appends the replica metadata & spec fields based on PodSetInfo. // It returns error if there is a conflict. func Merge(meta *metav1.ObjectMeta, spec *corev1.PodSpec, info PodSetInfo) error { diff --git a/pkg/podset/podset_test.go b/pkg/podset/podset_test.go index 9d666d3709..2db090fd0c 100644 --- a/pkg/podset/podset_test.go +++ b/pkg/podset/podset_test.go @@ -306,3 +306,48 @@ func TestMergeRestore(t *testing.T) { }) } } + +func TestAddOrUpdateLabel(t *testing.T) { + cases := map[string]struct { + info PodSetInfo + k, v string + wantInfo PodSetInfo + }{ + "add to nil labels": { + info: PodSetInfo{}, + k: "key", + v: "value", + wantInfo: PodSetInfo{ + Labels: map[string]string{"key": "value"}, + }, + }, + "add": { + info: PodSetInfo{ + Labels: map[string]string{"other-key": "other-value"}, + }, + k: "key", + v: "value", + wantInfo: PodSetInfo{ + Labels: map[string]string{"other-key": "other-value", "key": "value"}, + }, + }, + "update": { + info: PodSetInfo{ + Labels: map[string]string{"key": "value"}, + }, + k: "key", + v: "updated-value", + wantInfo: PodSetInfo{ + Labels: map[string]string{"key": "updated-value"}, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + tc.info.AddOrUpdateLabel(tc.k, tc.v) + if diff := cmp.Diff(tc.wantInfo, tc.info, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("Unexpected info (-want/+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index c8148a26c7..2f5b1236dc 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -26,6 +26,7 @@ import ( apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -211,6 +212,20 @@ func (w *WorkloadWrapper) AdmissionChecks(checks ...kueue.AdmissionCheckState) * return w } +func (w *WorkloadWrapper) OwnerReference(apiVersion, kind, name, uid string, controller, blockDeletion bool) *WorkloadWrapper { + w.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: apiVersion, + Kind: kind, + Name: name, + UID: types.UID(uid), + Controller: &controller, + BlockOwnerDeletion: &blockDeletion, + }, + } + return w +} + type PodSetWrapper struct{ kueue.PodSet } func MakePodSet(name string, count int) *PodSetWrapper { diff --git a/pkg/util/testingjobs/job/wrappers.go b/pkg/util/testingjobs/job/wrappers.go index af442ccab2..ab0104a584 100644 --- a/pkg/util/testingjobs/job/wrappers.go +++ b/pkg/util/testingjobs/job/wrappers.go @@ -206,6 +206,11 @@ func (j *JobWrapper) OwnerReference(ownerName string, ownerGVK schema.GroupVersi return j } +func (j *JobWrapper) Containers(containers ...corev1.Container) *JobWrapper { + j.Spec.Template.Spec.Containers = containers + return j +} + // UID updates the uid of the job. func (j *JobWrapper) UID(uid string) *JobWrapper { j.ObjectMeta.UID = types.UID(uid) @@ -229,3 +234,17 @@ func (j *JobWrapper) Condition(c batchv1.JobCondition) *JobWrapper { j.Status.Conditions = append(j.Status.Conditions, c) return j } + +func SetContainerDefaults(c *corev1.Container) { + if c.TerminationMessagePath == "" { + c.TerminationMessagePath = "/dev/termination-log" + } + + if c.TerminationMessagePolicy == "" { + c.TerminationMessagePolicy = corev1.TerminationMessageReadFile + } + + if c.ImagePullPolicy == "" { + c.ImagePullPolicy = corev1.PullIfNotPresent + } +} diff --git a/test/e2e/singlecluster/e2e_test.go b/test/e2e/singlecluster/e2e_test.go index 1e628f7552..9e0e722479 100644 --- a/test/e2e/singlecluster/e2e_test.go +++ b/test/e2e/singlecluster/e2e_test.go @@ -34,6 +34,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/controller/constants" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/util/testing" @@ -686,6 +687,70 @@ var _ = ginkgo.Describe("Kueue", func() { }, util.LongTimeout, util.Interval).Should(gomega.BeTrue()) }) + ginkgo.It("Should run with prebuilt workload", func() { + var wl *kueue.Workload + ginkgo.By("Create the pebuilt workload and the job adopting it", func() { + sampleJob = (&testingjob.JobWrapper{Job: *sampleJob}). + Label(constants.PrebuiltWorkloadLabel, "prebuilt-wl"). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.0.3", []string{"5s", "-termination-grace-period", "0s"}). + BackoffLimit(0). + TerimnationGracePeriod(1). + Obj() + testingjob.SetContainerDefaults(&sampleJob.Spec.Template.Spec.Containers[0]) + + wl = testing.MakeWorkload("prebuilt-wl", ns.Name). + Finalizers(kueue.ResourceInUseFinalizerName). + Queue(localQueue.Name). + PodSets( + *testing.MakePodSet("main", 1).Containers(sampleJob.Spec.Template.Spec.Containers[0]).Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + }) + + createdWorkload := &kueue.Workload{} + wlLookupKey := client.ObjectKeyFromObject(wl) + createdJob := &batchv1.Job{} + jobLookupKey := client.ObjectKeyFromObject(sampleJob) + + ginkgo.By("Verify the prebuilt workload is adopted by the job", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).To(gomega.Succeed()) + g.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(wl.Spec.PodSets[0].Template.Spec.Containers).To(gomega.BeComparableTo(createdJob.Spec.Template.Spec.Containers), "Check the way the job and workload is created") + g.Expect(createdWorkload.OwnerReferences).To(gomega.ContainElement( + gomega.BeComparableTo(metav1.OwnerReference{ + Name: sampleJob.Name, + UID: sampleJob.UID, + }, cmpopts.IgnoreFields(metav1.OwnerReference{}, "APIVersion", "Kind", "Controller", "BlockOwnerDeletion")))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify the job is running", func() { + expectJobUnsuspendedWithNodeSelectors(jobKey, map[string]string{ + "instance-type": "on-demand", + }) + }) + + ginkgo.By("Delete all pods", func() { + gomega.Expect(util.DeleteAllPodsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed()) + }) + + ginkgo.By("Await for jobs completion", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Finalizers).NotTo(gomega.ContainElement(kueue.ResourceInUseFinalizerName)) + g.Expect(createdWorkload.Status.Conditions).To(gomega.ContainElement( + gomega.BeComparableTo(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: "JobFinished", + }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Message")))) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + ginkgo.It("Should readmit preempted job with priorityClass into a separate flavor", func() { gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 7f3fa109a9..e824f84b96 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -382,6 +382,125 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) }) + ginkgo.When("A prebuilt workload is used", func() { + ginkgo.It("Should get suspended if the workload is not found", func() { + job := testingjob.MakeJob("job", ns.Name). + Queue("main"). + Label(constants.PrebuiltWorkloadLabel, "missing-workload"). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.It("Should take the ownership of the workload and continue the usual execution", func() { + container := corev1.Container{ + Name: "c", + Image: "pause", + } + testingjob.SetContainerDefaults(&container) + wl := testing.MakeWorkload("wl", ns.Name). + PodSets(*testing.MakePodSet("main", 1). + Containers(*container.DeepCopy()). + Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + createdWl := kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed()) + g.Expect(createdWl.OwnerReferences).To(gomega.BeEmpty()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + job := testingjob.MakeJob("job", ns.Name). + Queue("main"). + Label(constants.PrebuiltWorkloadLabel, "wl"). + Containers(*container.DeepCopy()). + Obj() + gomega.Expect(k8sClient.Create(ctx, job)).To(gomega.Succeed()) + ginkgo.By("Checking the job gets suspended", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Check the job gets the ownership of the workload", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWl := kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed()) + + g.Expect(createdWl.OwnerReferences).To(gomega.ContainElement( + gomega.BeComparableTo(metav1.OwnerReference{ + Name: job.Name, + UID: job.UID, + }, cmpopts.IgnoreFields(metav1.OwnerReference{}, "APIVersion", "Kind", "Controller", "BlockOwnerDeletion")))) + + // The workload is not marked as finished. + g.Expect(apimeta.IsStatusConditionTrue(createdWl.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeFalse()) + + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Admitting the workload, the job should unsuspend", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWl := kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed()) + + admission := testing.MakeAdmission("cq", container.Name).Obj() + g.Expect(util.SetQuotaReservation(ctx, k8sClient, wl, admission)).To(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, wl) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Checking the job gets suspended", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + g.Expect(ptr.Deref(createdJob.Spec.Suspend, true)).To(gomega.BeFalse()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + + ginkgo.By("Finishing the job, the workload should be finish", func() { + createdJob := batchv1.Job{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + createdJob.Status.Succeeded = 1 + createdJob.Status.Conditions = []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "ByTest", + Message: "by test", + }, + } + g.Expect(k8sClient.Status().Update(ctx, &createdJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Checking the workload is finished", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWl := kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &createdWl)).To(gomega.Succeed()) + + g.Expect(createdWl.Status.Conditions).To(gomega.ContainElement( + gomega.BeComparableTo(metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: "JobFinished", + Message: "Job finished successfully", + }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) + + }) + ginkgo.It("Should finish the preemption when the job becomes inactive", func() { job := testingjob.MakeJob(jobName, ns.Name).Queue("q").Obj() wl := &kueue.Workload{}