Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration and e2e tests for Indexed job #99737

Merged
merged 2 commits into from Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 33 additions & 0 deletions test/e2e/apps/job.go
Expand Up @@ -19,12 +19,14 @@ package apps
import (
"context"
"fmt"
"strconv"
"time"

batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
batchinternal "k8s.io/kubernetes/pkg/apis/batch"
Expand Down Expand Up @@ -67,6 +69,37 @@ var _ = SIGDescribe("Job", func() {
framework.ExpectEqual(successes, completions, "epected %d successful job pods, but got %d", completions, successes)
})

/*
Testcase: Ensure Pods of an Indexed Job get a unique index.
Description: Create an Indexed Job, wait for completion, capture the output of the pods and verify that they contain the completion index.
*/
ginkgo.It("[Feature:IndexedJob] should create pods for an Indexed job with completion indexes", func() {
ginkgo.By("Creating Indexed job")
job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job.Spec.CompletionMode = batchv1.IndexedCompletion
job, err := e2ejob.CreateJob(f.ClientSet, f.Namespace.Name, job)
framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)

ginkgo.By("Ensuring job reaches completions")
err = e2ejob.WaitForJobComplete(f.ClientSet, f.Namespace.Name, job.Name, completions)
framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)

ginkgo.By("Ensuring pods with index for job exist")
pods, err := e2ejob.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
succeededIndexes := sets.NewInt()
for _, pod := range pods.Items {
if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil {
ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotationAlpha])
framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name)
succeededIndexes.Insert(ix)
}
}
gotIndexes := succeededIndexes.List()
wantIndexes := []int{0, 1, 2, 3}
framework.ExpectEqual(gotIndexes, wantIndexes, "expected completed indexes %s, but got %s", gotIndexes, wantIndexes)
})

/*
Testcase: Ensure that the pods associated with the job are removed once the job is deleted
Description: Create a job and ensure the associated pod count is equal to paralellism count. Delete the
Expand Down
176 changes: 176 additions & 0 deletions test/integration/job/job_test.go
Expand Up @@ -18,19 +18,28 @@ package job

import (
"context"
"errors"
"fmt"
"strconv"
"testing"
"time"

"github.com/google/go-cmp/cmp"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -181,6 +190,85 @@ func TestParallelJobWithCompletions(t *testing.T) {
})
}

func TestIndexedJob(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()

closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer func() {
cancel()
}()

jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(3),
Completions: pointer.Int32Ptr(4),
CompletionMode: batchv1.IndexedCompletion,
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
})
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")

// One Pod succeeds.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatal("Failed trying to succeed pod with index 1")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Succeeded: 1,
})
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")

// Disable feature gate and restart controller.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil {
t.Fatal(err)
}
defer events.Stop()

// One Pod fails, but no recreations happen because feature is disabled.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatal("Failed trying to succeed pod with index 2")
}
if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil {
t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err)
}
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1")

// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
cancel()
ctx, cancel = startJobController(restConfig, clientSet)

validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3,
Failed: 1,
Succeeded: 1,
})
validateJobPodsIndexes(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")

// Remaining Pods succeed.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatal("Failed trying to succeed remaining pods")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 0,
Failed: 1,
Succeeded: 4,
})
validateJobPodsIndexes(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj)
}

type podsByStatus struct {
Active int
Failed int
Expand Down Expand Up @@ -223,6 +311,62 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
}
}

// validateJobPodsIndexes validates indexes of active and completed Pods of an
// Indexed Job. Call after validateJobPodsStatus
func validateJobPodsIndexes(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Int, gotCompleted string) {
t.Helper()
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get updated Job: %v", err)
}
if updatedJob.Status.CompletedIndexes != gotCompleted {
t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
}
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
gotActive := sets.NewInt()
for _, pod := range pods.Items {
if isPodOwnedByJob(&pod, jobObj) {
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
if ix, err := getCompletionIndex(&pod); err != nil {
t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
} else {
gotActive.Insert(ix)
}
}
}
}
if wantActive == nil {
wantActive = sets.NewInt()
}
if diff := cmp.Diff(wantActive.List(), gotActive.List()); diff != "" {
t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff)
}
}

func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
for {
var ev watch.Event
select {
case ev = <-events.ResultChan():
default:
return false, nil
}
e, ok := ev.Object.(*eventsv1.Event)
if !ok {
continue
}
ctrl := "job-controller"
if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid {
return true, nil
}
}
})
}

func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
Expand Down Expand Up @@ -265,6 +409,38 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj
return nil
}

func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("listing Job Pods: %w", err)
}
for _, pod := range pods.Items {
if p := pod.Status.Phase; !isPodOwnedByJob(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
continue
}
if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
pod.Status.Phase = phase
_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
}
return nil
}
}
return errors.New("no pod matching index found")
}

func getCompletionIndex(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")
}
v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotationAlpha]
if !ok {
return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotationAlpha)
}
return strconv.Atoi(v)
}

func isPodOwnedByJob(p *v1.Pod, j *batchv1.Job) bool {
for _, owner := range p.ObjectMeta.OwnerReferences {
if owner.Kind == "Job" && owner.UID == j.UID {
Expand Down