Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not bump API requests backoff in the Job controller due to pod failures #118759

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/controller/job/job_controller.go
Expand Up @@ -905,12 +905,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
return fmt.Errorf("tracking status: %w", err)
}

jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return fmt.Errorf("failed pod(s) detected for job key %q", key)
}

return manageJobErr
}

Expand Down
161 changes: 44 additions & 117 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -854,10 +854,6 @@ func TestControllerSyncJob(t *testing.T) {
if err == nil {
t.Error("Syncing jobs expected to return error on podControl exception")
}
} else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
}
} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
if err == nil {
t.Error("Syncing jobs expected to return error when reached the podControl limit")
Expand Down Expand Up @@ -1704,7 +1700,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
failedPods int

// expectations
expectedForGetKey bool
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
Expand All @@ -1719,7 +1714,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
startTime: 15,
backoffLimit: 6,
activePods: 1,
expectedForGetKey: false,
expectedDeletions: 1,
expectedFailed: 1,
expectedCondition: batch.JobFailed,
Expand All @@ -1733,7 +1727,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
backoffLimit: 6,
activePods: 1,
succeededPods: 1,
expectedForGetKey: true,
expectedDeletions: 1,
expectedSucceeded: 1,
expectedFailed: 1,
Expand All @@ -1746,7 +1739,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 10,
startTime: 10,
backoffLimit: 6,
expectedForGetKey: false,
expectedCondition: batch.JobFailed,
expectedConditionReason: "DeadlineExceeded",
},
Expand All @@ -1756,7 +1748,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 1,
startTime: 10,
failedPods: 1,
expectedForGetKey: false,
expectedFailed: 1,
expectedCondition: batch.JobFailed,
expectedConditionReason: "BackoffLimitExceeded",
Expand All @@ -1768,7 +1759,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 10,
startTime: 15,
backoffLimit: 6,
expectedForGetKey: true,
expectedCondition: batch.JobSuspended,
expectedConditionReason: "JobSuspended",
},
Expand Down Expand Up @@ -3898,80 +3888,38 @@ func bumpResourceVersion(obj metav1.Object) {
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
}

type pods struct {
pending int
active int
succeed int
failed int
}

func TestJobBackoffReset(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you summarize why this test is changing so drastically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test checks that the rate-limiter backoff in the syncJob queue is reset after a successful execution. For this reason it used to add an item with AddRateLimited based on the error due to pod failures, then it checked that the queue is emptied after a succeeded pod (esentially that the Forget is called).

However, now pod failures don't enqueue in the rate limiter. Still, the queue is empties after a successful syncJob, so it seems to make sense to preserve the test that the rate-limiter is getting emptied (Forget getting called).

Additionally, the test used to do it in two variants (with parallelism=1 and parallelism=2), I don't think it matters for the current scenario.

func TestJobApiBackoffReset(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testCases := map[string]struct {
// job setup
parallelism int32
completions int32
backoffLimit int32

// pod setup - each row is additive!
pods []pods
}{
"parallelism=1": {
1, 2, 1,
[]pods{
{0, 1, 0, 1},
{0, 0, 1, 0},
},
},
"parallelism=2 (just failure)": {
2, 2, 1,
[]pods{
{0, 2, 0, 1},
{0, 0, 1, 0},
},
},
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, nil
}

for name, tc := range testCases {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
job := newJob(1, 1, 2, batch.NonIndexedCompletion)
key := testutil.GetKey(job, t)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)

// job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
key := testutil.GetKey(job, t)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()

setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
manager.queue.Add(key)
manager.processNextWorkItem(context.TODO())
retries := manager.queue.NumRequeues(key)
if retries != 1 {
t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
}
// error returned make the key requeued
fakePodControl.Err = errors.New("Controller error")
manager.queue.Add(key)
manager.processNextWorkItem(context.TODO())
retries := manager.queue.NumRequeues(key)
if retries != 1 {
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
}

job = actual
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
manager.processNextWorkItem(context.TODO())
retries = manager.queue.NumRequeues(key)
if retries != 0 {
t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
}
if getCondition(actual, batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded") {
t.Errorf("%s: unexpected job failure", name)
}
// the queue is emptied on success
fakePodControl.Err = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a different test case instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no, this is part of the scenario. First an element was put into the queue (due to error), and here we are going to empty the queue by success (so the Forget is called).

manager.processNextWorkItem(context.TODO())
retries = manager.queue.NumRequeues(key)
if retries != 0 {
t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries)
}
}

Expand Down Expand Up @@ -4066,7 +4014,6 @@ func TestJobBackoffForOnFailure(t *testing.T) {
suspend bool

// pod setup
jobKeyForget bool
restartCounts []int32
podPhase v1.PodPhase

Expand All @@ -4078,57 +4025,57 @@ func TestJobBackoffForOnFailure(t *testing.T) {
expectedConditionReason string
}{
"backoffLimit 0 should have 1 pod active": {
1, 1, 0, false,
1, 1, 0,
false, []int32{0}, v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{0}, v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{1}, v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{1}, v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - single pod": {
1, 5, 2, false,
1, 5, 2,
false, []int32{2}, v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - single pod": {
1, 5, 2, false,
1, 5, 2,
false, []int32{2}, v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - multiple pods": {
2, 5, 2, false,
2, 5, 2,
false, []int32{1, 1}, v1.PodRunning,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - multiple pods": {
2, 5, 2, false,
2, 5, 2,
false, []int32{1, 1}, v1.PodPending,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures": {
2, 5, 3, false,
2, 5, 3,
false, []int32{1, 1}, v1.PodRunning,
2, 0, 0, nil, "",
},
"suspending a job": {
2, 4, 6, true,
2, 4, 6,
true, []int32{1, 1}, v1.PodRunning,
0, 0, 0, &jobConditionSuspended, "JobSuspended",
},
"finshed job": {
2, 4, 6, true,
2, 4, 6,
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
0, 4, 0, &jobConditionComplete, "",
},
Expand Down Expand Up @@ -4200,8 +4147,6 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
failedPods int

// expectations
isExpectingAnError bool
jobKeyForget bool
expectedActive int32
expectedSucceeded int32
expectedFailed int32
Expand All @@ -4211,27 +4156,27 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
"not enough failures with backoffLimit 0 - single pod": {
1, 1, 0,
v1.PodRunning, 1, 0,
false, false, 1, 0, 0, nil, "",
1, 0, 0, nil, "",
},
"not enough failures with backoffLimit 1 - single pod": {
1, 1, 1,
"", 0, 1,
true, false, 1, 0, 1, nil, "",
1, 0, 1, nil, "",
},
"too many failures with backoffLimit 1 - single pod": {
1, 1, 1,
"", 0, 2,
false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures with backoffLimit 6 - multiple pods": {
2, 2, 6,
v1.PodRunning, 1, 6,
true, false, 2, 0, 6, nil, "",
2, 0, 6, nil, "",
},
"too many failures with backoffLimit 6 - multiple pods": {
2, 2, 6,
"", 0, 7,
false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
},
}

Expand Down Expand Up @@ -4267,9 +4212,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {

// run
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))

if (err != nil) != tc.isExpectingAnError {
t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
if err != nil {
t.Fatalf("unexpected error syncing job: %#v\n", err)
}
// validate status
if actual.Status.Active != tc.expectedActive {
Expand Down Expand Up @@ -4490,23 +4434,6 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
}
}

// hasValidFailingPods checks if there exists failed pods with valid index.
func hasValidFailingPods(status []indexPhase, completions int) bool {
for _, s := range status {
ix, err := strconv.Atoi(s.Index)
if err != nil {
continue
}
if ix < 0 || ix >= completions {
continue
}
if s.Phase == v1.PodFailed {
return true
}
}
return false
}

type podBuilder struct {
*v1.Pod
}
Expand Down