Skip to content

Commit

Permalink
Fix JobTrackingWithFinalizers when a pod succeeds after the job fails
Browse files Browse the repository at this point in the history
Change-Id: I3be351fb3b53216948a37b1d58224f8fbbf22b47
  • Loading branch information
alculquicondor committed Aug 2, 2022
1 parent 0d46dc1 commit ca8cebe
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if podFinished || podTerminating || job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
}
if pod.Status.Phase == v1.PodSucceeded {
if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) {
if isIndexed {
// The completion index is enough to avoid recounting succeeded pods.
// No need to track UIDs.
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,32 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
},
"pod flips from failed to succeeded": {
job: batch.Job{
Spec: batch.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
},
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a", "b"},
},
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod,
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
},
finishedCond: failedCond,
wantRmFinalizers: 2,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Failed: 2,
Conditions: []batch.JobCondition{*failedCond},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
103 changes: 79 additions & 24 deletions test/integration/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -90,7 +91,7 @@ func TestNonParallelJob(t *testing.T) {
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)

// Failed Pod is replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Expand All @@ -104,7 +105,7 @@ func TestNonParallelJob(t *testing.T) {
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)

// No more Pods are created after the Pod succeeds.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)

// Tracks ready pods, if enabled.
if err := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
Expand All @@ -169,7 +170,7 @@ func TestParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)

// Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Expand All @@ -181,7 +182,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Once one Pod succeeds, no more Pods are created, even if some fail.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Expand All @@ -193,7 +194,7 @@ func TestParallelJob(t *testing.T) {
want.Ready = pointer.Int32(0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Expand All @@ -206,7 +207,7 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// No more Pods are created after remaining Pods succeed.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
Expand Down Expand Up @@ -270,7 +271,7 @@ func TestParallelJobParallelism(t *testing.T) {
}, wFinalizers)

// Succeed Job
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
Expand Down Expand Up @@ -332,7 +333,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)

// Tracks ready pods, if enabled.
if err := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
Expand All @@ -341,7 +342,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)

// Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Expand All @@ -353,7 +354,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Expand All @@ -366,7 +367,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
// No more Pods are created after the Job completes.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
Expand Down Expand Up @@ -439,7 +440,7 @@ func TestIndexedJob(t *testing.T) {
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")

// Remaining Pods succeed.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatal("Failed trying to succeed remaining pods")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Expand Down Expand Up @@ -491,7 +492,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
cancel()

// Fail a pod while Job controller is stopped.
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}

Expand All @@ -518,7 +519,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
cancel()

// Succeed a pod while Job controller is stopped.
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}

Expand Down Expand Up @@ -607,7 +608,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {

// Fail a pod ASAP.
err = wait.PollImmediate(time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
return false, nil
}
return true, nil
Expand All @@ -621,6 +622,57 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
}

// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
// succeed is marked as Failed, even if the controller fails in the middle.
func TestJobFailedWithInterrupts(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()

closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(10),
Parallelism: pointer.Int32(10),
BackoffLimit: pointer.Int32(0),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
NodeName: "foo", // Scheduled pods are not deleted immediately.
},
},
},
})
if err != nil {
t.Fatalf("Could not create job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 10,
Ready: pointer.Int32(0),
}, true)
t.Log("Finishing pods")
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Could not fail a pod: %v", err)
}
remaining := 9
if err := wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
remaining -= succ
t.Logf("Transient failure succeeding pods: %v", err)
return false, nil
}
return true, nil
}); err != nil {
t.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
}
t.Log("Recreating job controller")
cancel()
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
}

func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
orphanPods := 0
Expand Down Expand Up @@ -1041,15 +1093,15 @@ func validateJobCondition(ctx context.Context, t *testing.T, clientSet clientset
}
}

func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) error {
func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
op := func(p *v1.Pod) bool {
p.Status.Phase = phase
return true
}
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}

func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) error {
func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
op := func(p *v1.Pod) bool {
if podutil.IsPodReady(p) {
return false
Expand All @@ -1063,10 +1115,10 @@ func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj
return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
}

func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) error {
func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("listing Job Pods: %w", err)
return fmt.Errorf("listing Job Pods: %w", err), 0
}
updates := make([]v1.Pod, 0, cnt)
for _, pod := range pods.Items {
Expand All @@ -1081,22 +1133,25 @@ func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, job
}
}
if len(updates) != cnt {
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt)
return fmt.Errorf("couldn't set phase on %d Job Pods", cnt), 0
}
return updatePodStatuses(ctx, clientSet, updates)
}

func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error {
func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (error, int) {
wg := sync.WaitGroup{}
wg.Add(len(updates))
errCh := make(chan error, len(updates))
var updated int32

for _, pod := range updates {
pod := pod
go func() {
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
errCh <- err
} else {
atomic.AddInt32(&updated, 1)
}
wg.Done()
}()
Expand All @@ -1105,10 +1160,10 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat

select {
case err := <-errCh:
return fmt.Errorf("updating Pod status: %w", err)
return fmt.Errorf("updating Pod status: %w", err), int(updated)
default:
}
return nil
return nil, int(updated)
}

func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
Expand Down

0 comments on commit ca8cebe

Please sign in to comment.