-
Notifications
You must be signed in to change notification settings - Fork 568
/
monitor.go
394 lines (376 loc) · 14.3 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
package server
import (
"bytes"
"context"
"fmt"
"os"
"path"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pachyderm/pachyderm/v2/src/internal/backoff"
"github.com/pachyderm/pachyderm/v2/src/internal/client"
"github.com/pachyderm/pachyderm/v2/src/internal/cronutil"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/ppsutil"
"github.com/pachyderm/pachyderm/v2/src/internal/tracing"
"github.com/pachyderm/pachyderm/v2/src/internal/tracing/extended"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
workerserver "github.com/pachyderm/pachyderm/v2/src/server/worker/server"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// startMonitor starts a new goroutine running monitorPipeline for
// 'pipelineInfo.Pipeline'.
//
// Every running pipeline with standby == true or a cron input has a
// corresponding goroutine running monitorPipeline() that puts the pipeline in
// and out of standby in response to new output commits appearing in that
// pipeline's output repo.
// returns a cancel()
func (pc *pipelineController) startMonitor(ctx context.Context, pipelineInfo *pps.PipelineInfo) func() {
return startMonitorThread(
pctx.Child(ctx, fmt.Sprintf("monitorPipeline(%s)", pipelineInfo.Pipeline),
pctx.WithFields(zap.Stringer("pipeline", pipelineInfo.Pipeline))),
func(ctx context.Context) {
pc.monitorPipeline(ctx, pipelineInfo)
})
}
// startCrashingMonitor starts a new goroutine running monitorCrashingPipeline
// for 'pipelineInfo.Pipeline'
//
// Every crashing pipeline has a corresponding goro running
// monitorCrashingPipeline that checks to see if the issues have resolved
// themselves and moves the pipeline out of crashing if they have.
// returns a cancel for the crashing monitor
func (pc *pipelineController) startCrashingMonitor(ctx context.Context, pipelineInfo *pps.PipelineInfo) func() {
return startMonitorThread(
pctx.Child(ctx, fmt.Sprintf("monitorCrashingPipeline(%s)", pipelineInfo.Pipeline),
pctx.WithFields(zap.Stringer("pipeline", pipelineInfo.Pipeline))),
func(ctx context.Context) {
pc.monitorCrashingPipeline(ctx, pipelineInfo)
})
}
// startMonitorThread is a helper used by startMonitor, startCrashingMonitor,
// and startPipelinePoller (in poller.go). It doesn't manipulate any of
// APIServer's fields, just wrapps the passed function in a goroutine, and
// returns a cancel() fn to cancel it and block until it returns.
func startMonitorThread(ctx context.Context, f func(ctx context.Context)) func() {
ctx, cancel := pctx.WithCancel(ctx)
done := make(chan struct{})
go func() {
f(ctx)
close(done)
}()
return func() {
cancel()
select {
case <-done:
return
case <-time.After(time.Minute):
// restart pod rather than permanently locking up the PPS master (which
// would break the PPS API)
log.Error(ctx, "monitorThread blocked for over a minute after cancellation; exiting")
panic("blocked for over a minute after cancellation; restarting container")
}
}
}
func (pc *pipelineController) monitorPipeline(ctx context.Context, pipelineInfo *pps.PipelineInfo) {
pipelineName := pipelineInfo.Pipeline.Name
log.Debug(ctx, "monitoring pipeline")
var eg errgroup.Group
pps.VisitInput(pipelineInfo.Details.Input, func(in *pps.Input) error { //nolint:errcheck
if in.Cron != nil {
eg.Go(func() error {
cctx := pctx.Child(ctx, "makeCronCommits")
return backoff.RetryNotify(func() error {
return makeCronCommits(cctx, pc.env, in)
}, backoff.NewInfiniteBackOff(),
backoff.NotifyCtx(cctx, "cron for "+in.Cron.Name))
})
}
return nil
})
if pipelineInfo.Details.Autoscaling {
// Capacity 1 gives us a bit of buffer so we don't needlessly go into
// standby when SubscribeCommit takes too long to return.
ciChan := make(chan *pfs.CommitInfo, 1)
eg.Go(func() error {
defer close(ciChan)
return backoff.RetryUntilCancel(ctx, func() error {
pachClient := pc.env.GetPachClient(ctx)
return pachClient.SubscribeCommit(client.NewRepo(pipelineInfo.Pipeline.Project.GetName(), pipelineName), "", "", pfs.CommitState_READY, func(ci *pfs.CommitInfo) error {
select {
case ciChan <- ci:
case <-ctx.Done():
}
return nil
})
}, backoff.NewInfiniteBackOff(),
backoff.NotifyCtx(ctx, "SubscribeCommit for "+pipelineInfo.Pipeline.String()))
})
eg.Go(func() error {
return backoff.RetryNotify(func() error {
var (
oldCtx = ctx
childSpan opentracing.Span
ctx context.Context
)
defer func() {
// childSpan is overwritten so wrap in a lambda for late binding
tracing.FinishAnySpan(childSpan)
}()
// start span to capture & contextualize etcd state transition
childSpan, ctx = extended.AddSpanToAnyPipelineTrace(oldCtx,
pc.env.EtcdClient, pipelineInfo.Pipeline,
"/pps.Master/MonitorPipeline/Begin")
if err := pc.psDriver.TransitionState(ctx,
pipelineInfo.SpecCommit,
[]pps.PipelineState{
pps.PipelineState_PIPELINE_RUNNING,
pps.PipelineState_PIPELINE_CRASHING,
}, pps.PipelineState_PIPELINE_STANDBY, ""); err != nil {
pte := &ppsutil.PipelineTransitionError{}
if errors.As(err, &pte) {
if pte.Current == pps.PipelineState_PIPELINE_PAUSED {
// pipeline is stopped, exit monitorPipeline (which pausing the
// pipeline should also do). monitorPipeline will be called when
// it transitions back to running
// TODO(msteffen): this should happen in the pipeline
// controller
return nil
} else if pte.Current != pps.PipelineState_PIPELINE_STANDBY {
// it's fine if we were already in standby
return errors.EnsureStack(err)
}
} else {
return errors.EnsureStack(err)
}
}
for {
// finish span from previous loops
tracing.FinishAnySpan(childSpan)
childSpan = nil
var ci *pfs.CommitInfo
var ok bool
select {
case ci, ok = <-ciChan:
if !ok {
return nil // subscribeCommit exited, nothing left to do
}
if ci.Finished != nil {
continue
}
childSpan, ctx = extended.AddSpanToAnyPipelineTrace(oldCtx,
pc.env.EtcdClient, pipelineInfo.Pipeline,
"/pps.Master/MonitorPipeline/SpinUp",
"commit", ci.Commit.Id)
if err := pc.psDriver.TransitionState(ctx,
pipelineInfo.SpecCommit,
[]pps.PipelineState{pps.PipelineState_PIPELINE_STANDBY},
pps.PipelineState_PIPELINE_RUNNING, ""); err != nil {
pte := &ppsutil.PipelineTransitionError{}
if errors.As(err, &pte) && pte.Current == pps.PipelineState_PIPELINE_PAUSED {
// pipeline is stopped, exit monitorPipeline (see above)
return nil
}
return errors.EnsureStack(err)
}
// Stay running while commits are available and there's still job-related compaction to do
running:
for {
if err := pc.blockStandby(pc.env.GetPachClient(ctx), ci.Commit); err != nil {
return err
}
tracing.FinishAnySpan(childSpan)
childSpan = nil
select {
case ci, ok = <-ciChan:
if !ok {
return nil // subscribeCommit exited, nothing left to do
}
childSpan, ctx = extended.AddSpanToAnyPipelineTrace(oldCtx,
pc.env.EtcdClient, pipelineInfo.Pipeline,
"/pps.Master/MonitorPipeline/WatchNext",
"commit", ci.Commit.Id)
default:
break running
}
}
if err := pc.psDriver.TransitionState(ctx,
pipelineInfo.SpecCommit,
[]pps.PipelineState{
pps.PipelineState_PIPELINE_RUNNING,
pps.PipelineState_PIPELINE_CRASHING,
}, pps.PipelineState_PIPELINE_STANDBY, ""); err != nil {
pte := &ppsutil.PipelineTransitionError{}
if errors.As(err, &pte) && pte.Current == pps.PipelineState_PIPELINE_PAUSED {
// pipeline is stopped; monitorPipeline will be called when it
// transitions back to running
// TODO(msteffen): this should happen in the pipeline
// controller
return nil
}
return errors.EnsureStack(err)
}
case <-ctx.Done():
return errors.EnsureStack(context.Cause(ctx))
}
}
}, backoff.NewInfiniteBackOff(),
backoff.NotifyCtx(ctx, "monitorPipeline for "+pipelineInfo.Pipeline.String()))
})
}
if err := eg.Wait(); err != nil {
log.Info(ctx, "error in monitorPipeline", zap.Error(err))
}
}
func (pc *pipelineController) blockStandby(pachClient *client.APIClient, commit *pfs.Commit) error {
ctx := pachClient.Ctx()
if pc.env.PachwInSidecar {
if _, err := pachClient.PfsAPIClient.InspectCommit(ctx, &pfs.InspectCommitRequest{
Commit: commit,
Wait: pfs.CommitState_FINISHED,
}); err != nil {
return err
}
_, err := pachClient.PfsAPIClient.InspectCommit(ctx, &pfs.InspectCommitRequest{
Commit: ppsutil.MetaCommit(commit),
Wait: pfs.CommitState_FINISHED,
})
return err
}
if _, err := pachClient.PfsAPIClient.InspectCommit(ctx, &pfs.InspectCommitRequest{
Commit: commit,
Wait: pfs.CommitState_FINISHING,
}); err != nil {
return err
}
_, err := pachClient.PfsAPIClient.InspectCommit(ctx, &pfs.InspectCommitRequest{
Commit: ppsutil.MetaCommit(commit),
Wait: pfs.CommitState_FINISHING,
})
return err
}
func (pc *pipelineController) monitorCrashingPipeline(ctx context.Context, pipelineInfo *pps.PipelineInfo) {
ctx, cancelInner := pctx.WithCancel(ctx)
if err := backoff.RetryUntilCancel(ctx, backoff.MustLoop(func() error {
currRC, _, err := pc.getRC(ctx, pipelineInfo)
if err != nil {
return err
}
parallelism := int(*currRC.Spec.Replicas)
workerStatus, err := workerserver.Status(ctx, pipelineInfo, pc.env.EtcdClient, pc.etcdPrefix, pc.env.Config.PPSWorkerPort)
if err != nil {
return errors.Wrap(err, "could not check if all workers are up")
}
if len(workerStatus) >= parallelism && int(currRC.Status.ReadyReplicas) >= parallelism {
if err := pc.psDriver.TransitionState(ctx,
pipelineInfo.SpecCommit,
[]pps.PipelineState{pps.PipelineState_PIPELINE_CRASHING},
pps.PipelineState_PIPELINE_RUNNING, ""); err != nil {
return errors.Wrap(err, "could not transition pipeline to RUNNING")
}
cancelInner() // done--pipeline is out of CRASHING
}
return nil // loop again to check for new workers
}), backoff.NewConstantBackOff(pc.crashingBackoff),
backoff.NotifyContinue(fmt.Sprintf("monitorCrashingPipeline for %s", pipelineInfo.Pipeline)),
); err != nil && ctx.Err() == nil {
// retryUntilCancel should exit iff 'ctx' is cancelled, so this should be
// unreachable (restart master if not)
log.Error(ctx, "monitorCrashingPipeline is exiting prematurely which should not happen; restarting container...", zap.Error(err))
os.Exit(10)
}
}
func cronTick(pachClient *client.APIClient, now time.Time, cron *pps.CronInput) error {
if err := pachClient.WithModifyFileClient(
client.NewRepo(cron.Project, cron.Repo).NewCommit("master", ""),
func(m client.ModifyFile) error {
if cron.Overwrite {
if err := m.DeleteFile("/"); err != nil {
return errors.Wrap(err, "DeleteFile(/)")
}
}
file := now.Format(time.RFC3339)
if err := m.PutFile(file, bytes.NewReader(nil)); err != nil {
return errors.Wrapf(err, "PutFile(%s)", file)
}
return nil
}); err != nil {
return errors.Wrap(err, "WithModifyFileClient")
}
return nil
}
// makeCronCommits makes commits to a single cron input's repo. It's
// a helper function called by monitorPipeline.
func makeCronCommits(ctx context.Context, env Env, in *pps.Input) error {
schedule, err := cronutil.ParseCronExpression(in.Cron.Spec)
if err != nil {
return errors.EnsureStack(err) // Shouldn't happen, as the input is validated in CreatePipeline
}
pachClient := env.GetPachClient(ctx)
latestTime, err := getLatestCronTime(ctx, env, in)
if err != nil {
return errors.Wrap(err, "getLatestCronTime")
}
for {
// get the time of the next time from the latest time using the cron schedule
next := schedule.Next(latestTime)
if next.IsZero() {
log.Debug(ctx, "no more scheduled ticks; exiting loop")
return nil // zero time indicates there will never be another tick
}
log.Info(ctx, "waiting for next cron tick", zap.Time("tick", next), zap.Time("latestTick", latestTime))
// and wait until then to make the next commit
select {
case <-time.After(time.Until(next)):
case <-ctx.Done():
return errors.EnsureStack(context.Cause(ctx))
}
if err := cronTick(pachClient, next, in.Cron); err != nil {
return errors.Wrap(err, "cronTick")
}
log.Info(ctx, "cron tick committed", zap.Time("tick", next))
// set latestTime to the next time
latestTime = next
}
}
// getLatestCronTime is a helper used by m.makeCronCommits. It figures out what
// 'in's most recently executed cron tick was and returns it (or, if no cron
// ticks are in 'in's cron repo, it retuns the 'Start' time set in 'in.Cron'
// (typically set by 'pachctl extract')
func getLatestCronTime(ctx context.Context, env Env, in *pps.Input) (retTime time.Time, retErr error) {
var latestTime time.Time
pachClient := env.GetPachClient(ctx)
defer log.Span(ctx, "getLatestCronTime")(zap.Timep("latest", &retTime), log.Errorp(&retErr))
files, err := pachClient.ListFileAll(client.NewCommit(in.Cron.Project, in.Cron.Repo, "master", ""), "")
// bail if cron repo is not accessible
if err != nil {
return latestTime, err
}
// otherwise get timestamp from latest filename
if len(files) > 0 {
// Take the name of the most recent file as the latest timestamp
// ListFile returns the files in lexicographical order, and the RFC3339 format goes
// from largest unit of time to smallest, so the most recent file will be the last one
latestTime, err = time.Parse(time.RFC3339, path.Base(files[len(files)-1].File.Path))
// bail if filename format is bad
if err != nil {
return latestTime, err //nolint:wrapcheck
}
// get cron start time to compare if previous start time was updated
startTime := in.Cron.Start.AsTime()
// return latest time from filename if start time cannot be determined
if latestTime.After(startTime) {
return latestTime, nil
} else {
return startTime, nil
}
}
// otherwise return cron start time since there are no files in cron repo
startTime := in.Cron.Start.AsTime()
return startTime, nil
}