Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #58972: Fix job's backoff limit for restart policy OnFailure #63650: Never clean backoff in job controller #64813

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 53 additions & 8 deletions pkg/controller/job/job_controller.go
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/golang/glog"
)

const statusUpdateRetries = 3

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")

Expand Down Expand Up @@ -357,10 +359,10 @@ func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
return
}

if immediate {
jm.queue.Forget(key)
backoff := time.Duration(0)
if !immediate {
backoff = getBackoff(jm.queue, key)
}
backoff := getBackoff(jm.queue, key)

// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
Expand Down Expand Up @@ -496,12 +498,18 @@ func (jm *JobController) syncJob(key string) (bool, error) {
var failureMessage string

jobHaveNewFailure := failed > job.Status.Failed

// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) {
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&
(int32(previousRetry)+1 > *job.Spec.BackoffLimit)

if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reach the specified backoff limit"
failureMessage = "Job has reached the specified backoff limit"
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
Expand Down Expand Up @@ -615,6 +623,30 @@ func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh cha
wait.Wait()
}

// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure
func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
return false
}
result := int32(0)
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
return result >= *job.Spec.BackoffLimit
}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
Expand Down Expand Up @@ -788,7 +820,20 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
}

func (jm *JobController) updateJobStatus(job *batch.Job) error {
_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(job)
jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 {
var newJob *batch.Job
newJob, err = jobClient.Get(job.Name, metav1.GetOptions{})
if err != nil {
break
}
newJob.Status = job.Status
if _, err = jobClient.UpdateStatus(newJob); err == nil {
break
}
}

return err
}

Expand Down
95 changes: 92 additions & 3 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -269,7 +269,7 @@ func TestControllerSyncJob(t *testing.T) {
nil, true, 0, 0, 0, 0,
10, 0, 10, 0, 0, nil, "",
},
"to many job sync failure": {
"too many job failures": {
2, 5, 0, true, 0,
nil, true, 0, 0, 0, 1,
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
Expand Down Expand Up @@ -412,8 +412,8 @@ func TestSyncJobPastDeadline(t *testing.T) {
},
"activeDeadlineSeconds with backofflimit reach": {
1, 1, 1, 10, 0,
1, 0, 2,
true, 1, 0, 0, 3, "BackoffLimitExceeded",
0, 0, 1,
true, 0, 0, 0, 1, "BackoffLimitExceeded",
},
}

Expand Down Expand Up @@ -1406,3 +1406,92 @@ func TestJobBackoff(t *testing.T) {
})
}
}

func TestJobBackoffForOnFailure(t *testing.T) {
jobConditionFailed := batch.JobFailed

testCases := map[string]struct {
// job setup
parallelism int32
completions int32
backoffLimit int32

// pod setup
jobKeyForget bool
restartCounts []int32

// expectations
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedCondition *batch.JobConditionType
expectedConditionReason string
}{
"too many job failures - single pod": {
1, 5, 2,
true, []int32{2},
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures - multiple pods": {
2, 5, 2,
true, []int32{1, 1},
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures": {
2, 5, 3,
true, []int32{1, 1},
2, 0, 0, nil, "",
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error {
actual = job
return nil
}

// job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
for i, pod := range newPodList(int32(len(tc.restartCounts)), v1.PodRunning, job) {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
podIndexer.Add(&pod)
}

// run
forget, err := manager.syncJob(getKey(job, t))

if err != nil {
t.Errorf("unexpected error syncing job. Got %#v", err)
}
if forget != tc.jobKeyForget {
t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
}
if actual.Status.Succeeded != tc.expectedSucceeded {
t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
}
if actual.Status.Failed != tc.expectedFailed {
t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
}
// validate conditions
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
}
})
}
}
18 changes: 13 additions & 5 deletions test/e2e/apps/job.go
Expand Up @@ -179,19 +179,27 @@ var _ = SIGDescribe("Job", func() {

It("should exceed backoffLimit", func() {
By("Creating a job")
job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, 0)
backoff := 1
job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job exceed backofflimit")

err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, framework.JobTimeout, "BackoffLimitExceeded")
Expect(err).NotTo(HaveOccurred())

By("Checking that only one pod created and status is failed")
By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
pod := pods.Items[0]
Expect(pod.Status.Phase).To(Equal(v1.PodFailed))
// Expect(pods.Items).To(HaveLen(backoff + 1))
// due to NumRequeus not being stable enough, especially with failed status
// updates we need to allow more than backoff+1
// TODO revert this back to above when https://github.com/kubernetes/kubernetes/issues/64787 gets fixed
if len(pods.Items) < backoff+1 {
framework.Failf("Not enough pod created expected at least %d, got %#v", backoff+1, pods.Items)
}
for _, pod := range pods.Items {
Expect(pod.Status.Phase).To(Equal(v1.PodFailed))
}
})
})