Skip to content

Commit

Permalink
Allow updating workload for suspended job
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Oct 30, 2023
1 parent 0602e14 commit 9df2318
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 13 deletions.
33 changes: 29 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}
}

var toUpdate *kueue.Workload
if match == nil && len(toDelete) > 0 && job.IsSuspended() && !workload.HasQuotaReservation(toDelete[0]) {
toUpdate = toDelete[0]
toDelete = toDelete[1:]
}

// If there is no matching workload and the job is running, suspend it.
if match == nil && !job.IsSuspended() {
log.V(2).Info("job with no matching workload, suspending")
Expand All @@ -437,14 +443,14 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o

// Delete duplicate workload instances.
existedWls := 0
for i := range toDelete {
wlKey := workload.Key(toDelete[i])
err := r.removeFinalizer(ctx, toDelete[i])
for _, wl := range toDelete {
wlKey := workload.Key(wl)
err := r.removeFinalizer(ctx, wl)
if err != nil && !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to remove workload finalizer for: %w ", err)
}

err = r.client.Delete(ctx, toDelete[i])
err = r.client.Delete(ctx, wl)
if err != nil && !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("deleting not matching workload: %w", err)
}
Expand All @@ -462,6 +468,10 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
return nil, fmt.Errorf("%w: deleted %d workloads", ErrExtraWorkloads, len(toDelete))
}

if toUpdate != nil {
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate)
}

return match, nil
}

Expand Down Expand Up @@ -505,6 +515,21 @@ func (r *JobReconciler) equivalentToWorkload(job GenericJob, object client.Objec
return true
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
newWl, err := r.constructWorkload(ctx, job, object)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
}
wl.Spec = newWl.Spec
if err = r.client.Update(ctx, wl); err != nil {
return nil, fmt.Errorf("updating existed workload: %w", err)
}

r.record.Eventf(object, corev1.EventTypeNormal, "UpdatedWorkload",
"Updated not matching Workload for suspended job: %v", wl)
return newWl, nil
}

// startJob will unsuspend the job, and also inject the node affinity.
func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) error {
info, err := r.getPodSetsInfoFromStatus(ctx, wl)
Expand Down
88 changes: 87 additions & 1 deletion pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestPodSets(t *testing.T) {
var (
jobCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(batchv1.Job{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(batchv1.Job{}, "TypeMeta", "ObjectMeta.OwnerReferences", "ObjectMeta.ResourceVersion", "ObjectMeta.Annotations"),
}
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
Expand Down Expand Up @@ -874,6 +874,26 @@ func TestReconciler(t *testing.T) {
},
wantErr: jobframework.ErrNoMatchingWorkloads,
},
"non-matching non-admitted workload is updated": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
job: *baseJobWrapper.DeepCopy(),
wantJob: *baseJobWrapper.DeepCopy(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 5).Request(corev1.ResourceCPU, "1").Obj()).
Priority(0).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("foo").
Priority(0).
Obj(),
},
},
"suspended job with partial admission and admitted workload is unsuspended": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
Expand Down Expand Up @@ -955,6 +975,72 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"the workload is updated when queue name has changed for suspended job": {
job: *baseJobWrapper.
Clone().
Suspend(true).
Queue("test-queue-new").
UID("test-uid").
Obj(),
wantJob: *baseJobWrapper.
Clone().
Queue("test-queue-new").
UID("test-uid").
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("test-queue").
Priority(0).
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("test-queue-new").
Priority(0).
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
Obj(),
},
},
"the workload is updated when priority class has changed for suspended job": {
job: *baseJobWrapper.
Clone().
Suspend(true).
UID("test-uid").
Obj(),
wantJob: *baseJobWrapper.
Clone().
UID("test-uid").
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("foo").
Priority(0).
PriorityClass("new-priority-class").
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("foo").
Priority(0).
PriorityClass("new-priority-class").
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
Obj(),
},
},
"the workload without uid label is created when job's uid is longer than 63 characters": {
job: *baseJobWrapper.
Clone().
Expand Down
9 changes: 6 additions & 3 deletions pkg/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,12 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
specPath := field.NewPath("spec")
statusPath := field.NewPath("status")
allErrs = append(allErrs, ValidateWorkload(newObj)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...)

if workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...)
}
if workload.HasQuotaReservation(newObj) && workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
Expand Down
50 changes: 45 additions & 5 deletions pkg/webhooks/workload_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ func TestValidateWorkloadUpdate(t *testing.T) {
before, after *kueue.Workload
wantErr field.ErrorList
}{
"podSets should not be updated: count": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
"podSets should not be updated when has quota reservation: count": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).PodSets(
*testingutil.MakePodSet("main", 2).Obj(),
).Obj(),
Expand All @@ -406,7 +406,7 @@ func TestValidateWorkloadUpdate(t *testing.T) {
},
},
"podSets should not be updated: podSpec": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).PodSets(
kueue.PodSet{
Name: "main",
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestValidateWorkloadUpdate(t *testing.T) {
"priorityClassSource should not be updated": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
Priority(10).ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class").PriorityClassSource(constants.WorkloadPriorityClassSource).
Priority(10).Obj(),
Expand All @@ -560,7 +560,7 @@ func TestValidateWorkloadUpdate(t *testing.T) {
"priorityClassName should not be updated": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class-1").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
Priority(10).ReserveQuota(testingutil.MakeAdmission("cq").Obj()).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class-2").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
Expand Down Expand Up @@ -608,6 +608,46 @@ func TestValidateWorkloadUpdate(t *testing.T) {
State: kueue.CheckStateReady,
}).Obj(),
},
"updating priorityClassName before setting reserve quota for workload": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class-1").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class-2").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
wantErr: nil,
},
"updating priorityClassSource before setting reserve quota for workload": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class").PriorityClassSource(constants.PodPriorityClassSource).
Priority(10).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Queue("q").
PriorityClass("test-class").PriorityClassSource(constants.WorkloadPriorityClassSource).
Priority(10).Obj(),
wantErr: nil,
},
"updating podSets before setting reserve quota for workload": {
before: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).Obj(),
after: testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).PodSets(
kueue.PodSet{
Name: "main",
Count: 1,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "c-after",
Resources: corev1.ResourceRequirements{
Requests: make(corev1.ResourceList),
},
},
},
},
},
},
).Obj(),
wantErr: nil,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand Down
50 changes: 50 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal(""), "The Workload shouldn't have .spec.queueName set")
gomega.Expect(metav1.IsControlledBy(createdWorkload, job)).To(gomega.BeTrue(), "The Workload should be owned by the Job")

createdTime := createdWorkload.CreationTimestamp

ginkgo.By("checking the workload is created with priority and priorityName")
gomega.Expect(createdWorkload.Spec.PriorityClassName).Should(gomega.Equal(priorityClassName))
gomega.Expect(*createdWorkload.Spec.Priority).Should(gomega.Equal(int32(priorityValue)))
Expand All @@ -142,6 +144,10 @@ var _ = ginkgo.Describe("Job controller", ginkgo.Ordered, ginkgo.ContinueOnFailu
return createdWorkload.Spec.QueueName == jobQueueName
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("updated workload should have the same created timestamp", func() {
gomega.Expect(createdWorkload.CreationTimestamp).Should(gomega.Equal(createdTime))
})

ginkgo.By("checking a second non-matching workload is deleted")
secondWl := &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -1543,6 +1549,50 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
})
})
})
ginkgo.It("Should schedule updated job and update the workload", func() {
localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
ginkgo.By("create a localQueue", func() {
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

job := testingjob.MakeJob(jobName, ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "3").Parallelism(2).Suspend(false).Obj()
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}

ginkgo.By("creating the job that doesn't fit", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

ginkgo.By("job should be suspend", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(true)))
})

wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobName), Namespace: ns.Name}
createdWorkload := util.AwaitAndVerifyCreatedWorkload(ctx, k8sClient, wlLookupKey, createdJob)
createdTime := createdWorkload.CreationTimestamp

createdJob.Spec.Parallelism = ptr.To[int32](1)

ginkgo.By("updating the job", func() {
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
})

createdWorkload = util.AwaitAndVerifyCreatedWorkload(ctx, k8sClient, wlLookupKey, createdJob)

ginkgo.By("updated job should be unsuspended", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false)))
})

ginkgo.By("updated workload should have the same created timestamp", func() {
gomega.Expect(createdWorkload.CreationTimestamp).Should(gomega.Equal(createdTime))
})
})
})

func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {
Expand Down
1 change: 1 addition & 0 deletions test/integration/webhook/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ var _ = ginkgo.Describe("Workload validating webhook", func() {
ginkgo.By("Creating a new Workload")
workload := testing.MakeWorkload(workloadName, ns.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, workload)).Should(gomega.Succeed())
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, workload, testing.MakeAdmission("cq").Obj())).Should(gomega.Succeed())

ginkgo.By("Updating podSet")
gomega.Eventually(func() error {
Expand Down

0 comments on commit 9df2318

Please sign in to comment.