forked from kubermatic/kubermatic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
informer_provider.go
95 lines (77 loc) · 3.13 KB
/
informer_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/*
Copyright 2020 The Kubermatic Kubernetes Platform contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbac
import (
"fmt"
"sync"
"time"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
// InformerProvider allows for storing shared informer factories for the given namespaces
// additionally it provides method for starting and waiting for all registered factories
type InformerProvider interface {
// KubeInformerFactoryFor registers a shared informer factory for the given namespace
KubeInformerFactoryFor(namespace string) kubeinformers.SharedInformerFactory
// StartInformers starts all registered factories
StartInformers(stopCh <-chan struct{})
// WaitForCachesToSync waits until caches from all factories are synced
WaitForCachesToSync(stopCh <-chan struct{}) error
}
// InformerProviderImpl simply holds namespaced factories
type InformerProviderImpl struct {
kubeClient kubernetes.Interface
kubeInformers map[string]kubeinformers.SharedInformerFactory
resync time.Duration
lock *sync.Mutex
started bool
}
// NewInformerProvider creates a new provider that
func NewInformerProvider(kubeClient kubernetes.Interface, resync time.Duration) *InformerProviderImpl {
return &InformerProviderImpl{kubeClient: kubeClient, resync: resync, kubeInformers: map[string]kubeinformers.SharedInformerFactory{}, lock: &sync.Mutex{}}
}
// KubeInformerFactoryFor registers a shared informer factory for the given namespace
func (p *InformerProviderImpl) KubeInformerFactoryFor(namespace string) kubeinformers.SharedInformerFactory {
p.lock.Lock()
defer p.lock.Unlock()
if informer, ok := p.kubeInformers[namespace]; ok {
return informer
}
if p.started {
// this is a programmer error
panic("Please register a factory before starting the provider call to StartInformers method")
}
p.kubeInformers[namespace] = kubeinformers.NewFilteredSharedInformerFactory(p.kubeClient, p.resync, namespace, nil)
return p.kubeInformers[namespace]
}
// StartInformers starts all registered factories
func (p *InformerProviderImpl) StartInformers(stopCh <-chan struct{}) {
p.lock.Lock()
defer p.lock.Unlock()
for _, informer := range p.kubeInformers {
informer.Start(stopCh)
}
p.started = true
}
// WaitForCachesToSync waits until caches from all factories are synced
func (p *InformerProviderImpl) WaitForCachesToSync(stopCh <-chan struct{}) error {
for _, informer := range p.kubeInformers {
infKubeSyncStatus := informer.WaitForCacheSync(stopCh)
for informerType, informerSynced := range infKubeSyncStatus {
if !informerSynced {
return fmt.Errorf("unable to sync caches for for informer %v", informerType)
}
}
}
return nil
}