generated from kedacore/github-template
/
endpoints_cache_informer.go
135 lines (121 loc) · 3.17 KB
/
endpoints_cache_informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package k8s
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
infcorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type InformerBackedEndpointsCache struct {
lggr logr.Logger
endpointsInformer infcorev1.EndpointsInformer
bcaster *watch.Broadcaster
}
func (i *InformerBackedEndpointsCache) MarshalJSON() ([]byte, error) {
lst := i.endpointsInformer.Lister()
depls, err := lst.List(labels.Everything())
if err != nil {
return nil, err
}
return json.Marshal(&depls)
}
func (i *InformerBackedEndpointsCache) Start(ctx context.Context) {
i.endpointsInformer.Informer().Run(ctx.Done())
}
func (i *InformerBackedEndpointsCache) Get(
ns,
name string,
) (v1.Endpoints, error) {
depl, err := i.endpointsInformer.Lister().Endpoints(ns).Get(name)
if err != nil {
return v1.Endpoints{}, err
}
return *depl, nil
}
func (i *InformerBackedEndpointsCache) Watch(
ns,
name string,
) (watch.Interface, error) {
watched, err := i.bcaster.Watch()
if err != nil {
return nil, err
}
return watch.Filter(watched, func(e watch.Event) (watch.Event, bool) {
depl := e.Object.(*v1.Endpoints)
if depl.Namespace == ns && depl.Name == name {
return e, true
}
return e, false
}), nil
}
func (i *InformerBackedEndpointsCache) addEvtHandler(obj interface{}) {
depl, ok := obj.(*v1.Endpoints)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected service, got %v", obj),
"not forwarding event",
)
return
}
if err := i.bcaster.Action(watch.Added, depl); err != nil {
i.lggr.Error(err, "informer expected service")
}
}
func (i *InformerBackedEndpointsCache) updateEvtHandler(_, newObj interface{}) {
depl, ok := newObj.(*v1.Endpoints)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected service, got %v", newObj),
"not forwarding event",
)
return
}
if err := i.bcaster.Action(watch.Modified, depl); err != nil {
i.lggr.Error(err, "informer expected service")
}
}
func (i *InformerBackedEndpointsCache) deleteEvtHandler(obj interface{}) {
depl, ok := obj.(*v1.Endpoints)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected service, got %v", obj),
"not forwarding event",
)
return
}
if err := i.bcaster.Action(watch.Deleted, depl); err != nil {
i.lggr.Error(err, "informer expected service")
}
}
func NewInformerBackedEndpointsCache(
lggr logr.Logger,
cl kubernetes.Interface,
defaultResync time.Duration,
) *InformerBackedEndpointsCache {
factory := informers.NewSharedInformerFactory(
cl,
defaultResync,
)
endpointsInformer := factory.Core().V1().Endpoints()
ret := &InformerBackedEndpointsCache{
lggr: lggr,
bcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
endpointsInformer: endpointsInformer,
}
_, err := ret.endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ret.addEvtHandler,
UpdateFunc: ret.updateEvtHandler,
DeleteFunc: ret.deleteEvtHandler,
})
if err != nil {
lggr.Error(err, "error creating backend informer")
}
return ret
}