/
job.go
100 lines (80 loc) · 2.52 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package k8s
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1"
)
func (s *svc) DescribeJob(ctx context.Context, clientset, cluster, namespace, name string) (*k8sapiv1.Job, error) {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return nil, err
}
jobs, err := cs.BatchV1().Jobs(cs.Namespace()).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + name,
})
if err != nil {
return nil, err
}
if len(jobs.Items) == 1 {
return protoForJob(cs.Cluster(), &jobs.Items[0]), nil
} else if len(jobs.Items) > 1 {
return nil, status.Error(codes.FailedPrecondition, "located multiple jobs")
}
return nil, status.Error(codes.NotFound, "unable to locate specified job")
}
func (s *svc) DeleteJob(ctx context.Context, clientset, cluster, namespace, name string) error {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return err
}
opts := metav1.DeleteOptions{}
return cs.BatchV1().Jobs(cs.Namespace()).Delete(ctx, name, opts)
}
func (s *svc) CreateJob(ctx context.Context, clientset, cluster, namespace string, job *v1.Job) (*k8sapiv1.Job, error) {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return nil, err
}
opts := metav1.CreateOptions{}
resultJob, err := cs.BatchV1().Jobs(cs.Namespace()).Create(ctx, job, opts)
if err != nil {
return nil, err
}
return protoForJob(cs.Cluster(), resultJob), nil
}
func (s *svc) ListJobs(ctx context.Context, clientset, cluster, namespace string, listOptions *k8sapiv1.ListOptions) ([]*k8sapiv1.Job, error) {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return nil, err
}
opts, err := ApplyListOptions(listOptions)
if err != nil {
return nil, err
}
jobList, err := cs.BatchV1().Jobs(cs.Namespace()).List(ctx, opts)
if err != nil {
return nil, err
}
var jobs []*k8sapiv1.Job
for _, j := range jobList.Items {
job := j
jobs = append(jobs, protoForJob(cs.Cluster(), &job))
}
return jobs, nil
}
func protoForJob(cluster string, k8sJob *v1.Job) *k8sapiv1.Job {
clusterName := GetKubeClusterName(k8sJob)
if clusterName == "" {
clusterName = cluster
}
return &k8sapiv1.Job{
Cluster: clusterName,
Namespace: k8sJob.Namespace,
Name: k8sJob.Name,
Labels: k8sJob.Labels,
Annotations: k8sJob.Annotations,
}
}