Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid duplicate Failed conditions in job status #110292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 16 additions & 12 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"time"

batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -1205,7 +1205,7 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
if isIndexedJob(job) {
completionMode = string(*job.Spec.CompletionMode)
}
job.Status.Conditions = append(job.Status.Conditions, *finishedCond)
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
if finishedCond.Type == batch.JobComplete {
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
Expand Down Expand Up @@ -1652,17 +1652,12 @@ func errorFromChannel(errCh <-chan error) error {
// update the status condition to false. The function returns a bool to let the
// caller know if the list was changed (either appended or updated).
func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditionType, status v1.ConditionStatus, reason, message string) ([]batch.JobCondition, bool) {
for i := range list {
if list[i].Type == cType {
if list[i].Status != status || list[i].Reason != reason || list[i].Message != message {
list[i].Status = status
list[i].LastTransitionTime = metav1.Now()
list[i].Reason = reason
list[i].Message = message
return list, true
}
return list, false
if condition := findConditionByType(list, cType); condition != nil {
if condition.Status != status || condition.Reason != reason || condition.Message != message {
*condition = *newCondition(cType, status, reason, message)
return list, true
}
return list, false
}
// A condition with that type doesn't exist in the list.
if status != v1.ConditionFalse {
Expand All @@ -1671,6 +1666,15 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
return list, false
}

func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {
for i := range list {
if list[i].Type == cType {
return &list[i]
}
}
return nil
}

func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
completionMode := completionModeStr(job)
diff := job.Status.Succeeded - oldCounters.Succeeded
Expand Down
63 changes: 56 additions & 7 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -1631,7 +1632,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr)
}
if diff := cmp.Diff(tc.wantStatusUpdates, statusUpdates); diff != "" {
if diff := cmp.Diff(tc.wantStatusUpdates, statusUpdates, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
t.Errorf("Unexpected status updates (-want,+got):\n%s", diff)
}
rmFinalizers := len(fakePodControl.Patches)
Expand Down Expand Up @@ -1864,6 +1865,49 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
}
}

func TestSingleJobFailedCondition(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
actual = job
return job, nil
}

job := newJob(1, 1, 6, batch.NonIndexedCompletion)
activeDeadlineSeconds := int64(10)
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
job.Status.StartTime = &start
job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline"))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
forget, err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
if len(fakePodControl.DeletePodName) != 0 {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
}
if actual == nil {
t.Error("Expected job modification\n")
}
failedConditions := getConditionsByType(actual.Status.Conditions, batch.JobFailed)
if len(failedConditions) != 1 {
t.Error("Unexpected number of failed conditions\n")
}
if failedConditions[0].Status != v1.ConditionTrue {
t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status)
}

}

func TestSyncJobComplete(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
Expand Down Expand Up @@ -3140,12 +3184,7 @@ func TestEnsureJobConditions(t *testing.T) {
if len(gotList) != len(tc.expectList) {
t.Errorf("got a list of length %d, want %d", len(gotList), len(tc.expectList))
}
for i := range gotList {
// Make timestamps the same before comparing the two lists.
gotList[i].LastProbeTime = tc.expectList[i].LastProbeTime
gotList[i].LastTransitionTime = tc.expectList[i].LastTransitionTime
}
if diff := cmp.Diff(tc.expectList, gotList); diff != "" {
if diff := cmp.Diff(tc.expectList, gotList, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
t.Errorf("Unexpected JobCondition list: (-want,+got):\n%s", diff)
}
})
Expand Down Expand Up @@ -3304,6 +3343,16 @@ func buildPod() podBuilder {
}}
}

func getConditionsByType(list []batch.JobCondition, cType batch.JobConditionType) []*batch.JobCondition {
var result []*batch.JobCondition
for i := range list {
if list[i].Type == cType {
result = append(result, &list[i])
}
}
return result
}

func (pb podBuilder) name(n string) podBuilder {
pb.Name = n
return pb
Expand Down