forked from kubernetes-sigs/aws-load-balancer-controller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
secrets_manager.go
133 lines (117 loc) · 3.85 KB
/
secrets_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package k8s
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sync"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
// SecretsManager manages the secret resources needed by the controller
type SecretsManager interface {
// MonitorSecrets manages the individual watches for the given secrets
MonitorSecrets(ingressGroupID string, secrets []types.NamespacedName)
}
func NewSecretsManager(clientSet kubernetes.Interface, secretsEventChan chan<- event.GenericEvent, logger logr.Logger) *defaultSecretsManager {
return &defaultSecretsManager{
mutex: sync.Mutex{},
secretMap: make(map[types.NamespacedName]*secretItem),
secretsEventChan: secretsEventChan,
clientSet: clientSet,
logger: logger,
}
}
var _ SecretsManager = &defaultSecretsManager{}
type defaultSecretsManager struct {
mutex sync.Mutex
secretMap map[types.NamespacedName]*secretItem
secretsEventChan chan<- event.GenericEvent
clientSet kubernetes.Interface
queue workqueue.RateLimitingInterface
logger logr.Logger
}
type secretItem struct {
store cache.Store
rt *cache.Reflector
ingresses sets.String
stopCh chan struct{}
}
func (m *defaultSecretsManager) MonitorSecrets(ingressGroupID string, secrets []types.NamespacedName) {
m.logger.V(1).Info("Monitoring secrets", "groupID", ingressGroupID, "secrets", secrets)
m.mutex.Lock()
defer m.mutex.Unlock()
inputSecrets := make(sets.String)
for _, secret := range secrets {
inputSecrets.Insert(secret.String())
item, exists := m.secretMap[secret]
if !exists {
m.logger.V(1).Info("secret is not being monitored, adding watch", "item", secret)
item = m.newReflector(secret.Namespace, secret.Name)
m.secretMap[secret] = item
}
item.ingresses.Insert(ingressGroupID)
}
// Perform garbage collection
var cleanupSecrets []types.NamespacedName
for secret, secretItem := range m.secretMap {
if inputSecrets.Has(secret.String()) {
continue
}
if secretItem.ingresses.Has(ingressGroupID) {
secretItem.ingresses.Delete(ingressGroupID)
}
if secretItem.ingresses.Len() == 0 {
cleanupSecrets = append(cleanupSecrets, secret)
}
}
for _, secret := range cleanupSecrets {
m.logger.V(1).Info("secret no longer needs monitoring, stopping the watch", "item", secret)
m.secretMap[secret].stopReflector()
delete(m.secretMap, secret)
}
}
func (m *defaultSecretsManager) newReflector(namespace, name string) *secretItem {
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return m.clientSet.CoreV1().Secrets(namespace).List(context.TODO(), options)
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return m.clientSet.CoreV1().Secrets(namespace).Watch(context.TODO(), options)
}
store := m.newStore()
rt := cache.NewNamedReflector(
fmt.Sprintf("secret-%s/%s", namespace, name),
&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
&corev1.Secret{},
store,
0,
)
item := &secretItem{
store: store,
rt: rt,
ingresses: make(sets.String),
stopCh: make(chan struct{}),
}
go item.startReflector()
return item
}
func (m *defaultSecretsManager) newStore() *SecretsStore {
return NewSecretsStore(m.secretsEventChan, cache.MetaNamespaceKeyFunc, m.logger)
}
func (s *secretItem) stopReflector() {
close(s.stopCh)
}
func (s *secretItem) startReflector() {
s.rt.Run(s.stopCh)
}