diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index a280cb898d87..6e63f84b4783 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "math" "reflect" "sort" "sync" @@ -50,6 +51,13 @@ import ( // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") +const ( + // DefaultJobBackOff is the max backoff period, exported for the e2e test + DefaultJobBackOff = 10 * time.Second + // MaxJobBackOff is the max backoff period, exported for the e2e test + MaxJobBackOff = 360 * time.Second +) + type JobController struct { kubeClient clientset.Interface podControl controller.PodControlInterface @@ -96,7 +104,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } @@ -118,6 +126,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin jm.updateHandler = jm.updateJobStatus jm.syncHandler = jm.syncJob + return jm } @@ -312,7 +321,7 @@ func (jm *JobController) updateJob(old, cur interface{}) { if err != nil { return } - jm.queue.Add(key) + jm.enqueueController(curJob) // check if need to add a new rsync for ActiveDeadlineSeconds if curJob.Status.StartTime != nil { curADS := curJob.Spec.ActiveDeadlineSeconds @@ -333,20 +342,23 @@ func (jm *JobController) updateJob(old, cur interface{}) { } // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. -func (jm *JobController) enqueueController(obj interface{}) { - key, err := controller.KeyFunc(obj) +func (jm *JobController) enqueueController(job interface{}) { + key, err := controller.KeyFunc(job) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err)) return } + // Retrieves the backoff duration for this Job + backoff := getBackoff(jm.queue, key) + // TODO: Handle overlapping controllers better. Either disallow them at admission time or // deterministically avoid syncing controllers that fight over pods. Currently, we only // ensure that the same controller is synced for a given pod. When we periodically relist // all controllers there will still be some replica instability. One way to handle this is // by querying the store for all controllers that this rc overlaps, as well as all // controllers that overlap this rc, and sorting them. - jm.queue.Add(key) + jm.queue.AddAfter(key, backoff) } // worker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -432,6 +444,15 @@ func (jm *JobController) syncJob(key string) error { } job := *sharedJob + // if job was finished previously, we don't want to redo the termination + if IsJobFinished(&job) { + jm.queue.Forget(key) + return nil + } + + // retrieve the previous number of retry + previousRetry := jm.queue.NumRequeues(key) + // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters // the store after we've checked the expectation, the job sync is just deferred till the next relist. @@ -457,34 +478,28 @@ func (jm *JobController) syncJob(key string) error { jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } - // if job was finished previously, we don't want to redo the termination - if IsJobFinished(&job) { - return nil - } var manageJobErr error - if pastActiveDeadline(&job) { - // TODO: below code should be replaced with pod termination resulting in - // pod failures, rather than killing pods. Unfortunately none such solution - // exists ATM. There's an open discussion in the topic in - // https://github.com/kubernetes/kubernetes/issues/14602 which might give - // some sort of solution to above problem. - // kill remaining active pods - wait := sync.WaitGroup{} - errCh := make(chan error, int(active)) - wait.Add(int(active)) - for i := int32(0); i < active; i++ { - go func(ix int32) { - defer wait.Done() - if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil { - defer utilruntime.HandleError(err) - glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name) - errCh <- err - } - }(i) - } - wait.Wait() - + jobFailed := false + var failureReason string + var failureMessage string + + jobHaveNewFailure := failed > job.Status.Failed + + // check if the number of failed jobs increased since the last syncJob + if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) { + jobFailed = true + failureReason = "BackoffLimitExceeded" + failureMessage = "Job has reach the specified backoff limit" + } else if pastActiveDeadline(&job) { + jobFailed = true + failureReason = "DeadlineExceeded" + failureMessage = "Job was active longer than specified deadline" + } + + if jobFailed { + errCh := make(chan error, active) + jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { @@ -496,8 +511,8 @@ func (jm *JobController) syncJob(key string) error { // update status values accordingly failed += active active = 0 - job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) - jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline") + job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) + jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) @@ -546,9 +561,41 @@ func (jm *JobController) syncJob(key string) error { return err } } + + if jobHaveNewFailure { + // re-enqueue Job after the backoff period + jm.queue.AddRateLimited(key) + } else { + // if no new Failure the job backoff period can be reset + jm.queue.Forget(key) + } + return manageJobErr } +func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { + // TODO: below code should be replaced with pod termination resulting in + // pod failures, rather than killing pods. Unfortunately none such solution + // exists ATM. There's an open discussion in the topic in + // https://github.com/kubernetes/kubernetes/issues/14602 which might give + // some sort of solution to above problem. + // kill remaining active pods + wait := sync.WaitGroup{} + nbPods := len(pods) + wait.Add(nbPods) + for i := int32(0); i < int32(nbPods); i++ { + go func(ix int32) { + defer wait.Done() + if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil { + defer utilruntime.HandleError(err) + glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name) + errCh <- err + } + }(i) + } + wait.Wait() +} + // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. func pastActiveDeadline(job *batch.Job) bool { if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { @@ -726,6 +773,26 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error { return err } +func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration { + exp := queue.NumRequeues(key) + + if exp <= 0 { + return time.Duration(0) + } + + // The backoff is capped such that 'calculated' value never overflows. + backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1)) + if backoff > math.MaxInt64 { + return MaxJobBackOff + } + + calculated := time.Duration(backoff) + if calculated > MaxJobBackOff { + return MaxJobBackOff + } + return calculated +} + // filterPods returns pods based on their phase. func filterPods(pods []*v1.Pod, phase v1.PodPhase) int { result := 0 diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index cb7da57e2cea..3b613426d034 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -43,7 +43,7 @@ import ( var alwaysReady = func() bool { return true } -func newJob(parallelism, completions int32) *batch.Job { +func newJob(parallelism, completions, backoffLimit int32) *batch.Job { j := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "foobar", @@ -80,6 +80,8 @@ func newJob(parallelism, completions int32) *batch.Job { } else { j.Spec.Parallelism = nil } + j.Spec.BackoffLimit = &backoffLimit + return j } @@ -119,12 +121,16 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { } func TestControllerSyncJob(t *testing.T) { + jobConditionComplete := batch.JobComplete + jobConditionFailed := batch.JobFailed + testCases := map[string]struct { // job setup - parallelism int32 - completions int32 - deleting bool - podLimit int + parallelism int32 + completions int32 + backoffLimit int32 + deleting bool + podLimit int // pod setup podControllerError error @@ -134,107 +140,113 @@ func TestControllerSyncJob(t *testing.T) { failedPods int32 // expectations - expectedCreations int32 - expectedDeletions int32 - expectedActive int32 - expectedSucceeded int32 - expectedFailed int32 - expectedComplete bool + expectedCreations int32 + expectedDeletions int32 + expectedActive int32 + expectedSucceeded int32 + expectedFailed int32 + expectedCondition *batch.JobConditionType + expectedConditionReason string }{ "job start": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 0, 0, 0, - 2, 0, 2, 0, 0, false, + 2, 0, 2, 0, 0, nil, "", }, "WQ job start": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 0, 0, 0, - 2, 0, 2, 0, 0, false, + 2, 0, 2, 0, 0, nil, "", }, "pending pods": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 2, 0, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "correct # of pods": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 2, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "WQ job: correct # of pods": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 2, 0, 0, - 0, 0, 2, 0, 0, false, + 0, 0, 2, 0, 0, nil, "", }, "too few active pods": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 1, 1, 0, - 1, 0, 2, 1, 0, false, + 1, 0, 2, 1, 0, nil, "", }, "too few active pods with a dynamic job": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 1, 0, 0, - 1, 0, 2, 0, 0, false, + 1, 0, 2, 0, 0, nil, "", }, "too few active pods, with controller error": { - 2, 5, false, 0, + 2, 5, 6, false, 0, fmt.Errorf("Fake error"), 0, 1, 1, 0, - 1, 0, 1, 1, 0, false, + 1, 0, 1, 1, 0, nil, "", }, "too many active pods": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 3, 0, 0, - 0, 1, 2, 0, 0, false, + 0, 1, 2, 0, 0, nil, "", }, "too many active pods, with controller error": { - 2, 5, false, 0, + 2, 5, 6, false, 0, fmt.Errorf("Fake error"), 0, 3, 0, 0, - 0, 1, 3, 0, 0, false, + 0, 1, 3, 0, 0, nil, "", }, "failed pod": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 1, 1, 1, - 1, 0, 2, 1, 1, false, + 1, 0, 2, 1, 1, nil, "", }, "job finish": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 0, 5, 0, - 0, 0, 0, 5, 0, true, + 0, 0, 0, 5, 0, nil, "", }, "WQ job finishing": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 1, 1, 0, - 0, 0, 1, 1, 0, false, + 0, 0, 1, 1, 0, nil, "", }, "WQ job all finished": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 0, 2, 0, - 0, 0, 0, 2, 0, true, + 0, 0, 0, 2, 0, &jobConditionComplete, "", }, "WQ job all finished despite one failure": { - 2, -1, false, 0, + 2, -1, 6, false, 0, nil, 0, 0, 1, 1, - 0, 0, 0, 1, 1, true, + 0, 0, 0, 1, 1, &jobConditionComplete, "", }, "more active pods than completions": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 10, 0, 0, - 0, 8, 2, 0, 0, false, + 0, 8, 2, 0, 0, nil, "", }, "status change": { - 2, 5, false, 0, + 2, 5, 6, false, 0, nil, 0, 2, 2, 0, - 0, 0, 2, 2, 0, false, + 0, 0, 2, 2, 0, nil, "", }, "deleting job": { - 2, 5, true, 0, + 2, 5, 6, true, 0, nil, 1, 1, 1, 0, - 0, 0, 2, 1, 0, false, + 0, 0, 2, 1, 0, nil, "", }, "limited pods": { - 100, 200, false, 10, + 100, 200, 6, false, 10, nil, 0, 0, 0, 0, - 10, 0, 10, 0, 0, false, + 10, 0, 10, 0, 0, nil, "", + }, + "to many job sync failure": { + 2, 5, 0, true, 0, + nil, 0, 0, 0, 1, + 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, } @@ -253,7 +265,7 @@ func TestControllerSyncJob(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) if tc.deleting { now := metav1.Now() job.DeletionTimestamp = &now @@ -330,7 +342,7 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("%s: .status.startTime was not set", name) } // validate conditions - if tc.expectedComplete && !getCondition(actual, batch.JobComplete) { + if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) { t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions) } // validate slow start @@ -351,6 +363,7 @@ func TestSyncJobPastDeadline(t *testing.T) { completions int32 activeDeadlineSeconds int64 startTime int64 + backoffLimit int32 // pod setup activePods int32 @@ -358,25 +371,31 @@ func TestSyncJobPastDeadline(t *testing.T) { failedPods int32 // expectations - expectedDeletions int32 - expectedActive int32 - expectedSucceeded int32 - expectedFailed int32 + expectedDeletions int32 + expectedActive int32 + expectedSucceeded int32 + expectedFailed int32 + expectedConditionReason string }{ "activeDeadlineSeconds less than single pod execution": { - 1, 1, 10, 15, + 1, 1, 10, 15, 6, 1, 0, 0, - 1, 0, 0, 1, + 1, 0, 0, 1, "DeadlineExceeded", }, "activeDeadlineSeconds bigger than single pod execution": { - 1, 2, 10, 15, + 1, 2, 10, 15, 6, 1, 1, 0, - 1, 0, 1, 1, + 1, 0, 1, 1, "DeadlineExceeded", }, "activeDeadlineSeconds times-out before any pod starts": { - 1, 1, 10, 10, + 1, 1, 10, 10, 6, 0, 0, 0, - 0, 0, 0, 0, + 0, 0, 0, 0, "DeadlineExceeded", + }, + "activeDeadlineSeconds with backofflimit reach": { + 1, 1, 1, 10, 0, + 1, 0, 2, + 1, 0, 0, 3, "BackoffLimitExceeded", }, } @@ -395,7 +414,7 @@ func TestSyncJobPastDeadline(t *testing.T) { } // job & pods setup - job := newJob(tc.parallelism, tc.completions) + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0) job.Status.StartTime = &start @@ -438,15 +457,15 @@ func TestSyncJobPastDeadline(t *testing.T) { t.Errorf("%s: .status.startTime was not set", name) } // validate conditions - if !getCondition(actual, batch.JobFailed) { + if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) { t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions) } } } -func getCondition(job *batch.Job, condition batch.JobConditionType) bool { +func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool { for _, v := range job.Status.Conditions { - if v.Type == condition && v.Status == v1.ConditionTrue { + if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason { return true } } @@ -466,7 +485,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { return nil } - job := newJob(1, 1) + job := newJob(1, 1, 6) activeDeadlineSeconds := int64(10) job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds start := metav1.Unix(metav1.Now().Time.Unix()-15, 0) @@ -496,7 +515,7 @@ func TestSyncJobComplete(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady - job := newJob(1, 1) + job := newJob(1, 1, 6) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) @@ -521,7 +540,7 @@ func TestSyncJobDeleted(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2) + job := newJob(2, 2, 6) err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) @@ -546,7 +565,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { manager.queue.AddRateLimited(getKey(job, t)) return updateError } - job := newJob(2, 2) + job := newJob(2, 2, 6) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) err := manager.syncJob(getKey(job, t)) if err == nil || err != updateError { @@ -659,9 +678,9 @@ func TestGetPodsForJob(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -700,7 +719,7 @@ func TestGetPodsForJob(t *testing.T) { } func TestGetPodsForJobAdopt(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" clientset := fake.NewSimpleClientset(job1) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) @@ -726,7 +745,7 @@ func TestGetPodsForJobAdopt(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" job1.DeletionTimestamp = &metav1.Time{} clientset := fake.NewSimpleClientset(job1) @@ -756,7 +775,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) { } func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) { - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" // The up-to-date object says it's being deleted. job1.DeletionTimestamp = &metav1.Time{} @@ -795,7 +814,7 @@ func TestGetPodsForJobRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -824,9 +843,9 @@ func TestAddPod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -869,11 +888,11 @@ func TestAddPodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" - job3 := newJob(1, 1) + job3 := newJob(1, 1, 6) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -897,9 +916,9 @@ func TestUpdatePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -946,9 +965,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -973,9 +992,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -999,9 +1018,9 @@ func TestUpdatePodRelease(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1025,9 +1044,9 @@ func TestDeletePod(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2) @@ -1070,11 +1089,11 @@ func TestDeletePodOrphan(t *testing.T) { jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady - job1 := newJob(1, 1) + job1 := newJob(1, 1, 6) job1.Name = "job1" - job2 := newJob(1, 1) + job2 := newJob(1, 1, 6) job2.Name = "job2" - job3 := newJob(1, 1) + job3 := newJob(1, 1, 6) job3.Name = "job3" job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"} informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1) @@ -1113,7 +1132,7 @@ func TestSyncJobExpectations(t *testing.T) { manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } - job := newJob(2, 2) + job := newJob(2, 2, 6) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) pods := newPodList(2, v1.PodPending, job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() @@ -1181,7 +1200,7 @@ func TestWatchJobs(t *testing.T) { } func TestWatchPods(t *testing.T) { - testJob := newJob(2, 2) + testJob := newJob(2, 2, 6) clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 939c11ce7c66..608106d6db4b 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -35,11 +35,12 @@ var _ = SIGDescribe("Job", func() { f := framework.NewDefaultFramework("job") parallelism := int32(2) completions := int32(4) + backoffLimit := int32(6) // default value // Simplest case: all pods succeed promptly It("should run a job to completion when tasks succeed", func() { By("Creating a job") - job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil) + job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -58,7 +59,7 @@ var _ = SIGDescribe("Job", func() { // up to 5 minutes between restarts, making test timeouts // due to successive failures too likely with a reasonable // test timeout. - job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil) + job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -76,7 +77,7 @@ var _ = SIGDescribe("Job", func() { // Worst case analysis: 15 failures, each taking 1 minute to // run due to some slowness, 1 in 2^15 chance of happening, // causing test flake. Should be very rare. - job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil) + job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -88,7 +89,7 @@ var _ = SIGDescribe("Job", func() { It("should exceed active deadline", func() { By("Creating a job") var activeDeadlineSeconds int64 = 1 - job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds) + job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) By("Ensuring job past active deadline") @@ -98,7 +99,7 @@ var _ = SIGDescribe("Job", func() { It("should delete a job", func() { By("Creating a job") - job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil) + job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) @@ -121,7 +122,7 @@ var _ = SIGDescribe("Job", func() { It("should adopt matching orphans and release non-matching pods", func() { By("Creating a job") - job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil) + job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) // Replace job with the one returned from Create() so it has the UID. // Save Kind since it won't be populated in the returned job. kind := job.Kind @@ -172,4 +173,22 @@ var _ = SIGDescribe("Job", func() { }, )).To(Succeed(), "wait for pod %q to be released", pod.Name) }) + + It("should exceed backoffLimit", func() { + By("Creating a job") + job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, 0) + job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) + Expect(err).NotTo(HaveOccurred()) + By("Ensuring job exceed backofflimit") + + err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(30)*time.Second, "BackoffLimitExceeded") + Expect(err).NotTo(HaveOccurred()) + + By("Checking that only one pod created and status is failed") + pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + pod := pods.Items[0] + Expect(pod.Status.Phase).To(Equal(v1.PodFailed)) + }) }) diff --git a/test/e2e/framework/jobs_util.go b/test/e2e/framework/jobs_util.go index c8d3bd026cc1..87b620965b1f 100644 --- a/test/e2e/framework/jobs_util.go +++ b/test/e2e/framework/jobs_util.go @@ -43,7 +43,7 @@ const ( // first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart // policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the // Job's required number of completions. -func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64) *batch.Job { +func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64, backoffLimit int32) *batch.Job { job := &batch.Job{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -55,6 +55,7 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl ActiveDeadlineSeconds: activeDeadlineSeconds, Parallelism: ¶llelism, Completions: &completions, + BackoffLimit: &backoffLimit, ManualSelector: newBool(false), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/network_partition.go b/test/e2e/network_partition.go index 978b3a23fa63..2b00b1498074 100644 --- a/test/e2e/network_partition.go +++ b/test/e2e/network_partition.go @@ -418,9 +418,10 @@ var _ = framework.KubeDescribe("[sig-apps] Network Partition [Disruptive] [Slow] It("should create new pods when node is partitioned", func() { parallelism := int32(2) completions := int32(4) + backoffLimit := int32(6) // default value job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever, - parallelism, completions, nil) + parallelism, completions, nil, backoffLimit) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name})) diff --git a/test/e2e/upgrades/apps/job.go b/test/e2e/upgrades/apps/job.go index da5793b9bb99..4da7a1e91000 100644 --- a/test/e2e/upgrades/apps/job.go +++ b/test/e2e/upgrades/apps/job.go @@ -39,7 +39,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) { t.namespace = f.Namespace.Name By("Creating a job") - t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil) + t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6) job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job) t.job = job Expect(err).NotTo(HaveOccurred())