diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index c7b2df2efe..077b1a1fd4 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -33,12 +33,12 @@ import ( ) var ( - gvk = rayjobapi.GroupVersion.WithKind("RayJob") - FrameworkName = "ray.io/rayjob" + gvk = rayjobapi.GroupVersion.WithKind("RayJob") ) const ( headGroupPodSetName = "head" + FrameworkName = "ray.io/rayjob" ) func init() { @@ -163,7 +163,8 @@ func (j *RayJob) Finished() (metav1.Condition, bool) { Reason: string(j.Status.JobStatus), Message: j.Status.Message, } - return condition, rayjobapi.IsJobTerminal(j.Status.JobStatus) + + return condition, j.Status.JobStatus == rayjobapi.JobStatusFailed || j.Status.JobStatus == rayjobapi.JobStatusSucceeded } func (j *RayJob) PodsReady() bool { diff --git a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go index 41b8325463..e6177df94e 100644 --- a/test/integration/controller/jobs/rayjob/rayjob_controller_test.go +++ b/test/integration/controller/jobs/rayjob/rayjob_controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/controller/constants" @@ -600,3 +601,141 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde }) }) + +var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{rayCrdPath}, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerAndSchedulerSetup()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + var ( + ns *corev1.Namespace + onDemandFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "core-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + onDemandFlavor = testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj() + gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("clusterqueue"). + ResourceGroup( + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "4").Obj(), + ). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + + ginkgo.By("creating localQueue") + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + + ginkgo.By("creating priority") + priorityClass := testing.MakePriorityClass(priorityClassName). + PriorityValue(priorityValue).Obj() + gomega.Expect(k8sClient.Create(ctx, priorityClass)).Should(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) + }) + + ginkgo.It("Should preempt lower priority rayJobs when resource insufficient", func() { + ginkgo.By("Create a low priority rayJob") + lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name).Queue(localQueue.Name). + RequestHead(corev1.ResourceCPU, "1"). + RequestWorkerGroup(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, lowPriorityJob)).Should(gomega.Succeed()) + setInitStatus(lowPriorityJob.Name, lowPriorityJob.Namespace) + + ginkgo.By("Await for the low priority workload to be admitted") + createdJob := &rayjobapi.RayJob{} + gomega.Eventually(func() bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + return createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeFalse()) + + ginkgo.By("Create a high priority rayJob which will preempt the lower one") + highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name).Queue(localQueue.Name). + RequestHead(corev1.ResourceCPU, "2"). + WithPriorityClassName(priorityClassName). + RequestWorkerGroup(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, highPriorityJob)).Should(gomega.Succeed()) + setInitStatus(highPriorityJob.Name, highPriorityJob.Namespace) + + ginkgo.By("High priority workload should be admitted") + highPriorityWL := &kueue.Workload{} + highPriorityLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(highPriorityJob.Name), Namespace: ns.Name} + + gomega.Eventually(func() error { + return k8sClient.Get(ctx, highPriorityLookupKey, highPriorityWL) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + apimeta.IsStatusConditionTrue(highPriorityWL.Status.Conditions, kueue.WorkloadAdmitted) + + ginkgo.By("Low priority workload should not be admitted") + createdWorkload := &kueue.Workload{} + lowPriorityLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(lowPriorityJob.Name), Namespace: ns.Name} + + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lowPriorityLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + apimeta.IsStatusConditionFalse(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted) + + ginkgo.By("Low priority rayJob should be suspended") + createdJob = &rayjobapi.RayJob{} + gomega.Eventually(func() bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + return createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("Delete high priority rayjob") + gomega.Expect(k8sClient.Delete(ctx, highPriorityJob)).To(gomega.Succeed()) + gomega.EventuallyWithOffset(1, func() error { + rayjob := &rayjobapi.RayJob{} + return k8sClient.Get(ctx, client.ObjectKeyFromObject(highPriorityJob), rayjob) + }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) + // Manually delete workload because no garbage collection controller. + gomega.Expect(k8sClient.Delete(ctx, highPriorityWL)).To(gomega.Succeed()) + gomega.EventuallyWithOffset(1, func() error { + wl := &kueue.Workload{} + return k8sClient.Get(ctx, highPriorityLookupKey, wl) + }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) + + ginkgo.By("Low priority workload should be admitted again") + createdWorkload = &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lowPriorityLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted) + + ginkgo.By("Low priority rayJob should be unsuspended") + createdJob = &rayjobapi.RayJob{} + gomega.Eventually(func() bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + return createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.BeFalse()) + }) +})