/
daskjob.go
77 lines (65 loc) · 2.12 KB
/
daskjob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package managers
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
operationv1 "github.com/polyaxon/mloperator/api/v1"
"github.com/polyaxon/mloperator/controllers/daskapi"
"github.com/polyaxon/mloperator/controllers/kinds"
)
// generateHeadGroupSpec generates a new ReplicaSpec
func generateClusterSpec(worker operationv1.DaskReplicaSpec, scheduler operationv1.DaskReplicaSpec, service corev1.ServiceSpec, labels map[string]string, annotations map[string]string) daskapi.DaskCluster {
l := make(map[string]string)
for k, v := range labels {
l[k] = v
}
a := make(map[string]string)
for k, v := range annotations {
a[k] = v
}
return daskapi.DaskCluster{
Spec: daskapi.DaskClusterSpec{
Worker: daskapi.WorkerSpec{
Replicas: worker.Replicas,
Spec: worker.Template.Spec,
},
Scheduler: daskapi.SchedulerSpec{
Spec: scheduler.Template.Spec,
Service: service,
},
},
}
}
// GenerateDaskJob returns a DaskJob
func GenerateDaskJob(
name string,
namespace string,
labels map[string]string,
annotations map[string]string,
termination operationv1.TerminationSpec,
spec operationv1.DaskJobSpec,
) (*unstructured.Unstructured, error) {
cluster := generateClusterSpec(spec.ReplicaSpecs[operationv1.DaskReplicaTypeWorker], spec.ReplicaSpecs[operationv1.DaskReplicaTypeScheduler], spec.Service, labels, annotations)
jobSpec := &daskapi.DaskJobSpec{
Job: daskapi.JobSpec{
Spec: spec.ReplicaSpecs[operationv1.DaskReplicaTypeJob].Template.Spec,
},
Cluster: cluster,
}
job := &unstructured.Unstructured{}
job.SetAPIVersion(kinds.DaskAPIVersion)
job.SetKind(kinds.DaskJobKind)
job.SetLabels(labels)
job.SetAnnotations(annotations)
job.SetName(name)
job.SetNamespace(namespace)
jobManifest, err := runtime.DefaultUnstructuredConverter.ToUnstructured(jobSpec)
if err != nil {
return nil, fmt.Errorf("Convert daskjob to unstructured error: %v", err)
}
if err := unstructured.SetNestedField(job.Object, jobManifest, "spec"); err != nil {
return nil, fmt.Errorf("Set .spec.hosts error: %v", err)
}
return job, nil
}