Skip to content

Commit

Permalink
Rate limit only when an actual error happens, not on update conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
soltysh committed Jun 5, 2018
1 parent de62201 commit d80ed53
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
23 changes: 21 additions & 2 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import (
"github.com/golang/glog"
)

const statusUpdateRetries = 3

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")

Expand Down Expand Up @@ -495,7 +497,11 @@ func (jm *JobController) syncJob(key string) (bool, error) {
var failureMessage string

jobHaveNewFailure := failed > job.Status.Failed
exceedsBackoffLimit := jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit)
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) &&

This comment has been minimized.

Copy link
@Guoozz

Guoozz Jun 28, 2019

why add active != *job.Spec.Parallelism?

(int32(previousRetry)+1 > *job.Spec.BackoffLimit)

if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
Expand Down Expand Up @@ -813,7 +819,20 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
}

func (jm *JobController) updateJobStatus(job *batch.Job) error {
_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).UpdateStatus(job)
jobClient := jm.kubeClient.BatchV1().Jobs(job.Namespace)
var err error
for i := 0; i <= statusUpdateRetries; i = i + 1 {
var newJob *batch.Job
newJob, err = jobClient.Get(job.Name, metav1.GetOptions{})
if err != nil {
break
}
newJob.Status = job.Status
if _, err = jobClient.UpdateStatus(newJob); err == nil {
break
}
}

return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func TestSyncJobPastDeadline(t *testing.T) {
},
"activeDeadlineSeconds with backofflimit reach": {
1, 1, 1, 10, 0,
1, 0, 2,
true, 1, 0, 0, 3, "BackoffLimitExceeded",
0, 0, 1,
true, 0, 0, 0, 1, "BackoffLimitExceeded",
},
}

Expand Down
8 changes: 7 additions & 1 deletion test/e2e/apps/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ var _ = SIGDescribe("Job", func() {
By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(backoff + 1))
// Expect(pods.Items).To(HaveLen(backoff + 1))
// due to NumRequeus not being stable enough, especially with failed status
// updates we need to allow more than backoff+1
// TODO revert this back to above when https://github.com/kubernetes/kubernetes/issues/64787 gets fixed
if len(pods.Items) < backoff+1 {
framework.Failf("Not enough pod created expected at least %d, got %#v", backoff+1, pods.Items)
}
for _, pod := range pods.Items {
Expect(pod.Status.Phase).To(Equal(v1.PodFailed))
}
Expand Down

0 comments on commit d80ed53

Please sign in to comment.