/
event.go
104 lines (83 loc) · 2.17 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package autoscaler
import (
"fmt"
"sync"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type matchEventFunc func(event *corev1.Event) bool
type eventHandlerFunc func(event *corev1.Event)
type eventWatcher struct {
stopCh chan struct{}
informerFactory informers.SharedInformerFactory
eventInformer cache.SharedIndexInformer
startTime metav1.Time
eventHandlerLock sync.Mutex
eventHandlers []*eventHandler
}
type eventHandler struct {
sync.Mutex
matcher matchEventFunc
handler eventHandlerFunc
enabled bool
}
func newEventWatcher(clientset kubernetes.Interface) (*eventWatcher, error) {
w := eventWatcher{
stopCh: make(chan struct{}),
startTime: metav1.Now(),
informerFactory: informers.NewSharedInformerFactory(clientset, 0),
}
w.eventInformer = w.informerFactory.Core().V1().Events().Informer()
_, err := w.eventInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
event, ok := obj.(*corev1.Event)
if !ok {
panic("expected to get an of object of type corev1.Event")
}
if event.CreationTimestamp.Before(&w.startTime) {
return
}
w.eventHandlerLock.Lock()
defer w.eventHandlerLock.Unlock()
for _, h := range w.eventHandlers {
h.Lock()
if h.enabled && h.matcher(event) {
h.handler(event)
}
h.Unlock()
}
},
})
if err != nil {
return nil, fmt.Errorf("could not add event handler: %w", err)
}
return &w, nil
}
func (w *eventWatcher) run() bool {
w.informerFactory.Start(w.stopCh)
return cache.WaitForCacheSync(w.stopCh, w.eventInformer.HasSynced)
}
func (w *eventWatcher) stop() {
close(w.stopCh)
}
func (w *eventWatcher) onEvent(matcher matchEventFunc, handler eventHandlerFunc) *eventHandler {
h := &eventHandler{
matcher: matcher,
handler: handler,
}
w.eventHandlerLock.Lock()
defer w.eventHandlerLock.Unlock()
w.eventHandlers = append(w.eventHandlers, h)
return h
}
func (h *eventHandler) enable() {
h.Lock()
defer h.Unlock()
h.enabled = true
}
func matchAnyEvent(_ *corev1.Event) bool {
return true
}