Skip to content

Commit

Permalink
Fix rayjob will not resume after preempted
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <kerthcet@gmail.com>
  • Loading branch information
kerthcet committed Oct 8, 2023
1 parent 542d315 commit 7623656
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 3 deletions.
7 changes: 4 additions & 3 deletions pkg/controller/jobs/rayjob/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ import (
)

var (
gvk = rayjobapi.GroupVersion.WithKind("RayJob")
FrameworkName = "ray.io/rayjob"
gvk = rayjobapi.GroupVersion.WithKind("RayJob")
)

const (
headGroupPodSetName = "head"
FrameworkName = "ray.io/rayjob"
)

func init() {
Expand Down Expand Up @@ -163,7 +163,8 @@ func (j *RayJob) Finished() (metav1.Condition, bool) {
Reason: string(j.Status.JobStatus),
Message: j.Status.Message,
}
return condition, rayjobapi.IsJobTerminal(j.Status.JobStatus)

return condition, j.Status.JobStatus == rayjobapi.JobStatusFailed || j.Status.JobStatus == rayjobapi.JobStatusSucceeded
}

func (j *RayJob) PodsReady() bool {
Expand Down
139 changes: 139 additions & 0 deletions test/integration/controller/jobs/rayjob/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
Expand Down Expand Up @@ -600,3 +601,141 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde

})
})

var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
CRDPath: crdPath,
DepCRDPaths: []string{rayCrdPath},
}
cfg = fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerAndSchedulerSetup())
})
ginkgo.AfterAll(func() {
fwk.Teardown()
})

var (
ns *corev1.Namespace
onDemandFlavor *kueue.ResourceFlavor
clusterQueue *kueue.ClusterQueue
localQueue *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "core-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())

onDemandFlavor = testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj()
gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed())

clusterQueue = testing.MakeClusterQueue("clusterqueue").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "4").Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())

ginkgo.By("creating localQueue")
localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())

ginkgo.By("creating priority")
priorityClass := testing.MakePriorityClass(priorityClassName).
PriorityValue(priorityValue).Obj()
gomega.Expect(k8sClient.Create(ctx, priorityClass)).Should(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})

ginkgo.It("Should preempt lower priority rayJobs when resource insufficient", func() {
ginkgo.By("Create a low priority rayJob")
lowPriorityJob := testingrayjob.MakeJob("rayjob-with-low-priority", ns.Name).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "1").
RequestWorkerGroup(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, lowPriorityJob)).Should(gomega.Succeed())
setInitStatus(lowPriorityJob.Name, lowPriorityJob.Namespace)

ginkgo.By("Await for the low priority workload to be admitted")
createdJob := &rayjobapi.RayJob{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)).
Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())

ginkgo.By("Create a high priority rayJob which will preempt the lower one")
highPriorityJob := testingrayjob.MakeJob("rayjob-with-high-priority", ns.Name).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "2").
WithPriorityClassName(priorityClassName).
RequestWorkerGroup(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, highPriorityJob)).Should(gomega.Succeed())
setInitStatus(highPriorityJob.Name, highPriorityJob.Namespace)

ginkgo.By("High priority workload should be admitted")
highPriorityWL := &kueue.Workload{}
highPriorityLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(highPriorityJob.Name), Namespace: ns.Name}

gomega.Eventually(func() error {
return k8sClient.Get(ctx, highPriorityLookupKey, highPriorityWL)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
apimeta.IsStatusConditionTrue(highPriorityWL.Status.Conditions, kueue.WorkloadAdmitted)

ginkgo.By("Low priority workload should not be admitted")
createdWorkload := &kueue.Workload{}
lowPriorityLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(lowPriorityJob.Name), Namespace: ns.Name}

gomega.Eventually(func() error {
return k8sClient.Get(ctx, lowPriorityLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
apimeta.IsStatusConditionFalse(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)

ginkgo.By("Low priority rayJob should be suspended")
createdJob = &rayjobapi.RayJob{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)).
Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("Delete high priority rayjob")
gomega.Expect(k8sClient.Delete(ctx, highPriorityJob)).To(gomega.Succeed())
gomega.EventuallyWithOffset(1, func() error {
rayjob := &rayjobapi.RayJob{}
return k8sClient.Get(ctx, client.ObjectKeyFromObject(highPriorityJob), rayjob)
}, util.Timeout, util.Interval).Should(testing.BeNotFoundError())
// Manually delete workload because no garbage collection controller.
gomega.Expect(k8sClient.Delete(ctx, highPriorityWL)).To(gomega.Succeed())
gomega.EventuallyWithOffset(1, func() error {
wl := &kueue.Workload{}
return k8sClient.Get(ctx, highPriorityLookupKey, wl)
}, util.Timeout, util.Interval).Should(testing.BeNotFoundError())

ginkgo.By("Low priority workload should be admitted again")
createdWorkload = &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lowPriorityLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)

ginkgo.By("Low priority rayJob should be unsuspended")
createdJob = &rayjobapi.RayJob{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lowPriorityJob.Name, Namespace: lowPriorityJob.Namespace}, createdJob)).
Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())
})
})

0 comments on commit 7623656

Please sign in to comment.