Skip to content

Commit

Permalink
Merge pull request kubernetes#118759 from mimowo/dont-apibackoff-on-p…
Browse files Browse the repository at this point in the history
…od-failures

Do not bump API requests backoff in the Job controller due to pod failures
  • Loading branch information
k8s-ci-robot committed Jun 20, 2023
2 parents 2d60430 + 784a309 commit 2651e70
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 123 deletions.
6 changes: 0 additions & 6 deletions pkg/controller/job/job_controller.go
Expand Up @@ -905,12 +905,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
return fmt.Errorf("tracking status: %w", err)
}

jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return fmt.Errorf("failed pod(s) detected for job key %q", key)
}

return manageJobErr
}

Expand Down
161 changes: 44 additions & 117 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -854,10 +854,6 @@ func TestControllerSyncJob(t *testing.T) {
if err == nil {
t.Error("Syncing jobs expected to return error on podControl exception")
}
} else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
}
} else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
if err == nil {
t.Error("Syncing jobs expected to return error when reached the podControl limit")
Expand Down Expand Up @@ -1704,7 +1700,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
failedPods int

// expectations
expectedForGetKey bool
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
Expand All @@ -1719,7 +1714,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
startTime: 15,
backoffLimit: 6,
activePods: 1,
expectedForGetKey: false,
expectedDeletions: 1,
expectedFailed: 1,
expectedCondition: batch.JobFailed,
Expand All @@ -1733,7 +1727,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
backoffLimit: 6,
activePods: 1,
succeededPods: 1,
expectedForGetKey: true,
expectedDeletions: 1,
expectedSucceeded: 1,
expectedFailed: 1,
Expand All @@ -1746,7 +1739,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 10,
startTime: 10,
backoffLimit: 6,
expectedForGetKey: false,
expectedCondition: batch.JobFailed,
expectedConditionReason: "DeadlineExceeded",
},
Expand All @@ -1756,7 +1748,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 1,
startTime: 10,
failedPods: 1,
expectedForGetKey: false,
expectedFailed: 1,
expectedCondition: batch.JobFailed,
expectedConditionReason: "BackoffLimitExceeded",
Expand All @@ -1768,7 +1759,6 @@ func TestSyncJobPastDeadline(t *testing.T) {
activeDeadlineSeconds: 10,
startTime: 15,
backoffLimit: 6,
expectedForGetKey: true,
expectedCondition: batch.JobSuspended,
expectedConditionReason: "JobSuspended",
},
Expand Down Expand Up @@ -3898,80 +3888,38 @@ func bumpResourceVersion(obj metav1.Object) {
obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
}

type pods struct {
pending int
active int
succeed int
failed int
}

func TestJobBackoffReset(t *testing.T) {
func TestJobApiBackoffReset(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
testCases := map[string]struct {
// job setup
parallelism int32
completions int32
backoffLimit int32

// pod setup - each row is additive!
pods []pods
}{
"parallelism=1": {
1, 2, 1,
[]pods{
{0, 1, 0, 1},
{0, 0, 1, 0},
},
},
"parallelism=2 (just failure)": {
2, 2, 1,
[]pods{
{0, 2, 0, 1},
{0, 0, 1, 0},
},
},
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
return job, nil
}

for name, tc := range testCases {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}
job := newJob(1, 1, 2, batch.NonIndexedCompletion)
key := testutil.GetKey(job, t)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)

// job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
key := testutil.GetKey(job, t)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()

setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0)
manager.queue.Add(key)
manager.processNextWorkItem(context.TODO())
retries := manager.queue.NumRequeues(key)
if retries != 1 {
t.Errorf("%s: expected exactly 1 retry, got %d", name, retries)
}
// error returned make the key requeued
fakePodControl.Err = errors.New("Controller error")
manager.queue.Add(key)
manager.processNextWorkItem(context.TODO())
retries := manager.queue.NumRequeues(key)
if retries != 1 {
t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
}

job = actual
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion)
setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0)
manager.processNextWorkItem(context.TODO())
retries = manager.queue.NumRequeues(key)
if retries != 0 {
t.Errorf("%s: expected exactly 0 retries, got %d", name, retries)
}
if getCondition(actual, batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded") {
t.Errorf("%s: unexpected job failure", name)
}
// the queue is emptied on success
fakePodControl.Err = nil
manager.processNextWorkItem(context.TODO())
retries = manager.queue.NumRequeues(key)
if retries != 0 {
t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries)
}
}

Expand Down Expand Up @@ -4066,7 +4014,6 @@ func TestJobBackoffForOnFailure(t *testing.T) {
suspend bool

// pod setup
jobKeyForget bool
restartCounts []int32
podPhase v1.PodPhase

Expand All @@ -4078,57 +4025,57 @@ func TestJobBackoffForOnFailure(t *testing.T) {
expectedConditionReason string
}{
"backoffLimit 0 should have 1 pod active": {
1, 1, 0, false,
1, 1, 0,
false, []int32{0}, v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{0}, v1.PodRunning,
1, 0, 0, nil, "",
},
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{1}, v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
1, 1, 1, false,
1, 1, 1,
false, []int32{1}, v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - single pod": {
1, 5, 2, false,
1, 5, 2,
false, []int32{2}, v1.PodRunning,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - single pod": {
1, 5, 2, false,
1, 5, 2,
false, []int32{2}, v1.PodPending,
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podRunning - multiple pods": {
2, 5, 2, false,
2, 5, 2,
false, []int32{1, 1}, v1.PodRunning,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures with podPending - multiple pods": {
2, 5, 2, false,
2, 5, 2,
false, []int32{1, 1}, v1.PodPending,
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures": {
2, 5, 3, false,
2, 5, 3,
false, []int32{1, 1}, v1.PodRunning,
2, 0, 0, nil, "",
},
"suspending a job": {
2, 4, 6, true,
2, 4, 6,
true, []int32{1, 1}, v1.PodRunning,
0, 0, 0, &jobConditionSuspended, "JobSuspended",
},
"finshed job": {
2, 4, 6, true,
2, 4, 6,
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
0, 4, 0, &jobConditionComplete, "",
},
Expand Down Expand Up @@ -4200,8 +4147,6 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
failedPods int

// expectations
isExpectingAnError bool
jobKeyForget bool
expectedActive int32
expectedSucceeded int32
expectedFailed int32
Expand All @@ -4211,27 +4156,27 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
"not enough failures with backoffLimit 0 - single pod": {
1, 1, 0,
v1.PodRunning, 1, 0,
false, false, 1, 0, 0, nil, "",
1, 0, 0, nil, "",
},
"not enough failures with backoffLimit 1 - single pod": {
1, 1, 1,
"", 0, 1,
true, false, 1, 0, 1, nil, "",
1, 0, 1, nil, "",
},
"too many failures with backoffLimit 1 - single pod": {
1, 1, 1,
"", 0, 2,
false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures with backoffLimit 6 - multiple pods": {
2, 2, 6,
v1.PodRunning, 1, 6,
true, false, 2, 0, 6, nil, "",
2, 0, 6, nil, "",
},
"too many failures with backoffLimit 6 - multiple pods": {
2, 2, 6,
"", 0, 7,
false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
},
}

Expand Down Expand Up @@ -4267,9 +4212,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {

// run
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))

if (err != nil) != tc.isExpectingAnError {
t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError)
if err != nil {
t.Fatalf("unexpected error syncing job: %#v\n", err)
}
// validate status
if actual.Status.Active != tc.expectedActive {
Expand Down Expand Up @@ -4490,23 +4434,6 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
}
}

// hasValidFailingPods checks if there exists failed pods with valid index.
func hasValidFailingPods(status []indexPhase, completions int) bool {
for _, s := range status {
ix, err := strconv.Atoi(s.Index)
if err != nil {
continue
}
if ix < 0 || ix >= completions {
continue
}
if s.Phase == v1.PodFailed {
return true
}
}
return false
}

type podBuilder struct {
*v1.Pod
}
Expand Down

0 comments on commit 2651e70

Please sign in to comment.