-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
149 lines (127 loc) · 3.39 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package scheduler
import (
"encoding/json"
"errors"
"fmt"
"html/template"
"io/ioutil"
"time"
log "github.com/sirupsen/logrus"
"github.com/crunchydata/postgres-operator/apiserver"
"github.com/crunchydata/postgres-operator/kubeapi"
cv2 "gopkg.in/robfig/cron.v2"
"k8s.io/client-go/kubernetes"
)
var PolicyJobTemplate *template.Template
func Init() error {
buf, err := ioutil.ReadFile("/pgo-config/pgo.sqlrunner-template.json")
if err != nil {
return err
}
PolicyJobTemplate = template.Must(template.New("policy").Parse(string(buf)))
return nil
}
func New(label, namespace string, client *kubernetes.Clientset) *Scheduler {
apiserver.ConnectToKube()
restClient = apiserver.RESTClient
kubeClient = client
cronClient := cv2.New()
var p phony
cronClient.AddJob("* * * * *", p)
return &Scheduler{
namespace: namespace,
label: label,
CronClient: cronClient,
entries: make(map[string]cv2.EntryID),
}
}
func (s *Scheduler) AddSchedules() error {
configs, _ := kubeapi.ListConfigMap(kubeClient, s.label, s.namespace)
for _, config := range configs.Items {
if _, ok := s.entries[string(config.Name)]; ok {
continue
}
contextErr := log.WithFields(log.Fields{
"configMap": config.Name,
})
if len(config.Data) != 1 {
contextErr.WithFields(log.Fields{
"error": errors.New("Schedule configmaps should contain only one schedule"),
}).Error("Failed reading configMap")
}
var schedule ScheduleTemplate
for _, data := range config.Data {
if err := json.Unmarshal([]byte(data), &schedule); err != nil {
contextErr.WithFields(log.Fields{
"error": err,
}).Error("Failed unmarshaling configMap")
continue
}
}
if err := validate(schedule); err != nil {
contextErr.WithFields(log.Fields{
"error": err,
}).Error("Failed to validate schedule")
continue
}
id, err := s.schedule(schedule)
if err != nil {
contextErr.WithFields(log.Fields{
"error": err,
}).Error("Failed to schedule configMap")
continue
}
log.WithFields(log.Fields{
"configMap": string(config.Name),
"type": schedule.Type,
"schedule": schedule.Schedule,
"namespace": schedule.Namespace,
"deployment": schedule.Deployment,
"label": schedule.Label,
"container": schedule.Container,
}).Info("Added new schedule")
s.entries[string(config.Name)] = id
}
return nil
}
func (s *Scheduler) DeleteSchedules() error {
configs, _ := kubeapi.ListConfigMap(kubeClient, s.label, s.namespace)
for name := range s.entries {
found := false
for _, config := range configs.Items {
if name == string(config.Name) {
found = true
}
}
if !found {
log.WithFields(log.Fields{
"scheduleName": name,
}).Info("Removed schedule")
s.CronClient.Remove(s.entries[name])
delete(s.entries, name)
}
}
return nil
}
func (s *Scheduler) schedule(st ScheduleTemplate) (cv2.EntryID, error) {
var job cv2.Job
switch st.Type {
case "pgbackrest":
job = st.NewBackRestSchedule()
case "pgbasebackup":
job = st.NewBaseBackupSchedule()
case "policy":
job = st.NewPolicySchedule()
default:
var id cv2.EntryID
return id, fmt.Errorf("schedule type not implemented yet")
}
return s.CronClient.AddJob(st.Schedule, job)
}
type phony string
func (p phony) Run() {
// This is a phony job that register with the cron service
// that does nothing to prevent a bug that runs newly scheduled
// jobs multiple times.
_ = time.Now()
}