forked from kubernetes-sigs/aws-load-balancer-controller
/
ingress_events.go
92 lines (78 loc) · 3.35 KB
/
ingress_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package eventhandlers
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"github.com/sonal-chauhan/aws-load-balancer-controller/pkg/ingress"
"github.com/sonal-chauhan/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func NewEnqueueRequestsForIngressEvent(groupLoader ingress.GroupLoader, eventRecorder record.EventRecorder,
logger logr.Logger) *enqueueRequestsForIngressEvent {
return &enqueueRequestsForIngressEvent{
groupLoader: groupLoader,
eventRecorder: eventRecorder,
logger: logger,
}
}
var _ handler.EventHandler = (*enqueueRequestsForIngressEvent)(nil)
type enqueueRequestsForIngressEvent struct {
groupLoader ingress.GroupLoader
eventRecorder record.EventRecorder
logger logr.Logger
}
func (h *enqueueRequestsForIngressEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
h.enqueueIfBelongsToGroup(queue, e.Object.(*networking.Ingress))
}
func (h *enqueueRequestsForIngressEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
ingOld := e.ObjectOld.(*networking.Ingress)
ingNew := e.ObjectNew.(*networking.Ingress)
// we only care below update event:
// 1. Ingress annotation updates
// 2. Ingress spec updates
// 3. Ingress deletion
if !equality.Semantic.DeepEqual(ingOld.ResourceVersion, ingNew.ResourceVersion) {
if equality.Semantic.DeepEqual(ingOld.Annotations, ingNew.Annotations) &&
equality.Semantic.DeepEqual(ingOld.Spec, ingNew.Spec) &&
equality.Semantic.DeepEqual(ingOld.DeletionTimestamp.IsZero(), ingNew.DeletionTimestamp.IsZero()) {
return
}
}
h.enqueueIfBelongsToGroup(queue, ingNew)
}
func (h *enqueueRequestsForIngressEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
// since we'll always attach an finalizer before doing any reconcile action,
// user triggered delete action will actually be an update action with deletionTimestamp set,
// which will be handled by update event handler.
// so we'll just ignore delete events to avoid unnecessary reconcile call.
}
func (h *enqueueRequestsForIngressEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
h.enqueueIfBelongsToGroup(queue, e.Object.(*networking.Ingress))
}
func (h *enqueueRequestsForIngressEvent) enqueueIfBelongsToGroup(queue workqueue.RateLimitingInterface, ing *networking.Ingress) {
ctx := context.Background()
ingKey := k8s.NamespacedName(ing)
groupIDsSet := make(map[ingress.GroupID]struct{})
groupIDsPendingFinalization := h.groupLoader.LoadGroupIDsPendingFinalization(ctx, ing)
for _, groupID := range groupIDsPendingFinalization {
groupIDsSet[groupID] = struct{}{}
}
if groupID, err := h.groupLoader.LoadGroupIDIfAny(ctx, ing); err != nil {
h.eventRecorder.Event(ing, corev1.EventTypeWarning, k8s.IngressEventReasonFailedLoadGroupID, fmt.Sprintf("failed load groupID due to %v", err))
} else if groupID != nil {
groupIDsSet[*groupID] = struct{}{}
}
for groupID, _ := range groupIDsSet {
h.logger.V(1).Info("enqueue ingressGroup for ingress event",
"ingress", ingKey.String(),
"ingressGroup", groupID,
)
queue.Add(ingress.EncodeGroupIDToReconcileRequest(groupID))
}
}