Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #45213: Syncing jobs would return error when podController exception #45664

Merged
merged 1 commit into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 35 additions & 5 deletions pkg/controller/job/jobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ func (jm *JobController) syncJob(key string) error {
if IsJobFinished(&job) {
return nil
}

var manageJobErr error
if pastActiveDeadline(&job) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
Expand All @@ -437,24 +439,36 @@ func (jm *JobController) syncJob(key string) error {
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
errCh := make(chan error, int(active))
wait.Add(int(active))
for i := int32(0); i < active; i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()

select {
case manageJobErr = <-errCh:
if manageJobErr != nil {
break
}
default:
}

// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active = jm.manageJob(activePods, succeeded, &job)
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
}
completions := succeeded
complete := false
Expand Down Expand Up @@ -500,7 +514,7 @@ func (jm *JobController) syncJob(key string) error {
return err
}
}
return nil
return manageJobErr
}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
Expand Down Expand Up @@ -536,18 +550,20 @@ func getStatus(pods []*v1.Pod) (succeeded, failed int32) {
// manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>.
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) int32 {
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {
var activeLock sync.Mutex
active := int32(len(activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
return 0
return 0, nil
}

var errCh chan error
if active > parallelism {
diff := active - parallelism
errCh = make(chan error, diff)
jm.expectations.ExpectDeletions(jobKey, int(diff))
glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
Expand All @@ -564,10 +580,12 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of deletes because the informer won't observe this deletion
glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name)
jm.expectations.DeletionObserved(jobKey)
activeLock.Lock()
active++
activeLock.Unlock()
errCh <- err
}
}(i)
}
Expand Down Expand Up @@ -598,6 +616,7 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
diff = 0
}
jm.expectations.ExpectCreations(jobKey, int(diff))
errCh = make(chan error, diff)
glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

active += diff
Expand All @@ -609,17 +628,28 @@ func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *b
if err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, newControllerRef(job)); err != nil {
defer utilruntime.HandleError(err)
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name)
jm.expectations.CreationObserved(jobKey)
activeLock.Lock()
active--
activeLock.Unlock()
errCh <- err
}
}()
}
wait.Wait()
}

return active
select {
case err := <-errCh:
// all errors have been reported before, we only need to inform the controller that there was an error and it should re-try this job once more next time.
if err != nil {
return active, err
}
default:
}

return active, nil
}

func (jm *JobController) updateJobStatus(job *batch.Job) error {
Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/job/jobcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,16 @@ func TestControllerSyncJob(t *testing.T) {

// run
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)

// We need requeue syncJob task if podController error
if tc.podControllerError != nil {
if err == nil {
t.Errorf("%s: Syncing jobs would return error when podController exception", name)
}
} else {
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
}
}

// validate created/deleted pods
Expand Down