forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
offers.go
570 lines (497 loc) · 16.9 KB
/
offers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package offers
import (
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/mesosproto"
"k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
offerListenerMaxAge = 12 // max number of times we'll attempt to fit an offer to a listener before requiring them to re-register themselves
offerIdCacheTTL = 1 * time.Second // determines expiration of cached offer ids, used in listener notification
deferredDeclineTtlFactor = 2 // this factor, multiplied by the offer ttl, determines how long to wait before attempting to decline previously claimed offers that were subsequently deleted, then released. see offerStorage.Delete
notifyListenersDelay = 0 // delay between offer listener notification attempts
)
type Filter func(*mesos.Offer) bool
type Registry interface {
// Initialize the instance, spawning necessary housekeeping go routines.
Init(<-chan struct{})
// Add offers to this registry, rejecting those that are deemed incompatible.
Add([]*mesos.Offer)
// Listen for arriving offers that are acceptable to the filter, sending
// a signal on (by closing) the returned channel. A listener will only
// ever be notified once, if at all.
Listen(id string, f Filter) <-chan struct{}
// invoked when offers are rescinded or expired
Delete(string, metrics.OfferDeclinedReason)
// when true, returns the offer that's registered for the given ID
Get(offerId string) (Perishable, bool)
// iterate through non-expired offers in this registry
Walk(Walker) error
// invalidate one or all (when offerId="") offers; offers are not declined,
// but are simply flagged as expired in the offer history
Invalidate(offerId string)
// invalidate all offers associated with the slave identified by slaveId.
InvalidateForSlave(slaveId string)
}
// callback that is invoked during a walk through a series of live offers,
// returning with stop=true (or err != nil) if the walk should stop prematurely.
type Walker func(offer Perishable) (stop bool, err error)
type RegistryConfig struct {
DeclineOffer func(offerId string) <-chan error // tell Mesos that we're declining the offer
Compat func(*mesos.Offer) bool // returns true if offer is compatible; incompatible offers are declined
TTL time.Duration // determines a perishable offer's expiration deadline: now+ttl
LingerTTL time.Duration // if zero, offers will not linger in the FIFO past their expiration deadline
ListenerDelay time.Duration // specifies the sleep time between offer listener notifications
}
type offerStorage struct {
RegistryConfig
offers *cache.FIFO // collection of Perishable, both live and expired
listeners *queue.DelayFIFO // collection of *offerListener
delayed *queue.DelayQueue // deadline-oriented offer-event queue
slaves *slaveStorage // slave to offer mappings
}
type liveOffer struct {
*mesos.Offer
expiration time.Time
acquired int32 // 1 = acquired, 0 = free
}
type expiredOffer struct {
offerSpec
deadline time.Time
}
// subset of mesos.OfferInfo useful for recordkeeping
type offerSpec struct {
id string
hostname string
}
// offers that may perish (all of them?) implement this interface.
// callers may expect to access these funcs concurrently so implementations
// must provide their own form of synchronization around mutable state.
type Perishable interface {
// returns true if this offer has expired
HasExpired() bool
// if not yet expired, return mesos offer details; otherwise nil
Details() *mesos.Offer
// mark this offer as acquired, returning true if it was previously unacquired. thread-safe.
Acquire() bool
// mark this offer as un-acquired. thread-safe.
Release()
// expire or delete this offer from storage
age(s *offerStorage)
// return a unique identifier for this offer
Id() string
// return the slave host for this offer
Host() string
addTo(*queue.DelayQueue)
}
func (e *expiredOffer) addTo(q *queue.DelayQueue) {
q.Add(e)
}
func (e *expiredOffer) Id() string {
return e.id
}
func (e *expiredOffer) Host() string {
return e.hostname
}
func (e *expiredOffer) HasExpired() bool {
return true
}
func (e *expiredOffer) Details() *mesos.Offer {
return nil
}
func (e *expiredOffer) Acquire() bool {
return false
}
func (e *expiredOffer) Release() {}
func (e *expiredOffer) age(s *offerStorage) {
log.V(3).Infof("Delete lingering offer: %v", e.id)
s.offers.Delete(e)
s.slaves.deleteOffer(e.id)
}
// return the time left to linger
func (e *expiredOffer) GetDelay() time.Duration {
return e.deadline.Sub(time.Now())
}
func (to *liveOffer) HasExpired() bool {
return time.Now().After(to.expiration)
}
func (to *liveOffer) Details() *mesos.Offer {
return to.Offer
}
func (to *liveOffer) Acquire() (acquired bool) {
if acquired = atomic.CompareAndSwapInt32(&to.acquired, 0, 1); acquired {
metrics.OffersAcquired.WithLabelValues(to.Host()).Inc()
}
return
}
func (to *liveOffer) Release() {
if released := atomic.CompareAndSwapInt32(&to.acquired, 1, 0); released {
metrics.OffersReleased.WithLabelValues(to.Host()).Inc()
}
}
func (to *liveOffer) age(s *offerStorage) {
s.Delete(to.Id(), metrics.OfferExpired)
}
func (to *liveOffer) Id() string {
return to.Offer.Id.GetValue()
}
func (to *liveOffer) Host() string {
return to.Offer.GetHostname()
}
func (to *liveOffer) addTo(q *queue.DelayQueue) {
q.Add(to)
}
// return the time remaining before the offer expires
func (to *liveOffer) GetDelay() time.Duration {
return to.expiration.Sub(time.Now())
}
func CreateRegistry(c RegistryConfig) Registry {
metrics.Register()
return &offerStorage{
RegistryConfig: c,
offers: cache.NewFIFO(cache.KeyFunc(func(v interface{}) (string, error) {
if perishable, ok := v.(Perishable); !ok {
return "", fmt.Errorf("expected perishable offer, not '%+v'", v)
} else {
return perishable.Id(), nil
}
})),
listeners: queue.NewDelayFIFO(),
delayed: queue.NewDelayQueue(),
slaves: newSlaveStorage(),
}
}
func (s *offerStorage) declineOffer(offerId, hostname string, reason metrics.OfferDeclinedReason) {
//TODO(jdef) might be nice to spec an abort chan here
runtime.Signal(proc.OnError(s.DeclineOffer(offerId), func(err error) {
log.Warningf("decline failed for offer id %v: %v", offerId, err)
}, nil)).Then(func() {
metrics.OffersDeclined.WithLabelValues(hostname, string(reason)).Inc()
})
}
func (s *offerStorage) Add(offers []*mesos.Offer) {
now := time.Now()
for _, offer := range offers {
if !s.Compat(offer) {
//TODO(jdef) would be nice to batch these up
offerId := offer.Id.GetValue()
log.V(3).Infof("Declining incompatible offer %v", offerId)
s.declineOffer(offerId, offer.GetHostname(), metrics.OfferCompat)
continue
}
timed := &liveOffer{
Offer: offer,
expiration: now.Add(s.TTL),
acquired: 0,
}
log.V(3).Infof("Receiving offer %v", timed.Id())
s.offers.Add(timed)
s.delayed.Add(timed)
s.slaves.add(offer.SlaveId.GetValue(), timed.Id())
metrics.OffersReceived.WithLabelValues(timed.Host()).Inc()
}
}
// delete an offer from storage, implicitly expires the offer
func (s *offerStorage) Delete(offerId string, reason metrics.OfferDeclinedReason) {
if offer, ok := s.Get(offerId); ok {
log.V(3).Infof("Deleting offer %v", offerId)
// attempt to block others from consuming the offer. if it's already been
// claimed and is not yet lingering then don't decline it - just mark it as
// expired in the history: allow a prior claimant to attempt to launch with it
notYetClaimed := offer.Acquire()
if offer.Details() != nil {
if notYetClaimed {
log.V(3).Infof("Declining offer %v", offerId)
s.declineOffer(offerId, offer.Host(), reason)
} else {
// some pod has acquired this and may attempt to launch a task with it
// failed schedule/launch attempts are required to Release() any claims on the offer
// TODO(jdef): not sure what a good value is here. the goal is to provide a
// launchTasks (driver) operation enough time to complete so that we don't end
// up declining an offer that we're actually attempting to use.
time.AfterFunc(deferredDeclineTtlFactor*s.TTL, func() {
// at this point the offer is in one of five states:
// a) permanently deleted: expired due to timeout
// b) permanently deleted: expired due to having been rescinded
// c) lingering: expired due to timeout
// d) lingering: expired due to having been rescinded
// e) claimed: task launched and it using resources from this offer
// we want to **avoid** declining an offer that's claimed: attempt to acquire
if offer.Acquire() {
// previously claimed offer was released, perhaps due to a launch
// failure, so we should attempt to decline
log.V(3).Infof("attempting to decline (previously claimed) offer %v", offerId)
s.declineOffer(offerId, offer.Host(), reason)
}
})
}
}
s.expireOffer(offer)
} // else, ignore offers not in the history
}
func (s *offerStorage) InvalidateForSlave(slaveId string) {
offerIds := s.slaves.deleteSlave(slaveId)
for oid := range offerIds {
s.invalidateOne(oid)
}
}
// if offerId == "" then expire all known, live offers, otherwise only the offer indicated
func (s *offerStorage) Invalidate(offerId string) {
if offerId != "" {
s.invalidateOne(offerId)
return
}
obj := s.offers.List()
for _, o := range obj {
offer, ok := o.(Perishable)
if !ok {
log.Errorf("Expected perishable offer, not %v", o)
continue
}
offer.Acquire() // attempt to block others from using it
s.expireOffer(offer)
// don't decline, we already know that it's an invalid offer
}
}
func (s *offerStorage) invalidateOne(offerId string) {
if offer, ok := s.Get(offerId); ok {
offer.Acquire() // attempt to block others from using it
s.expireOffer(offer)
// don't decline, we already know that it's an invalid offer
}
}
// Walk the collection of offers. The walk stops either as indicated by the
// Walker or when the end of the offer list is reached. Expired offers are
// never passed to a Walker.
func (s *offerStorage) Walk(w Walker) error {
for _, v := range s.offers.List() {
offer, ok := v.(Perishable)
if !ok {
// offer disappeared...
continue
}
if offer.HasExpired() {
// never pass expired offers to walkers
continue
}
if stop, err := w(offer); err != nil {
return err
} else if stop {
return nil
}
}
return nil
}
func Expired(offerId, hostname string, ttl time.Duration) *expiredOffer {
return &expiredOffer{offerSpec{id: offerId, hostname: hostname}, time.Now().Add(ttl)}
}
func (s *offerStorage) expireOffer(offer Perishable) {
// the offer may or may not be expired due to TTL so check for details
// since that's a more reliable determinant of lingering status
if details := offer.Details(); details != nil {
// recently expired, should linger
offerId := details.Id.GetValue()
log.V(3).Infof("Expiring offer %v", offerId)
if s.LingerTTL > 0 {
log.V(3).Infof("offer will linger: %v", offerId)
expired := Expired(offerId, offer.Host(), s.LingerTTL)
s.offers.Update(expired)
s.delayed.Add(expired)
} else {
log.V(3).Infof("Permanently deleting offer %v", offerId)
s.offers.Delete(offerId)
s.slaves.deleteOffer(offerId)
}
} // else, it's still lingering...
}
func (s *offerStorage) Get(id string) (Perishable, bool) {
if obj, ok, _ := s.offers.GetByKey(id); !ok {
return nil, false
} else {
to, ok := obj.(Perishable)
if !ok {
log.Errorf("invalid offer object in fifo '%v'", obj)
}
return to, ok
}
}
type offerListener struct {
id string
accepts Filter
notify chan<- struct{}
age int
deadline time.Time
sawVersion uint64
}
func (l *offerListener) GetUID() string {
return l.id
}
func (l *offerListener) Deadline() (time.Time, bool) {
return l.deadline, true
}
// register a listener for new offers, whom we'll notify upon receiving such.
// notification is delivered in the form of closing the channel, nothing is ever sent.
func (s *offerStorage) Listen(id string, f Filter) <-chan struct{} {
if f == nil {
return nil
}
ch := make(chan struct{})
listen := &offerListener{
id: id,
accepts: f,
notify: ch,
deadline: time.Now().Add(s.ListenerDelay),
}
log.V(3).Infof("Registering offer listener %s", listen.id)
s.listeners.Offer(listen, queue.ReplaceExisting)
return ch
}
func (s *offerStorage) ageOffers() {
offer, ok := s.delayed.Pop().(Perishable)
if !ok {
log.Errorf("Expected Perishable, not %v", offer)
return
}
if details := offer.Details(); details != nil && !offer.HasExpired() {
// live offer has not expired yet: timed out early
// FWIW: early timeouts are more frequent when GOMAXPROCS is > 1
offer.addTo(s.delayed)
} else {
offer.age(s)
}
}
func (s *offerStorage) nextListener() *offerListener {
obj := s.listeners.Pop(queue.WithoutCancel())
if listen, ok := obj.(*offerListener); !ok {
//programming error
panic(fmt.Sprintf("unexpected listener object %v", obj))
} else {
return listen
}
}
// notify listeners if we find an acceptable offer for them. listeners
// are garbage collected after a certain age (see offerListenerMaxAge).
// ids lists offer IDs that are retrievable from offer storage.
func (s *offerStorage) notifyListeners(ids func() (sets.String, uint64)) {
listener := s.nextListener() // blocking
offerIds, version := ids()
if listener.sawVersion == version {
// no changes to offer list, avoid growing older - just wait for new offers to arrive
listener.deadline = time.Now().Add(s.ListenerDelay)
s.listeners.Offer(listener, queue.KeepExisting)
return
}
listener.sawVersion = version
// notify if we find an acceptable offer
for id := range offerIds {
if offer, ok := s.Get(id); !ok || offer.HasExpired() {
continue
} else if listener.accepts(offer.Details()) {
log.V(3).Infof("Notifying offer listener %s", listener.id)
close(listener.notify)
return
}
}
// no interesting offers found, re-queue the listener
listener.age++
if listener.age < offerListenerMaxAge {
listener.deadline = time.Now().Add(s.ListenerDelay)
s.listeners.Offer(listener, queue.KeepExisting)
} else {
// garbage collection is as simple as not re-adding the listener to the queue
log.V(3).Infof("garbage collecting offer listener %s", listener.id)
}
}
func (s *offerStorage) Init(done <-chan struct{}) {
// zero delay, reap offers as soon as they expire
go runtime.Until(s.ageOffers, 0, done)
// cached offer ids for the purposes of listener notification
idCache := &stringsCache{
refill: func() sets.String {
result := sets.NewString()
for _, v := range s.offers.List() {
if offer, ok := v.(Perishable); ok {
result.Insert(offer.Id())
}
}
return result
},
ttl: offerIdCacheTTL,
}
go runtime.Until(func() { s.notifyListeners(idCache.Strings) }, notifyListenersDelay, done)
}
type stringsCache struct {
expiresAt time.Time
cached sets.String
ttl time.Duration
refill func() sets.String
version uint64
}
// not thread-safe
func (c *stringsCache) Strings() (sets.String, uint64) {
now := time.Now()
if c.expiresAt.Before(now) {
old := c.cached
c.cached = c.refill()
c.expiresAt = now.Add(c.ttl)
if !reflect.DeepEqual(old, c.cached) {
c.version++
}
}
return c.cached, c.version
}
type slaveStorage struct {
sync.Mutex
index map[string]string // map offerId to slaveId
}
func newSlaveStorage() *slaveStorage {
return &slaveStorage{
index: make(map[string]string),
}
}
// create a mapping between a slave and an offer
func (self *slaveStorage) add(slaveId, offerId string) {
self.Lock()
defer self.Unlock()
self.index[offerId] = slaveId
}
// delete the slave-offer mappings for slaveId, returns the IDs of the offers that were unmapped
func (self *slaveStorage) deleteSlave(slaveId string) sets.String {
offerIds := sets.NewString()
self.Lock()
defer self.Unlock()
for oid, sid := range self.index {
if sid == slaveId {
offerIds.Insert(oid)
delete(self.index, oid)
}
}
return offerIds
}
// delete the slave-offer mappings for offerId
func (self *slaveStorage) deleteOffer(offerId string) {
self.Lock()
defer self.Unlock()
delete(self.index, offerId)
}