-
Notifications
You must be signed in to change notification settings - Fork 566
/
master.go
246 lines (224 loc) · 8.26 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package server
import (
"context"
"fmt"
"path"
"strings"
"sync"
"time"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"github.com/pachyderm/pachyderm/v2/src/internal/backoff"
"github.com/pachyderm/pachyderm/v2/src/internal/collection"
"github.com/pachyderm/pachyderm/v2/src/internal/dlock"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
middleware_auth "github.com/pachyderm/pachyderm/v2/src/internal/middleware/auth"
"github.com/pachyderm/pachyderm/v2/src/internal/pachsql"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/ppsutil"
"github.com/pachyderm/pachyderm/v2/src/internal/tracing"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
)
const (
masterLockPath = "_master_lock"
maxErrCount = 3 // gives all retried operations ~4.5s total to finish
)
var (
failures = map[string]bool{
// from k8s.io/kubernetes/pkg/kubelet/images/types.go
"ErrImagePull": true,
"InvalidImageName": true,
"ImagePullBackOff": true,
v1.PodReasonUnschedulable: true,
// from k8s.io/kubernetes/pkg/kubelet/kuberuntime/kuberuntime_container.go
"CreateContainerConfigError": true,
"CreateContainerError": true,
}
zero int32 // used to turn down RCs in scaleDownWorkersForPipeline
falseVal bool // used to delete RCs in deletePipelineResources and restartPipeline()
)
type pipelineKey string
func toKey(p *pps.Pipeline) pipelineKey {
return pipelineKey(fmt.Sprintf("%s/%s", p.GetProject().GetName(), p.GetName()))
}
func fromKey(k pipelineKey) (*pps.Pipeline, error) {
parts := strings.Split(string(k), "/")
// TODO: this will need to change when hierarchical projects are
// enabled; then keys with longer parts will be permissible
if len(parts) != 2 {
return nil, errors.Errorf("invalid pipeline key %s", k)
}
return newPipeline(parts[0], parts[1]), nil
}
func newPipeline(projectName, pipelineName string) *pps.Pipeline {
return &pps.Pipeline{
Project: &pfs.Project{Name: projectName},
Name: pipelineName,
}
}
type pipelineEvent struct {
pipeline *pps.Pipeline
timestamp time.Time
}
type stepError struct {
error
retry bool
failPipeline bool
}
func newRetriableError(err error, message string) error {
retry, failPipeline := true, true
if errors.Is(err, context.Canceled) {
retry = false
failPipeline = false
}
if errors.Is(err, context.DeadlineExceeded) {
retry = true
failPipeline = false
}
return stepError{
error: errors.Wrap(err, message),
retry: retry,
failPipeline: failPipeline,
}
}
func (s stepError) Unwrap() error {
return s.error
}
type ppsMaster struct {
env Env
// for checking worker status during crashing monitor
etcdPrefix string
// masterCtx is a context that is cancelled if
// the current pps master loses its master status
masterCtx context.Context
// fields for the pollPipelines, pollPipelinePods, and watchPipelines goros
pollPipelinesMu sync.Mutex
pollCancel func() // protected by pollPipelinesMu
pollPodsCancel func() // protected by pollPipelinesMu
watchCancel func() // protected by pollPipelinesMu
pcMgr *pcManager
kd InfraDriver
sd PipelineStateDriver
// channel through which pipeline events are passed
eventCh chan *pipelineEvent
scaleUpInterval time.Duration
crashingBackoff time.Duration
}
func newMaster(ctx context.Context, env Env, etcdPrefix string, kd InfraDriver, sd PipelineStateDriver) *ppsMaster {
return &ppsMaster{
masterCtx: ctx,
env: env,
etcdPrefix: etcdPrefix,
pcMgr: newPcManager(),
kd: kd,
sd: sd,
scaleUpInterval: time.Second * 30,
crashingBackoff: time.Second * 15,
}
}
// The master process is responsible for creating/deleting workers as
// pipelines are created/removed.
func (a *apiServer) master(ctx context.Context) {
masterLock := dlock.NewDLock(a.env.EtcdClient, path.Join(a.etcdPrefix, masterLockPath))
backoff.RetryUntilCancel(ctx, func() error { //nolint:errcheck
ctx, cancel := pctx.WithCancel(pctx.Child(ctx, "master", pctx.WithServerID()))
// set internal auth for basic operations
ctx = middleware_auth.AsInternalUser(ctx, "pps-master")
defer cancel()
ctx, err := masterLock.Lock(ctx)
if err != nil {
return errors.EnsureStack(err)
}
defer masterLock.Unlock(ctx) //nolint:errcheck
log.Info(ctx, "PPS master: launching master process")
kd := newKubeDriver(a.env.KubeClient, a.env.Config)
sd := newPipelineStateDriver(a.env.DB, a.pipelines, a.txnEnv, a.env.PFSServer)
m := newMaster(ctx, a.env, a.etcdPrefix, kd, sd)
m.run()
return errors.Wrapf(context.Cause(ctx), "ppsMaster.Run() exited unexpectedly")
}, backoff.NewInfiniteBackOff(), func(err error, d time.Duration) error {
log.Error(ctx, "PPS master: error running the master process; retrying",
zap.Error(err), zap.Duration("retryIn", d))
return nil
})
}
func (m *ppsMaster) setPipelineCrashing(ctx context.Context, specCommit *pfs.Commit, reason string) error {
if err := m.sd.SetState(ctx, specCommit, pps.PipelineState_PIPELINE_CRASHING, reason); err != nil {
return errors.Wrapf(err, "failed to set pipeline to crashing state")
}
return nil
}
// run() ingests pipeline events from `m.eventCh` that are generated by watching the DB and
// from polling k8s. It then distributes the work to pipelineController goroutines that refresh
// the worker(s) state for each pipeline.
//
// Notes:
// - Since each event `e` instructs the master to set a particular pipeline `p` to its most recently
// declared state, and because pipelines don't share k8s resources, we can run a single goroutine
// for each `p` to increase throughput across pipelines.
//
// - In the case where many events are queued for a given pipeline, we can skip to the
// most recent event in the next `step` the pipelineController executes. This is done using `pipelineController.Bump()`,
// i.e. when an active pipelineController completes execution, it will re-execute with the
// most recently declared state if it has been bumped.
func (m *ppsMaster) run() {
// close m.eventCh after all cancels have returned and therefore all pollers
// (which are what write to m.eventCh) have exited
m.eventCh = make(chan *pipelineEvent, 1)
defer close(m.eventCh)
defer m.cancelPCs()
// start pollers in the background--cancel functions ensure poll/monitor
// goroutines all definitely stop (either because cancelXYZ returns or because
// the binary panics)
m.startPipelinePoller()
defer m.cancelPipelinePoller()
m.startPipelinePodsPoller()
defer m.cancelPipelinePodsPoller()
m.startPipelineWatcher()
defer m.cancelPipelineWatcher()
eventLoop:
for {
select {
case e := <-m.eventCh:
func(e *pipelineEvent) {
log.Debug(m.masterCtx, "pipelineEvent", zap.Stringer("pipeline", e.pipeline))
m.pcMgr.Lock()
defer m.pcMgr.Unlock()
key := toKey(e.pipeline)
if pc, ok := m.pcMgr.pcs[key]; ok {
pc.Bump(e.timestamp) // raises flag in pipelineController to run again whenever it finishes
} else {
// pc's ctx is cancelled in pipelineController.tryFinish(), to avoid leaking resources
pcCtx, pcCancel := pctx.WithCancel(m.masterCtx)
pc = m.newPipelineController(pcCtx, pcCancel, e.pipeline)
m.pcMgr.pcs[key] = pc
go pc.Start(e.timestamp)
}
}(e)
case <-m.masterCtx.Done():
break eventLoop
}
}
}
// setPipelineState is a PPS-master-specific helper that wraps
// ppsutil.SetPipelineState in a trace
func setPipelineState(ctx context.Context, db *pachsql.DB, pipelines collection.PostgresCollection, specCommit *pfs.Commit, state pps.PipelineState, reason string) (retErr error) {
log.Debug(ctx, "set pipeline state", zap.String("pipeline", specCommit.GetBranch().GetRepo().GetName()), zap.Stringer("state", state))
span, ctx := tracing.AddSpanToAnyExisting(ctx,
"/pps.Master/SetPipelineState", "project", specCommit.Repo.Project.GetName(), "pipeline", specCommit.Repo.Name, "new-state", state)
defer func() {
tracing.TagAnySpan(span, "err", retErr)
tracing.FinishAnySpan(span)
}()
return ppsutil.SetPipelineState(ctx, db, pipelines,
specCommit, nil, state, reason)
}
func (m *ppsMaster) cancelPCs() {
m.pcMgr.Lock()
defer m.pcMgr.Unlock()
for _, pc := range m.pcMgr.pcs {
pc.cancel()
}
}