Skip to content

Commit

Permalink
Ensure all group members own the workload
Browse files Browse the repository at this point in the history
# Conflicts:
#	pkg/controller/jobframework/interface.go
  • Loading branch information
mimowo committed Feb 6, 2024
1 parent 892d85c commit 312157f
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 59 deletions.
13 changes: 7 additions & 6 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestWlReconcileJobset(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -84,7 +85,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -111,7 +112,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -172,7 +173,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestWlReconcileJobset(t *testing.T) {
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", true, true).
OwnerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -228,7 +229,7 @@ func TestUpdateConfig(t *testing.T) {

func TestRemoteClientGC(t *testing.T) {
baseJobBuilder := testingjob.MakeJob("job1", TestNamespace)
baseWlBuilder := utiltesting.MakeWorkload("wl1", TestNamespace).OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "test-uuid", true, true)
baseWlBuilder := utiltesting.MakeWorkload("wl1", TestNamespace).OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "test-uuid", ptr.To(true), ptr.To(true))

cases := map[string]struct {
managersWorkloads []kueue.Workload
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestRemoteClientGC(t *testing.T) {
"missing worker workloads are deleted (no job adapter)": {
workersWorkloads: []kueue.Workload{
*baseWlBuilder.Clone().
OwnerReference(batchv1.SchemeGroupVersion.WithKind("NptAJob"), "job1", "test-uuid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("NptAJob"), "job1", "test-uuid", ptr.To(true), ptr.To(true)).
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
},
Expand Down
21 changes: 11 additions & 10 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand Down Expand Up @@ -91,7 +92,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand All @@ -102,7 +103,7 @@ func TestWlReconcile(t *testing.T) {
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
Obj(),
},
},
Expand All @@ -111,14 +112,14 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -133,7 +134,7 @@ func TestWlReconcile(t *testing.T) {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand All @@ -155,7 +156,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
Expand Down Expand Up @@ -214,7 +215,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -248,7 +249,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestWlReconcile(t *testing.T) {
State: kueue.CheckStatePending,
Message: `The workload got reservation on "worker1"`,
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1", ptr.To(true), ptr.To(true)).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Condition(metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, Reason: "ByTest", Message: `From remote "worker1": by test`}).
Obj(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,28 @@ func TestReconcile(t *testing.T) {
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", ptr.To(true), ptr.To(true)).
DeletionTimestamp(testStartTime).
Obj(),
wantWorkload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Type: "Finished",
Status: "True",
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", ptr.To(true), ptr.To(true)).
DeletionTimestamp(testStartTime).
Obj(),
},
"unadmitted workload with rejected checks": {
workload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", ptr.To(true), ptr.To(true)).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", ptr.To(true), ptr.To(true)).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -427,7 +427,7 @@ func TestReconcile(t *testing.T) {
workload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", ptr.To(true), ptr.To(true)).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand All @@ -436,7 +436,7 @@ func TestReconcile(t *testing.T) {
wantWorkload: utiltesting.MakeWorkload("wl", "ns").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Admitted(true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "ownername", "owneruid", ptr.To(true), ptr.To(true)).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateRejected,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type ComposableJob interface {
FindMatchingWorkloads(ctx context.Context, c client.Client) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
// Stop implements the custom stop procedure for ComposableJob
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error)
// Ensure all members of the ComposableJob are owning the workload
EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error
}

func ParentWorkloadName(job GenericJob) string {
Expand Down
9 changes: 9 additions & 0 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

// Ensure all members of the composable job own the workload
if wl != nil {
if cj, implements := job.(ComposableJob); implements {
if err := cj.EnsureWorkloadOwnedByAllMembers(ctx, r.client, r.record, wl); err != nil {
return ctrl.Result{}, err
}
}
}

if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
// Finalize the job if it's finished
if _, finished := job.Finished(); finished {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/jobframework/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
Expand Down Expand Up @@ -121,10 +122,10 @@ func TestSetupIndexes(t *testing.T) {
"proper indexes are set": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("alpha-wl", testNamespace).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "alpha", "job", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "alpha", "job", ptr.To(true), ptr.To(true)).
Obj(),
*utiltesting.MakeWorkload("beta-wl", testNamespace).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "beta", "job", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "beta", "job", ptr.To(true), ptr.To(true)).
Obj(),
},
opts: []Option{
Expand All @@ -138,10 +139,10 @@ func TestSetupIndexes(t *testing.T) {
"kubeflow.org/mpijob is disabled in the configAPI": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("alpha-wl", testNamespace).
OwnerReference(kubeflow.SchemeGroupVersionKind, "alpha", "mpijob", true, true).
OwnerReference(kubeflow.SchemeGroupVersionKind, "alpha", "mpijob", ptr.To(true), ptr.To(true)).
Obj(),
*utiltesting.MakeWorkload("beta-wl", testNamespace).
OwnerReference(kubeflow.SchemeGroupVersionKind, "beta", "mpijob", true, true).
OwnerReference(kubeflow.SchemeGroupVersionKind, "beta", "mpijob", ptr.To(true), ptr.To(true)).
Obj(),
},
opts: []Option{
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,7 @@ func TestReconciler(t *testing.T) {
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", ptr.To(true), ptr.To(true)).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
Expand Down Expand Up @@ -1953,7 +1953,7 @@ func TestReconciler(t *testing.T) {
PriorityClass("test-wpc").
Priority(100).
PriorityClassSource(constants.WorkloadPriorityClassSource).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", ptr.To(true), ptr.To(true)).
Obj(),
},
wantWorkloads: []kueue.Workload{
Expand All @@ -1963,7 +1963,7 @@ func TestReconciler(t *testing.T) {
PriorityClass("test-wpc").
Priority(100).
PriorityClassSource(constants.WorkloadPriorityClassSource).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "other-job", "other-uid", ptr.To(true), ptr.To(true)).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
Expand Down Expand Up @@ -2006,7 +2006,7 @@ func TestReconciler(t *testing.T) {
Labels(map[string]string{
controllerconsts.JobUIDLabel: "test-uid",
}).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", true, true).
OwnerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job", "test-uid", ptr.To(true), ptr.To(true)).
Condition(metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,29 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, totalCount
return nil
}

func (p *Pod) EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r record.EventRecorder, workload *kueue.Workload) error {
if !p.isGroup {
return nil
}
oldOwnersCnt := len(workload.GetOwnerReferences())
for _, pod := range p.list.Items {
if err := controllerutil.SetOwnerReference(&pod, workload, c.Scheme()); err != nil {
return err
}
}
newOwnersCnt := len(workload.GetOwnerReferences())
if addedOwnersCnt := newOwnersCnt - oldOwnersCnt; addedOwnersCnt > 0 {
log := ctrl.LoggerFrom(ctx).WithValues("workload", klog.KObj(workload))
log.V(4).Info("Adding owner references for workload", "count", addedOwnersCnt)
err := c.Update(ctx, workload)
if err == nil {
r.Eventf(workload, corev1.EventTypeNormal, "OwnerReferencesAdded", fmt.Sprintf("Added %d owner reference(s)", addedOwnersCnt))
}
return err
}
return nil
}

func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, r record.EventRecorder) (*kueue.Workload, error) {
object := p.Object()
log := ctrl.LoggerFrom(ctx)
Expand Down
Loading

0 comments on commit 312157f

Please sign in to comment.