Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

if err := validateRayJobSpec(rayJobInstance); err != nil {
r.Log.Error(err, "The RayJob spec is invalid")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// Please do NOT modify `originalRayJobInstance` in the following code.
originalRayJobInstance := rayJobInstance.DeepCopy()

Expand Down Expand Up @@ -536,13 +541,6 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
if err := r.Get(ctx, rayClusterNamespacedName, rayClusterInstance); err != nil {
if errors.IsNotFound(err) {
r.Log.Info("RayCluster not found", "RayJob", rayJobInstance.Name, "RayCluster", rayClusterNamespacedName)
// TODO: If both ClusterSelector and RayClusterSpec are not set, we should avoid retrieving a RayCluster instance.
// Consider moving this logic to a more appropriate location.
if len(rayJobInstance.Spec.ClusterSelector) == 0 && rayJobInstance.Spec.RayClusterSpec == nil {
err := fmt.Errorf("one of ClusterSelector or RayClusterSpec must be set, but both are undefined, err: %v", err)
return nil, err
}

if len(rayJobInstance.Spec.ClusterSelector) != 0 {
err := fmt.Errorf("we have choosed the cluster selector mode, failed to find the cluster named %v, err: %v", rayClusterInstanceName, err)
return nil, err
Expand Down Expand Up @@ -615,3 +613,13 @@ func (r *RayJobReconciler) updateStatusToSuspendingIfNeeded(ctx context.Context,
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusSuspending
return true
}

func validateRayJobSpec(rayJob *rayv1.RayJob) error {
if rayJob.Spec.Suspend && !rayJob.Spec.ShutdownAfterJobFinishes {
return fmt.Errorf("a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended")
}
if rayJob.Spec.RayClusterSpec == nil && len(rayJob.Spec.ClusterSelector) == 0 {
return fmt.Errorf("one of RayClusterSpec or ClusterSelector must be set")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ var _ = Context("Inside the default namespace", func() {
Namespace: "default",
},
Spec: rayv1.RayJobSpec{
Suspend: true,
Entrypoint: "sleep 999",
ShutdownAfterJobFinishes: true,
Suspend: true,
Entrypoint: "sleep 999",
RayClusterSpec: &rayv1.RayClusterSpec{
RayVersion: "2.9.0",
HeadGroupSpec: rayv1.HeadGroupSpec{
Expand Down
22 changes: 22 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,25 @@ func TestUpdateRayJobStatus(t *testing.T) {
})
}
}

func TestValidateRayJobSpec(t *testing.T) {
err := validateRayJobSpec(&rayv1.RayJob{})
assert.Error(t, err, "The RayJob is invalid because both `RayClusterSpec` and `ClusterSelector` are empty")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: false,
},
})
assert.Error(t, err, "The RayJob is invalid because a RayJob with shutdownAfterJobFinishes set to false is not allowed to be suspended.")

err = validateRayJobSpec(&rayv1.RayJob{
Spec: rayv1.RayJobSpec{
Suspend: true,
ShutdownAfterJobFinishes: true,
RayClusterSpec: &rayv1.RayClusterSpec{},
},
})
assert.NoError(t, err, "The RayJob is valid.")
}