Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for elastic Indexed Jobs #115236

Merged
merged 5 commits into from Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion pkg/apis/batch/validation/validation.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/robfig/cron/v3"

v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unversionedvalidation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -375,7 +376,7 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...)
allErrs = append(allErrs, validateCompletions(spec, oldSpec, fldPath.Child("completions"), opts)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...)
allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
Expand Down Expand Up @@ -565,10 +566,41 @@ func ValidateJobTemplateSpec(spec *batch.JobTemplateSpec, fldPath *field.Path, o
return allErrs
}

func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
if !opts.AllowElasticIndexedJobs {
return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath)
}

// Completions is immutable for non-indexed jobs.
// For Indexed Jobs, if ElasticIndexedJob feature gate is not enabled,
// fall back to validating that spec.Completions is always immutable.
isIndexedJob := spec.CompletionMode != nil && *spec.CompletionMode == batch.IndexedCompletion
if !isIndexedJob {
return apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath)
}

var allErrs field.ErrorList
if apiequality.Semantic.DeepEqual(spec.Completions, oldSpec.Completions) {
return allErrs
}
// Indexed Jobs cannot set completions to nil. The nil check
// is already performed in validateJobSpec, no need to add another error.
if spec.Completions == nil {
return allErrs
}

if *spec.Completions != *spec.Parallelism {
allErrs = append(allErrs, field.Invalid(fldPath, spec.Completions, fmt.Sprintf("can only be modified in tandem with %s", fldPath.Root().Child("parallelism").String())))
}
return allErrs
}

type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow Job to have the annotation batch.kubernetes.io/job-tracking
AllowTrackingAnnotation bool
// Allow mutable node affinity, selector and tolerations of the template
AllowMutableSchedulingDirectives bool
// Allow elastic indexed jobs
AllowElasticIndexedJobs bool
}
140 changes: 139 additions & 1 deletion pkg/apis/batch/validation/validation_test.go
Expand Up @@ -919,7 +919,23 @@ func TestValidateJobUpdate(t *testing.T) {
job.Spec.ManualSelector = pointer.BoolPtr(true)
},
},
"immutable completion": {
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
"immutable completions for non-indexed jobs": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also have a test case for immutable indexed job when AllowElasticIndexedJobs=false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(1)
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.completions",
},
},
"immutable completions for indexed job when AllowElasticIndexedJobs is false": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Expand Down Expand Up @@ -1283,6 +1299,128 @@ func TestValidateJobUpdate(t *testing.T) {
AllowMutableSchedulingDirectives: true,
},
},
"update completions and parallelism to same value is valid": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect to see a feature gate loop in this code since you are using that feature gate in validation.go.

Ideally you would want to turn the FeatureGate for your code and then run these tests and also run it with the feature gate off.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably mean a loop like we used to have for job tracking. We don't want that here, as the behavior changes depending on the value of the feature gate.

Here, we should have the value of the feature gate as part of the test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I didn't see anything about disabling or enabling the feature gate in these test cases so I was wondering if it was assuming default behavior. I guess the gate is true but we should maybe test that the gate is false in case of rollback?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it's true because it will be beta. We need to have coverage for when the feature is disabled.

old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(1),
Parallelism: pointer.Int32Ptr(1),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(2)
job.Spec.Parallelism = pointer.Int32Ptr(2)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
},
"previous parallelism != previous completions, new parallelism == new completions": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(1),
Parallelism: pointer.Int32Ptr(2),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(3)
job.Spec.Parallelism = pointer.Int32Ptr(3)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
},
"indexed job updating completions and parallelism to different values is invalid": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(1),
Parallelism: pointer.Int32Ptr(1),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(2)
job.Spec.Parallelism = pointer.Int32Ptr(3)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.completions",
},
},
"indexed job with completions set updated to nil does not panic": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(1),
Parallelism: pointer.Int32Ptr(1),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = nil
job.Spec.Parallelism = pointer.Int32Ptr(3)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
err: &field.Error{
Type: field.ErrorTypeRequired,
Field: "spec.completions",
},
},
"indexed job with completions unchanged, parallelism reduced to less than completions": {
danielvegamyhre marked this conversation as resolved.
Show resolved Hide resolved
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(2),
Parallelism: pointer.Int32Ptr(2),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(2)
job.Spec.Parallelism = pointer.Int32Ptr(1)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
},
"indexed job with completions unchanged, parallelism increased higher than completions": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Completions: pointer.Int32Ptr(2),
Parallelism: pointer.Int32Ptr(2),
CompletionMode: completionModePtr(batch.IndexedCompletion),
},
},
update: func(job *batch.Job) {
job.Spec.Completions = pointer.Int32Ptr(2)
job.Spec.Parallelism = pointer.Int32Ptr(3)
},
opts: JobValidationOptions{
AllowElasticIndexedJobs: true,
},
},
}
ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")
for k, tc := range cases {
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -237,8 +237,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedConditionStatus v1.ConditionStatus
expectedConditionReason string
expectedCreatedIndexes sets.Int

expectedPodPatches int
expectedPodPatches int

// features
jobReadyPodsEnabled bool
Expand Down
11 changes: 11 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -685,6 +685,15 @@ const (
// certificate as expiration approaches.
RotateKubeletServerCertificate featuregate.Feature = "RotateKubeletServerCertificate"

// owner: @danielvegamyhre
// kep: https://kep.k8s.io/2413
// beta: v1.27
//
// Allows mutating spec.completions for Indexed job when done in tandem with
// spec.parallelism. Specifically, spec.completions is mutable iff spec.completions
// equals to spec.parallelism before and after the update.
ElasticIndexedJob featuregate.Feature = "ElasticIndexedJob"

// owner: @saschagrunert
// kep: https://kep.k8s.io/2413
// alpha: v1.22
Expand Down Expand Up @@ -1023,6 +1032,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

RotateKubeletServerCertificate: {Default: true, PreRelease: featuregate.Beta},

ElasticIndexedJob: {Default: true, PreRelease: featuregate.Beta},

SeccompDefault: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.29

ServiceIPStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
Expand Down
3 changes: 2 additions & 1 deletion pkg/registry/batch/job/strategy.go
Expand Up @@ -184,9 +184,10 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) batchvalidation.JobValid
notStarted := oldJob.Status.StartTime == nil
opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) &&
suspended && notStarted

}

// Elastic indexed jobs (mutable completions iff updated parallelism == updated completions)
opts.AllowElasticIndexedJobs = utilfeature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move this up into the batchvalidation.JobValidationOptions{ declaration, to keep all the things that don't depend on oldJob together

return opts
}

Expand Down