/
job.go
144 lines (125 loc) · 4.65 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package common
import (
"encoding/json"
"fmt"
"strings"
semver "github.com/Masterminds/semver/v3"
"github.com/google/shlex"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/yaml"
)
// GetRuntimeEnvJson returns the JSON string of the runtime environment for the Ray job.
func getRuntimeEnvJson(rayJobInstance *rayv1.RayJob) (string, error) {
runtimeEnvYAML := rayJobInstance.Spec.RuntimeEnvYAML
if len(runtimeEnvYAML) > 0 {
// Convert YAML to JSON
jsonData, err := yaml.YAMLToJSON([]byte(runtimeEnvYAML))
if err != nil {
return "", err
}
// We return the JSON as a string
return string(jsonData), nil
}
return "", nil
}
// GetBaseRayJobCommand returns the first part of the Ray Job command up to and including the address, e.g. "ray job submit --address http://..."
func GetBaseRayJobCommand(address string) []string {
// add http:// if needed
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
return []string{"ray", "job", "submit", "--address", address}
}
// GetMetadataJson returns the JSON string of the metadata for the Ray job.
func GetMetadataJson(metadata map[string]string, rayVersion string) (string, error) {
// Check that the Ray version is at least 2.6.0.
// If it is, we can use the --metadata-json flag.
// Otherwise, we need to raise an error.
constraint, _ := semver.NewConstraint(">= 2.6.0")
v, err := semver.NewVersion(rayVersion)
if err != nil {
return "", fmt.Errorf("failed to parse Ray version: %v: %v", rayVersion, err)
}
if !constraint.Check(v) {
return "", fmt.Errorf("the Ray version must be at least 2.6.0 to use the metadata field")
}
// Convert the metadata map to a JSON string.
metadataBytes, err := json.Marshal(metadata)
if err != nil {
return "", fmt.Errorf("failed to marshal metadata: %v: %v", metadata, err)
}
return string(metadataBytes), nil
}
// GetK8sJobCommand builds the K8s job command for the Ray job.
func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
address := rayJobInstance.Status.DashboardURL
metadata := rayJobInstance.Spec.Metadata
jobId := rayJobInstance.Status.JobId
entrypoint := rayJobInstance.Spec.Entrypoint
entrypointNumCpus := rayJobInstance.Spec.EntrypointNumCpus
entrypointNumGpus := rayJobInstance.Spec.EntrypointNumGpus
entrypointResources := rayJobInstance.Spec.EntrypointResources
k8sJobCommand := GetBaseRayJobCommand(address)
runtimeEnvJson, err := getRuntimeEnvJson(rayJobInstance)
if err != nil {
return nil, err
}
if len(runtimeEnvJson) > 0 {
k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", runtimeEnvJson)
}
if len(metadata) > 0 {
metadataJson, err := GetMetadataJson(metadata, rayJobInstance.Spec.RayClusterSpec.RayVersion)
if err != nil {
return nil, err
}
k8sJobCommand = append(k8sJobCommand, "--metadata-json", metadataJson)
}
if len(jobId) > 0 {
k8sJobCommand = append(k8sJobCommand, "--submission-id", jobId)
}
if entrypointNumCpus > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
}
if entrypointNumGpus > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
}
if len(entrypointResources) > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-resources", entrypointResources)
}
// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
k8sJobCommand = append(k8sJobCommand, "--")
commandSlice, err := shlex.Split(entrypoint)
if err != nil {
return nil, err
}
k8sJobCommand = append(k8sJobCommand, commandSlice...)
return k8sJobCommand, nil
}
// GetDefaultSubmitterTemplate creates a default submitter template for the Ray job.
func GetDefaultSubmitterTemplate(rayClusterInstance *rayv1.RayCluster) corev1.PodTemplateSpec {
return corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
// Use the image of the Ray head to be defensive against version mismatch issues
Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
}
}