/
scheduler.go
367 lines (314 loc) · 9.55 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
package rink
import (
"context"
"encoding/binary"
"fmt"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/dgryski/go-jump"
"github.com/luno/jettison/errors"
"github.com/luno/jettison/j"
"github.com/luno/jettison/log"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/concurrency"
)
const modprefix = "modrole:"
// ModRole returns a role that will be mapped to a member by modulo operation
// instead of consistent hashing.
//
// This is useful is even distribution of roles is important.
func ModRole(role int) string {
return fmt.Sprintf("%s%d", modprefix, role)
}
// New returns a new scheduler linked to the session and cluster prefix. It also starts the underlying cluster.
// Closing the session releases all etcd resources and results in asynchronous release of all scheduler golang resources.
// Note that Scheduler.Close is a synchronous alternative.
func New(sess *concurrency.Session, clusterPrefix string, opts ...Option) *Scheduler {
o := defaultOptions(sess.Lease())
for _, opt := range opts {
opt(o)
}
ctx, cancel := context.WithCancel(o.ctx)
c := &cluster{
ctx: ctx,
sess: sess,
clusterPrefix: clusterPrefix,
name: o.name,
rebalanceDelay: o.rebalanceDelay,
logger: o.logger,
}
s := &Scheduler{
ctx: ctx,
cancel: cancel,
sess: sess,
clusterPrefix: clusterPrefix,
name: o.name,
hasher: o.hasher,
logger: o.logger,
cond: sync.NewCond(new(sync.Mutex)),
roles: make(map[string]*roleCtx),
}
// Link the scheduler to the cluster.
c.callback = s.updateState
// Link root context/lifecycle to that of session.
go func() {
<-sess.Done()
s.logger.Debug(s.ctx, "rink session closed")
s.cancel() // Close root context
s.updateState(state{}) // Clear state
}()
// Start the cluster.
go startCluster(c)
return s
}
// Scheduler maps arbitrary roles to members of the cluster by consistent
// hashing to the mth of n member. It also maintains
// additional etcd mutex locks for each role this instance assumes.
// This ensures that overlapping roles is not possible.
type Scheduler struct {
ctx context.Context
cancel context.CancelFunc
clusterPrefix string
name string
sess *concurrency.Session
hasher hasher
logger logger
cond *sync.Cond
state state
roles map[string]*roleCtx
}
// Close closes the etcd session which releases all etcd resources or returns an error. It the session is closed, it
// also synchronously releases all golang resources.
func (s *Scheduler) Close() error {
select {
case <-s.sess.Done():
// Session already closed.
default:
// Most important, first close the session removing all etcd keys.
err := s.sess.Close()
if errors.Is(err, rpctypes.ErrLeaseNotFound) {
// NoReturnErr: Session already closed.
} else if err != nil {
return err
}
}
s.cancel() // Close root context
s.updateState(state{}) // Clear state
return nil
}
// Info returns the current scheduler rank (m) and cluster size (n).
// A rank of -1 indicates this scheduler has joined the cluster but is waiting for a rank.
// A size of 0 indicates this scheduler has not joined a cluster yet or has been stopped and left the cluster.
func (s *Scheduler) Info() (rank int, size int) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
return getInfo(s.state, s.name)
}
// Await blocks until this scheduler can assign the role and returns a role context.
// The context will be closed when the role is assigned to another member of the cluster.
func (s *Scheduler) Await(role string) context.Context {
for {
ctx, err := s.tryToGetRole(role)
if err != nil {
// NoReturnErr: Retry role creation
continue
}
return ctx
}
}
func (s *Scheduler) tryToGetRole(role string) (context.Context, error) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for !hasRole(s.state, s.hasher, s.name, role) {
// Wait while we do not have the role.
//
// Note: Wait unlocks s.cond.L and locks it again before
// returning. So multiple goroutines can wait at the
// same time but only one will continue at a time.
s.cond.Wait()
}
// We have the role (and s.conf.L is locked)
// Can still return an error if we can't lock the role
return s.getOrCreateUnsafe(role)
}
// Get returns true and a role context if this scheduler can assume the role now.
// The context will be closed when the role is assigned to another member of the cluster.
// It returns false and a nil context if this scheduler cannot assume the role now.
func (s *Scheduler) Get(role string) (context.Context, bool) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
if !hasRole(s.state, s.hasher, s.name, role) {
return nil, false
}
ctx, err := s.getOrCreateUnsafe(role)
if err != nil {
// NoReturnErr: Return false
return nil, false
}
return ctx, true
}
// getOrCreateUnsafe returns an existing or new role context for the provided role.
// If new, an additional etcd mutex is also locked for additional protection against
// overlapping roles. It is unsafe since it assumes the s.cond.L lock is held.
func (s *Scheduler) getOrCreateUnsafe(role string) (context.Context, error) {
if roleCtx, ok := s.roles[role]; ok {
return roleCtx.ctx, nil // Just return existing role context.
}
// FIXME(corver): Creating a new role context for each role will result
// in resource leak if roles are unbounded. Maybe expose
// `Release` method in the API.
roleCtx := newRoleCtx(s, role)
// Try to lock additional etcd role mutex.
if err := roleCtx.TryLock(); err != nil {
return nil, err
}
s.roles[role] = roleCtx
return roleCtx.ctx, nil
}
func (s *Scheduler) updateState(next state) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
if len(next) > 0 && s.ctx.Err() != nil {
// Ignore racey cluster updates after close.
return
}
// Cancel roles we do not have anymore.
var toCancel []*roleCtx
for role, roleCtx := range s.roles {
if hasRole(next, s.hasher, s.name, role) {
// We still have this role!
continue
}
toCancel = append(toCancel, roleCtx)
}
for _, roleCtx := range toCancel {
roleCtx.Release()
delete(s.roles, roleCtx.role)
}
s.state = next
s.cond.Broadcast()
// Metrics and logging
rank, has := s.state[s.name]
if len(s.state) > 0 && !has {
rank = -1 // Use -1 to indicate waiting to join.
}
if len(s.state) > 0 {
s.logger.Debug(s.ctx, "rink state updated", j.MKV{
"state": summariseState(s.state, s.name), "my_rank": rank,
})
} else {
s.logger.Debug(s.ctx, "rink state cleared")
}
rankGauge.WithLabelValues(s.clusterPrefix).Set(float64(rank))
}
func summariseState(s state, me string) string {
members := make([]string, 0, len(s))
for m := range s {
members = append(members, m)
}
// Sort by rank
sort.Slice(members, func(i, j int) bool {
return s[members[i]] < s[members[j]]
})
summaries := make([]string, 0, len(members))
for _, m := range members {
isMe := ""
if m == me {
isMe = " [me]"
}
summaries = append(summaries, fmt.Sprintf("'%s'%s => rank %d", m, isMe, s[m]))
}
return strings.Join(summaries, ", ")
}
func newRoleCtx(r *Scheduler, role string) *roleCtx {
ctx, cancel := context.WithCancel(r.ctx)
ctx = log.ContextWith(ctx, j.KS("rink_role", role))
mutex := concurrency.NewMutex(r.sess, path.Join(r.clusterPrefix, "roles", role))
gauge := roleGauge.WithLabelValues(r.clusterPrefix, role)
return &roleCtx{
ctx: ctx,
cancel: cancel,
role: role,
logger: r.logger,
mutex: mutex,
gauge: gauge,
}
}
type roleCtx struct {
mutex *concurrency.Mutex
role string
ctx context.Context
cancel context.CancelFunc
logger logger
gauge prometheus.Gauge
}
// TryLock can be called only once, if it errors then you would need to create a new roleCtx
func (r *roleCtx) TryLock() error {
err := r.mutex.TryLock(r.ctx)
if err != nil {
r.cancel()
return err
}
r.logger.Info(r.ctx, "rink role locked")
r.gauge.Set(1)
return nil
}
// Release releases the role by canceling the role context and unlocking the role mutex.
func (r *roleCtx) Release() {
defer r.logger.Debug(r.ctx, "rink role released")
defer r.gauge.Set(0)
r.cancel()
// Try a quick synchronous mutex unlock (with fresh context).
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
defer cancel()
err := r.mutex.Unlock(ctx)
if err != nil {
r.logger.Error(ctx, errors.Wrap(err, "error unlocking role mutex"))
} else {
// It worked.
return
}
// Do not block, but keep on trying to unlock.
go func() {
reliably(r.ctx, r.logger, "unlock role", func() error {
return r.mutex.Unlock(context.Background()) // Unlock is idempotent.
})
}()
}
// hasRole returns true if the member (name) has the role.
func hasRole(state state, hasher hasher, name, role string) bool {
rank, ok := state[name]
if !ok {
return false
}
if i, ok := maybeModRole(role); ok {
return rank == i%len(state)
}
// Convert role string to uint64 hash key
h := hasher()
_, _ = h.Write([]byte(role))
b := h.Sum(nil)
key := binary.BigEndian.Uint64(b[len(b)-8:])
// Get the role rank (hash the key to a bucket)
rolerank := jump.Hash(key, len(state))
return rank == int(rolerank)%len(state)
}
// maybeModRole returns true and the integer to use
// if this role should be mapped with modulo operation instead of
// consistent hashing.
func maybeModRole(role string) (int, bool) {
if !strings.HasPrefix(role, modprefix) {
return 0, false
}
i, err := strconv.Atoi(strings.TrimPrefix(role, modprefix))
if err != nil {
// NoReturnErr: Assume not a mod role
return 0, false
}
return i, true
}