/
rayjob.go
148 lines (133 loc) · 4.49 KB
/
rayjob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package managers
import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
operationv1 "github.com/polyaxon/mloperator/api/v1"
"github.com/polyaxon/mloperator/controllers/kinds"
"github.com/polyaxon/mloperator/controllers/rayapi"
"github.com/polyaxon/mloperator/controllers/utils"
)
/*
GetRayStartParams utils function to handle default case
*/
func GetRayStartParams(rayStartParams map[string]string) map[string]string {
if rayStartParams != nil && len(rayStartParams) > 0 {
return rayStartParams
}
return make(map[string]string)
}
// generateHeadGroupSpec generates a new ReplicaSpec
func generateHeadGroupSpec(replicaSpec operationv1.RayReplicaSpec, name string, labels map[string]string, annotations map[string]string) rayapi.HeadGroupSpec {
l := make(map[string]string)
for k, v := range labels {
if k != "app.kubernetes.io/name" {
l[k] = v
}
}
a := make(map[string]string)
for k, v := range annotations {
a[k] = v
}
return rayapi.HeadGroupSpec{
RayStartParams: GetRayStartParams(replicaSpec.RayStartParams),
HeadService: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: name + "-svc", Labels: l, Annotations: a},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Name: name + "-svc", Labels: l, Annotations: a},
Spec: replicaSpec.Template.Spec,
},
}
}
// generateWorkerGroupSpec generates a new ReplicaSpec
func generateWorkerGroupSpec(replicaSpec operationv1.RayReplicaSpec, labels map[string]string, annotations map[string]string, idx int) rayapi.WorkerGroupSpec {
l := make(map[string]string)
for k, v := range labels {
if k != "app.kubernetes.io/name" {
l[k] = v
}
}
a := make(map[string]string)
for k, v := range annotations {
a[k] = v
}
// Use groupName or generate a new name based on idx
var groupName string
if replicaSpec.GroupName != "" {
groupName = replicaSpec.GroupName
} else {
groupName = fmt.Sprintf("worker-%d", idx)
}
return rayapi.WorkerGroupSpec{
GroupName: groupName,
Replicas: utils.GetNumReplicas(replicaSpec.Replicas),
MinReplicas: utils.GetNumReplicas(replicaSpec.MinReplicas),
MaxReplicas: utils.GetNumReplicas(replicaSpec.MaxReplicas),
RayStartParams: GetRayStartParams(replicaSpec.RayStartParams),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: l, Annotations: a},
Spec: replicaSpec.Template.Spec,
},
}
}
// GenerateRayJob returns a RayJob
func GenerateRayJob(
name string,
namespace string,
labels map[string]string,
annotations map[string]string,
termination operationv1.TerminationSpec,
spec operationv1.RayJobSpec,
) (*unstructured.Unstructured, error) {
head := generateHeadGroupSpec(spec.Head, name, labels, annotations)
var workers []rayapi.WorkerGroupSpec
if spec.Workers != nil && len(spec.Workers) > 0 {
workers = make([]rayapi.WorkerGroupSpec, len(spec.Workers))
for i, w := range spec.Workers {
workers[i] = generateWorkerGroupSpec(w, labels, annotations, i)
}
} else {
workers = nil
}
cluster := &rayapi.RayClusterSpec{
RayVersion: spec.RayVersion,
HeadGroupSpec: head,
WorkerGroupSpecs: workers,
}
// TODO: Replace shutdownAfterJobFinishes with termination.ActiveDeadlineSeconds
jobSpec := &rayapi.RayJobSpec{
Entrypoint: spec.Entrypoint,
Metadata: spec.Metadata,
RuntimeEnv: spec.RuntimeEnv,
JobId: name,
ShutdownAfterJobFinishes: true,
TTLSecondsAfterFinished: utils.GetTTL(termination.TTLSecondsAfterFinished),
RayClusterSpec: cluster,
}
jobStatus := &rayapi.RayJobStatus{
JobId: name,
RayClusterName: name,
}
job := &unstructured.Unstructured{}
job.SetAPIVersion(kinds.RayAPIVersion)
job.SetKind(kinds.RayJobKind)
job.SetLabels(labels)
job.SetAnnotations(annotations)
job.SetName(name)
job.SetNamespace(namespace)
jobManifest, err := runtime.DefaultUnstructuredConverter.ToUnstructured(jobSpec)
jobStatusManifest, err := runtime.DefaultUnstructuredConverter.ToUnstructured(jobStatus)
if err != nil {
return nil, fmt.Errorf("Convert rayjob to unstructured error: %v", err)
}
if err := unstructured.SetNestedField(job.Object, jobManifest, "spec"); err != nil {
return nil, fmt.Errorf("Set .spec error: %v", err)
}
if err := unstructured.SetNestedField(job.Object, jobStatusManifest, "status"); err != nil {
return nil, fmt.Errorf("Set status error: %v", err)
}
return job, nil
}