Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for change in syncOrphanPod for managedBy jobs #123723

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
152 changes: 134 additions & 18 deletions test/integration/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ import (
clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
Expand All @@ -63,6 +65,12 @@ import (
const waitInterval = time.Second
const fastPodFailureBackoff = 100 * time.Millisecond

// Time duration used to account for controller latency in tests in which it is
// expected the Job controller does not make a change. In that cases we wait a
// little bit (more than the typical time for a couple of controller syncs) and
// verify there is no change.
const sleepDurationForControllerLatency = 100 * time.Millisecond

type metricLabelsWithValue struct {
Labels []string
Value int
Expand Down Expand Up @@ -1294,11 +1302,7 @@ func TestManagedBy(t *testing.T) {
} else {
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)

// Await for a little bit to verify the reconciliation does not
// happen. We wait 100ms for the sync itself, because we already
// checked the metric is incremented so the sync would start
// immediately if it was queued.
time.Sleep(100 * time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand Down Expand Up @@ -1360,9 +1364,7 @@ func TestManagedBy_Reenabling(t *testing.T) {
Value: 1,
})

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand Down Expand Up @@ -1395,22 +1397,17 @@ func TestManagedBy_Reenabling(t *testing.T) {
resetMetrics()
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)

// Marking the pod as finished, but
// Marking the pod as finished, but it does not result in updating of the Job status.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
}

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)

validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
Labels: []string{customControllerName},
Value: 1,
})

// Verify the built-in controller does not reconcile the Job. It is up to
// the external controller to update the status.
time.Sleep(sleepDurationForControllerLatency)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Expand Down Expand Up @@ -1491,9 +1488,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
Value: 1,
})

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand All @@ -1503,6 +1498,127 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
}
}

// TestManagedBy_UsingReservedJobFinalizers documents the behavior of the Job
// controller when there is a job with custom value of the managedBy field, creating
// pods with the batch.kubernetes.io/job-tracking finalizer. The built-in controller
// should not remove the finalizer. Note that, the use of the finalizer in jobs
// managed by external controllers is discouraged, but may potentially happen
// when one forks the controller and does not rename the finalizer.
func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
customControllerName := "example.com/custom-job-controller"
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()

closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
resetMetrics()

jobSpec := batchv1.Job{
TypeMeta: metav1.TypeMeta{
APIVersion: "batch/v1",
Kind: "Job",
},
ObjectMeta: metav1.ObjectMeta{
Name: "custom-job-test",
Namespace: ns.Name,
},
Spec: batchv1.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
},
},
},
},
ManagedBy: ptr.To(customControllerName),
},
}
// Create a job with custom managedBy
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec)
if err != nil {
t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
}

podControl := controller.RealPodControl{
KubeClient: clientSet,
Recorder: &record.FakeRecorder{},
}

// Create the pod manually simulating the behavior of the external controller
// indicated by the managedBy field. We create the pod with the built-in
// finalizer.
podTemplate := jobObj.Spec.Template.DeepCopy()
podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer)
err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1")
if err != nil {
t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj))
}

// Getting the list of pods for the Jobs to obtain the reference to the created pod.
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
if err != nil {
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
}
if len(jobPods) != 1 {
t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj))
}

// Marking the pod as finished (succeeded), before marking the parent job as complete.
podObj := jobPods[0]
podObj.Status.Phase = v1.PodSucceeded
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
}

// Mark the job as finished so that the built-in controller receives the
// UpdateJob event in reaction to each it would remove the pod's finalizer,
// if not for the custom managedBy field.
jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
})
jobObj.Status.StartTime = ptr.To(metav1.Now())
jobObj.Status.CompletionTime = ptr.To(metav1.Now())

if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
}

podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
}

// Update the pod so that the built-in controller receives the UpdatePod event
// in reaction to each it would remove the pod's finalizer, if not for the
// custom value of the managedBy field on the job.
podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{
Type: v1.PodConditionType("CustomCondition"),
Status: v1.ConditionTrue,
})
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj))
}

time.Sleep(sleepDurationForControllerLatency)
podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
}

if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" {
t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff)
}
}

func getIndexFailureCount(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")
Expand Down