Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Job submission support to the API server #1639

Merged
merged 20 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions apiserver/JobSubmission.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
# Job Submission using API server

Ray provides very convinient and powerful [Job Submission APIs](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/rest.html). The issue is that it needs to access cluster's dashboard Ingress, which currently does not have any security implementations. Alternatively you can use Ray cluster head service for this, but in this case your Ray job management code has to run in the same kubernetes cluster as Ray. Job submission implementation in the API server leverages already exposed URL of the API server to locate cluster and use its head service to implement Job management functionality. Because you can access API server running on the remote kubernetes cluster you can use these APIs for managing remote Ray clusters without exposing them via Ingress.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ray provides very convinient and powerful [Job Submission APIs](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/rest.html). The issue is that it needs to access cluster's dashboard Ingress, which currently does not have any security implementations. Alternatively you can use Ray cluster head service for this, but in this case your Ray job management code has to run in the same kubernetes cluster as Ray. Job submission implementation in the API server leverages already exposed URL of the API server to locate cluster and use its head service to implement Job management functionality. Because you can access API server running on the remote kubernetes cluster you can use these APIs for managing remote Ray clusters without exposing them via Ingress.
Ray provides very convenient and powerful [Job Submission APIs](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/rest.html). The issue is that it needs to access cluster's dashboard Ingress, which currently does not have any security implementations. Alternatively you can use Ray cluster head service for this, but in this case your Ray job management code has to run in the same kubernetes cluster as Ray. Job submission implementation in the API server leverages already exposed URL of the API server to locate cluster and use its head service to implement Job management functionality. Because you can access API server running on the remote kubernetes cluster you can use these APIs for managing remote Ray clusters without exposing them via Ingress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blublinsky these doc changes don't show up as fixed on Github, can you double check on your end?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blublinsky These fixes aren't showing up as fixed on Github, can you double check on your end? Are they being overwritten somehow?

blublinsky marked this conversation as resolved.
Show resolved Hide resolved

## Using Job Submission APIs

Note that job submission APIs will only work if you are running API server within kubernetes cluster. Local Development option of the API server will not work.
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

The first step is to deploy KubeRay operator and API server.

### Deploy KubeRay operator and API server

Reffer to [readme](README.md) for setting up KubRay operator and API server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Reffer to [readme](README.md) for setting up KubRay operator and API server.
Refer to [readme](README.md) for setting up KubRay operator and API server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not fixed still? Also KubRay -> KubeRay


```shell
make docker-image cluster load-image deploy
```

blublinsky marked this conversation as resolved.
Show resolved Hide resolved
Once they are set up, you first need to create a Ray cluster using the following commands:

### Deploy Ray cluster

Execute the following commands:

```shell
curl -X POST 'localhost:31888/apis/v1/namespaces/default/compute_templates' \
--header 'Content-Type: application/json' \
--data '{
"name": "default-template",
"namespace": "default",
"cpu": 2,
"memory": 4
}'
curl -X POST 'localhost:31888/apis/v1/namespaces/default/clusters' \
--header 'Content-Type: application/json' \
--data '{
"name": "test-cluster",
"namespace": "default",
"user": "boris",
"clusterSpec": {
"headGroupSpec": {
"computeTemplate": "default-template",
"image": "rayproject/ray:2.7.0-py310",
"serviceType": "NodePort",
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"metrics-export-port": "8080"
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-job-code-sample",
"items": {"sample_code.py" : "sample_code.py"}
}
]
},
"workerGroupSpec": [
{
"groupName": "small-wg",
"computeTemplate": "default-template",
"image": "rayproject/ray:2.7.0-py310",
"replicas": 1,
"minReplicas": 0,
"maxReplicas": 5,
"rayStartParams": {
"node-ip-address": "$MY_POD_IP"
},
"volumes": [
{
"name": "code-sample",
"mountPath": "/home/ray/samples",
"volumeType": "CONFIGMAP",
"source": "ray-job-code-sample",
"items": {"sample_code.py" : "sample_code.py"}
}
]
}
]
}
}'
```

Note that this cluster is mounting a volume from a configmap. This config map should be created prior to cluster creation using this [yaml](/test/job/code.yaml).
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

### Submit Ray Job

Once the cluster is up and running, you can submit a job to the cluster using the following command:

```shell
curl -X POST 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster' \
--header 'Content-Type: application/json' \
--data '{
"entrypoint": "python /home/ray/samples/sample_code.py",
"runtimeEnv": "pip:\n - requests==2.26.0\n - pendulum==2.1.2\nenv_vars:\n counter_name: test_counter\n",
"numCpus": ".5"
}'
```

This should return the following:

```json
{
"submissionId":"raysubmit_KWZLwme56esG3Wcr"
}
```

Note that the `submissionId` value that you will get is different

### Get job details

Once the job is submitted, the following command can be used to get job's details (Note that submission id returned during job creation should be used here):

```shell
curl -X GET 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster/raysubmit_KWZLwme56esG3Wcr' \
--header 'Content-Type: application/json'
```

This should return JSON similar to the one below
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

```json
{
"entrypoint":"python /home/ray/samples/sample_code.py",
"jobId":"02000000",
"submissionId":"raysubmit_KWZLwme56esG3Wcr",
"status":"SUCCEEDED",
"message":"Job finished successfully.",
"startTime":"1699442662879",
"endTime":"1699442682405",
"runtimeEnv":{
"env_vars":"map[counter_name:test_counter]",
"pip":"[requests==2.26.0 pendulum==2.1.2]"
}
}
```

### Get Job log

You can also get job execution log using the following command (Note that submission id returned during job creation should be used here):

```shell
curl -X GET 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster/log/raysubmit_KWZLwme56esG3Wcr' \
--header 'Content-Type: application/json'
```

This will return execution log, that will look something like the following
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

```text
2023-11-08 03:24:31,904\tINFO worker.py:1329 -- Using address 10.244.2.2:6379 set in the environment variable RAY_ADDRESS
2023-11-08 03:24:31,905\tINFO worker.py:1458 -- Connecting to existing Ray cluster at address: 10.244.2.2:6379...
2023-11-08 03:24:31,921\tINFO worker.py:1633 -- Connected to Ray cluster. View the dashboard at \u001b[1m\u001b[32m10.244.2.2:8265 \u001b[39m\u001b[22m
test_counter got 1
test_counter got 2
test_counter got 3
test_counter got 4
test_counter got 5
```

Note that this command always returns execution log from the begining (no streaming support) till the current moment
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

### List jobs

You can also list all the jobs (in any state) in the Ray cluster using the following command:

```shell
curl -X GET 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster' \
--header 'Content-Type: application/json'
```

This should return the list of the submissions, that looks as follows:

```json
{
"submissions":[
{
"entrypoint":"python /home/ray/samples/sample_code.py",
"jobId":"02000000",
"submissionId":"raysubmit_KWZLwme56esG3Wcr",
"status":"SUCCEEDED",
"message":"Job finished successfully.",
"startTime":"1699442662879",
"endTime":"1699442682405",
"runtimeEnv":{
"env_vars":"map[counter_name:test_counter]",
"pip":"[requests==2.26.0 pendulum==2.1.2]"
}
}
]
}
```

### Stop Job

Execution of the job can be stoped using the following command (Note that submission id returned during job creation should be used here):
blublinsky marked this conversation as resolved.
Show resolved Hide resolved

```shell
curl -X POST 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster/raysubmit_KWZLwme56esG3Wcr' \
--header 'Content-Type: application/json'
```

### Delete Job

Finally, you can delete job using the following command (Note that submission id returned during job creation should be used here):

```shell
curl -X DELETE 'localhost:31888/apis/v1/namespaces/default/jobsubmissions/test-cluster/raysubmit_KWZLwme56esG3Wcr' \
--header 'Content-Type: application/json'
```

You can validate job deletion by looking at the Ray dashboard (jobs pane) and ensuring that it was removed
blublinsky marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 12 additions & 4 deletions apiserver/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,21 @@ func startRpcServer(resourceManager *manager.ResourceManager) {
klog.Fatalf("Failed to start GPRC server: %v", err)
}

clusterServer := server.NewClusterServer(resourceManager, &server.ClusterServerOptions{CollectMetrics: *collectMetricsFlag})
templateServer := server.NewComputeTemplateServer(resourceManager, &server.ComputeTemplateServerOptions{CollectMetrics: *collectMetricsFlag})
jobServer := server.NewRayJobServer(resourceManager, &server.JobServerOptions{CollectMetrics: *collectMetricsFlag})
jobSubmissionServer := server.NewRayJobSubmissionServiceServer(clusterServer, &server.RayJobSubmissionServiceServerOptions{CollectMetrics: *collectMetricsFlag})
serveServer := server.NewRayServiceServer(resourceManager, &server.ServiceServerOptions{CollectMetrics: *collectMetricsFlag})

s := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(grpc_prometheus.UnaryServerInterceptor, interceptor.ApiServerInterceptor)),
grpc.MaxRecvMsgSize(math.MaxInt32))
api.RegisterClusterServiceServer(s, server.NewClusterServer(resourceManager, &server.ClusterServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterComputeTemplateServiceServer(s, server.NewComputeTemplateServer(resourceManager, &server.ComputeTemplateServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterRayJobServiceServer(s, server.NewRayJobServer(resourceManager, &server.JobServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterRayServeServiceServer(s, server.NewRayServiceServer(resourceManager, &server.ServiceServerOptions{CollectMetrics: *collectMetricsFlag}))
api.RegisterClusterServiceServer(s, clusterServer)
api.RegisterComputeTemplateServiceServer(s, templateServer)
api.RegisterRayJobServiceServer(s, jobServer)
api.RegisterRayJobSubmissionServiceServer(s, jobSubmissionServer)
api.RegisterRayServeServiceServer(s, serveServer)

// Register reflection service on gRPC server.
reflection.Register(s)
Expand Down Expand Up @@ -125,6 +132,7 @@ func startHttpProxy() {
registerHttpHandlerFromEndpoint(api.RegisterComputeTemplateServiceHandlerFromEndpoint, "ComputeTemplateService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(api.RegisterRayJobServiceHandlerFromEndpoint, "JobService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(api.RegisterRayServeServiceHandlerFromEndpoint, "ServeService", ctx, runtimeMux)
registerHttpHandlerFromEndpoint(api.RegisterRayJobSubmissionServiceHandlerFromEndpoint, "RayJobSubmissionService", ctx, runtimeMux)

// Create a top level mux to include both Http gRPC servers and other endpoints like metrics
topMux := http.NewServeMux()
Expand Down
11 changes: 9 additions & 2 deletions apiserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ require (
require (
github.com/dustinkirkland/golang-petname v0.0.0-20230626224747-e794b9370d49
github.com/elazarl/go-bindata-assetfs v1.0.1
github.com/go-logr/logr v1.2.3
github.com/go-logr/zerologr v1.2.3
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.6.0
github.com/rs/zerolog v1.31.0
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af
sigs.k8s.io/yaml v1.3.0
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af
)

Expand All @@ -35,7 +40,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-openapi/errors v0.19.6 // indirect
github.com/go-openapi/strfmt v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand All @@ -48,13 +53,16 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.28.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.mongodb.org/mongo-driver v1.5.1 // indirect
golang.org/x/net v0.17.0 // indirect
Expand All @@ -71,7 +79,6 @@ require (
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace (
Expand Down
Loading
Loading