Skip to content

Commit

Permalink
Backoff only when failed pod shows up
Browse files Browse the repository at this point in the history
  • Loading branch information
soltysh committed Mar 14, 2018
1 parent 0207a09 commit 1266252
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/controller/job/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_test(
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

Expand Down
40 changes: 25 additions & 15 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
}

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
AddFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
UpdateFunc: jm.updateJob,
DeleteFunc: jm.enqueueController,
DeleteFunc: func(obj interface{}) {
jm.enqueueController(obj, true)
},
})
jm.jobLister = jobInformer.Lister()
jm.jobStoreSynced = jobInformer.Informer().HasSynced
Expand Down Expand Up @@ -209,7 +213,7 @@ func (jm *JobController) addPod(obj interface{}) {
return
}
jm.expectations.CreationObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
return
}

Expand All @@ -218,7 +222,7 @@ func (jm *JobController) addPod(obj interface{}) {
// DO NOT observe creation because no controller should be waiting for an
// orphan.
for _, job := range jm.getPodJobs(pod) {
jm.enqueueController(job)
jm.enqueueController(job, true)
}
}

Expand All @@ -242,15 +246,16 @@ func (jm *JobController) updatePod(old, cur interface{}) {
return
}

labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
// the only time we want the backoff to kick-in, is when the pod failed
immediate := curPod.Status.Phase != v1.PodFailed

curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}

Expand All @@ -260,15 +265,16 @@ func (jm *JobController) updatePod(old, cur interface{}) {
if job == nil {
return
}
jm.enqueueController(job)
jm.enqueueController(job, immediate)
return
}

// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if labelChanged || controllerRefChanged {
for _, job := range jm.getPodJobs(curPod) {
jm.enqueueController(job)
jm.enqueueController(job, immediate)
}
}
}
Expand Down Expand Up @@ -309,7 +315,7 @@ func (jm *JobController) deletePod(obj interface{}) {
return
}
jm.expectations.DeletionObserved(jobKey)
jm.enqueueController(job)
jm.enqueueController(job, true)
}

func (jm *JobController) updateJob(old, cur interface{}) {
Expand All @@ -321,7 +327,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.enqueueController(curJob)
jm.enqueueController(curJob, true)
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
Expand All @@ -341,15 +347,19 @@ func (jm *JobController) updateJob(old, cur interface{}) {
}
}

// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) enqueueController(job interface{}) {
key, err := controller.KeyFunc(job)
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item,
// immediate tells the controller to update the status right away, and should
// happen ONLY when there was a successful pod run.
func (jm *JobController) enqueueController(obj interface{}, immediate bool) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}

// Retrieves the backoff duration for this Job
if immediate {
jm.queue.Forget(key)
}
backoff := getBackoff(jm.queue, key)

// TODO: Handle overlapping controllers better. Either disallow them at admission time or
Expand Down
68 changes: 68 additions & 0 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -1338,3 +1339,70 @@ func TestJobBackoffReset(t *testing.T) {
}
}
}

var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}

type fakeRateLimitingQueue struct {
workqueue.Interface
requeues int
item interface{}
duration time.Duration
}

func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
func (f *fakeRateLimitingQueue) Forget(item interface{}) {}
func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
return f.requeues
}
func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
f.item = item
f.duration = duration
}

func TestJobBackoff(t *testing.T) {
job := newJob(1, 1, 1)
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
oldPod.Status.Phase = v1.PodRunning
oldPod.ResourceVersion = "1"
newPod := oldPod.DeepCopy()
newPod.ResourceVersion = "2"

testCases := map[string]struct {
// inputs
requeues int
phase v1.PodPhase

// expectation
backoff int
}{
"1st failure": {0, v1.PodFailed, 0},
"2nd failure": {1, v1.PodFailed, 1},
"3rd failure": {2, v1.PodFailed, 2},
"1st success": {0, v1.PodSucceeded, 0},
"2nd success": {1, v1.PodSucceeded, 0},
"1st running": {0, v1.PodSucceeded, 0},
"2nd running": {1, v1.PodSucceeded, 0},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
queue := &fakeRateLimitingQueue{}
manager.queue = queue
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)

queue.requeues = tc.requeues
newPod.Status.Phase = tc.phase
manager.updatePod(oldPod, newPod)

if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() {
t.Errorf("unexpected backoff %v", queue.duration)
}
})
}
}

0 comments on commit 1266252

Please sign in to comment.