/
controller.go
157 lines (137 loc) · 4.8 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package eventwatch
import (
"context"
"fmt"
"strings"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
corev1typed "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
)
const (
// eventAckAnnotationName is an annotation we place on event that was processed by this controller.
// This is used to not process same event multiple times.
eventAckAnnotationName = "eventwatch.openshift.io/last-seen-count"
)
// Controller observes the events in given informer namespaces and match them with configured event handlers.
// If the event reason and namespace match the configured, then the process() function is called in handler on that event.
// The event is then acknowledged via annotation update, so each interesting event is only processed once.
// The process() function will received the observed event and can update Prometheus counter or manage operator conditions.
type Controller struct {
events []eventHandler
eventClient corev1typed.EventsGetter
}
type eventHandler struct {
reasonPattern string
namespace string
process func(event *corev1.Event) error
}
type Builder struct {
eventConfig []eventHandler
}
// New returns a new event watch controller builder that allow to specify event handlers.
func New() *Builder {
return &Builder{}
}
// WithEventHandler add handler for event matching the namespace and the reason pattern.
// This can be called multiple times.
// The reason pattern can contain glob ("*") as prefix or suffix. If no glob is used a strict match is required
func (b *Builder) WithEventHandler(namespace, reasonPattern string, processEvent func(event *corev1.Event) error) *Builder {
b.eventConfig = append(b.eventConfig, eventHandler{
reasonPattern: reasonPattern,
namespace: namespace,
process: processEvent,
})
return b
}
// ToController returns a factory controller that can be run.
// The kubeInformersForTargetNamespace must have informer for namespaces which matching the registered event handlers.
// The event client is used to update/acknowledge events.
func (b *Builder) ToController(kubeInformersForTargetNamespace informers.SharedInformerFactory, eventClient corev1typed.EventsGetter, recorder events.Recorder) factory.Controller {
c := &Controller{
events: b.eventConfig,
eventClient: eventClient,
}
return factory.New().
WithSync(c.sync).
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
event, ok := obj.(*corev1.Event)
if !ok {
return ""
}
return eventKeyFunc(event.Namespace, event.Name, event.Reason)
}, kubeInformersForTargetNamespace.Core().V1().Events().Informer()).
ToController("EventWatchController", recorder)
}
func eventKeyFunc(namespace, name, reason string) string {
if len(namespace) == 0 || len(name) == 0 || len(reason) == 0 {
return ""
}
return strings.Join([]string{namespace, name, reason}, "/")
}
func decomposeEventKey(key string) (string, string, string, bool) {
parts := strings.Split(key, "/")
if len(parts) != 3 {
return "", "", "", false
}
return parts[0], parts[1], parts[2], true
}
func (c *Controller) getEventHandler(eventKey string) *eventHandler {
namespace, _, reason, ok := decomposeEventKey(eventKey)
if !ok {
return nil
}
for i := range c.events {
if c.events[i].namespace == namespace && reasonMatch(c.events[i].reasonPattern, reason) {
return &c.events[i]
}
}
return nil
}
func isAcknowledgedEvent(e *corev1.Event) bool {
if e.Annotations == nil {
return false
}
lastSeenCount, ok := e.Annotations[eventAckAnnotationName]
if !ok {
return false
}
return fmt.Sprintf("%d", e.Count) == lastSeenCount
}
func (c *Controller) sync(ctx context.Context, syncCtx factory.SyncContext) error {
eventHandler := c.getEventHandler(syncCtx.QueueKey())
if eventHandler == nil {
return nil
}
namespace, name, _, ok := decomposeEventKey(syncCtx.QueueKey())
if !ok {
klog.Errorf("Unexpected queue key %q", syncCtx.QueueKey())
return nil
}
event, err := c.eventClient.Events(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
klog.Errorf("Event not found %s/%s: %v", namespace, name, err)
return nil
}
if err != nil {
return err
}
if isAcknowledgedEvent(event) {
return nil
}
// acknowledge the event, so we won't process it multiple times
seenEvent := event.DeepCopy()
if seenEvent.Annotations == nil {
seenEvent.Annotations = map[string]string{}
}
seenEvent.Annotations[eventAckAnnotationName] = fmt.Sprintf("%d", seenEvent.Count)
if _, err := c.eventClient.Events(namespace).Update(ctx, seenEvent, metav1.UpdateOptions{}); err != nil {
return err
}
return eventHandler.process(seenEvent)
}