This repository has been archived by the owner on Jul 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 277
/
client.go
141 lines (113 loc) · 5.42 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package reconciler
import (
"strconv"
clientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
customResourceDefinitionInformer "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
internalinterfaces "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/internalinterfaces"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/k8s"
)
// NewReconcilerClient implements a client to reconcile osm managed resources
func NewReconcilerClient(kubeClient kubernetes.Interface, apiServerClient clientset.Interface, meshName, osmVersion string, stop chan struct{}, selectInformers ...k8s.InformerKey) error {
// Initialize client object
c := client{
kubeClient: kubeClient,
meshName: meshName,
osmVersion: osmVersion,
apiServerClient: apiServerClient,
informers: informerCollection{},
}
// Initialize informers
informerInitHandlerMap := map[k8s.InformerKey]func(){
CrdInformerKey: c.initCustomResourceDefinitionMonitor,
MutatingWebhookInformerKey: c.initMutatingWebhookConfigurationMonitor,
ValidatingWebhookInformerKey: c.initValidatingWebhookConfigurationMonitor,
}
// If specific informers are not selected to be initialized, initialize all informers
if len(selectInformers) == 0 {
informers := []k8s.InformerKey{MutatingWebhookInformerKey, ValidatingWebhookInformerKey}
// initialize informer for CRDs only if the apiServerClient is not nil
if apiServerClient != nil {
informers = append(informers, CrdInformerKey)
}
selectInformers = informers
}
for _, informer := range selectInformers {
informerInitHandlerMap[informer]()
}
if err := c.run(stop); err != nil {
log.Error().Err(err).Str(errcode.Kind, errcode.GetErrCodeWithMetric(errcode.ErrStartingReconciler)).
Msg("Could not start osm reconciler client")
return err
}
return nil
}
// Initializes CustomResourceDefinition monitoring
func (c *client) initCustomResourceDefinitionMonitor() {
osmCrdsLabel := map[string]string{constants.OSMAppNameLabelKey: constants.OSMAppNameLabelValue, constants.ReconcileLabel: strconv.FormatBool(true)}
labelSelector := fields.SelectorFromSet(osmCrdsLabel).String()
options := internalinterfaces.TweakListOptionsFunc(func(opt *metav1.ListOptions) {
opt.LabelSelector = labelSelector
})
informerFactory := customResourceDefinitionInformer.NewFilteredCustomResourceDefinitionInformer(c.apiServerClient, k8s.DefaultKubeEventResyncInterval, cache.Indexers{nameIndex: metaNameIndexFunc}, options)
// Add informer
c.informers[CrdInformerKey] = informerFactory
// Add event handler to informer
c.informers[CrdInformerKey].AddEventHandler(c.crdEventHandler())
}
// Initializes mutating webhook monitoring
func (c *client) initMutatingWebhookConfigurationMonitor() {
osmMwhcLabel := map[string]string{constants.OSMAppNameLabelKey: constants.OSMAppNameLabelValue, constants.OSMAppInstanceLabelKey: c.meshName, constants.ReconcileLabel: strconv.FormatBool(true)}
labelSelector := fields.SelectorFromSet(osmMwhcLabel).String()
option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labelSelector
})
informerFactory := informers.NewSharedInformerFactoryWithOptions(c.kubeClient, k8s.DefaultKubeEventResyncInterval, option)
// Add informer
c.informers[MutatingWebhookInformerKey] = informerFactory.Admissionregistration().V1().MutatingWebhookConfigurations().Informer()
// Add event handler to informer
c.informers[MutatingWebhookInformerKey].AddEventHandler(c.mutatingWebhookEventHandler())
}
// Initializes validating webhook monitoring
func (c *client) initValidatingWebhookConfigurationMonitor() {
osmVwhcLabel := map[string]string{constants.OSMAppNameLabelKey: constants.OSMAppNameLabelValue, constants.OSMAppInstanceLabelKey: c.meshName, constants.ReconcileLabel: strconv.FormatBool(true)}
labelSelector := fields.SelectorFromSet(osmVwhcLabel).String()
option := informers.WithTweakListOptions(func(opt *metav1.ListOptions) {
opt.LabelSelector = labelSelector
})
informerFactory := informers.NewSharedInformerFactoryWithOptions(c.kubeClient, k8s.DefaultKubeEventResyncInterval, option)
// Add informer
c.informers[ValidatingWebhookInformerKey] = informerFactory.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer()
// Add event handler to informer
c.informers[ValidatingWebhookInformerKey].AddEventHandler(c.validatingWebhookEventHandler())
}
func (c *client) run(stop <-chan struct{}) error {
log.Info().Msg("OSM reconciler client started")
var hasSynced []cache.InformerSynced
var names []string
if c.informers == nil {
log.Error().Err(errInitInformers).Msg("No resources added to reconciler's informer")
return errInitInformers
}
for name, informer := range c.informers {
if informer == nil {
continue
}
log.Info().Msgf("Starting reconciler informer: %s", name)
go informer.Run(stop)
names = append(names, (string)(name))
hasSynced = append(hasSynced, informer.HasSynced)
}
log.Info().Msgf("Waiting for reconciler informer's cache to sync")
if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}
log.Info().Msgf("Cache sync finished for reconciler informer : %v", names)
return nil
}