Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] [RayJobs] Use finalizers to implement stopping a job upon cluster deletion #735

Merged
merged 4 commits into from
Nov 24, 2022

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented Nov 17, 2022

Why are these changes needed?

See #629 to get more context. The behavior of this PR is almost the same as #629. The only difference is that this PR promises that operator will try to stop the job at least once. In #629, if the RayJob is deleted when the operator is down, the operator will not try to stop the job.

Related issue number

Closes #676
#629

Checks

We should add tests for RayJob when the integration tests are stable enough. Use #664 to track the progress.

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

Manual tests

  • Step1: Install KubeRay operator

  • Step2: Install a RayCluster

    YAML
    # The resource requests and limits in this config are too small for production!
    # For examples with more realistic resource configuration, see
    # ray-cluster.complete.large.yaml and
    # ray-cluster.autoscaler.large.yaml.
    apiVersion: ray.io/v1alpha1
    kind: RayCluster
    metadata:
      labels:
        controller-tools.k8s.io: "1.0"
        # A unique identifier for the head node and workers of this cluster.
      name: raycluster-complete
    spec:
      rayVersion: '2.0.0'
      ######################headGroupSpec#################################
      # head group template and specs, (perhaps 'group' is not needed in the name)
      headGroupSpec:
        # Kubernetes Service Type, valid values are 'ClusterIP', 'NodePort' and 'LoadBalancer'
        serviceType: ClusterIP
        # for the head group, replicas should always be 1.
        # headGroupSpec.replicas is deprecated in KubeRay >= 0.3.0.
        replicas: 1
        # the following params are used to complete the ray start: ray start --head --block --dashboard-host: '0.0.0.0' ...
        rayStartParams:
          dashboard-host: '0.0.0.0'
          block: 'true'
        #pod template
        template:
          metadata:
            labels:
              # custom labels. NOTE: do not define custom labels start with `raycluster.`, they may be used in controller.
              # Refer to https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
              rayCluster: raycluster-sample # will be injected if missing
              groupName: headgroup # will be injected if missing
            # annotations for pod
            annotations:
              key: value
          spec:
            containers:
            - name: ray-head
              image: rayproject/ray:2.0.0
              ports:
              - containerPort: 6379
                name: gcs
              - containerPort: 8265
                name: dashboard
              - containerPort: 10001
                name: client
              lifecycle:
                preStop:
                  exec:
                    command: ["/bin/sh","-c","ray stop"]
              volumeMounts:
                - mountPath: /tmp/ray
                  name: ray-logs
                - mountPath: /home/ray/samples
                  name: code-sample
              resources:
                limits:
                  cpu: "1"
                  memory: "2G"
                requests:
                  cpu: "500m"
                  memory: "1G"
            volumes:
              - name: ray-logs
                emptyDir: {}
                # You set volumes at the Pod level, then mount them into containers inside that Pod
              - name: code-sample
                configMap:
                  # Provide the name of the ConfigMap you want to mount.
                  name: ray-job-code-sample
                  # An array of keys from the ConfigMap to create as files
                  items:
                    - key: sample_code.py
                      path: sample_code.py
      workerGroupSpecs:
      # the pod replicas in this group typed worker
      - replicas: 1
        minReplicas: 1
        maxReplicas: 10
        # logical group name, for this called large-group, also can be functional
        groupName: large-group
        # if worker pods need to be added, we can simply increment the replicas
        # if worker pods need to be removed, we decrement the replicas, and populate the podsToDelete list
        # the operator will remove pods from the list until the number of replicas is satisfied
        # when a pod is confirmed to be deleted, its name will be removed from the list below
        #scaleStrategy:
        #  workersToDelete:
        #  - raycluster-complete-worker-large-group-bdtwh
        #  - raycluster-complete-worker-large-group-hv457
        #  - raycluster-complete-worker-large-group-k8tj7 
        # the following params are used to complete the ray start: ray start --block
        rayStartParams:
          block: 'true'
        #pod template
        template:
          metadata:
            labels:
              rayCluster: raycluster-complete # will be injected if missing
              groupName: small-group # will be injected if missing
          spec:
            containers:
            - name: machine-learning # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
              image: rayproject/ray:2.0.0
              # environment variables to set in the container.Optional.
              # Refer to https://kubernetes.io/docs/tasks/inject-data-application/define-environment-variable-container/
              lifecycle:
                preStop:
                  exec:
                    command: ["/bin/sh","-c","ray stop"]
              # use volumeMounts.Optional.
              # Refer to https://kubernetes.io/docs/concepts/storage/volumes/
              volumeMounts:
                - mountPath: /tmp/ray
                  name: ray-logs
              resources:
                limits:
                  cpu: "1"
                  memory: "512Mi"
                requests:
                  cpu: "500m"
                  memory: "256Mi"
            initContainers:
            # the env var $RAY_IP is set by the operator if missing, with the value of the head service name
            - name: init-myservice
              image: busybox:1.28
              # Change the cluster postfix if you don't have a default setting
              command: ['sh', '-c', "until nslookup $RAY_IP.$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).svc.cluster.local; do echo waiting for myservice; sleep 2; done"]
            # use volumes
            # Refer to https://kubernetes.io/docs/concepts/storage/volumes/
            volumes:
              - name: ray-logs
                emptyDir: {}
    ######################status#################################
    ---
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: ray-job-code-sample
    data:
      sample_code.py: |
        import ray
        import time
    
        @ray.remote
        class MyActor:
            def __init__(self):
                pass
    
            def func(self, v):
                return v
    
        seconds = 60
        actor = MyActor.remote()
    
        while seconds > 0:
            print(ray.get(actor.func.remote(seconds)))
            time.sleep(1)
  • Step3: Submit a RayJob with clusterSelector

    YAML
    apiVersion: ray.io/v1alpha1
    kind: RayJob
    metadata:
      name: rayjob-sample-2
    spec:
      entrypoint: python /home/ray/samples/sample_code.py
      clusterSelector:
        ray.io/cluster: "raycluster-complete"
  • Step4: Use Dashboard or curl command to check the job status.

    • Example: curl localhost:8265/api/jobs/rayjob-sample-2-j2hnn (RESTful API doc)
  • Step5: Use kubectl describe rayjobs.ray.io rayjob-sample-2 to check the finalizer "ray.io/rayjob-finalizer".

    Example
    Name:         rayjob-sample-2
    Namespace:    default
    Labels:       <none>
    Annotations:  <none>
    API Version:  ray.io/v1alpha1
    Kind:         RayJob
    Metadata:
      Creation Timestamp:  2022-11-17T22:55:10Z
      Finalizers:
        ray.io/rayjob-finalizer
  • Step6: Delete the RayJob kubectl delete rayjobs.ray.io rayjob-sample-2. The finalizer in Step5 will be removed by the operator. Hence, RayJob can be removed successfully.

  • Step7: Check operator's log

    2022-11-22T19:13:53.929Z        INFO    controllers.RayJob      Add a finalizer {"finalizer": "ray.io/rayjob-finalizer"}
    .
    .
    .
    2022-11-22T19:18:54.568Z        INFO    controllers.RayJob      reconciling RayJob      {"NamespacedName": "default/rayjob-sample-2"}
    2022-11-22T19:18:54.568Z        INFO    controllers.RayJob      RayJob is being deleted {"DeletionTimestamp": "2022-11-22 19:18:54 +0000 UTC"}
    2022-11-22T19:18:54.568Z        INFO    controllers.RayJob      Stop a ray job  {"rayJob": "rayjob-sample-2-cn9vg"}
    2022-11-22T19:18:54.576Z        INFO    controllers.RayJob      Remove the finalizer no matter StopJob() succeeds or not.       {"finalizer": "ray.io/rayjob-finalizer"}
    
    

Others (Feel free to ignore this part)

  • Stop job with curl: curl -X POST -H 'Content-Type: application/json' localhost:8265/api/jobs/02000000/stop
  • If we stop a job with STOPPED status, {"stopped": false} will be returned.

@kevin85421 kevin85421 marked this pull request as ready for review November 17, 2022 23:28
@kevin85421 kevin85421 changed the title [WIP][Feature] [RayJobs] Use finalizers to implement stopping a job upon cluster deletion [Feature] [RayJobs] Use finalizers to implement stopping a job upon cluster deletion Nov 17, 2022
@kevin85421
Copy link
Member Author

@Basasuya Do you mind helping me review this PR? Thank you!

return err
}
// StopJob only returns an error when JobStatus is not in terminated states (STOPPED / SUCCEEDED / FAILED)
if (jobInfo.JobStatus == rayv1alpha1.JobStatusPending) || (jobInfo.JobStatus == rayv1alpha1.JobStatusRunning) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we might change the states over time, I would suggest

  • Defining the terminated states in a constant somewhere.
  • Have this condition check if we're not terminated, using that constant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I will update it tomorrow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I decided not to replace isJobSucceedOrFailed and isJobPendingOrRunning with IsJobTerminal in this PR because current RayJob controller does not handle jobs with STOPPED statuses explicitly. I will open an issue to track the progress.

// isJobSucceedOrFailed indicates whether the job comes into end status.
func isJobSucceedOrFailed(status rayv1alpha1.JobStatus) bool {
if status == rayv1alpha1.JobStatusSucceeded || status == rayv1alpha1.JobStatusFailed {
return true
}
return false
}
// isJobPendingOrRunning indicates whether the job is running.
func isJobPendingOrRunning(status rayv1alpha1.JobStatus) bool {
if status == rayv1alpha1.JobStatusPending || status == rayv1alpha1.JobStatusRunning {
return true
}
return false
}

@DmitriGekhtman
Copy link
Collaborator

nits

Also, cc @architkulkarni.

@Jeffwan
Copy link
Collaborator

Jeffwan commented Nov 22, 2022

overall looks good to me. /cc @Basasuya please have a check

@Jeffwan
Copy link
Collaborator

Jeffwan commented Nov 22, 2022

Seems one tricky problem is the stop job failed and finalizer has been removed. at least once won't guarantee the job stopped. In this case we need to reply on ray core's exact once semantics

@DmitriGekhtman
Copy link
Collaborator

DmitriGekhtman commented Nov 22, 2022

Seems one tricky problem is the stop job failed and finalizer has been removed. at least once won't guarantee the job stopped.

@kevin85421 and I talked about this. The idea is to be initially defensive, to avoid triggering a situation where there's an issue stopping the job and the user has to remove the finalizer manually.

The "at least one attempt" semantics in this PR are better than the current behavior, which doesn't handle missed delete events.

@DmitriGekhtman
Copy link
Collaborator

Once we're more confident in this, we can have it try to guarantee stopped jobs by retrying until the job is successfully stopped and only delete finalizer on success.

@scarlet25151
Copy link
Collaborator

scarlet25151 commented Nov 23, 2022

Hi, I think for ray service #647, @sihanwang41 also put out the request that we also need the similar mechanism and we may reuse some codes from here. would it possible for put these two finalizer features into the 0.4.0 release?

@DmitriGekhtman DmitriGekhtman added this to the v0.4.0 release milestone Nov 23, 2022
@Basasuya
Copy link
Contributor

@kevin85421 @Jeffwan it looks good to me 👍

@kevin85421
Copy link
Member Author

This PR is approved by 3 reviewers. Merge it.

@kevin85421 kevin85421 merged commit 89f5fba into ray-project:master Nov 24, 2022
lowang-bh pushed a commit to lowang-bh/kuberay that referenced this pull request Sep 24, 2023
…luster deletion (ray-project#735)

See ray-project#629 to get more context. The behavior of this PR is almost the same as ray-project#629. The only difference is that this PR promises that operator will try to stop the job at least once. In ray-project#629, if the RayJob is deleted when the operator is down, the operator will not try to stop the job.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] [RayJobs] Use finalizers to implement stopping a job upon cluster deletion
5 participants