-
Notifications
You must be signed in to change notification settings - Fork 8
/
multicluster_work_queue.go
49 lines (42 loc) · 1.14 KB
/
multicluster_work_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package workqueue
import (
"sync"
"k8s.io/client-go/util/workqueue"
)
// MultiClusterQueues multiplexes queues across
// multiple k8s clusters.
type MultiClusterQueues struct {
queues map[string]workqueue.RateLimitingInterface
lock sync.RWMutex
}
// sets the queue for a cluster
func (s *MultiClusterQueues) Set(cluster string, queue workqueue.RateLimitingInterface) {
s.lock.Lock()
defer s.lock.Unlock()
if s.queues == nil {
s.queues = make(map[string]workqueue.RateLimitingInterface)
}
s.queues[cluster] = queue
}
// removes the queue for a cluster
func (s *MultiClusterQueues) Remove(cluster string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.queues, cluster)
}
// get the stored queues for the cluster
func (s *MultiClusterQueues) Get(cluster string) workqueue.RateLimitingInterface {
s.lock.RLock()
defer s.lock.RUnlock()
return s.queues[cluster]
}
// currently unused, useful for debugging
func (s *MultiClusterQueues) All() []workqueue.RateLimitingInterface {
s.lock.RLock()
defer s.lock.RUnlock()
var queues []workqueue.RateLimitingInterface
for _, queue := range s.queues {
queues = append(queues, queue)
}
return queues
}