Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update wl/job handling for the case, when the job is finished #1383

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 16 additions & 11 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// Finalize the job if it's finished
if _, finished := job.Finished(); finished {
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, r.removeFinalizer(ctx, wl)
}

Expand All @@ -258,17 +265,19 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

// 2. handle job is finished.
if condition, finished := job.Finished(); finished && wl != nil {
if condition, finished := job.Finished(); finished {
if wl != nil && !apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
}

// Execute job finalization logic
if err := r.finalizeJob(ctx, job); err != nil {
return ctrl.Result{}, err
}

err := workload.UpdateStatus(ctx, r.client, wl, condition.Type, condition.Status, condition.Reason, condition.Message, constants.JobControllerName)
if err != nil {
log.Error(err, "Updating workload status")
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -460,11 +469,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
w = toDelete[0]
}

if _, finished := job.Finished(); finished {
if err := r.finalizeJob(ctx, job); err != nil {
return nil, fmt.Errorf("finalizing job with no matching workload: %w", err)
}
} else {
if _, finished := job.Finished(); !finished {
if err := r.stopJob(ctx, job, w, StopReasonNoMatchingWorkload, "No matching Workload"); err != nil {
return nil, fmt.Errorf("stopping job with no matching workload: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,16 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"the workload shouldn't be recreated for the completed job": {
job: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: *baseJobWrapper.Clone().
Condition(batchv1.JobCondition{Type: batchv1.JobComplete, Status: corev1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,18 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload shouldn't be recreated for the completed mx job": {
job: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
workloads: []kueue.Workload{},
wantJob: testingmxjob.MakeMXJob("mxjob", "ns").
Queue("foo").
StatusConditions(kftraining.JobCondition{Type: kftraining.JobSucceeded, Status: v1.ConditionTrue}).
Obj(),
wantWorkloads: []kueue.Workload{},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
97 changes: 96 additions & 1 deletion pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,101 @@ func TestReconciler(t *testing.T) {
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod group is finished and wl is deleted, new workload shouldn't be created": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"if pod in group is scheduling gated and wl is deleted, workload should be recreated": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("b990493b", 2).
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
Priority(0).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
}

for name, tc := range testCases {
Expand Down Expand Up @@ -1649,7 +1744,7 @@ func TestReconciler_ErrorFinalizingPod(t *testing.T) {
}

// Workload should be finished after the second reconcile
wantWl := *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
wantWl := *utiltesting.MakeWorkload("unit-test", "ns").
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/mxjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,9 @@ func (j *MXJobWrapper) Active(rType kftraining.ReplicaType, c int32) *MXJobWrapp
}
return j
}

// StatusConditions updates status conditions of the MXJob.
func (j *MXJobWrapper) StatusConditions(conditions ...kftraining.JobCondition) *MXJobWrapper {
j.Status.Conditions = conditions
return j
}