-
Notifications
You must be signed in to change notification settings - Fork 38.7k
/
stateful_pod_control.go
345 lines (313 loc) · 13.8 KB
/
stateful_pod_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statefulset
import (
"context"
"fmt"
"strings"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
)
// StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this
// with a clientset for writes and listers for reads; for tests we provide stubs.
type StatefulPodControlObjectManager interface {
CreatePod(ctx context.Context, pod *v1.Pod) error
GetPod(namespace, podName string) (*v1.Pod, error)
UpdatePod(pod *v1.Pod) error
DeletePod(pod *v1.Pod) error
CreateClaim(claim *v1.PersistentVolumeClaim) error
GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error)
UpdateClaim(claim *v1.PersistentVolumeClaim) error
}
// StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods,
// and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
// implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
// Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing.
type StatefulPodControl struct {
objectMgr StatefulPodControlObjectManager
recorder record.EventRecorder
}
// NewStatefulPodControl constructs a StatefulPodControl using a realStatefulPodControlObjectManager with the given
// clientset, listers and EventRecorder.
func NewStatefulPodControl(
client clientset.Interface,
podLister corelisters.PodLister,
claimLister corelisters.PersistentVolumeClaimLister,
recorder record.EventRecorder,
) *StatefulPodControl {
return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder}
}
// NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder.
func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl {
return &StatefulPodControl{om, recorder}
}
// realStatefulPodControlObjectManager uses a clientset.Interface and listers.
type realStatefulPodControlObjectManager struct {
client clientset.Interface
podLister corelisters.PodLister
claimLister corelisters.PersistentVolumeClaimLister
}
func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
_, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
return err
}
func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
return om.podLister.Pods(namespace).Get(podName)
}
func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error {
_, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
return err
}
func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error {
return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}
func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{})
return err
}
func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName)
}
func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
return err
}
func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
// Create the Pod's PVCs prior to creating the Pod
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("create", set, pod, err)
return err
}
// If we created the PVCs attempt to create the Pod
err := spc.objectMgr.CreatePod(ctx, pod)
// sink already exists errors
if apierrors.IsAlreadyExists(err) {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Set PVC policy as much as is possible at this point.
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
spc.recordPodEvent("create", set, pod, err)
return err
}
func (spc *StatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
attemptedUpdate := false
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
// if the Pod does not conform to its identity, update the identity and dirty the Pod
if !identityMatches(set, pod) {
updateIdentity(set, pod)
consistent = false
}
// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
// dirty the Pod, and create any missing PVCs
if !storageMatches(set, pod) {
updateStorage(set, pod)
consistent = false
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC
// and dirty the pod.
if match, err := spc.ClaimsMatchRetentionPolicy(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
} else if !match {
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
consistent = false
}
}
// if the Pod is not dirty, do nothing
if consistent {
return nil
}
attemptedUpdate = true
// commit the update, retrying on conflicts
updateErr := spc.objectMgr.UpdatePod(pod)
if updateErr == nil {
return nil
}
if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil {
// make a copy so we don't mutate the shared cache
pod = updated.DeepCopy()
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err))
}
return updateErr
})
if attemptedUpdate {
spc.recordPodEvent("update", set, pod, err)
}
return err
}
func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
err := spc.objectMgr.DeletePod(pod)
spc.recordPodEvent("delete", set, pod, err)
return err
}
// ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy.
// An error is returned if something is not consistent. This is expected if the pod is being otherwise updated,
// but a problem otherwise (see usage of this method in UpdateStatefulPod).
func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
ordinal := getOrdinal(pod)
templates := set.Spec.VolumeClaimTemplates
for i := range templates {
claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
switch {
case apierrors.IsNotFound(err):
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration", claimName)
case err != nil:
return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
default:
if !claimOwnerMatchesSetAndPod(claim, set, pod) {
return false, nil
}
}
}
return true, nil
}
// UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set.
func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(set *apps.StatefulSet, pod *v1.Pod) error {
ordinal := getOrdinal(pod)
templates := set.Spec.VolumeClaimTemplates
for i := range templates {
claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
switch {
case apierrors.IsNotFound(err):
klog.V(4).Infof("Expected claim %s missing, continuing to pick up in next iteration.")
case err != nil:
return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)
default:
if !claimOwnerMatchesSetAndPod(claim, set, pod) {
needsUpdate := updateClaimOwnerRefForSetAndPod(claim, set, pod)
if needsUpdate {
err := spc.objectMgr.UpdateClaim(claim)
if err != nil {
return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err)
}
}
}
}
}
return nil
}
// PodClaimIsStale returns true for a stale PVC that should block pod creation. If the scaling
// policy is deletion, and a PVC has an ownerRef that does not match the pod, the PVC is stale. This
// includes pods whose UID has not been created.
func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
policy := getPersistentVolumeClaimRetentionPolicy(set)
if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType {
// PVCs are meant to be reused and so can't be stale.
return false, nil
}
for _, claim := range getPersistentVolumeClaims(set, pod) {
pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
switch {
case apierrors.IsNotFound(err):
// If the claim doesn't exist yet, it can't be stale.
continue
case err != nil:
return false, err
case err == nil:
// A claim is stale if it doesn't match the pod's UID, including if the pod has no UID.
if hasStaleOwnerRef(pvc, pod) {
return true, nil
}
}
}
return false, nil
}
// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
// have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
strings.ToLower(verb), pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
// nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
// reason of v1.EventTypeWarning.
func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
if err == nil {
reason := fmt.Sprintf("Successful%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
strings.ToLower(verb), claim.Name, pod.Name, set.Name)
spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
} else {
reason := fmt.Sprintf("Failed%s", strings.Title(verb))
message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
}
}
// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
// set's Spec.
func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
var errs []error
for _, claim := range getPersistentVolumeClaims(set, pod) {
pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
switch {
case apierrors.IsNotFound(err):
err := spc.objectMgr.CreateClaim(&claim)
if err != nil {
errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err))
}
if err == nil || !apierrors.IsAlreadyExists(err) {
spc.recordClaimEvent("create", set, pod, &claim, err)
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err))
spc.recordClaimEvent("create", set, pod, &claim, err)
case err == nil:
if pvc.DeletionTimestamp != nil {
errs = append(errs, fmt.Errorf("pvc %s is being deleted", claim.Name))
}
}
// TODO: Check resource requirements and accessmodes, update if necessary
}
return errorutils.NewAggregate(errs)
}