/
controlled_pods_indexer.go
259 lines (226 loc) · 7.65 KB
/
controlled_pods_indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"fmt"
"sync"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
const (
controllerUIDIndex = "controllerUID"
)
// ControlledPodsIndexer is able to efficiently find pods with ownerReference pointing a given controller object.
// For Deployments, it performs indirect lookup with ReplicaSets in the middle.
type ControlledPodsIndexer struct {
podsIndexer cache.Indexer
podsSynced cache.InformerSynced
rsSynced cache.InformerSynced
rsIndexer cache.Indexer
// lock is a lock for accessing rsPendingDeletion.
lock sync.Mutex
// rsPendingDeletion are replicasets that have been deleted, but there are still pods referencing them,
// so we have to postpone deletion from `rsIndexer`. They should be deleted as soon as the last pod
// referencing it is deleted.
rsPendingDeletion map[types.UID]bool
}
// ReplicaSetState stores information relevant to a specific ReplicaSet object,
// i.e. how many pods it owns exist, whether the RS object itself exists
// and its latest known owner's UID.
type ReplicaSetState struct {
NumPods int
Exists bool
}
// UIDSet is a collection of ReplicaSet objects UIDs.
type UIDSet map[types.UID]bool
func deletionHandlingUIDKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return string(getObjUID(obj)), nil
}
// NewControlledPodsIndexer creates a new ControlledPodsIndexer instance.
func NewControlledPodsIndexer(podsInformer coreinformers.PodInformer, rsInformer appsinformers.ReplicaSetInformer) (*ControlledPodsIndexer, error) {
if err := podsInformer.Informer().AddIndexers(cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc}); err != nil {
return nil, fmt.Errorf("failed to register indexer: %w", err)
}
// We need a separate storage from rsInformer as we postpone deletion until all pods are removed.
rsIndexer := cache.NewIndexer(deletionHandlingUIDKeyFunc, cache.Indexers{controllerUIDIndex: controllerUIDIndexFunc})
cpi := &ControlledPodsIndexer{
podsIndexer: podsInformer.Informer().GetIndexer(),
podsSynced: podsInformer.Informer().HasSynced,
rsIndexer: rsIndexer,
rsSynced: rsInformer.Informer().HasSynced,
rsPendingDeletion: make(map[types.UID]bool),
}
podsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
oldOwnerUID, _ := getControllerInfo(oldObj)
newOwnerUID, _ := getControllerInfo(newObj)
if oldOwnerUID == newOwnerUID {
return
}
cpi.lock.Lock()
defer cpi.lock.Unlock()
if err := cpi.clearRSDataIfPossibleLocked(oldOwnerUID); err != nil {
klog.Errorf("error while deleting %v: %v", oldOwnerUID, err)
}
},
DeleteFunc: func(obj interface{}) {
ownerUID, _ := getControllerInfo(obj)
cpi.lock.Lock()
defer cpi.lock.Unlock()
if err := cpi.clearRSDataIfPossibleLocked(ownerUID); err != nil {
klog.Errorf("error while deleting %v: %v", ownerUID, err)
}
},
},
)
rsInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if err := rsIndexer.Add(obj); err != nil {
klog.Errorf("error while adding %v: %v", obj, err)
}
},
UpdateFunc: func(_, newObj interface{}) {
if err := rsIndexer.Update(newObj); err != nil {
klog.Errorf("error while updating %v: %v", newObj, err)
}
},
DeleteFunc: func(obj interface{}) {
rsUID := getObjUID(obj)
cpi.lock.Lock()
defer cpi.lock.Unlock()
cpi.rsPendingDeletion[rsUID] = true
if err := cpi.clearRSDataIfPossibleLocked(rsUID); err != nil {
klog.Errorf("error while deleting %v: %v", rsUID, err)
}
},
},
)
return cpi, nil
}
func getControllerInfo(obj interface{}) (types.UID, string) {
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return "", ""
}
controller := metav1.GetControllerOf(metaAccessor)
if controller == nil {
return "", ""
}
return controller.UID, controller.Kind
}
func getObjUID(obj interface{}) types.UID {
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return ""
}
return metaAccessor.GetUID()
}
func (p *ControlledPodsIndexer) clearRSDataIfPossibleLocked(rsUID types.UID) error {
if !p.rsPendingDeletion[rsUID] {
return nil
}
pods, err := p.appendPodsControlledBy(nil, rsUID)
if err != nil {
return fmt.Errorf("failed to list pods for %q: %w", rsUID, err)
}
if len(pods) != 0 {
return nil
}
delete(p.rsPendingDeletion, rsUID)
obj, exists, err := p.rsIndexer.GetByKey(string(rsUID))
if err != nil {
return err
}
if !exists {
return nil
}
return p.rsIndexer.Delete(obj)
}
func controllerUIDIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return nil, fmt.Errorf("object has no meta: %v", err)
}
controllerRef := metav1.GetControllerOf(meta)
if controllerRef == nil {
return []string{}, nil
}
return []string{string(controllerRef.UID)}, nil
}
// WaitForCacheSync waits for all required informers to be initialized.
func (p *ControlledPodsIndexer) WaitForCacheSync(ctx context.Context) bool {
return cache.WaitForNamedCacheSync("PodsIndexer", ctx.Done(), p.podsSynced, p.rsSynced)
}
// PodsControlledBy returns pods controlled by a given controller object.
func (p *ControlledPodsIndexer) PodsControlledBy(obj interface{}) ([]*corev1.Pod, error) {
metaAccessor, err := meta.Accessor(obj)
if err != nil {
return nil, fmt.Errorf("object has no meta: %w", err)
}
typeAccessor, err := meta.TypeAccessor(obj)
if err != nil {
return nil, fmt.Errorf("object has unknown type: %w", err)
}
var podOwners []types.UID
switch typeAccessor.GetKind() {
case "Deployment":
replicaSets, err := p.rsIndexer.ByIndex(controllerUIDIndex, string(metaAccessor.GetUID()))
if err != nil {
return nil, fmt.Errorf("failed to get replicasets controlled by %v: %w", metaAccessor.GetUID(), err)
}
for _, replicaSet := range replicaSets {
replicaSet, ok := replicaSet.(*appsv1.ReplicaSet)
if !ok {
return nil, fmt.Errorf("expected *appsv1.ReplicaSet; got: %T", replicaSet)
}
podOwners = append(podOwners, replicaSet.GetUID())
}
default:
podOwners = append(podOwners, metaAccessor.GetUID())
}
var res []*corev1.Pod
for _, podOwner := range podOwners {
res, err = p.appendPodsControlledBy(res, podOwner)
if err != nil {
return nil, fmt.Errorf("failed to get pods controlled by %v: %w", podOwner, err)
}
}
return res, nil
}
func (p *ControlledPodsIndexer) appendPodsControlledBy(res []*corev1.Pod, uid types.UID) ([]*corev1.Pod, error) {
objs, err := p.podsIndexer.ByIndex(controllerUIDIndex, string(uid))
if err != nil {
return nil, fmt.Errorf("method ByIndex failed: %w", err)
}
for _, obj := range objs {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("expected *corev1.Pod; got: %T", obj)
}
res = append(res, pod)
}
return res, nil
}