-
Notifications
You must be signed in to change notification settings - Fork 828
/
multi-cluster-manager.go
156 lines (130 loc) · 4.88 KB
/
multi-cluster-manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package typedmanager
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/karmada-io/karmada/pkg/util/fedinformer"
)
var (
instance MultiClusterInformerManager
once sync.Once
stopOnce sync.Once
stopCh chan struct{}
)
func init() {
stopCh = make(chan struct{})
}
// GetInstance returns a shared MultiClusterInformerManager instance.
func GetInstance() MultiClusterInformerManager {
once.Do(func() {
transforms := map[schema.GroupVersionResource]cache.TransformFunc{
nodeGVR: fedinformer.NodeTransformFunc,
podGVR: fedinformer.PodTransformFunc,
}
instance = NewMultiClusterInformerManager(stopCh, transforms)
})
return instance
}
// StopInstance will stop the shared MultiClusterInformerManager instance.
func StopInstance() {
stopOnce.Do(func() {
close(stopCh)
})
}
// MultiClusterInformerManager manages typed shared informer for all resources, include Kubernetes resource and
// custom resources defined by CustomResourceDefinition, across multi-cluster.
type MultiClusterInformerManager interface {
// ForCluster builds an informer manager for a specific cluster.
ForCluster(cluster string, client kubernetes.Interface, defaultResync time.Duration) SingleClusterInformerManager
// GetSingleClusterManager gets the informer manager for a specific cluster.
// The informer manager should be created before, otherwise, nil will be returned.
GetSingleClusterManager(cluster string) SingleClusterInformerManager
// IsManagerExist checks if the informer manager for the cluster already created.
IsManagerExist(cluster string) bool
// Start will run all informers for a specific cluster.
// Should call after 'ForCluster', otherwise no-ops.
Start(cluster string)
// Stop will stop all informers for a specific cluster, and delete the cluster from informer managers.
Stop(cluster string)
// WaitForCacheSync waits for all caches to populate.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool
// WaitForCacheSyncWithTimeout waits for all caches to populate with a definitive timeout.
// Should call after 'ForCluster', otherwise no-ops.
WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool
}
// NewMultiClusterInformerManager constructs a new instance of multiClusterInformerManagerImpl.
func NewMultiClusterInformerManager(stopCh <-chan struct{}, transformFuncs map[schema.GroupVersionResource]cache.TransformFunc) MultiClusterInformerManager {
return &multiClusterInformerManagerImpl{
managers: make(map[string]SingleClusterInformerManager),
transformFuncs: transformFuncs,
stopCh: stopCh,
}
}
type multiClusterInformerManagerImpl struct {
managers map[string]SingleClusterInformerManager
transformFuncs map[schema.GroupVersionResource]cache.TransformFunc
stopCh <-chan struct{}
lock sync.RWMutex
}
func (m *multiClusterInformerManagerImpl) getManager(cluster string) (SingleClusterInformerManager, bool) {
m.lock.RLock()
defer m.lock.RUnlock()
manager, exist := m.managers[cluster]
return manager, exist
}
func (m *multiClusterInformerManagerImpl) ForCluster(cluster string, client kubernetes.Interface, defaultResync time.Duration) SingleClusterInformerManager {
m.lock.Lock()
defer m.lock.Unlock()
// If informer manager already exist, just return
if manager, exist := m.managers[cluster]; exist {
return manager
}
manager := NewSingleClusterInformerManager(client, defaultResync, m.stopCh, m.transformFuncs)
m.managers[cluster] = manager
return manager
}
func (m *multiClusterInformerManagerImpl) GetSingleClusterManager(cluster string) SingleClusterInformerManager {
if manager, exist := m.getManager(cluster); exist {
return manager
}
return nil
}
func (m *multiClusterInformerManagerImpl) IsManagerExist(cluster string) bool {
_, exist := m.getManager(cluster)
return exist
}
func (m *multiClusterInformerManagerImpl) Start(cluster string) {
// if informer manager haven't been created, just return with no-ops.
manager, exist := m.getManager(cluster)
if !exist {
return
}
manager.Start()
}
func (m *multiClusterInformerManagerImpl) Stop(cluster string) {
manager, exist := m.getManager(cluster)
if !exist {
return
}
manager.Stop()
m.lock.Lock()
defer m.lock.Unlock()
delete(m.managers, cluster)
}
func (m *multiClusterInformerManagerImpl) WaitForCacheSync(cluster string) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSync()
}
func (m *multiClusterInformerManagerImpl) WaitForCacheSyncWithTimeout(cluster string, cacheSyncTimeout time.Duration) map[schema.GroupVersionResource]bool {
manager, exist := m.getManager(cluster)
if !exist {
return nil
}
return manager.WaitForCacheSyncWithTimeout(cacheSyncTimeout)
}