diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index ef0eac5225..d39a745039 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -133,20 +134,16 @@ func NewReconciler( } } -func (r *JobReconciler) ReconcileGenericJobWrapper(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) { +func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error) { + object := job.Object() log := ctrl.LoggerFrom(ctx).WithValues("job", req.String(), "gvk", job.GVK()) ctx = ctrl.LoggerInto(ctx, log) - result, err := r.ReconcileGenericJob(ctx, req, job) + defer func() { + err = r.ignoreUnretryableError(log, err) + }() - return result, r.ignoreUnretryableError(ctx, err) -} - -func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) { - object := job.Object() - log := ctrl.LoggerFrom(ctx) - - err := r.client.Get(ctx, req.NamespacedName, object) + err = r.client.Get(ctx, req.NamespacedName, object) if jws, implements := job.(JobWithSkip); implements { if jws.Skip() { @@ -514,7 +511,7 @@ func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool { return false } - jobPodSets := ResetMinCounts(job.PodSets()) + jobPodSets := ClearMinCountsIfFeatureDisabled(job.PodSets()) if !workload.CanBePartiallyAdmitted(wl) || !workload.HasQuotaReservation(wl) { // the two sets should fully match. @@ -644,16 +641,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o return nil, err } - wl.Spec.PodSets = ResetMinCounts(wl.Spec.PodSets) - - priorityClassName, source, p, err := r.extractPriority(ctx, wl.Spec.PodSets, job) - if err != nil { - return nil, err - } - - wl.Spec.PriorityClassName = priorityClassName - wl.Spec.Priority = &p - wl.Spec.PriorityClassSource = source + wl.Spec.PodSets = ClearMinCountsIfFeatureDisabled(wl.Spec.PodSets) return wl, nil } @@ -668,7 +656,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o Finalizers: []string{kueue.ResourceInUseFinalizerName}, }, Spec: kueue.WorkloadSpec{ - PodSets: ResetMinCounts(podSets), + PodSets: ClearMinCountsIfFeatureDisabled(podSets), QueueName: QueueName(job), }, } @@ -684,19 +672,24 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o ) } - priorityClassName, source, p, err := r.extractPriority(ctx, podSets, job) - if err != nil { + if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil { return nil, err } + return wl, nil +} + +// prepareWorkload adds the priority information for the constructed workload +func (r *JobReconciler) prepareWorkload(ctx context.Context, job GenericJob, wl *kueue.Workload) error { + priorityClassName, source, p, err := r.extractPriority(ctx, wl.Spec.PodSets, job) + if err != nil { + return err + } wl.Spec.PriorityClassName = priorityClassName wl.Spec.Priority = &p wl.Spec.PriorityClassSource = source - if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil { - return nil, err - } - return wl, nil + return nil } func (r *JobReconciler) extractPriority(ctx context.Context, podSets []kueue.PodSet, job GenericJob) (string, string, int32, error) { @@ -764,6 +757,10 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic if err != nil { return err } + err = r.prepareWorkload(ctx, job, wl) + if err != nil { + return err + } if err = r.client.Create(ctx, wl); err != nil { return err } @@ -772,9 +769,8 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic return nil } -func (r *JobReconciler) ignoreUnretryableError(ctx context.Context, err error) error { +func (r *JobReconciler) ignoreUnretryableError(log logr.Logger, err error) error { if IsUnretryableError(err) { - log := ctrl.LoggerFrom(ctx) log.V(2).Info("Received an unretryable error", "error", err) return nil } @@ -832,7 +828,7 @@ type genericReconciler struct { } func (r *genericReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - return r.jr.ReconcileGenericJobWrapper(ctx, req, r.newJob()) + return r.jr.ReconcileGenericJob(ctx, req, r.newJob()) } func (r *genericReconciler) SetupWithManager(mgr ctrl.Manager) error { @@ -845,8 +841,8 @@ func (r *genericReconciler) SetupWithManager(mgr ctrl.Manager) error { return b.Complete(r) } -// ResetMinCounts resets the minCount for all podSets if the PartialAdmission feature is not enabled -func ResetMinCounts(in []kueue.PodSet) []kueue.PodSet { +// ClearMinCountsIfFeatureDisabled sets the minCount for all podSets to nil if the PartialAdmission feature is not enabled +func ClearMinCountsIfFeatureDisabled(in []kueue.PodSet) []kueue.PodSet { if features.Enabled(features.PartialAdmission) || len(in) == 0 { return in } diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 7a83635c43..fa09ad6bf5 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -296,6 +296,11 @@ func (p *Pod) constructGroupPodSets(podsInGroup corev1.PodList) ([]kueue.PodSet, var resultPodSets []kueue.PodSet for _, podInGroup := range podsInGroup.Items { + // Skip failed pods + if podInGroup.Status.Phase == corev1.PodFailed { + break + } + tc, err := strconv.Atoi(podInGroup.GetAnnotations()[GroupTotalCountAnnotation]) if err != nil { return nil, fmt.Errorf("failed to extract '%s' annotation from the pod '%s': %w", @@ -353,7 +358,7 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client, } else { if err := c.List(ctx, &podsInGroup, client.MatchingLabels{ GroupNameLabel: p.groupName(), - }); err != nil { + }, client.InNamespace(p.Namespace)); err != nil { return nil, err } @@ -439,7 +444,7 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client) (*kueu var podsInGroup corev1.PodList if err := c.List(ctx, &podsInGroup, client.MatchingLabels{ GroupNameLabel: p.groupName(), - }); err != nil { + }, client.InNamespace(p.Namespace)); err != nil { return nil, nil, err } @@ -456,21 +461,24 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client) (*kueu } func (p *Pod) equivalentToWorkload(wl *kueue.Workload, jobPodSets []kueue.PodSet) bool { - workloadFinished := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished) != nil + workloadFinished := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) if wl.GetName() != p.groupName() { return false } - if !workloadFinished && len(jobPodSets) != len(wl.Spec.PodSets) { + if !workloadFinished && len(wl.Spec.PodSets) < len(jobPodSets) { return false } for i := range wl.Spec.PodSets { - if !workloadFinished && wl.Spec.PodSets[i].Count != jobPodSets[i].Count { + if i >= len(jobPodSets) { + return true + } + if !workloadFinished && wl.Spec.PodSets[i].Count < jobPodSets[i].Count { return false } - if i < len(jobPodSets) && wl.Spec.PodSets[i].Name != jobPodSets[i].Name { + if wl.Spec.PodSets[i].Name != jobPodSets[i].Name { return false } } @@ -516,13 +524,13 @@ func (p *Pod) IsComposableJobFinished(ctx context.Context, c client.Client) (met Status: metav1.ConditionTrue, Reason: "JobFinished", Message: fmt.Sprintf( - "Pod group has finished. Pods succeeded: %d/%d. Pods failed: %d/%d", - succeededPodCount, groupTotalCount, groupTotalCount-succeededPodCount, groupTotalCount, + "Pods succeeded: %d/%d.", + succeededPodCount, groupTotalCount, ), } if succeededPodCount < groupTotalCount { - condition.Status = metav1.ConditionFalse + return metav1.Condition{}, false } return condition, true diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 1cacb32bd9..3c8a49abe5 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -701,7 +701,7 @@ func TestReconciler(t *testing.T) { KueueFinalizer(). Group("test-group"). GroupTotalCount("2"). - StatusPhase(corev1.PodFailed). + StatusPhase(corev1.PodSucceeded). Obj(), }, wantPods: []corev1.Pod{ @@ -718,7 +718,7 @@ func TestReconciler(t *testing.T) { Label("kueue.x-k8s.io/managed", "true"). Group("test-group"). GroupTotalCount("2"). - StatusPhase(corev1.PodFailed). + StatusPhase(corev1.PodSucceeded). Obj(), }, workloads: []kueue.Workload{ @@ -734,7 +734,7 @@ func TestReconciler(t *testing.T) { Obj(), }, wantWorkloads: []kueue.Workload{ - *utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + *utiltesting.MakeWorkload("test-group", "ns"). PodSets( *utiltesting.MakePodSet("b990493b", 2). Request(corev1.ResourceCPU, "1"). @@ -745,15 +745,15 @@ func TestReconciler(t *testing.T) { Admitted(true). Condition(metav1.Condition{ Type: "Finished", - Status: "False", + Status: "True", Reason: "JobFinished", - Message: "Pod group has finished. Pods succeeded: 1/2. Pods failed: 1/2", + Message: "Pods succeeded: 2/2.", }). Obj(), }, workloadCmpOpts: defaultWorkloadCmpOpts, }, - "workload is deleted if the pod in group has been deleted after admission": { + "workload is not deleted if the pod in group has been deleted after admission": { pods: []corev1.Pod{*basePodWrapper. Clone(). Label("kueue.x-k8s.io/managed", "true"). @@ -767,12 +767,6 @@ func TestReconciler(t *testing.T) { KueueFinalizer(). Group("test-group"). GroupTotalCount("2"). - StatusConditions(corev1.PodCondition{ - Type: "TerminationTarget", - Status: corev1.ConditionTrue, - Reason: "StoppedByKueue", - Message: "No matching Workload", - }). Obj()}, workloads: []kueue.Workload{ *utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName). @@ -786,7 +780,18 @@ func TestReconciler(t *testing.T) { Admitted(true). Obj(), }, - wantErr: jobframework.ErrNoMatchingWorkloads, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + *utiltesting.MakePodSet("b990493b", 2). + Request(corev1.ResourceCPU, "1"). + Obj(), + ). + Queue("user-queue"). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Obj(), + }, workloadCmpOpts: defaultWorkloadCmpOpts, }, "pod group is stopped when workload is evicted": { @@ -887,6 +892,7 @@ func TestReconciler(t *testing.T) { *basePodWrapper. Clone(). Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). Group("test-group"). GroupTotalCount("2"). StatusPhase(corev1.PodSucceeded). diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index 3f04d82255..0fd0bd1546 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -307,7 +307,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu gomega.Expect(createdWorkload.Spec.PodSets[0].Name).To(gomega.Equal("120fa2c0")) gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set") - ginkgo.By("checking that pod is unsuspended when workload is admitted") + ginkgo.By("checking that all pods in group are unsuspended when workload is admitted") clusterQueue := testing.MakeClusterQueue("cluster-queue"). ResourceGroup( *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), @@ -356,7 +356,7 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu wlConditionCmpOpts..., )) - ginkgo.By("checking that pod is stopped when workload is evicted") + ginkgo.By("checking that pod group is stopped when workload is evicted") gomega.Expect( workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, @@ -397,6 +397,80 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu )) }) + ginkgo.It("Should keep the existing workload for pod replacement", func() { + ginkgo.By("Creating a single pod with queue and group names") + + pod := testingpod.MakePod("test-pod", ns.Name). + Group("test-group"). + GroupTotalCount("1"). + Queue("test-queue"). + Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + ginkgo.By("checking that workload is created for the pod group with the queue name") + wlLookupKey := types.NamespacedName{ + Namespace: pod.Namespace, + Name: "test-group", + } + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + gomega.Expect(createdWorkload.Spec.PodSets[0].Count).To(gomega.Equal(int32(1))) + gomega.Expect(createdWorkload.Spec.PodSets[0].Name).To(gomega.Equal("120fa2c0")) + gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set") + + ginkgo.By("checking that pod is unsuspended when workload is admitted") + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name, "120fa2c0"). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + createdPod := &corev1.Pod{} + podLookupKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + gomega.Eventually(func(g gomega.Gomega) []corev1.PodSchedulingGate { + g.Expect(k8sClient.Get(ctx, podLookupKey, createdPod)).To(gomega.Succeed()) + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).ShouldNot( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + ) + gomega.Expect(len(createdPod.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + + ginkgo.By("Failing the running pod") + createdPod.Status.Phase = corev1.PodFailed + gomega.Expect(k8sClient.Status().Update(ctx, createdPod)).Should(gomega.Succeed()) + gomega.Consistently(func(g gomega.Gomega) []string { + g.Expect(k8sClient.Get(ctx, podLookupKey, createdPod)).To(gomega.Succeed()) + return createdPod.Finalizers + }, util.Timeout, util.Interval).Should(gomega.ContainElement("kueue.x-k8s.io/managed"), "Pod should have finalizer") + gomega.Expect(createdPod.Status.Phase).To(gomega.Equal(corev1.PodFailed)) + + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + gomega.Expect(createdWorkload.DeletionTimestamp.IsZero()).Should(gomega.BeTrue()) + + ginkgo.By("Creating a replacement pod in the group") + replacementPod := testingpod.MakePod("replacement-test-pod", ns.Name). + Group("test-group"). + GroupTotalCount("1"). + Queue("test-queue"). + Obj() + gomega.Expect(k8sClient.Create(ctx, replacementPod)).Should(gomega.Succeed()) + + // Workload shouldn't be deleted after replacement pod has been created + gomega.Consistently(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + ginkgo.When("Pod owner is managed by Kueue", func() { var pod *corev1.Pod ginkgo.BeforeEach(func() {