Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Roll-forward: Beta requirements for JobTrackingWithFinalizers #105197

Merged
merged 5 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/controller/job/indexed_job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) {
func (bci byCompletionIndex) Len() int {
return len(bci)
}

func completionModeStr(job *batch.Job) string {
if job.Spec.CompletionMode != nil {
return string(*job.Spec.CompletionMode)
}
return string(batch.NonIndexedCompletion)
}
184 changes: 127 additions & 57 deletions pkg/controller/job/job_controller.go

Large diffs are not rendered by default.

250 changes: 213 additions & 37 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
metricstestutil "k8s.io/component-base/metrics/testutil"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -114,6 +116,7 @@ func newPod(name string, job *batch.Job) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
Labels: job.Spec.Selector.MatchLabels,
Namespace: job.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
Expand Down Expand Up @@ -1001,10 +1004,11 @@ func TestSyncJobLegacyTracking(t *testing.T) {

func TestGetStatus(t *testing.T) {
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
wantSucceeded int32
wantFailed int32
job batch.Job
pods []*v1.Pod
expectedRmFinalizers sets.String
wantSucceeded int32
wantFailed int32
}{
"without finalizers": {
job: batch.Job{
Expand Down Expand Up @@ -1066,6 +1070,30 @@ func TestGetStatus(t *testing.T) {
wantSucceeded: 4,
wantFailed: 4,
},
"with expected removed finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 2,
Failed: 2,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a"},
Failed: []types.UID{"d"},
},
},
},
expectedRmFinalizers: sets.NewString("b", "f"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).Pod,
buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod,
},
wantSucceeded: 4,
wantFailed: 5,
},
"deleted pods": {
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod,
Expand Down Expand Up @@ -1102,7 +1130,7 @@ func TestGetStatus(t *testing.T) {
if tc.job.Status.UncountedTerminatedPods != nil {
uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
}
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted)
succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers)
if succeeded != tc.wantSucceeded {
t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
}
Expand All @@ -1119,15 +1147,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
indexedCompletion := batch.IndexedCompletion
mockErr := errors.New("mock error")
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
finishedCond *batch.JobCondition
needsFlush bool
statusUpdateErr error
podControlErr error
wantErr error
wantRmFinalizers int
wantStatusUpdates []batch.JobStatus
job batch.Job
pods []*v1.Pod
finishedCond *batch.JobCondition
expectedRmFinalizers sets.String
needsFlush bool
statusUpdateErr error
podControlErr error
wantErr error
wantRmFinalizers int
wantStatusUpdates []batch.JobStatus
}{
"no updates": {},
"new active": {
Expand Down Expand Up @@ -1209,6 +1238,45 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
},
"expecting removed finalizers": {
job: batch.Job{
Status: batch.JobStatus{
Succeeded: 2,
Failed: 3,
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "g"},
Failed: []types.UID{"b", "h"},
},
},
},
expectedRmFinalizers: sets.NewString("c", "d", "g", "h"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod,
},
wantRmFinalizers: 4,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "e"},
Failed: []types.UID{"b", "f"},
},
Succeeded: 3,
Failed: 4,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 5,
Failed: 6,
},
},
},
"succeeding job": {
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
Expand Down Expand Up @@ -1462,7 +1530,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods
}(),
wantRmFinalizers: 501,
wantRmFinalizers: 499,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Expand All @@ -1479,17 +1547,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"499"},
Failed: []types.UID{"b"},
Failed: []types.UID{"b"},
},
Succeeded: 499,
Failed: 1,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
},
},
"too many indexed finished": {
Expand All @@ -1506,18 +1568,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}
return pods
}(),
wantRmFinalizers: 501,
wantRmFinalizers: 500,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499",
Succeeded: 500,
},
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501,
},
},
},
}
Expand All @@ -1526,19 +1583,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
metrics.JobPodsFinished.Reset()
manager.podControl = &fakePodControl
var statusUpdates []batch.JobStatus
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) {
statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
return job, tc.statusUpdateErr
}

if tc.job.Status.UncountedTerminatedPods == nil {
tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
job := tc.job.DeepCopy()
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
succeededIndexes := succeededIndexesFromJob(&tc.job)
err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush)
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
succeededIndexes := succeededIndexesFromJob(job)
err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush)
if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr)
}
Expand All @@ -1549,6 +1607,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if rmFinalizers != tc.wantRmFinalizers {
t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers)
}
if tc.wantErr == nil {
completionMode := completionModeStr(job)
v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded))
if err != nil {
t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err)
}
newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded
if float64(newSucceeded) != v {
t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded)
}
v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed))
if err != nil {
t.Fatalf("Obtaining failed job_pods_finished_total: %v", err)
}
newFailed := job.Status.Failed - tc.job.Status.Failed
if float64(newFailed) != v {
t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed)
}
}
})
}
}
Expand Down Expand Up @@ -2274,7 +2351,7 @@ func TestDeletePod(t *testing.T) {
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)

jm.deletePod(pod1)
jm.deletePod(pod1, true)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
Expand All @@ -2287,7 +2364,7 @@ func TestDeletePod(t *testing.T) {
t.Errorf("queue.Get() = %v, want %v", got, want)
}

jm.deletePod(pod2)
jm.deletePod(pod2, true)
if got, want := jm.queue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
Expand Down Expand Up @@ -2322,7 +2399,7 @@ func TestDeletePodOrphan(t *testing.T) {
pod1.OwnerReferences = nil
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)

jm.deletePod(pod1)
jm.deletePod(pod1, true)
if got, want := jm.queue.Len(), 0; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
Expand Down Expand Up @@ -2966,6 +3043,105 @@ func TestEnsureJobConditions(t *testing.T) {
}
}

func TestFinalizersRemovedExpectations(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
clientset := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")}
manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) {
return job, nil
}

job := newJob(2, 2, 6, batch.NonIndexedCompletion)
job.Annotations = map[string]string{
batch.JobTrackingFinalizer: "",
}
sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer()
podIndexer := podInformer.GetIndexer()
uids := sets.NewString()
for i := range pods {
clientset.Tracker().Add(&pods[i])
podIndexer.Add(&pods[i])
uids.Insert(string(pods[i].UID))
}
jobKey := testutil.GetKey(job, t)

manager.syncJob(jobKey)
gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey)
if len(gotExpectedUIDs) != 0 {
t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List())
}

// Remove failures and re-sync.
manager.podControl.(*controller.FakePodControl).Err = nil
manager.syncJob(jobKey)
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff)
}

stopCh := make(chan struct{})
defer close(stopCh)
go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
cache.WaitForCacheSync(stopCh, podInformer.HasSynced)

// Make sure the first syncJob sets the expectations, even after the caches synced.
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff)
}

// Change pods in different ways.

podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"}

update := pods[0].DeepCopy()
update.Finalizers = nil
update.ResourceVersion = "1"
err := clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Removing finalizer: %v", err)
}

update = pods[1].DeepCopy()
update.Finalizers = nil
update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
update.ResourceVersion = "1"
err = clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Removing finalizer and setting deletion timestamp: %v", err)
}

// Preserve the finalizer.
update = pods[2].DeepCopy()
update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
update.ResourceVersion = "1"
err = clientset.Tracker().Update(podsResource, update, update.Namespace)
if err != nil {
t.Errorf("Setting deletion timestamp: %v", err)
}

err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name)
if err != nil {
t.Errorf("Deleting pod that had finalizer: %v", err)
}

uids = sets.NewString(string(pods[2].UID))
var diff string
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
diff = cmp.Diff(uids, gotExpectedUIDs)
return diff == "", nil
}); err != nil {
t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff)
}
}

func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
t.Helper()
want := []v1.EnvVar{
Expand Down