/
cronjob.go
101 lines (88 loc) · 2.92 KB
/
cronjob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package k8s
import (
"context"
"strings"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sapiv1 "github.com/lyft/clutch/backend/api/k8s/v1"
)
func (s *svc) DescribeCronJob(ctx context.Context, clientset, cluster, namespace, name string) (*k8sapiv1.CronJob, error) {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return nil, err
}
cronJobs, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, metav1.ListOptions{
FieldSelector: "metadata.name=" + name,
})
if err != nil {
return nil, err
}
if len(cronJobs.Items) == 1 {
return ProtoForCronJob(cs.Cluster(), &cronJobs.Items[0]), nil
}
if len(cronJobs.Items) > 1 {
return nil, status.Error(codes.FailedPrecondition, "located multiple cron jobs")
}
return nil, status.Error(codes.NotFound, "unable to locate specified cron job")
}
func (s *svc) ListCronJobs(ctx context.Context, clientset, cluster, namespace string, listOptions *k8sapiv1.ListOptions) ([]*k8sapiv1.CronJob, error) {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return nil, err
}
opts, err := ApplyListOptions(listOptions)
if err != nil {
return nil, err
}
cronJobList, err := cs.BatchV1().CronJobs(cs.Namespace()).List(ctx, opts)
if err != nil {
return nil, err
}
var cronJobs []*k8sapiv1.CronJob
for _, d := range cronJobList.Items {
cronJob := d
cronJobs = append(cronJobs, ProtoForCronJob(cs.Cluster(), &cronJob))
}
return cronJobs, nil
}
func (s *svc) DeleteCronJob(ctx context.Context, clientset, cluster, namespace, name string) error {
cs, err := s.manager.GetK8sClientset(ctx, clientset, cluster, namespace)
if err != nil {
return err
}
opts := metav1.DeleteOptions{}
return cs.BatchV1().CronJobs(cs.Namespace()).Delete(ctx, name, opts)
}
func ProtoForCronJob(cluster string, k8scronJob *v1.CronJob) *k8sapiv1.CronJob {
clusterName := GetKubeClusterName(k8scronJob)
if clusterName == "" {
clusterName = cluster
}
// Required fields
ret := &k8sapiv1.CronJob{
Cluster: clusterName,
Namespace: k8scronJob.Namespace,
Name: k8scronJob.Name,
Schedule: k8scronJob.Spec.Schedule,
Labels: k8scronJob.Labels,
Annotations: k8scronJob.Annotations,
}
// Update optional fields
if k8scronJob.Spec.Suspend != nil {
ret.Suspend = *k8scronJob.Spec.Suspend
}
if k8scronJob.Spec.ConcurrencyPolicy != "" {
ret.ConcurrencyPolicy = k8sapiv1.CronJob_ConcurrencyPolicy(
k8sapiv1.CronJob_ConcurrencyPolicy_value[strings.ToUpper(string(k8scronJob.Spec.ConcurrencyPolicy))])
}
if k8scronJob.Status.Active != nil {
ret.NumActiveJobs = int32(len(k8scronJob.Status.Active))
}
if k8scronJob.Spec.StartingDeadlineSeconds != nil {
ret.StartingDeadlineSeconds = &wrappers.Int64Value{Value: *k8scronJob.Spec.StartingDeadlineSeconds}
}
return ret
}