Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow updating scheduling directives of suspended jobs that never started #105479

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 33 additions & 4 deletions pkg/apis/batch/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
}

// ValidateJobUpdate validates an update to a Job and returns an ErrorList with any errors.
func ValidateJobUpdate(job, oldJob *batch.Job, opts apivalidation.PodValidationOptions) field.ErrorList {
func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobSpecUpdate(job.Spec, oldJob.Spec, field.NewPath("spec"), opts)...)
return allErrs
Expand All @@ -228,16 +228,43 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
}

// ValidateJobSpecUpdate validates an update to a JobSpec and returns an ErrorList with any errors.
func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts apivalidation.PodValidationOptions) field.ErrorList {
func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts)...)
allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Template, oldSpec.Template, fldPath.Child("template"))...)
allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
return allErrs
}

func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
template := &spec.Template
oldTemplate := &oldSpec.Template
if opts.AllowMutableSchedulingDirectives {
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
oldTemplate = oldSpec.Template.DeepCopy() // +k8s:verify-mutation:reason=clone
switch {
case template.Spec.Affinity == nil && oldTemplate.Spec.Affinity != nil:
// allow the Affinity field to be cleared if the old template had no affinity directives other than NodeAffinity
oldTemplate.Spec.Affinity.NodeAffinity = nil // +k8s:verify-mutation:reason=clone
if (*oldTemplate.Spec.Affinity) == (api.Affinity{}) {
oldTemplate.Spec.Affinity = nil // +k8s:verify-mutation:reason=clone
}
case template.Spec.Affinity != nil && oldTemplate.Spec.Affinity == nil:
// allow the NodeAffinity field to skip immutability checking
oldTemplate.Spec.Affinity = &api.Affinity{NodeAffinity: template.Spec.Affinity.NodeAffinity} // +k8s:verify-mutation:reason=clone
case template.Spec.Affinity != nil && oldTemplate.Spec.Affinity != nil:
// allow the NodeAffinity field to skip immutability checking
oldTemplate.Spec.Affinity.NodeAffinity = template.Spec.Affinity.NodeAffinity // +k8s:verify-mutation:reason=clone
}
oldTemplate.Spec.NodeSelector = template.Spec.NodeSelector // +k8s:verify-mutation:reason=clone
oldTemplate.Spec.Tolerations = template.Spec.Tolerations // +k8s:verify-mutation:reason=clone
}
allErrs = append(allErrs, apivalidation.ValidateImmutableField(template, oldTemplate, fldPath.Child("template"))...)
return allErrs
}

// ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors.
func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList {
allErrs := field.ErrorList{}
Expand Down Expand Up @@ -346,4 +373,6 @@ type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow Job to have the annotation batch.kubernetes.io/job-tracking
AllowTrackingAnnotation bool
// Allow mutable node affinity, selector and tolerations of the template
AllowMutableSchedulingDirectives bool
}
194 changes: 190 additions & 4 deletions pkg/apis/batch/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,57 @@ func TestValidateJob(t *testing.T) {
func TestValidateJobUpdate(t *testing.T) {
validGeneratedSelector := getValidGeneratedSelector()
validPodTemplateSpecForGenerated := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
validNodeAffinity := &api.Affinity{
NodeAffinity: &api.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &api.NodeSelector{
NodeSelectorTerms: []api.NodeSelectorTerm{
{
MatchExpressions: []api.NodeSelectorRequirement{
{
Key: "foo",
Operator: api.NodeSelectorOpIn,
Values: []string{"bar", "value2"},
},
},
},
},
},
},
}
validPodTemplateWithAffinity := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
validPodTemplateWithAffinity.Spec.Affinity = &api.Affinity{
NodeAffinity: &api.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &api.NodeSelector{
NodeSelectorTerms: []api.NodeSelectorTerm{
{
MatchExpressions: []api.NodeSelectorRequirement{
{
Key: "foo",
Operator: api.NodeSelectorOpIn,
Values: []string{"bar", "value"},
},
},
},
},
},
},
}
// This is to test immutability of the selector, both the new and old
// selector should match the labels in the template, which is immutable
// on its own; therfore, the only way to test selector immutability is
// when the new selector is changed but still matches the existing labels.
newSelector := getValidGeneratedSelector()
newSelector.MatchLabels["foo"] = "bar"
validTolerations := []api.Toleration{{
Key: "foo",
Operator: api.TolerationOpEqual,
Value: "bar",
Effect: api.TaintEffectPreferNoSchedule,
}}
cases := map[string]struct {
old batch.Job
update func(*batch.Job)
opts JobValidationOptions
err *field.Error
}{
"mutable fields": {
Expand Down Expand Up @@ -416,11 +464,11 @@ func TestValidateJobUpdate(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
Template: getValidPodTemplateSpecForGenerated(newSelector),
},
},
update: func(job *batch.Job) {
job.Spec.Selector.MatchLabels["foo"] = "bar"
job.Spec.Selector = newSelector
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Expand All @@ -436,7 +484,7 @@ func TestValidateJobUpdate(t *testing.T) {
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Containers = nil
job.Spec.Template.Spec.DNSPolicy = api.DNSClusterFirstWithHostNet
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Expand All @@ -461,14 +509,152 @@ func TestValidateJobUpdate(t *testing.T) {
Field: "spec.completionMode",
},
},
"immutable node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"add node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"update node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"remove node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity.NodeAffinity = nil
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"remove affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = nil
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"immutable tolerations": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Tolerations = validTolerations
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"mutable tolerations": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Tolerations = validTolerations
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"immutable node selector": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"mutable node selector": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
}
ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")
for k, tc := range cases {
t.Run(k, func(t *testing.T) {
tc.old.ResourceVersion = "1"
update := tc.old.DeepCopy()
tc.update(update)
errs := ValidateJobUpdate(&tc.old, update, corevalidation.PodValidationOptions{})
errs := ValidateJobUpdate(update, &tc.old, tc.opts)
var wantErrs field.ErrorList
if tc.err != nil {
wantErrs = append(wantErrs, tc.err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,14 @@ const (
// See https://groups.google.com/g/kubernetes-sig-architecture/c/Nxsc7pfe5rw/m/vF2djJh0BAAJ
// for details about the removal of this feature gate.
CPUManagerPolicyBetaOptions featuregate.Feature = "CPUManagerPolicyBetaOptions"

// owner: @ahg
// beta: v1.23
//
// Allow updating node scheduling directives in the pod template of jobs. Specifically,
// node affinity, selector and tolerations. This is allowed only for suspended jobs
// that have never been unsuspended before.
JobMutableNodeSchedulingDirectives featuregate.Feature = "JobMutableNodeSchedulingDirectives"
)

func init() {
Expand Down Expand Up @@ -871,6 +879,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.Beta},
CPUManagerPolicyAlphaOptions: {Default: false, PreRelease: featuregate.Alpha},
CPUManagerPolicyBetaOptions: {Default: true, PreRelease: featuregate.Beta},
JobMutableNodeSchedulingDirectives: {Default: true, PreRelease: featuregate.Beta},

// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:
Expand Down
11 changes: 10 additions & 1 deletion pkg/registry/batch/job/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,16 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) validation.JobValidation
// existing jobs, we allow the annotation only if the Job already had it,
// regardless of the feature gate.
opts.AllowTrackingAnnotation = hasJobTrackingAnnotation(oldJob)

// Updating node affinity, node selector and tolerations is allowed
// only for suspended jobs that never started before.
suspended := oldJob.Spec.Suspend != nil && *oldJob.Spec.Suspend
notStarted := oldJob.Status.StartTime == nil
ahg-g marked this conversation as resolved.
Show resolved Hide resolved
opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) &&
suspended && notStarted

}

return opts
}

Expand Down Expand Up @@ -271,7 +280,7 @@ func (jobStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object)

opts := validationOptionsForJob(job, oldJob)
validationErrorList := validation.ValidateJob(job, opts)
updateErrorList := validation.ValidateJobUpdate(job, oldJob, opts.PodValidationOptions)
updateErrorList := validation.ValidateJobUpdate(job, oldJob, opts)
return append(validationErrorList, updateErrorList...)
}

Expand Down