Skip to content

Commit

Permalink
Merge pull request kubernetes#123723 from mimowo/job-managed-by-impl-…
Browse files Browse the repository at this point in the history
…test

Integration test for change in syncOrphanPod for managedBy jobs
  • Loading branch information
k8s-ci-robot committed Mar 8, 2024
2 parents 74b2f4d + 194009f commit 5639f8f
Showing 1 changed file with 134 additions and 18 deletions.
152 changes: 134 additions & 18 deletions test/integration/job/job_test.go
Expand Up @@ -45,13 +45,15 @@ import (
clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
Expand All @@ -63,6 +65,12 @@ import (
const waitInterval = time.Second
const fastPodFailureBackoff = 100 * time.Millisecond

// Time duration used to account for controller latency in tests in which it is
// expected the Job controller does not make a change. In that cases we wait a
// little bit (more than the typical time for a couple of controller syncs) and
// verify there is no change.
const sleepDurationForControllerLatency = 100 * time.Millisecond

type metricLabelsWithValue struct {
Labels []string
Value int
Expand Down Expand Up @@ -1662,11 +1670,7 @@ func TestManagedBy(t *testing.T) {
} else {
validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)

// Await for a little bit to verify the reconciliation does not
// happen. We wait 100ms for the sync itself, because we already
// checked the metric is incremented so the sync would start
// immediately if it was queued.
time.Sleep(100 * time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand Down Expand Up @@ -1728,9 +1732,7 @@ func TestManagedBy_Reenabling(t *testing.T) {
Value: 1,
})

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand Down Expand Up @@ -1763,22 +1765,17 @@ func TestManagedBy_Reenabling(t *testing.T) {
resetMetrics()
ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)

// Marking the pod as finished, but
// Marking the pod as finished, but it does not result in updating of the Job status.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
}

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)

validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
Labels: []string{customControllerName},
Value: 1,
})

// Verify the built-in controller does not reconcile the Job. It is up to
// the external controller to update the status.
time.Sleep(sleepDurationForControllerLatency)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Ready: ptr.To[int32](0),
Expand Down Expand Up @@ -1859,9 +1856,7 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
Value: 1,
})

// Await for a little bit to verify the reconciliation does not happen.
// We wait 1s to account for queued sync delay plus 100ms for the sync itself.
time.Sleep(time.Second + 100*time.Millisecond)
time.Sleep(sleepDurationForControllerLatency)
jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
Expand All @@ -1871,6 +1866,127 @@ func TestManagedBy_RecreatedJob(t *testing.T) {
}
}

// TestManagedBy_UsingReservedJobFinalizers documents the behavior of the Job
// controller when there is a job with custom value of the managedBy field, creating
// pods with the batch.kubernetes.io/job-tracking finalizer. The built-in controller
// should not remove the finalizer. Note that, the use of the finalizer in jobs
// managed by external controllers is discouraged, but may potentially happen
// when one forks the controller and does not rename the finalizer.
func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
customControllerName := "example.com/custom-job-controller"
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()

closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
resetMetrics()

jobSpec := batchv1.Job{
TypeMeta: metav1.TypeMeta{
APIVersion: "batch/v1",
Kind: "Job",
},
ObjectMeta: metav1.ObjectMeta{
Name: "custom-job-test",
Namespace: ns.Name,
},
Spec: batchv1.JobSpec{
Completions: ptr.To[int32](1),
Parallelism: ptr.To[int32](1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "main-container",
Image: "foo",
},
},
},
},
ManagedBy: ptr.To(customControllerName),
},
}
// Create a job with custom managedBy
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec)
if err != nil {
t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
}

podControl := controller.RealPodControl{
KubeClient: clientSet,
Recorder: &record.FakeRecorder{},
}

// Create the pod manually simulating the behavior of the external controller
// indicated by the managedBy field. We create the pod with the built-in
// finalizer.
podTemplate := jobObj.Spec.Template.DeepCopy()
podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer)
err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1")
if err != nil {
t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj))
}

// Getting the list of pods for the Jobs to obtain the reference to the created pod.
jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
if err != nil {
t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
}
if len(jobPods) != 1 {
t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj))
}

// Marking the pod as finished (succeeded), before marking the parent job as complete.
podObj := jobPods[0]
podObj.Status.Phase = v1.PodSucceeded
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
}

// Mark the job as finished so that the built-in controller receives the
// UpdateJob event in reaction to each it would remove the pod's finalizer,
// if not for the custom managedBy field.
jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
Type: batchv1.JobComplete,
Status: v1.ConditionTrue,
})
jobObj.Status.StartTime = ptr.To(metav1.Now())
jobObj.Status.CompletionTime = ptr.To(metav1.Now())

if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
}

podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
}

// Update the pod so that the built-in controller receives the UpdatePod event
// in reaction to each it would remove the pod's finalizer, if not for the
// custom value of the managedBy field on the job.
podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{
Type: v1.PodConditionType("CustomCondition"),
Status: v1.ConditionTrue,
})
podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj))
}

time.Sleep(sleepDurationForControllerLatency)
podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
}

if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" {
t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff)
}
}

func getIndexFailureCount(p *v1.Pod) (int, error) {
if p.Annotations == nil {
return 0, errors.New("no annotations found")
Expand Down

0 comments on commit 5639f8f

Please sign in to comment.