Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: remove workload finalizer if workload is finished #1523

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() {
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl)
}

if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
Expand Down
83 changes: 79 additions & 4 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
testingclock "k8s.io/utils/clock/testing"
Expand Down Expand Up @@ -298,11 +300,23 @@ func TestSyncCheckStates(t *testing.T) {
}
}

var (
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(
kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion",
),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
}
)

func TestReconcile(t *testing.T) {
testStartTime := time.Now()
cases := map[string]struct {
workload *kueue.Workload
wantError error
wantEvents []utiltesting.EventRecord
workload *kueue.Workload
wantWorkload *kueue.Workload
wantError error
wantEvents []utiltesting.EventRecord
}{
"admit": {
workload: utiltesting.MakeWorkload("wl", "ns").
Expand All @@ -312,6 +326,19 @@ func TestReconcile(t *testing.T) {
State: kueue.CheckStateReady,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Condition(metav1.Condition{
Type: "Admitted",
Status: "True",
Reason: "Admitted",
Message: "The workload is admitted",
}).
Obj(),
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "ns", Name: "wl"},
Expand All @@ -320,7 +347,7 @@ func TestReconcile(t *testing.T) {
},
},
},
"already admited": {
"already admitted": {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
Expand All @@ -329,6 +356,42 @@ func TestReconcile(t *testing.T) {
State: kueue.CheckStateReady,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
}).
Obj(),
},
"remove finalizer for finished workload": {
workload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: nil,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a case for the workload with ownerReference?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"don't remove finalizer for owned finished workload": {
workload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.String(), "Job", "job", "test-uid", true, true).
DeletionTimestamp(testStartTime).
Obj(),
},
}
for name, tc := range cases {
Expand All @@ -351,6 +414,18 @@ func TestReconcile(t *testing.T) {
t.Errorf("unexpected reconcile error (-want/+got):\n%s", diff)
}

gotWorkload := &kueue.Workload{}
if err := cl.Get(ctx, client.ObjectKeyFromObject(tc.workload), gotWorkload); err != nil {
if tc.wantWorkload != nil && !errors.IsNotFound(err) {
t.Fatalf("Could not get Workloads after reconcile: %v", err)
}
gotWorkload = nil
}

if diff := cmp.Diff(tc.wantWorkload, gotWorkload, workloadCmpOpts...); diff != "" {
t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff)
}

if diff := cmp.Diff(tc.wantEvents, recorder.RecordedEvents, cmpopts.IgnoreFields(utiltesting.EventRecord{}, "Message")); diff != "" {
t.Errorf("unexpected events (-want/+got):\n%s", diff)
}
Expand Down
16 changes: 4 additions & 12 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -176,7 +175,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}
for i := range workloads.Items {
err := r.removeFinalizer(ctx, &workloads.Items[i])
err := workload.RemoveFinalizer(ctx, r.client, &workloads.Items[i])
if client.IgnoreNotFound(err) != nil {
log.Error(err, "Removing finalizer")
return ctrl.Result{}, err
Expand Down Expand Up @@ -256,7 +255,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

return ctrl.Result{}, r.removeFinalizer(ctx, wl)
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, wl)
}

// 1.1 If the workload is pending deletion, suspend the job if needed
Expand All @@ -269,7 +268,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

if err == nil && wl != nil {
err = r.removeFinalizer(ctx, wl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this? how will this workload be finalized?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

err = workload.RemoveFinalizer(ctx, r.client, wl)
}
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -517,7 +516,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
existedWls := 0
for _, wl := range toDelete {
wlKey := workload.Key(wl)
err := r.removeFinalizer(ctx, wl)
err := workload.RemoveFinalizer(ctx, r.client, wl)
if err != nil && !apierrors.IsNotFound(err) {
return nil, fmt.Errorf("failed to remove workload finalizer for: %w ", err)
}
Expand Down Expand Up @@ -731,13 +730,6 @@ func (r *JobReconciler) finalizeJob(ctx context.Context, job GenericJob) error {
return nil
}

func (r *JobReconciler) removeFinalizer(ctx context.Context, wl *kueue.Workload) error {
if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) {
return r.client.Update(ctx, wl)
}
return nil
}

// constructWorkload will derive a workload from the corresponding job.
func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, object client.Object) (*kueue.Workload, error) {
log := ctrl.LoggerFrom(ctx)
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,7 @@ func TestReconciler(t *testing.T) {
Admitted(true).
Obj(),
},
wantWorkloads: []kueue.Workload{},
workloadCmpOpts: defaultWorkloadCmpOpts,
workloadCmpOpts: append(defaultWorkloadCmpOpts, cmpopts.IgnoreFields(kueue.Workload{}, "ObjectMeta.DeletionTimestamp")),
deleteWorkloads: true,
},
"replacement pod should be started for pod group of size 1": {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func (w *WorkloadWrapper) Annotations(kv map[string]string) *WorkloadWrapper {
return w
}

// DeletionTimestamp sets a deletion timestamp for the workload.
func (w *WorkloadWrapper) DeletionTimestamp(t time.Time) *WorkloadWrapper {
w.Workload.DeletionTimestamp = ptr.To(metav1.NewTime(t).Rfc3339Copy())
return w
}

type PodSetWrapper struct{ kueue.PodSet }

func MakePodSet(name string, count int) *PodSetWrapper {
Expand Down
8 changes: 8 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
Expand Down Expand Up @@ -434,3 +435,10 @@ func ReclaimablePodsAreEqual(a, b []kueue.ReclaimablePod) bool {
func IsAdmitted(w *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted)
}

func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) error {
if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) {
return c.Update(ctx, wl)
}
return nil
}