Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayJob][Status][11/n] Refactor the suspend operation #1782

Merged
merged 1 commit into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type JobStatus string

// https://docs.ray.io/en/latest/cluster/running-applications/job-submission/jobs-package-ref.html#jobstatus
const (
JobStatusNew JobStatus = ""
JobStatusPending JobStatus = "PENDING"
JobStatusRunning JobStatus = "RUNNING"
JobStatusStopped JobStatus = "STOPPED"
Expand Down
20 changes: 13 additions & 7 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// TODO (kevin85421): Don't return here; we still need to move some of the code below into this block.
case rayv1.JobDeploymentStatusSuspended:
if !rayJobInstance.Spec.Suspend {
r.Log.Info("The status is 'Suspended', but the suspend flag is false. Transition the status to 'New'.")
if err = r.updateState(ctx, rayJobInstance, nil, rayv1.JobStatusNew, rayv1.JobDeploymentStatusNew); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}
// TODO (kevin85421): We may not need to requeue the RayJob if it has already been suspended.
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
case rayv1.JobDeploymentStatusComplete:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
r.Log.Info("JobDeploymentStatusComplete", "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
Expand All @@ -164,7 +174,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
} else {
if err = r.releaseComputeResources(ctx, rayJobInstance); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

}
}
}
Expand All @@ -178,10 +188,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}
// If there is no cluster instance and no error suspend the job deployment
if rayClusterInstance == nil {
// Already suspended?
if rayJobInstance.Status.JobDeploymentStatus == rayv1.JobDeploymentStatusSuspended {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
err = r.updateState(ctx, rayJobInstance, nil, rayJobInstance.Status.JobStatus, rayv1.JobDeploymentStatusSuspended)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
Expand Down Expand Up @@ -254,7 +260,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

if err = r.releaseComputeResources(ctx, rayJobInstance); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// Since RayCluster instance is gone, remove it status also
// on RayJob resource
Expand Down Expand Up @@ -469,7 +475,7 @@ func (r *RayJobReconciler) initRayJobStatusIfNeed(ctx context.Context, rayJob *r
}

if rayJob.Status.JobStatus == "" {
rayJob.Status.JobStatus = rayv1.JobStatusPending
rayJob.Status.JobStatus = rayv1.JobStatusNew
}

return r.updateState(ctx, rayJob, nil, rayJob.Status.JobStatus, rayv1.JobDeploymentStatusInitializing)
Expand Down
79 changes: 9 additions & 70 deletions ray-operator/controllers/ray/rayjob_controller_suspended_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,41 +57,13 @@ var _ = Context("Inside the default namespace", func() {
RayClusterSpec: &rayv1.RayClusterSpec{
RayVersion: "2.8.0",
HeadGroupSpec: rayv1.HeadGroupSpec{
ServiceType: corev1.ServiceTypeClusterIP,
RayStartParams: map[string]string{
"port": "6379",
"object-store-memory": "100000000",
"dashboard-host": "0.0.0.0",
"num-cpus": "1",
"node-ip-address": "127.0.0.1",
"block": "true",
"dashboard-agent-listen-port": "52365",
},
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
},
Annotations: map[string]string{
"key": "value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-head",
Image: "rayproject/ray:2.2.0",
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
Image: "rayproject/ray:2.8.0",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
Expand Down Expand Up @@ -127,50 +99,17 @@ var _ = Context("Inside the default namespace", func() {
},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
{
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(10000),
GroupName: "small-group",
RayStartParams: map[string]string{
"port": "6379",
"num-cpus": "1",
"dashboard-agent-listen-port": "52365",
},
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(10000),
GroupName: "small-group",
RayStartParams: map[string]string{},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-worker",
Image: "rayproject/ray:2.2.0",
Command: []string{"echo"},
Args: []string{"Hello Ray"},
Env: []corev1.EnvVar{
{
Name: "MY_POD_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
},
},
},
},
Ports: []corev1.ContainerPort{
{
Name: "client",
ContainerPort: 80,
},
{
Name: "dashboard-agent",
ContainerPort: 52365,
},
},
Name: "ray-worker",
Image: "rayproject/ray:2.8.0",
},
},
},
Expand Down
8 changes: 8 additions & 0 deletions ray-operator/test/e2e/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ env_vars:
// For Kubernetes Jobs, the default deletion behavior is "orphanDependents," which means the Pods will not be
// cascadingly deleted with the Kubernetes Job by default.

test.T().Logf("Update `suspend` to true. However, since the RayJob is completed, the status should not be updated to `Suspended`.")
rayJob.Spec.Suspend = true
// TODO (kevin85421): We may need to retry `Update` if 409 conflict makes the test flaky.
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Update(test.Ctx(), rayJob, metav1.UpdateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Consistently(RayJob(test, rayJob.Namespace, rayJob.Name)).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))

// Delete the RayJob
err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
Expand Down
Loading