Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/runtime/framework/plugins/jobset/jobset.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,15 @@ func (j *JobSet) checkRuntimePatchesImmutability(ctx context.Context, oldObj, ne

jobSet := &jobsetv1alpha2.JobSet{}
changed := !equality.Semantic.DeepEqual(oldObj.Spec.RuntimePatches, newObj.Spec.RuntimePatches)
suspended := ptr.Equal(newObj.Spec.Suspend, ptr.To(true))
// Allow modifying RuntimePatches if the TrainJob is suspended before or
// after the update (i.e. block only when it stays fully unsuspended).
// This lets external controllers (e.g. Kueue) update RuntimePatches and
// toggle spec.suspend in a single API request.
oldSuspended := oldObj.Spec.Suspend != nil && *oldObj.Spec.Suspend
newSuspended := newObj.Spec.Suspend != nil && *newObj.Spec.Suspend
if changed {
if !suspended {
allErrs = append(allErrs, field.Forbidden(runtimePatchesPath, "RuntimePatches can only be modified when the TrainJob is suspended"))
if !oldSuspended && !newSuspended {
allErrs = append(allErrs, field.Forbidden(runtimePatchesPath, "RuntimePatches can only be modified when the TrainJob is suspended before or after the update"))
} else if err := j.client.Get(ctx, client.ObjectKeyFromObject(newObj), jobSet); client.IgnoreNotFound(err) != nil {
allErrs = append(allErrs, field.InternalError(runtimePatchesPath, err))
} else {
Expand Down
257 changes: 256 additions & 1 deletion pkg/runtime/framework/plugins/jobset/jobset_test.go
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add one more scenario where old is not suspended, but new is suspended, and the JobSet is still active (ReplicatedJobsStatus: []jobsetv1alpha2.ReplicatedJobStatus{ {Name: constants.Node, Active: 2}})?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in a3a2271c as "forbid atomic update: suspending trainJob with runtimePatches change is rejected when jobSet has an active replicatedJob". Fixture is oldSuspend=false, newSuspend=true, and a JobSet with ReplicatedJobsStatus: [{Name: constants.Node, Active: 2}] — exactly the scenario you suggested. Verifies that the relaxed suspend predicate alone won't sneak a runtimePatches change past the activity safety net.

Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ func TestValidate(t *testing.T) {
}).
Obj(),
wantError: field.ErrorList{
field.Forbidden(runtimePatchesPath, "RuntimePatches can only be modified when the TrainJob is suspended"),
field.Forbidden(runtimePatchesPath, "RuntimePatches can only be modified when the TrainJob is suspended before or after the update"),
},
},
"allow changes to runtimePatches when trainJob is suspended and jobSet does not exist": {
Expand Down Expand Up @@ -1121,6 +1121,166 @@ func TestValidate(t *testing.T) {
clientErr: apierrors.NewNotFound(jobsetv1alpha2.Resource("jobset"), ""),
wantError: nil,
},
"allow atomic update: modify runtimePatches and unsuspend trainJob in a single request": {
info: &runtime.Info{
TemplateSpec: runtime.TemplateSpec{
ObjApply: &jobsetv1alpha2ac.JobSetSpecApplyConfiguration{
ReplicatedJobs: []jobsetv1alpha2ac.ReplicatedJobApplyConfiguration{
{
Name: ptr.To(constants.Node),
Template: &batchv1ac.JobTemplateSpecApplyConfiguration{
Spec: &batchv1ac.JobSpecApplyConfiguration{
Template: &corev1ac.PodTemplateSpecApplyConfiguration{
Spec: &corev1ac.PodSpecApplyConfiguration{
Containers: []corev1ac.ContainerApplyConfiguration{
{
Name: ptr.To(constants.Node),
},
},
},
},
},
},
},
},
},
},
},
oldObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(true).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
ServiceAccountName: ptr.To("service-account"),
},
},
},
},
}},
},
},
},
},
}).
Obj(),
newObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(false).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
NodeSelector: map[string]string{"injected": "by-kueue"},
},
},
},
},
}},
},
},
},
},
}).
Obj(),
clientErr: apierrors.NewNotFound(jobsetv1alpha2.Resource("jobset"), ""),
wantError: nil,
},
"allow atomic update: modify runtimePatches and suspend trainJob in a single request": {
info: &runtime.Info{
TemplateSpec: runtime.TemplateSpec{
ObjApply: &jobsetv1alpha2ac.JobSetSpecApplyConfiguration{
ReplicatedJobs: []jobsetv1alpha2ac.ReplicatedJobApplyConfiguration{
{
Name: ptr.To(constants.Node),
Template: &batchv1ac.JobTemplateSpecApplyConfiguration{
Spec: &batchv1ac.JobSpecApplyConfiguration{
Template: &corev1ac.PodTemplateSpecApplyConfiguration{
Spec: &corev1ac.PodSpecApplyConfiguration{
Containers: []corev1ac.ContainerApplyConfiguration{
{
Name: ptr.To(constants.Node),
},
},
},
},
},
},
},
},
},
},
},
oldObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(false).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
ServiceAccountName: ptr.To("service-account"),
},
},
},
},
}},
},
},
},
},
}).
Obj(),
newObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(true).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
ServiceAccountName: ptr.To("service-account-updated"),
},
},
},
},
}},
},
},
},
},
}).
Obj(),
clientErr: apierrors.NewNotFound(jobsetv1alpha2.Resource("jobset"), ""),
wantError: nil,
},
"allow changes to runtimePatches when trainJob is suspended and jobSet exists but is inactive": {
info: &runtime.Info{
TemplateSpec: runtime.TemplateSpec{
Expand Down Expand Up @@ -1349,6 +1509,101 @@ func TestValidate(t *testing.T) {
field.Forbidden(runtimePatchesPath, "RuntimePatches cannot be modified when the JobSet's ReplicatedJob node is still active"),
},
},
"forbid atomic update: suspending trainJob with runtimePatches change is rejected when jobSet has an active replicatedJob": {
info: &runtime.Info{
TemplateSpec: runtime.TemplateSpec{
ObjApply: &jobsetv1alpha2ac.JobSetSpecApplyConfiguration{
ReplicatedJobs: []jobsetv1alpha2ac.ReplicatedJobApplyConfiguration{
{
Name: ptr.To(constants.Node),
Template: &batchv1ac.JobTemplateSpecApplyConfiguration{
Spec: &batchv1ac.JobSpecApplyConfiguration{
Template: &corev1ac.PodTemplateSpecApplyConfiguration{
Spec: &corev1ac.PodSpecApplyConfiguration{
Containers: []corev1ac.ContainerApplyConfiguration{
{
Name: ptr.To(constants.Node),
},
},
},
},
},
},
},
},
},
},
},
oldObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(false).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
ServiceAccountName: ptr.To("service-account"),
},
},
},
},
}},
},
},
},
},
}).
Obj(),
newObj: utiltesting.MakeTrainJobWrapper(metav1.NamespaceDefault, "test").
Suspend(true).
RuntimePatches([]trainer.RuntimePatch{
{
Manager: "test.io/manager",
TrainingRuntimeSpec: &trainer.TrainingRuntimeSpecPatch{
Template: &trainer.JobSetTemplatePatch{
Spec: &trainer.JobSetSpecPatch{
ReplicatedJobs: []trainer.ReplicatedJobPatch{{
Name: constants.Node,
Template: &trainer.JobTemplatePatch{
Spec: &trainer.JobSpecPatch{
Template: &trainer.PodTemplatePatch{
Spec: &trainer.PodSpecPatch{
ServiceAccountName: ptr.To("service-account-updated"),
},
},
},
},
}},
},
},
},
},
}).
Obj(),
jobSet: &jobsetv1alpha2.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: metav1.NamespaceDefault,
},
Status: jobsetv1alpha2.JobSetStatus{
ReplicatedJobsStatus: []jobsetv1alpha2.ReplicatedJobStatus{
{
Name: constants.Node,
Active: 2,
},
},
},
},
wantError: field.ErrorList{
field.Forbidden(runtimePatchesPath, "RuntimePatches cannot be modified when the JobSet's ReplicatedJob node is still active"),
},
},
"forbid changes to runtimePatches when trainJob is suspended but has multiple active replicatedJobs": {
info: &runtime.Info{
TemplateSpec: runtime.TemplateSpec{
Expand Down
Loading