diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 352da6907b11..b95ae739ede2 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -19,12 +19,14 @@ package apps import ( "context" "fmt" + "strconv" "time" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" batchinternal "k8s.io/kubernetes/pkg/apis/batch" @@ -67,6 +69,37 @@ var _ = SIGDescribe("Job", func() { framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got %d", completions, successes) }) + /* + Testcase: Ensure Pods of an Indexed Job get a unique index. + Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index. + */ + ginkgo.It("[Feature:IndexedJob] should create pods for an Indexed job with completion indexes", func() { + ginkgo.By("Creating Indexed job") + job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + job.Spec.CompletionMode = batchv1.IndexedCompletion + job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name) + + ginkgo.By("Ensuring job reaches completions") + err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions) + framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name) + + ginkgo.By("Ensuring pods with index for job exist") + pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name) + succeededIndexes := sets.NewInt() + for _, pod := range pods.Items { + if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil { + ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotationAlpha]) + framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name) + succeededIndexes.Insert(ix) + } + } + gotIndexes := succeededIndexes.List() + wantIndexes := []int{0, 1, 2, 3} + framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", gotIndexes, wantIndexes) + }) + /* Testcase: Ensure that the pods associated with the job are removed once the job is deleted Description: Create a job and ensure the associated pod count is equal to paralellism count. Delete the diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 68b796dd2173..fbc243e57b98 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -18,19 +18,28 @@ package job import ( "context" + "errors" "fmt" + "strconv" "testing" "time" "github.com/google/go-cmp/cmp" batchv1 "k8s.io/api/batch/v1" "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller/job" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" ) @@ -181,6 +190,85 @@ func TestParallelJobWithCompletions(t *testing.T) { }) } +func TestIndexedJob(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "indexed") + defer closeFn() + ctx, cancel := startJobController(restConfig, clientSet) + defer func() { + cancel() + }() + + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ + Spec: batchv1.JobSpec{ + Parallelism: pointer.Int32Ptr(3), + Completions: pointer.Int32Ptr(4), + CompletionMode: batchv1.IndexedCompletion, + }, + }) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 3, + }) + validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "") + + // One Pod succeeds. + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + t.Fatal("Failed trying to succeed pod with index 1") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 3, + Succeeded: 1, + }) + validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") + + // Disable feature gate and restart controller. + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)() + cancel() + ctx, cancel = startJobController(restConfig, clientSet) + events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatal(err) + } + defer events.Stop() + + // One Pod fails, but no recreations happen because feature is disabled. + if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { + t.Fatal("Failed trying to succeed pod with index 2") + } + if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil { + t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err) + } + validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1") + + // Re-enable feature gate and restart controller. Failed Pod should be recreated now. + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)() + cancel() + ctx, cancel = startJobController(restConfig, clientSet) + + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 3, + Failed: 1, + Succeeded: 1, + }) + validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") + + // Remaining Pods succeed. + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + t.Fatal("Failed trying to succeed remaining pods") + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 0, + Failed: 1, + Succeeded: 4, + }) + validateJobPodsIndexes(ctx, t, clientSet, jobObj, nil, "0-3") + validateJobSucceeded(ctx, t, clientSet, jobObj) +} + type podsByStatus struct { Active int Failed int @@ -223,6 +311,62 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse } } +// validateJobPodsIndexes validates indexes of active and completed Pods of an +// Indexed Job. Call after validateJobPodsStatus +func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) { + t.Helper() + updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get updated Job: %v", err) + } + if updatedJob.Status.CompletedIndexes != gotCompleted { + t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted) + } + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list Job Pods: %v", err) + } + gotActive := sets.NewInt() + for _, pod := range pods.Items { + if isPodOwnedByJob(&pod, jobObj) { + if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { + if ix, err := getCompletionIndex(&pod); err != nil { + t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err) + } else { + gotActive.Insert(ix) + } + } + } + } + if wantActive == nil { + wantActive = sets.NewInt() + } + if diff := cmp.Diff(wantActive.List(), gotActive.List()); diff != "" { + t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff) + } +} + +func waitForEvent(events watch.Interface, uid types.UID, reason string) error { + return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { + for { + var ev watch.Event + select { + case ev = <-events.ResultChan(): + default: + return false, nil + } + e, ok := ev.Object.(*eventsv1.Event) + if !ok { + continue + } + ctrl := "job-controller" + if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid { + return true, nil + } + } + }) +} + func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { t.Helper() if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { @@ -265,6 +409,38 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj return nil } +func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { + pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("listing Job Pods: %w", err) + } + for _, pod := range pods.Items { + if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded { + continue + } + if pix, err := getCompletionIndex(&pod); err == nil && pix == ix { + pod.Status.Phase = phase + _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("updating pod %s status: %w", pod.Name, err) + } + return nil + } + } + return errors.New("no pod matching index found") +} + +func getCompletionIndex(p *v1.Pod) (int, error) { + if p.Annotations == nil { + return 0, errors.New("no annotations found") + } + v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotationAlpha] + if !ok { + return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotationAlpha) + } + return strconv.Atoi(v) +} + func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool { for _, owner := range p.ObjectMeta.OwnerReferences { if owner.Kind == "Job" && owner.UID == j.UID {