/
aggregator.go
119 lines (105 loc) · 3.58 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package k8s
import (
"context"
"fmt"
"time"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type IngressLister interface {
List() ([]v1beta1.Ingress, error)
}
type Aggregator struct {
factories []*informers.SharedInformerFactory
events chan SyncDataEvent
ingressStores []cache.Store
secretsStore []cache.Store
}
func (a *Aggregator) Events() chan SyncDataEvent {
return a.events
}
func (a *Aggregator) GetSecrets() ([]*v1.Secret, error) {
allSecrets := make([]*v1.Secret, 0)
for _, store := range a.secretsStore {
secrets := store.List()
for _, obj := range secrets {
secret, ok := obj.(*v1.Secret)
if !ok {
return nil, fmt.Errorf("unexpected object in store: %+v", obj)
}
allSecrets = append(allSecrets, secret)
}
}
return allSecrets, nil
}
// NewAggregator returns a new Aggregator initialized with resource informers
func NewAggregator(k8sClients []*kubernetes.Clientset, ctx context.Context, syncSecrets bool) *Aggregator {
a := Aggregator{
events: make(chan SyncDataEvent, watch.DefaultChanSize),
ingressStores: []cache.Store{},
secretsStore: []cache.Store{},
}
informersSynced := []cache.InformerSynced{}
for _, c := range k8sClients {
factory := informers.NewSharedInformerFactory(c, time.Minute)
ingressInformer := getIngressInformer(factory, c)
a.EventsIngresses(ctx, ingressInformer)
a.ingressStores = append(a.ingressStores, ingressInformer.GetStore())
a.factories = append(a.factories, &factory)
informersSynced = append(informersSynced, ingressInformer.HasSynced)
if syncSecrets {
tlsFilter := informers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.FieldSelector = "type=kubernetes.io/tls"
})
// using new factory here to apply filter to secrets lister only
// see https://github.com/kubernetes/kubernetes/issues/90262#issuecomment-671479190
secretsFactory := informers.NewSharedInformerFactoryWithOptions(c, time.Minute, tlsFilter)
secretsInformer := secretsFactory.Core().V1().Secrets().Informer()
a.EventsSecrets(ctx, secretsInformer)
a.secretsStore = append(a.secretsStore, secretsInformer.GetStore())
informersSynced = append(informersSynced, secretsInformer.HasSynced)
}
}
if !cache.WaitForCacheSync(ctx.Done(), informersSynced...) {
logrus.Panicf("Unable to populate caches")
}
return &a
}
func getIngressInformer(factory informers.SharedInformerFactory, clientSet *kubernetes.Clientset) (ingressInformer cache.SharedIndexInformer) {
for _, apiGroup := range []string{"networking.k8s.io/v1", "networking.k8s.io/v1beta1", "extensions/v1beta1"} {
resources, err := clientSet.ServerResourcesForGroupVersion(apiGroup)
if err != nil {
continue
}
for _, rs := range resources.APIResources {
if rs.Name == "ingresses" {
switch apiGroup {
case "networking.k8s.io/v1":
ingressInformer = factory.Networking().V1().Ingresses().Informer()
case "networking.k8s.io/v1beta1":
ingressInformer = factory.Networking().V1beta1().Ingresses().Informer()
case "extensions/v1beta1":
ingressInformer = factory.Extensions().V1beta1().Ingresses().Informer()
}
logrus.Infof("watching ingress resources of apiGroup %s", apiGroup)
}
}
if ingressInformer != nil {
break
}
}
return ingressInformer
}
// Run is the synchronization loop
func (a *Aggregator) Run() {
for {
a.events <- SyncDataEvent{SyncType: COMMAND}
time.Sleep(5 * time.Second)
}
}