-
Notifications
You must be signed in to change notification settings - Fork 566
/
poller.go
316 lines (295 loc) · 11.7 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package server
import (
"context"
"os"
"strconv"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
kube_err "k8s.io/apimachinery/pkg/api/errors"
kube_watch "k8s.io/apimachinery/pkg/watch"
"github.com/pachyderm/pachyderm/v2/src/internal/backoff"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/ppsdb"
"github.com/pachyderm/pachyderm/v2/src/internal/watch"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
)
const pollBackoffTime = 2 * time.Second
// startPipelinePoller starts a new goroutine running pollPipelines
func (m *ppsMaster) startPipelinePoller() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
m.pollCancel = startMonitorThread(pctx.Child(m.masterCtx, "pollPipelines"), m.pollPipelines)
}
func (m *ppsMaster) cancelPipelinePoller() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
if m.pollCancel != nil {
m.pollCancel()
m.pollCancel = nil
}
}
// startPipelinePodsPoller starts a new goroutine running pollPipelinePods
func (m *ppsMaster) startPipelinePodsPoller() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
m.pollPodsCancel = startMonitorThread(pctx.Child(m.masterCtx, "pollPipelinePods"), m.pollPipelinePods)
}
func (m *ppsMaster) cancelPipelinePodsPoller() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
if m.pollPodsCancel != nil {
m.pollPodsCancel()
m.pollPodsCancel = nil
}
}
// startPipelineDBPoller starts a new goroutine running watchPipelines
func (m *ppsMaster) startPipelineWatcher() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
m.watchCancel = startMonitorThread(pctx.Child(m.masterCtx, "watchPipelines"), m.watchPipelines)
}
func (m *ppsMaster) cancelPipelineWatcher() {
m.pollPipelinesMu.Lock()
defer m.pollPipelinesMu.Unlock()
if m.watchCancel != nil {
m.watchCancel()
m.watchCancel = nil
}
}
//////////////////////////////////////////////////////////////////////////////
// PollPipelines Definition //
// - As in monitor.go, functions below should not call functions above, to //
// avoid reentrancy deadlock. //
//////////////////////////////////////////////////////////////////////////////
// pollPipelines generates regular updateEv and deleteEv events for each
// pipeline and sends them to ppsMaster.Run(). By scanning the database and k8s
// regularly and generating events for them, it prevents pipelines from getting
// orphaned.
func (m *ppsMaster) pollPipelines(ctx context.Context) {
dbPipelines := map[pipelineKey]bool{}
if err := backoff.RetryUntilCancel(ctx, backoff.MustLoop(func() error {
if len(dbPipelines) == 0 {
// 1. Get the current set of pipeline RCs, as a base set for stale RCs.
//
// Pipelines are created in the database before their RC is created in
// k8s, so to garbage-collect stale RCs, we have to go the other way and
// query k8s first (if we were to query the database first, and
// CreatePipeline(foo) were to run between querying the database and
// querying k8s, then we might delete the RC for brand-new pipeline
// 'foo'). Though, even if we do delete a live pipeline's RC, it'll be
// fixed in the next cycle
rcs, err := m.kd.ListReplicationControllers(ctx)
if err != nil {
// No sensible error recovery here (e.g .if we can't reach k8s). We'll
// keep going, and just won't delete any RCs this round.
log.Info(ctx, "error polling pipeline RCs", zap.Error(err))
}
// 2. Replenish 'dbPipelines' with the set of pipelines currently in the
// database; it determines both which RCs (from above) are stale and also
// which pipelines need to be bumped. Note that there may be zero
// pipelines in the database, and dbPipelines may be empty.
if err := m.sd.ListPipelineInfo(ctx,
func(ptr *pps.PipelineInfo) error {
dbPipelines[toKey(ptr.Pipeline)] = true
return nil
}); err != nil {
// ListPipelineInfo results (dbPipelines) are used by all remaining
// steps, so if that didn't work, start over and try again
dbPipelines = map[pipelineKey]bool{}
return errors.Wrap(err, "error polling pipelines")
}
// 3. Generate a delete event for orphaned RCs
if rcs != nil {
for _, rc := range rcs.Items {
projectName := rc.Labels[pipelineProjectLabel]
pipelineName, ok := rc.Labels[pipelineNameLabel]
if !ok {
return errors.New("'pipelineName' label missing from rc " + rc.Name)
}
pipeline := newPipeline(projectName, pipelineName)
if !dbPipelines[toKey(pipeline)] {
log.Debug(ctx, "generating pipelineEvent for orphaned RC", zap.Stringer("pipeline", pipeline))
m.eventCh <- &pipelineEvent{pipeline: pipeline}
}
}
}
// 4. Retry if there are no pipelines to read/write
if len(dbPipelines) == 0 {
return backoff.ErrContinue
}
}
// Generate one event for a pKey (to trigger the pKey controller)
// and remove this pKey from dbPipelines. Always choose the
// lexicographically smallest pKey so that pipelines are always
// traversed in the same order and the period between polls is stable across
// all pipelines.
var pKey pipelineKey
for p := range dbPipelines {
if pKey == "" || p < pKey {
pKey = p
}
}
// always rm 'pipeline', to advance loop
delete(dbPipelines, pKey)
// generate a pipeline event for 'pipeline'
log.Debug(ctx, "polling pipeline", zap.String("pipeline", string(pKey)))
pipeline, err := fromKey(pKey)
if err != nil {
return errors.Wrapf(err, "invalid pipeline key %q in dbPipelines", pKey)
}
select {
case m.eventCh <- &pipelineEvent{pipeline: pipeline}:
break
case <-ctx.Done():
break
}
// 5. move to next pipeline (after 2s sleep)
return nil
}), backoff.NewConstantBackOff(pollBackoffTime),
backoff.NotifyContinue("pollPipelines"),
); err != nil && ctx.Err() == nil {
log.Error(ctx, "pollPipelines is exiting prematurely which should not happen; restarting container...", zap.Error(err))
os.Exit(10)
}
}
// pollPipelinePods creates a kubernetes watch, and for each event:
// 1. Checks if the event concerns a Pod
// 2. Checks if the Pod belongs to a pipeline (pipelineName annotation is set)
// 3. Checks if the Pod is failing
//
// If all three conditions are met, then the pipline (in 'pipelineName') is set
// to CRASHING
func (m *ppsMaster) pollPipelinePods(ctx context.Context) {
if err := backoff.RetryUntilCancel(ctx, backoff.MustLoop(func() error {
watch, cancel, err := m.kd.WatchPipelinePods(ctx)
if err != nil {
return errors.Wrap(err, "failed to watch kubernetes pods")
}
defer cancel()
WatchLoop:
for {
select {
case <-ctx.Done():
return nil
case event, ok := <-watch:
if !ok {
log.Debug(ctx, "kubernetes pod watch unexpectedly ended; restarting watch")
return backoff.ErrContinue
}
// if we get an error we restart the watch
if event.Type == kube_watch.Error {
kerr := kube_err.FromObject(event.Object)
log.Debug(ctx, "kubernetes pod watch unexpectedly ended with error; restarting watch", zap.Error(err))
return errors.Wrap(kerr, "error while watching kubernetes pods")
}
pod, ok := event.Object.(*v1.Pod)
if !ok {
continue // irrelevant event
}
if pod.Status.Phase == v1.PodFailed {
// This is one of those log messages that is going to be
// unnecessarily alarming; there are lots of processes in
// place to restart the pod. But it's good to have a
// record.
log.Info(ctx, "worker pod failed", zap.String("podName", pod.Name), zap.Any("podStatus", pod.Status))
}
crashPipeline := func(reason string) error {
projectName := pod.ObjectMeta.Annotations[pipelineProjectAnnotation]
pipelineName := pod.ObjectMeta.Annotations[pipelineNameAnnotation]
pipelineVersion, versionErr := strconv.Atoi(pod.ObjectMeta.Annotations["pipelineVersion"])
if versionErr != nil {
return errors.Wrapf(err, "couldn't find pipeline rc version")
}
pipeline := &pps.Pipeline{
Project: &pfs.Project{Name: projectName},
Name: pipelineName,
}
var pipelineInfo *pps.PipelineInfo
if pipelineInfo, err = m.sd.GetPipelineInfo(ctx, pipeline, pipelineVersion); err != nil {
return errors.EnsureStack(err)
}
return m.setPipelineCrashing(ctx, pipelineInfo.SpecCommit, reason)
}
for _, status := range pod.Status.ContainerStatuses {
if status.State.Waiting != nil && failures[status.State.Waiting.Reason] {
if err := crashPipeline(status.State.Waiting.Message); err != nil {
return errors.Wrap(err, "error moving pipeline to CRASHING")
}
continue WatchLoop
}
}
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodScheduled &&
condition.Status != v1.ConditionTrue && failures[condition.Reason] {
if err := crashPipeline(condition.Message); err != nil {
return errors.Wrap(err, "error moving pipeline to CRASHING")
}
continue WatchLoop
}
}
}
}
}), backoff.NewInfiniteBackOff(), backoff.NotifyContinue("pollPipelinePods"),
); err != nil && ctx.Err() == nil {
log.Error(ctx, "pollPipelinePods is exiting prematurely which should not happen; restarting container...", zap.Error(err))
os.Exit(11)
}
}
// watchPipelines watches the 'pipelines' collection in the database and sends
// writeEv and deleteEv events to the PPS master when it sees them.
//
// watchPipelines is unlike the other poll and monitor goroutines in that it sees
// the result of other poll/monitor goroutines' writes. For example, when
// pollPipelinePods (above) observes that a pipeline is crashing and updates its
// state in the database, the flow for starting monitorPipelineCrashing is:
//
// k8s watch ─> pollPipelinePods ╭───> watchPipelines ╭──> m.run()
// │ │ │ │ │
// ↓ │ ↓ │ ↓
// db write──────╯ m.eventCh ──────╯ m.step()
//
// Most of the other poll/monitor goroutines actually go through watchPipelines
// (by writing to the database, which is then observed by the watch below)
func (m *ppsMaster) watchPipelines(ctx context.Context) {
if err := backoff.RetryUntilCancel(ctx, backoff.MustLoop(func() error {
// TODO(msteffen) request only keys, since pipeline_controller.go reads
// fresh values for each event anyway
watcher, close, err := m.sd.Watch(ctx)
if err != nil {
return errors.Wrapf(err, "error creating watch")
}
defer close()
for event := range watcher {
if event.Err != nil {
return errors.Wrapf(event.Err, "event err")
}
projectName, pipelineName, _, err := ppsdb.ParsePipelineKey(string(event.Key))
if err != nil {
return errors.Wrap(err, "bad watch event key")
}
switch event.Type {
case watch.EventPut, watch.EventDelete:
e := &pipelineEvent{
pipeline: newPipeline(projectName, pipelineName),
timestamp: time.Unix(event.Rev, 0),
}
select {
case m.eventCh <- e:
case <-m.masterCtx.Done():
return errors.Wrap(err, "pipeline event arrived while master is restarting")
}
case watch.EventError:
log.Error(ctx, "watchPipelines received an errored event from the pipelines watcher", zap.Error(event.Err))
}
}
return nil // reset until ctx is cancelled (RetryUntilCancel)
}), &backoff.ZeroBackOff{}, backoff.NotifyContinue("watchPipelines"),
); err != nil && ctx.Err() == nil {
log.Error(ctx, "watchPipelines is exiting prematurely which should not happen; restarting container...", zap.Error(err))
os.Exit(11)
}
}