/
listwatch_merge.go
86 lines (74 loc) · 1.46 KB
/
listwatch_merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package listwatchext
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/watch"
"sync"
"time"
)
type (
BatchEventConsumer func(merged map[string]watch.Event)
EventMerger struct {
merged map[string]watch.Event
mutex sync.Mutex
interval time.Duration
consumer BatchEventConsumer
stopCh <-chan struct{}
}
)
func NewEventMerger(interval time.Duration, consumer BatchEventConsumer, stopCh <-chan struct{}) *EventMerger {
m := &EventMerger{
interval: interval,
merged: make(map[string]watch.Event),
consumer: consumer,
stopCh: stopCh,
}
m.start()
return m
}
func (m *EventMerger) Add(e watch.Event) {
if e.Type == watch.Error || e.Type == watch.Bookmark {
return
}
mobj, err := meta.Accessor(e.Object)
if err != nil {
// TODO log ?
return
}
var key string
if len(mobj.GetNamespace()) > 0 {
key = mobj.GetNamespace() + "/" + mobj.GetName()
} else {
key = mobj.GetName()
}
m.mutex.Lock()
m.merged[key] = e
m.mutex.Unlock()
}
func (m *EventMerger) start() {
go func() {
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.flush()
}
}
}()
}
func (m *EventMerger) flush() {
m.mutex.Lock()
if len(m.merged) == 0 {
m.mutex.Unlock()
return
}
x := m.merged
m.merged = make(map[string]watch.Event)
m.mutex.Unlock()
m.consumer(x)
}