/
listmanager.go
626 lines (529 loc) · 16.5 KB
/
listmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
package frontend
import (
"fmt"
klogv2 "k8s.io/klog/v2"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/khenidak/london/pkg/backend"
"github.com/khenidak/london/pkg/config"
"github.com/khenidak/london/pkg/types"
)
// List manager is a cache of lists.
// each list is a list of keys that matches a prefix e.g. /x/y/z
// ListManager is a "current" view of lists. It does not include
// older versions.
// items in a list gets updated via two mechanisms:
// 1. loading events and updating internal state (mgmt loop)
// 2 direct notification from action that happened on the backend. That ensures
// that read-after-write will always return the updated version.
const maxFail = 4
type listManager struct {
// high water mark for events we have acquired from
// store
revHighWatermark int64
// represents the lowest (compacted) events we can
// acquire from store
revLowWatermark int64
// if reaches max we will panic, to avoid responding stale data
errorCount int
config *config.Config
be backend.Backend
lists sync.Map // of *list
}
type list struct {
loadLock sync.Mutex
// events is an order list of lists. New first.
// all watchers read from cached set of events
// the events gets updated using two ways:
// events loop
// notification on write from front the front end.
// * events are subject to compactation (from memory)
eventsLock sync.RWMutex
events [][]types.Record
firstRead bool
prefix string
items sync.Map // of listItem
}
type listItem struct {
lock sync.Mutex
r types.Record
}
func newlistManager(config *config.Config, be backend.Backend, revLowWatermark int64) (*listManager, error) {
klogv2.Infof("list manager: creating with low watermark: %v", revLowWatermark)
lm := &listManager{
config: config,
be: be,
// last mod-rev marked as compacted
revLowWatermark: revLowWatermark,
}
// load all current keys to prime the cache
currentRev, records, err := be.ListAllCurrent()
if err != nil {
return nil, err
}
// set high and low water marks
if lm.revLowWatermark == 0 {
lm.revLowWatermark = currentRev - int64(config.MaxEventCount)
if lm.revLowWatermark < 0 {
lm.revLowWatermark = 0
}
}
lm.revHighWatermark = lm.revLowWatermark
klogv2.Infof("List manager: event load will start at: %v with max in memory events:%v", lm.revHighWatermark, config.MaxEventCount)
// feed those records
byPrefix := make(map[string][]types.Record)
// pivot the list by prefix,
// so we won't need to get/re-get lists
for _, record := range records {
prefix := prefixFromKey(string(record.Key()))
if _, ok := byPrefix[prefix]; !ok {
byPrefix[prefix] = make([]types.Record, 0)
}
byPrefix[prefix] = append(byPrefix[prefix], record)
}
// we can now process events by prefix
for prefix, recordsForPrefix := range byPrefix {
// we can saftely ingore error here
// since we are not going through a path that would generate an error
l, _ := lm.getList(prefix, false)
l.feedRecords(recordsForPrefix)
}
// start mgmtLoop
go lm.mgmtLoop()
return lm, nil
}
// for test only
func newListManagerForTest(config *config.Config, be backend.Backend) *listManager {
lm := &listManager{
config: config,
be: be,
}
// tests run the iteration manually
return lm
}
// mgmtLoop responsible for reading events from
// backend and use them to update internal state
func (lm *listManager) mgmtLoop() {
ticker := time.NewTicker(time.Millisecond * 50)
stopCh := lm.config.Runtime.Context.Done()
lastCompact := time.Now().UTC()
for {
select {
case <-stopCh:
// done here
return
case <-ticker.C:
lm.mgmtLoadEventsIteration()
// getting the events is more important than
// filtering out compacted events. Even at the
// cost of some more more mem usage.
window := lastCompact.Add(time.Minute * 1)
nowTime := time.Now().UTC()
if nowTime.After(window) {
start := time.Now()
klogv2.Errorf("list-manager: (event compact) did not run in the last 1m.. running")
lastCompact = time.Now().UTC()
lm.mgmtFilterCompactedEventsIteration()
duration := time.Since(start)
// in theory we need to keep that under 250ms
klogv2.Errorf("list-manager: (event compact) ran for %v", duration)
}
}
}
}
func (lm *listManager) mgmtFilterCompactedEventsIteration() {
compactedRev := int64(0)
var err error
if lm.revHighWatermark-lm.revLowWatermark >= int64(lm.config.MaxEventCount) {
// the # 1000 is arbitrary selected. We can increase or decrease it with memory consumption
// and functional correctness in mind. To increase use configuration
compactedRev = lm.revHighWatermark - int64(lm.config.MaxEventCount)
klogv2.Infof("list-manager: events in memory is > %v compacting in memory events (not depending on DB)", lm.config.MaxEventCount)
} else {
// what we have in memory is less than maxEventCount, check if the DB has been compacted.
compactedRev, err = lm.be.GetCompactedRev(true)
if err != nil {
klogv2.Errorf("list-manager: (event compact) failed to read compacted rev from store with err:%v. will try again later", err)
return
}
if compactedRev <= lm.revLowWatermark {
// nothing to do here
klogv2.Error("list-manager: (event compact) compacted rev less or equal to cached compacted rev, ignoring this run")
return
}
}
// set
lm.revLowWatermark = compactedRev
//for each list..
lm.lists.Range(
func(k, v interface{}) bool {
// we can run this a goroutine. but we want to avoid
// WLock all lists (events) at the same time. making it
// latent to read /get events from all of them
// we relay on the fact that the entire process
// runs against in-mem lists. hence each is a short
// lived o(n) loop per list
thisList := v.(*list)
thisList.compactEvents(lm.revLowWatermark)
return true
})
}
// get all events since last run
// for each event get the list, update the list accordingly
func (lm *listManager) mgmtLoadEventsIteration() {
events, err := lm.be.ListEvents(lm.revHighWatermark + 1)
if err != nil {
lm.errorCount++
if lm.errorCount == maxFail {
panic(fmt.Sprintf("list cache loop has failed %v/%v times, crash and reload (err:%v)", lm.errorCount, maxFail, err))
}
klogv2.Errorf("list-manager:failed to read events with err:%v -- still under max fail %v/%v", err, lm.errorCount, maxFail)
return // error but under max fail
}
lm.errorCount = 0 // reset error count
byPrefix := make(map[string][]types.Record)
// pivot the list by prefix,
// so we won't need to get/re-get lists
for _, eventRecord := range events {
// increase rev if needed
lm.trySetLastRev(eventRecord.ModRevision())
prefix := prefixFromKey(string(eventRecord.Key()))
if _, ok := byPrefix[prefix]; !ok {
byPrefix[prefix] = make([]types.Record, 0)
}
byPrefix[prefix] = append(byPrefix[prefix], eventRecord)
}
// we can now process events by prefix
for prefix, eventsForPrefix := range byPrefix {
// we can saftely ingore error here
// since we are not going through a path that would generate an error
l, _ := lm.getList(prefix, false)
l.processEvents(eventsForPrefix)
}
}
func (lm *listManager) trySetLastRev(newRev int64) {
if newRev > lm.revHighWatermark {
lm.revHighWatermark = newRev
}
}
// returns the current view of a list.
func (lm *listManager) list(prefix string) ([]types.Record, error) {
/* the problem with this function is, if there is no list already there
be it loaded or not the result will always == 0
this condition happens before the first run of events loop
TODO: lm needs to declare READINESS. e.g. signal READY once the first
event loop iteration happened.
until then all list() calls will return empty results.
The simplest most efficent solution is to run event loop iteration
once in sync mode (as part of NewListManager() call)
We can not load all since
-- it may not exist as a prefix in our list
-- we are not sure if the list is for
-- base prefix (i.e, all-namespaces) such as /registry/leases
-- specific namesoace such as /registry/pods/default/
*/
var records []types.Record
lm.lists.Range(func(k, v interface{}) bool {
key := k.(string)
l := v.(*list)
if strings.HasPrefix(string(key), prefix) {
listRecords := l.toRecords()
totalRecords := make([]types.Record, 0, len(records)+len(listRecords))
totalRecords = append(totalRecords, listRecords...)
totalRecords = append(totalRecords, records...)
records = totalRecords
}
return true
})
sort.Slice(records[:], func(i, j int) bool {
return string(records[i].Key()) > string(records[j].Key())
})
return records, nil
}
//events gets from cached events all events > rev
func (lm *listManager) events(prefix string, newerThanRev int64) ([]types.Record, error) {
var records []types.Record
lm.lists.Range(func(k, v interface{}) bool {
key := k.(string)
l := v.(*list)
if strings.HasPrefix(string(key), prefix) {
listRecords := l.getEvents(newerThanRev, lm.revLowWatermark)
totalRecords := make([]types.Record, 0, len(records)+len(listRecords))
totalRecords = append(totalRecords, listRecords...)
totalRecords = append(totalRecords, records...)
records = totalRecords
}
return true
})
// reverse the order of results..
// we add new to old, but we need to return old to new
sort.Slice(records, func(i, j int) bool {
return records[i].ModRevision() < records[j].ModRevision()
})
return records, nil
}
func (lm *listManager) getList(prefix string, loadIfNotLoaded bool) (*list, error) {
ifNotThere := &list{
prefix: prefix,
events: make([][]types.Record, 0),
}
actualI, _ := lm.lists.LoadOrStore(prefix, ifNotThere)
actual := actualI.(*list)
if loadIfNotLoaded {
err := actual.load(lm.be)
if err != nil {
return nil, err
}
}
return actual, nil
}
// remove any event older than rev
func (l *list) compactEvents(olderThan int64) {
l.eventsLock.Lock()
defer l.eventsLock.Unlock()
if len(l.events) == 0 {
return // it is empty
}
outerIdx := 0
innerIdx := 0
keepGoing := true
for _, eventsSlice := range l.events {
innerIdx = 0
for _, event := range eventsSlice {
if event.ModRevision() < olderThan {
keepGoing = false
break
}
innerIdx++
}
if !keepGoing {
break
}
outerIdx++
}
if outerIdx == 0 {
// base case: the entire list has been compacted
if innerIdx == 0 {
l.events = l.events[:0] // saves on re-alloc
return
}
// we have only one inner list remaining, and it is partially gone
l.events = l.events[:1]
l.events[0] = l.events[0][:innerIdx]
return
}
// last list is gone
if innerIdx == 0 {
l.events = l.events[:outerIdx]
return
}
// somewhere in the middle.
// trim outer
if outerIdx == len(l.events) {
// last list
l.events = l.events[:outerIdx]
l.events[outerIdx-1] = l.events[outerIdx-1][:innerIdx]
} else {
// somehwhere in the middle
l.events[outerIdx] = l.events[outerIdx][:innerIdx]
l.events = l.events[:outerIdx+1]
}
}
// update events adds a slice of events to existing slice
// it does not perform checks on how old events are.
// instead it adds them to head of events we already cached.
func (l *list) addEvents(eventRecords []types.Record) {
// cachevents, EXPECTED NEW TO BE ON HEAD
l.eventsLock.Lock()
defer l.eventsLock.Unlock()
newSlice := make([][]types.Record, 0, len(l.events)+1)
newSlice = append(newSlice, eventRecords)
newSlice = append(newSlice, l.events...)
l.events = newSlice
}
// updates list state based on events
// caches events for later use by watchers
func (l *list) processEvents(eventRecords []types.Record) {
if len(eventRecords) == 0 {
return
}
// ensure that they are ordered new-->old
// so watchers don't need to wade through a long list.
sort.Slice(eventRecords, func(i, j int) bool {
return eventRecords[i].ModRevision() > eventRecords[j].ModRevision()
})
// events needs to be applied on list backward. old then new.
for i := len(eventRecords) - 1; i >= 0; i-- {
eventRecord := eventRecords[i]
if eventRecord.IsCreateEvent() {
l.inserted(eventRecord)
continue
}
if eventRecord.IsDeleteEvent() {
l.deleted(eventRecord)
continue
}
l.updated(eventRecord)
}
// update cached events with these events
// for watchers - if any - to pick it up
l.addEvents(eventRecords)
}
func (l *list) getEvents(newerThanRev, revLowWatermark int64) []types.Record {
all := make([]types.Record, 0, 128)
l.eventsLock.RLock()
defer l.eventsLock.RUnlock()
for _, eventsSlice := range l.events {
keepGoing := true
for _, event := range eventsSlice {
// filter out events that has been compacted or less than requested rev
if event.ModRevision() <= newerThanRev || event.ModRevision() <= revLowWatermark {
keepGoing = false
break
}
all = append(all, event)
}
if !keepGoing {
break
}
}
return all
}
// converts a list to records, this op is o(n) but we it is a lot cheaper
// and alot faster than going to store
func (l *list) toRecords() []types.Record {
all := make([]types.Record, 0, 128) // just avoid re-alloc + copy for the 128 items
counter := 0
l.items.Range(func(k, v interface{}) bool {
asListItem := v.(*listItem)
func() {
asListItem.lock.Lock()
defer asListItem.lock.Unlock()
all = append(all, asListItem.r)
}()
counter++
return true // keep going
})
return all[:counter]
}
func (l *list) load(be backend.Backend) error {
if !l.firstRead {
l.loadLock.Lock()
defer l.loadLock.Unlock()
if l.firstRead {
// it was already between the double check
return nil
}
// lock is held, ready to go
// This is the only time we call list
// to populate our internal state. the rest
// is done via events and notification
klogv2.V(0).Infof("Expensive: cold list for prefix:%v", l.prefix)
_, records, err := be.ListForPrefix(l.prefix)
if err != nil {
return err
}
l.feedRecords(records)
l.firstRead = true
}
return nil
}
// saftely adds records to list
func (l *list) feedRecords(records []types.Record) {
for _, r := range records {
// create new item
item := &listItem{}
item.r = r
currentI, loaded := l.items.LoadOrStore(string(r.Key()), item)
if !loaded {
// done, here nothing more to do
continue
}
// current is there. compare. new wins
current := currentI.(*listItem)
current.lock.Lock()
defer current.lock.Unlock()
if r.ModRevision() > current.r.ModRevision() {
klogv2.V(8).Infof("REPLACE %v:%v=>%v ", string(current.r.Key()), current.r.ModRevision(), r.ModRevision())
current.r = r
}
}
}
// gets a record from list
func (l *list) getRecord(key string) types.Record {
// create new item
currentI, loaded := l.items.Load(key)
if !loaded {
return nil
}
current := currentI.(*listItem)
current.lock.Lock()
defer current.lock.Unlock()
return current.r
}
func (lm *listManager) getRecord(key string) types.Record {
prefix := prefixFromKey(key)
// we ignore error here because we don't reload
l, _ := lm.getList(prefix, false)
return l.getRecord(key)
}
func (l *list) inserted(r types.Record) {
l.feedRecords([]types.Record{r})
}
func (l *list) updated(r types.Record) {
l.feedRecords([]types.Record{r})
}
func (l *list) deleted(r types.Record) {
l.items.Delete(string(r.Key()))
}
func (l *list) deletedByKey(key string) {
l.items.Delete(key)
}
func (lm *listManager) notifyCompact(compactRev int64) {
// hmm assuming that we are on 64b, no torn reads
if lm.revLowWatermark < compactRev {
lm.revLowWatermark = compactRev
}
}
// used by front end to notify listManager of a record
// inserted
func (lm *listManager) notifyInserted(r types.Record) {
prefix := prefixFromKey(string(r.Key()))
// we ignore error here because we don't reload
l, _ := lm.getList(prefix, false)
l.inserted(r)
}
// used by front end to notify listManager of a deleted record
func (lm *listManager) notifyDeleted(r types.Record) {
prefix := prefixFromKey(string(r.Key()))
// we ignore error here because we don't reload
l, _ := lm.getList(prefix, false)
l.deleted(r)
}
// used by front end to notify listManager of a deleted record by "key"
func (lm *listManager) notifyDeletedKey(key string) {
prefix := prefixFromKey(key)
// we ignore error here because we don't reload
l, _ := lm.getList(prefix, false)
l.deletedByKey(key)
}
// used by front end to notify listManager of an updated record
func (lm *listManager) notifyUpdated(oldRecord, r types.Record) {
prefix := prefixFromKey(string(r.Key()))
// we ignore error here because we don't reload
l, _ := lm.getList(prefix, false)
// update list
l.updated(r)
}
// from a key that looks like /x/y/z/<object>
// returns /x/y/z/
func prefixFromKey(key string) string {
prefix := suffixedKey(filepath.Dir(key))
if prefix == "./" {
prefix = "/"
}
return prefix
}