Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graduate JobTrackingWithFinalizers to stable #113510

Merged
merged 1 commit into from Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/openapi-spec/v3/apis__batch__v1_openapi.json
Expand Up @@ -454,7 +454,7 @@
"$ref": "#/components/schemas/io.k8s.api.batch.v1.UncountedTerminatedPods"
}
],
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null."
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nOld jobs might not be tracked using this field, in which case the field remains null."
}
},
"type": "object"
Expand Down
14 changes: 7 additions & 7 deletions pkg/apis/batch/types.go
Expand Up @@ -24,10 +24,13 @@ import (

// JobTrackingFinalizer is a finalizer for Job's pods. It prevents them from
// being deleted before being accounted in the Job status.
// The apiserver and job controller use this string as a Job annotation, to
// mark Jobs that are being tracked using pod finalizers. Two releases after
// the JobTrackingWithFinalizers graduates to GA, JobTrackingFinalizer will
// no longer be used as a Job annotation.
//
// Additionally, the apiserver and job controller use this string as a Job
// annotation, to mark Jobs that are being tracked using pod finalizers.
// However, this behavior is deprecated in kubernetes 1.26. This means that, in
// 1.27+, one release after JobTrackingWithFinalizers graduates to GA, the
// apiserver and job controller will ignore this annotation and they will
// always track jobs using finalizers.
const JobTrackingFinalizer = "batch.kubernetes.io/job-tracking"

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -405,9 +408,6 @@ type JobStatus struct {
// (3) Remove the pod UID from the array while increasing the corresponding
// counter.
//
// This field is beta-level. The job controller only makes use of this field
// when the feature gate JobTrackingWithFinalizers is enabled (enabled
// by default).
// Old jobs might not be tracked using this field, in which case the field
// remains null.
// +optional
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/job/indexed_job_utils.go
Expand Up @@ -53,7 +53,7 @@ type orderedIntervals []interval
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
var prevIntervals orderedIntervals
withFinalizers := trackingUncountedPods(job)
withFinalizers := hasJobTrackingAnnotation(job)
if withFinalizers {
prevIntervals = succeededIndexesFromJob(job)
}
Expand Down
15 changes: 5 additions & 10 deletions pkg/controller/job/indexed_job_utils_test.go
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -219,20 +215,19 @@ func TestCalculateSucceededIndexes(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizers)()
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
batch.JobTrackingFinalizer: "",
},
},
Status: batch.JobStatus{
CompletedIndexes: tc.prevSucceeded,
},
Spec: batch.JobSpec{
Completions: pointer.Int32Ptr(tc.completions),
},
}
if tc.trackingWithFinalizers {
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
}
pods := hollowPodsWithIndexPhase(tc.pods)
for _, p := range pods {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
Expand Down
27 changes: 2 additions & 25 deletions pkg/controller/job/job_controller.go
Expand Up @@ -714,17 +714,13 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr

var expectedRmFinalizers sets.String
var uncounted *uncountedTerminatedPods
if trackingUncountedPods(&job) {
if hasJobTrackingAnnotation(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(ctx, &job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
}
}

// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
Expand Down Expand Up @@ -1476,7 +1472,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate)
}
if trackingUncountedPods(job) {
if hasJobTrackingAnnotation(job) {
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
}

Expand Down Expand Up @@ -1635,10 +1631,6 @@ func getCompletionMode(job *batch.Job) string {
return string(batch.NonIndexedCompletion)
}

func trackingUncountedPods(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
}

func hasJobTrackingAnnotation(job *batch.Job) bool {
if job.Annotations == nil {
return false
Expand Down Expand Up @@ -1669,21 +1661,6 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
return patchBytes
}

func removeTrackingAnnotationPatch(job *batch.Job) []byte {
if !hasJobTrackingAnnotation(job) {
return nil
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
batch.JobTrackingFinalizer: nil,
},
},
}
patchBytes, _ := json.Marshal(patch)
return patchBytes
}

type uncountedTerminatedPods struct {
succeeded sets.String
failed sets.String
Expand Down
64 changes: 11 additions & 53 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -135,7 +135,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
for i := 0; i < count; i++ {
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
newPod.Status = v1.PodStatus{Phase: status}
if trackingUncountedPods(job) {
if hasJobTrackingAnnotation(job) {
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
}
pods = append(pods, newPod)
Expand Down Expand Up @@ -178,7 +178,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
}
p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
}
if trackingUncountedPods(job) {
if hasJobTrackingAnnotation(job) {
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
podIndexer.Add(p)
Expand Down Expand Up @@ -343,15 +343,15 @@ func TestControllerSyncJob(t *testing.T) {
parallelism: 2,
completions: 5,
backoffLimit: 6,
podControllerError: fmt.Errorf("fake error"),
jobKeyForget: true,
activePods: 1,
succeededPods: 1,
failedPods: 1,
expectedCreations: 1,
expectedActive: 1,
expectedActive: 2,
expectedSucceeded: 1,
expectedFailed: 1,
expectedPodPatches: 2,
},
"new failed pod": {
parallelism: 2,
Expand Down Expand Up @@ -728,7 +728,6 @@ func TestControllerSyncJob(t *testing.T) {
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
}
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()

// job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
Expand Down Expand Up @@ -918,13 +917,12 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
}

// TestSyncJobLegacyTracking makes sure that a Job is only tracked with
// finalizers only when the feature is enabled and the job has the finalizer.
// finalizers when the job has the annotation.
func TestSyncJobLegacyTracking(t *testing.T) {
cases := map[string]struct {
job batch.Job
trackingWithFinalizersEnabled bool
wantUncounted bool
wantPatches int
job batch.Job
wantUncounted bool
wantPatches int
}{
"no annotation": {
job: batch.Job{
Expand All @@ -937,19 +935,7 @@ func TestSyncJobLegacyTracking(t *testing.T) {
},
},
},
"no annotation, feature enabled": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32Ptr(1),
},
},
trackingWithFinalizersEnabled: true,
},
"tracking annotation, feature disabled": {
"tracking annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Expand All @@ -962,26 +948,9 @@ func TestSyncJobLegacyTracking(t *testing.T) {
Parallelism: pointer.Int32Ptr(1),
},
},
// Finalizer removed.
wantPatches: 1,
wantUncounted: true,
},
"tracking annotation, feature enabled": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "ns",
Annotations: map[string]string{
batch.JobTrackingFinalizer: "",
},
},
Spec: batch.JobSpec{
Parallelism: pointer.Int32Ptr(1),
},
},
trackingWithFinalizersEnabled: true,
wantUncounted: true,
},
"different annotation, feature enabled": {
"different annotation": {
job: batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Expand All @@ -994,13 +963,10 @@ func TestSyncJobLegacyTracking(t *testing.T) {
Parallelism: pointer.Int32Ptr(1),
},
},
trackingWithFinalizersEnabled: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizersEnabled)()

// Job manager setup.
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
Expand Down Expand Up @@ -2909,7 +2875,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
for name, tc := range testCases {
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
Expand Down Expand Up @@ -2992,15 +2957,9 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
wantRequeue: true,
},
"conflict error, with finalizers": {
withFinalizers: true,
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
wantRequeue: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.withFinalizers)()
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
Expand Down Expand Up @@ -4204,7 +4163,6 @@ func TestEnsureJobConditions(t *testing.T) {
}

func TestFinalizersRemovedExpectations(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
Expand Down
3 changes: 2 additions & 1 deletion pkg/features/kube_features.go
Expand Up @@ -448,6 +448,7 @@ const (
// owner: @alculquicondor
// alpha: v1.22
// beta: v1.23
// stable: v1.26
//
// Track Job completion without relying on Pod remaining in the cluster
// indefinitely. Pod finalizers, in addition to a field in the Job status
Expand Down Expand Up @@ -949,7 +950,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

JobReadyPods: {Default: true, PreRelease: featuregate.Beta},

JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.Beta},
JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28

KubeletCredentialProviders: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28

Expand Down
2 changes: 1 addition & 1 deletion pkg/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 12 additions & 16 deletions pkg/registry/batch/job/strategy.go
Expand Up @@ -93,13 +93,10 @@ func (jobStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {

job.Generation = 1

if utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) {
// Until this feature graduates to GA and soaks in clusters, we use an
// annotation to mark whether jobs are tracked with it.
addJobTrackingAnnotation(job)
} else {
dropJobTrackingAnnotation(job)
}
// While legacy tracking is supported, we use an annotation to mark whether
// jobs are tracked with finalizers.
addJobTrackingAnnotation(job)

if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
job.Spec.PodFailurePolicy = nil
}
Expand All @@ -122,20 +119,12 @@ func hasJobTrackingAnnotation(job *batch.Job) bool {
return ok
}

func dropJobTrackingAnnotation(job *batch.Job) {
delete(job.Annotations, batchv1.JobTrackingFinalizer)
}

// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
func (jobStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
newJob := obj.(*batch.Job)
oldJob := old.(*batch.Job)
newJob.Status = oldJob.Status

if !utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && !hasJobTrackingAnnotation(oldJob) {
dropJobTrackingAnnotation(newJob)
liggitt marked this conversation as resolved.
Show resolved Hide resolved
}
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved

if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && oldJob.Spec.PodFailurePolicy == nil {
newJob.Spec.PodFailurePolicy = nil
}
Expand All @@ -147,6 +136,13 @@ func (jobStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object
if !apiequality.Semantic.DeepEqual(newJob.Spec, oldJob.Spec) {
newJob.Generation = oldJob.Generation + 1
}

// While legacy tracking is supported, we use an annotation to mark whether
liggitt marked this conversation as resolved.
Show resolved Hide resolved
// jobs are tracked with finalizers. This annotation cannot be removed by
// users.
if hasJobTrackingAnnotation(oldJob) {
addJobTrackingAnnotation(newJob)
}
}

// Validate validates a new job.
Expand All @@ -170,7 +166,7 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) validation.JobValidation
}
opts := validation.JobValidationOptions{
PodValidationOptions: pod.GetValidationOptionsFromPodTemplate(newPodTemplate, oldPodTemplate),
AllowTrackingAnnotation: utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers),
AllowTrackingAnnotation: true,
}
if oldJob != nil {
// Because we don't support the tracking with finalizers for already
Expand Down