Skip to content

Commit

Permalink
Update wl/job handling for the case, when the job is finished
Browse files Browse the repository at this point in the history
* Workload is not being recreated, when the job is
  finished.

* If related workload is not found, the job will be stopped
  only if it's not finished.
  • Loading branch information
achernevskii committed Dec 1, 2023
1 parent 0385c10 commit 9683b08
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 11 deletions.
21 changes: 10 additions & 11 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,20 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

// 2. handle job is finished.
if condition, finished := job.Finished(); finished && wl != nil {
if condition, finished := job.Finished(); finished {
if wl != nil && !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "Updating workload status")
return ctrl.Result{}, err
}
}

// Execute job finalization logic
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}

err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -460,11 +463,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
w = toDelete[0]
}

if _, finished := job.Finished(); finished {
if err := r.finalizeJob(ctx, job); err != nil {
return nil, fmt.Errorf("finalizing job with no matching workload: %w", err)
}
} else {
if _, finished := job.Finished(); !finished {
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, "No matching Workload"); err != nil {
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,16 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"the workload shouldn't be recreated for the completed job": {
job: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,18 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload shouldn't be recreated for the completed mx job": {
job: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
95 changes: 95 additions & 0 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,101 @@ func TestReconciler(t *testing.T) {
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod group is finished and wl is deleted, new workload shouldn't be created": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod in group is scheduling gated and wl is deleted, workload should be recreated": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("b990493b", 2).
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
Priority(0).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
}

for name, tc := range testCases {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/mxjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,9 @@ func (j *MXJobWrapper) Active(rType kftraining.ReplicaType, c int32) *MXJobWrapp
}
return j
}

// StatusConditions updates status conditions of the MXJob.
func (j *MXJobWrapper) StatusConditions(conditions ...kftraining.JobCondition) *MXJobWrapper {
j.Status.Conditions = conditions
return j
}

0 comments on commit 9683b08

Please sign in to comment.