/
start.go
88 lines (71 loc) · 2.63 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package secondarywatch
import (
"context"
"fmt"
"reflect"
"time"
"github.com/openshift/cluster-resource-override-admission-operator/pkg/runtime"
"k8s.io/client-go/informers"
)
type Options struct {
Client *runtime.Client
ResyncPeriod time.Duration
Namespace string
}
// StarterFunc refers to a function that can be called to start watch on secondary resources.
type StarterFunc func(enqueuer runtime.Enqueuer, shutdown context.Context) error
func (s StarterFunc) Start(enqueuer runtime.Enqueuer, shutdown context.Context) error {
return s(enqueuer, shutdown)
}
// New sets up watch on secondary resources.
// The function returns lister(s) that can be used to query secondary resources
// and a StarterFunc that can be called to start the watch.
func New(options *Options) (lister *Lister, startFunc StarterFunc) {
option := informers.WithNamespace(options.Namespace)
factory := informers.NewSharedInformerFactoryWithOptions(options.Client.Kubernetes, options.ResyncPeriod, option)
deployment := factory.Apps().V1().Deployments()
daemonset := factory.Apps().V1().DaemonSets()
pod := factory.Core().V1().Pods()
configmap := factory.Core().V1().ConfigMaps()
service := factory.Core().V1().Services()
secret := factory.Core().V1().Secrets()
serviceaccount := factory.Core().V1().ServiceAccounts()
webhook := factory.Admissionregistration().V1().MutatingWebhookConfigurations()
startFunc = func(enqueuer runtime.Enqueuer, shutdown context.Context) error {
handler := newResourceEventHandler(enqueuer)
deployment.Informer().AddEventHandler(handler)
daemonset.Informer().AddEventHandler(handler)
pod.Informer().AddEventHandler(handler)
configmap.Informer().AddEventHandler(handler)
service.Informer().AddEventHandler(handler)
secret.Informer().AddEventHandler(handler)
serviceaccount.Informer().AddEventHandler(handler)
webhook.Informer().AddEventHandler(handler)
factory.Start(shutdown.Done())
status := factory.WaitForCacheSync(shutdown.Done())
if names := check(status); len(names) > 0 {
return fmt.Errorf("WaitForCacheSync did not successfully complete resources=%s", names)
}
return nil
}
lister = &Lister{
deployment: deployment.Lister(),
daemonset: daemonset.Lister(),
pod: pod.Lister(),
configmap: configmap.Lister(),
service: service.Lister(),
secret: secret.Lister(),
serviceaccount: serviceaccount.Lister(),
webhook: webhook.Lister(),
}
return
}
func check(status map[reflect.Type]bool) []string {
names := make([]string, 0)
for objType, synced := range status {
if !synced {
names = append(names, objType.Name())
}
}
return names
}