/
controller.go
961 lines (867 loc) · 34.8 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resourceclaim
import (
"context"
"errors"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
v1informers "k8s.io/client-go/informers/core/v1"
resourcev1alpha2informers "k8s.io/client-go/informers/resource/v1alpha2"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
"k8s.io/utils/pointer"
)
const (
// podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates.
podResourceClaimIndex = "pod-resource-claim-index"
// podResourceClaimAnnotation is the special annotation that generated
// ResourceClaims get. Its value is the pod.spec.resourceClaims[].name
// for which it was generated. This is used only inside the controller
// and not documented as part of the Kubernetes API.
podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
// claimPodOwnerIndex is used to find ResourceClaims which have
// a specific pod as owner. Values for this index are the pod UID.
claimPodOwnerIndex = "claim-pod-owner-index"
// Field manager used to update the pod status.
fieldManager = "ResourceClaimController"
maxUIDCacheEntries = 500
)
// Controller creates ResourceClaims for ResourceClaimTemplates in a pod spec.
type Controller struct {
// kubeClient is the kube API client used to communicate with the API
// server.
kubeClient clientset.Interface
// claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim
// objects from the API server. It is shared with other controllers and
// therefore the ResourceClaim objects in its store should be treated as immutable.
claimLister resourcev1alpha2listers.ResourceClaimLister
claimsSynced cache.InformerSynced
claimCache cache.MutationCache
// podLister is the shared Pod lister used to fetch Pod
// objects from the API server. It is shared with other controllers and
// therefore the Pod objects in its store should be treated as immutable.
podLister v1listers.PodLister
podSynced cache.InformerSynced
// podSchedulingList is the shared PodSchedulingContext lister used to
// fetch scheduling objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
// as immutable.
podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister
podSchedulingSynced cache.InformerSynced
// templateLister is the shared ResourceClaimTemplate lister used to
// fetch template objects from the API server. It is shared with other
// controllers and therefore the objects in its store should be treated
// as immutable.
templateLister resourcev1alpha2listers.ResourceClaimTemplateLister
templatesSynced cache.InformerSynced
// podIndexer has the common PodResourceClaim indexer indexer installed To
// limit iteration over pods to those of interest.
podIndexer cache.Indexer
// recorder is used to record events in the API server
recorder record.EventRecorder
queue workqueue.RateLimitingInterface
// The deletedObjects cache keeps track of Pods for which we know that
// they have existed and have been removed. For those we can be sure
// that a ReservedFor entry needs to be removed.
deletedObjects *uidCache
}
const (
claimKeyPrefix = "claim:"
podKeyPrefix = "pod:"
)
// NewController creates a ResourceClaim controller.
func NewController(
logger klog.Logger,
kubeClient clientset.Interface,
podInformer v1informers.PodInformer,
podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer,
claimInformer resourcev1alpha2informers.ResourceClaimInformer,
templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) {
ec := &Controller{
kubeClient: kubeClient,
podLister: podInformer.Lister(),
podIndexer: podInformer.Informer().GetIndexer(),
podSynced: podInformer.Informer().HasSynced,
podSchedulingLister: podSchedulingInformer.Lister(),
podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
claimLister: claimInformer.Lister(),
claimsSynced: claimInformer.Informer().HasSynced,
templateLister: templateInformer.Lister(),
templatesSynced: templateInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
deletedObjects: newUIDCache(maxUIDCacheEntries),
}
metrics.RegisterMetrics()
if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ec.enqueuePod(logger, obj, false)
},
UpdateFunc: func(old, updated interface{}) {
ec.enqueuePod(logger, updated, false)
},
DeleteFunc: func(obj interface{}) {
ec.enqueuePod(logger, obj, true)
},
}); err != nil {
return nil, err
}
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger.V(6).Info("new claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, false)
},
UpdateFunc: func(old, updated interface{}) {
logger.V(6).Info("updated claim", "claimDump", updated)
ec.enqueueResourceClaim(logger, updated, false)
},
DeleteFunc: func(obj interface{}) {
logger.V(6).Info("deleted claim", "claimDump", obj)
ec.enqueueResourceClaim(logger, obj, true)
},
}); err != nil {
return nil, err
}
if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
}
// The mutation cache acts as an additional layer for the informer
// cache and after a create made by the controller returns that
// object until the informer catches up. That is necessary
// when a ResourceClaim got created, updating the pod status fails,
// and then a retry occurs before the informer cache is updated.
// In that scenario, the controller would create another claim
// instead of continuing with the existing one.
claimInformerCache := claimInformer.Informer().GetIndexer()
if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil {
return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
}
ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache,
// Very long time to live, unlikely to be needed because
// the informer cache should get updated soon.
time.Hour,
// Allow storing objects not in the underlying cache - that's the point...
// It's safe because in case of a race (claim is in mutation cache, claim
// gets deleted, controller updates status based on mutation cache) the
// "bad" pod status will get detected and fixed when the informer catches up.
true,
)
return ec, nil
}
func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
pod, ok := obj.(*v1.Pod)
if !ok {
// Not a pod?!
logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
return
}
if len(pod.Spec.ResourceClaims) == 0 {
// Nothing to do for it at all.
return
}
if deleted {
logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
ec.deletedObjects.Add(pod.UID)
}
logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
// Release reservations of a deleted or completed pod?
if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims {
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &podClaim)
switch {
case err != nil:
// Either the claim was not created (nothing to do here) or
// the API changed. The later will also get reported elsewhere,
// so here it's just a debug message.
logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason)
case claimName != nil:
key := claimKeyPrefix + pod.Namespace + "/" + *claimName
logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason)
ec.queue.Add(key)
default:
// Nothing to do, claim wasn't generated.
logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason)
}
}
}
needsWork, reason := ec.podNeedsWork(pod)
if needsWork {
logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
return
}
logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
}
func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
if deleted {
return false, "pod got removed"
}
if podutil.IsPodTerminal(pod) {
return false, "pod has terminated"
}
if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
return false, "pod got deleted before scheduling"
}
// Still needs claims.
return true, "pod might run"
}
// podNeedsWork checks whether a new or modified pod needs to be processed
// further by a worker. It returns a boolean with the result and an explanation
// for it.
func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
if pod.DeletionTimestamp != nil {
// Nothing else to do for the pod.
return false, "pod is deleted"
}
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
if err != nil {
return true, err.Error()
}
// If the claimName is nil, then it has been determined before
// that the claim is not needed.
if claimName == nil {
return false, "claim is not needed"
}
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
if apierrors.IsNotFound(err) {
if podClaim.Source.ResourceClaimTemplateName != nil {
return true, "must create ResourceClaim from template"
}
// User needs to create claim.
return false, "claim is missing and must be created by user"
}
if err != nil {
// Shouldn't happen.
return true, fmt.Sprintf("internal error while checking for claim: %v", err)
}
if checkOwner &&
resourceclaim.IsForPod(pod, claim) != nil {
// Cannot proceed with the pod unless that other claim gets deleted.
return false, "conflicting claim needs to be removed by user"
}
// This check skips over the reasons below that only apply
// when a pod has been scheduled already. We need to keep checking
// for more claims that might need to be created.
if pod.Spec.NodeName == "" {
continue
}
// Create PodSchedulingContext if the pod got scheduled without triggering
// delayed allocation.
//
// These can happen when:
// - a user created a pod with spec.nodeName set, perhaps for testing
// - some scheduler was used which is unaware of DRA
// - DRA was not enabled in kube-scheduler (version skew, configuration)
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
claim.Status.Allocation == nil {
scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
if apierrors.IsNotFound(err) {
return true, "need to create PodSchedulingContext for scheduled pod"
}
if err != nil {
// Shouldn't happen.
return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err)
}
if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
// Need to update PodSchedulingContext.
return true, "need to updated PodSchedulingContext for scheduled pod"
}
}
if claim.Status.Allocation != nil &&
!resourceclaim.IsReservedForPod(pod, claim) &&
resourceclaim.CanBeReserved(claim) {
// Need to reserve it.
return true, "need to reserve claim for pod"
}
}
return false, "nothing to do"
}
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return
}
if !deleted {
// When starting up, we have to check all claims to find those with
// stale pods in ReservedFor. During an update, a pod might get added
// that already no longer exists.
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
ec.queue.Add(key)
} else {
logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
}
// Also check whether this causes work for any of the currently
// known pods which use the ResourceClaim.
objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
if err != nil {
logger.Error(err, "listing pods from cache")
return
}
if len(objs) == 0 {
logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
return
}
for _, obj := range objs {
ec.enqueuePod(logger, obj, false)
}
}
func (ec *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer ec.queue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting ephemeral volume controller")
defer logger.Info("Shutting down ephemeral volume controller")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")})
ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
defer eventBroadcaster.Shutdown()
if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
}
<-ctx.Done()
}
func (ec *Controller) runWorker(ctx context.Context) {
for ec.processNextWorkItem(ctx) {
}
}
func (ec *Controller) processNextWorkItem(ctx context.Context) bool {
key, shutdown := ec.queue.Get()
if shutdown {
return false
}
defer ec.queue.Done(key)
err := ec.syncHandler(ctx, key.(string))
if err == nil {
ec.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
ec.queue.AddRateLimited(key)
return true
}
// syncHandler is invoked for each work item which might need to be processed.
// If an error is returned from this function, the item will be requeued.
func (ec *Controller) syncHandler(ctx context.Context, key string) error {
sep := strings.Index(key, ":")
if sep < 0 {
return fmt.Errorf("unexpected key: %s", key)
}
prefix, object := key[0:sep+1], key[sep+1:]
namespace, name, err := cache.SplitMetaNamespaceKey(object)
if err != nil {
return err
}
switch prefix {
case podKeyPrefix:
return ec.syncPod(ctx, namespace, name)
case claimKeyPrefix:
return ec.syncClaim(ctx, namespace, name)
default:
return fmt.Errorf("unexpected key prefix: %s", prefix)
}
}
func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
pod, err := ec.podLister.Pods(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("nothing to do for pod, it is gone")
return nil
}
return err
}
// Ignore pods which are already getting deleted.
if pod.DeletionTimestamp != nil {
logger.V(5).Info("nothing to do for pod, it is marked for deletion")
return nil
}
var newPodClaims map[string]string
for _, podClaim := range pod.Spec.ResourceClaims {
if err := ec.handleClaim(ctx, pod, podClaim, &newPodClaims); err != nil {
if ec.recorder != nil {
ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
}
return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
}
}
if newPodClaims != nil {
// Patch the pod status with the new information about
// generated ResourceClaims.
statuses := make([]*corev1apply.PodResourceClaimStatusApplyConfiguration, 0, len(newPodClaims))
for podClaimName, resourceClaimName := range newPodClaims {
statuses = append(statuses, corev1apply.PodResourceClaimStatus().WithName(podClaimName).WithResourceClaimName(resourceClaimName))
}
podApply := corev1apply.Pod(name, namespace).WithStatus(corev1apply.PodStatus().WithResourceClaimStatuses(statuses...))
if _, err := ec.kubeClient.CoreV1().Pods(namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
return fmt.Errorf("update pod %s/%s ResourceClaimStatuses: %v", namespace, name, err)
}
}
if pod.Spec.NodeName == "" {
// Scheduler will handle PodSchedulingContext and reservations.
logger.V(5).Info("nothing to do for pod, scheduler will deal with it")
return nil
}
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
if err != nil {
return err
}
// If nil, then it has been determined that the claim is not needed
// and can be skipped.
if claimName == nil {
continue
}
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("retrieve claim: %v", err)
}
if checkOwner {
if err := resourceclaim.IsForPod(pod, claim); err != nil {
return err
}
}
if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
claim.Status.Allocation == nil {
logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim))
return ec.ensurePodSchedulingContext(ctx, pod)
}
if claim.Status.Allocation != nil &&
!resourceclaim.IsReservedForPod(pod, claim) &&
resourceclaim.CanBeReserved(claim) {
logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim))
if err := ec.reserveForPod(ctx, pod, claim); err != nil {
return err
}
}
}
return nil
}
// handleResourceClaim is invoked for each volume of a pod.
func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name)
ctx = klog.NewContext(ctx, logger)
logger.V(5).Info("checking", "podClaim", podClaim.Name)
// resourceclaim.Name checks for the situation that the client doesn't
// know some future addition to the API. Therefore it gets called here
// even if there is no template to work on, because if some new field
// gets added, the expectation might be that the controller does
// something for it.
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim)
switch {
case errors.Is(err, resourceclaim.ErrClaimNotFound):
// Continue below.
case err != nil:
return fmt.Errorf("checking for claim before creating it: %v", err)
case claimName == nil:
// Nothing to do, no claim needed.
return nil
case *claimName != "":
claimName := *claimName
// The ResourceClaim should exist because it is recorded in the pod.status.resourceClaimStatuses,
// but perhaps it was deleted accidentally. In that case we re-create it.
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if claim != nil {
var err error
if mustCheckOwner {
err = resourceclaim.IsForPod(pod, claim)
}
if err == nil {
// Already created, nothing more to do.
logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
return nil
}
logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName)
}
}
templateName := podClaim.Source.ResourceClaimTemplateName
if templateName == nil {
// Nothing to do.
return nil
}
// Before we create a new ResourceClaim, check if there is an orphaned one.
// This covers the case that the controller has created it, but then fails
// before it can update the pod status.
claim, err := ec.findPodResourceClaim(pod, podClaim)
if err != nil {
return fmt.Errorf("finding ResourceClaim for claim %s in pod %s/%s failed: %v", podClaim.Name, pod.Namespace, pod.Name, err)
}
if claim == nil {
template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName)
if err != nil {
return fmt.Errorf("resource claim template %q: %v", *templateName, err)
}
// Create the ResourceClaim with pod as owner, with a generated name that uses
// <pod>-<claim name> as base.
isTrue := true
annotations := template.Spec.ObjectMeta.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations[podResourceClaimAnnotation] = podClaim.Name
generateName := pod.Name + "-" + podClaim.Name
maxBaseLen := 57 // Leave space for hyphen and 5 random characters in a name with 63 characters.
if len(generateName) > maxBaseLen {
// We could leave truncation to the apiserver, but as
// it removes at the end, we would loose everything
// from the pod claim name when the pod name is long.
// We can do better and truncate both strings,
// proportional to their length.
generateName = pod.Name[0:len(pod.Name)*maxBaseLen/len(generateName)] +
"-" +
podClaim.Name[0:len(podClaim.Name)*maxBaseLen/len(generateName)]
}
claim = &resourcev1alpha2.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
GenerateName: generateName,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: &isTrue,
BlockOwnerDeletion: &isTrue,
},
},
Annotations: annotations,
Labels: template.Spec.ObjectMeta.Labels,
},
Spec: template.Spec.Spec,
}
metrics.ResourceClaimCreateAttempts.Inc()
claimName := claim.Name
claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
if err != nil {
metrics.ResourceClaimCreateFailures.Inc()
return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
}
ec.claimCache.Mutation(claim)
}
// Remember the new ResourceClaim for a batch PodStatus update in our caller.
if *newPodClaims == nil {
*newPodClaims = make(map[string]string)
}
(*newPodClaims)[podClaim.Name] = claim.Name
return nil
}
// findPodResourceClaim looks for an existing ResourceClaim with the right
// annotation (ties it to the pod claim) and the right ownership (ties it to
// the pod).
func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) {
// Only claims owned by the pod will get returned here.
claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID))
if err != nil {
return nil, err
}
deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior.
for _, claimObj := range claims {
claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj)
}
podClaimName, ok := claim.Annotations[podResourceClaimAnnotation]
if ok && podClaimName != podClaim.Name {
continue
}
// No annotation? It might a ResourceClaim created for
// the pod with a previous Kubernetes release where the
// ResourceClaim name was deterministic, in which case
// we have to use it and update the new pod status
// field accordingly.
if !ok && claim.Name != deterministicName {
continue
}
// Pick the first one that matches. There shouldn't be more than one. If there is,
// then all others will be ignored until the pod gets deleted. Then they also get
// cleaned up.
return claim, nil
}
return nil, nil
}
func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error {
scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("retrieve PodSchedulingContext: %v", err)
}
if scheduling == nil {
scheduling = &resourcev1alpha2.PodSchedulingContext{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
Controller: pointer.Bool(true),
},
},
},
Spec: resourcev1alpha2.PodSchedulingContextSpec{
SelectedNode: pod.Spec.NodeName,
// There is no need for negotiation about
// potential and suitable nodes anymore, so
// PotentialNodes can be left empty.
},
}
if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create PodSchedulingContext: %v", err)
}
return nil
}
if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
scheduling := scheduling.DeepCopy()
scheduling.Spec.SelectedNode = pod.Spec.NodeName
if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err)
}
}
return nil
}
func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error {
claim = claim.DeepCopy()
claim.Status.ReservedFor = append(claim.Status.ReservedFor,
resourcev1alpha2.ResourceClaimConsumerReference{
Resource: "pods",
Name: pod.Name,
UID: pod.UID,
})
if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("reserve claim for pod: %v", err)
}
return nil
}
func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name))
ctx = klog.NewContext(ctx, logger)
claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("nothing to do for claim, it is gone")
return nil
}
return err
}
// Check if the ReservedFor entries are all still valid.
valid := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
for _, reservedFor := range claim.Status.ReservedFor {
if reservedFor.APIGroup == "" &&
reservedFor.Resource == "pods" {
// A pod falls into one of three categories:
// - we have it in our cache -> don't remove it until we are told that it got removed
// - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it
// - not in our cache, not seen -> double-check with API server before removal
keepEntry := true
// Tracking deleted pods in the LRU cache is an
// optimization. Without this cache, the code would
// have to do the API call below for every deleted pod
// to ensure that the pod really doesn't exist. With
// the cache, most of the time the pod will be recorded
// as deleted and the API call can be avoided.
if ec.deletedObjects.Has(reservedFor.UID) {
// We know that the pod was deleted. This is
// easy to check and thus is done first.
keepEntry = false
} else {
pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name)
switch {
case err != nil && !apierrors.IsNotFound(err):
return err
case err != nil:
// We might not have it in our informer cache
// yet. Removing the pod while the scheduler is
// scheduling it would be bad. We have to be
// absolutely sure and thus have to check with
// the API server.
pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if pod == nil || pod.UID != reservedFor.UID {
logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
}
case pod.UID != reservedFor.UID:
logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
case isPodDone(pod):
logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
keepEntry = false
}
}
if keepEntry {
valid = append(valid, reservedFor)
}
continue
}
// TODO: support generic object lookup
return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor)
}
logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid))
if len(valid) < len(claim.Status.ReservedFor) {
// TODO (#113700): patch
claim := claim.DeepCopy()
claim.Status.ReservedFor = valid
// When a ResourceClaim uses delayed allocation, then it makes sense to
// deallocate the claim as soon as the last consumer stops using
// it. This ensures that the claim can be allocated again as needed by
// some future consumer instead of trying to schedule that consumer
// onto the node that was chosen for the previous consumer. It also
// releases the underlying resources for use by other claims.
//
// This has to be triggered by the transition from "was being used" to
// "is not used anymore" because a DRA driver is not required to set
// `status.reservedFor` together with `status.allocation`, i.e. a claim
// that is "currently unused" should not get deallocated.
//
// This does not matter for claims that were created for a pod. For
// those, the resource claim controller will trigger deletion when the
// pod is done. However, it doesn't hurt to also trigger deallocation
// for such claims and not checking for them keeps this code simpler.
if len(valid) == 0 &&
claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
claim.Status.DeallocationRequested = true
}
_, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return err
}
}
if len(valid) == 0 {
// Claim is not reserved. If it was generated for a pod and
// that pod is not going to run, the claim can be
// deleted. Normally the garbage collector does that, but the
// pod itself might not get deleted for a while.
podName, podUID := owningPod(claim)
if podName != "" {
pod, err := ec.podLister.Pods(claim.Namespace).Get(podName)
switch {
case err == nil:
// Pod already replaced or not going to run?
if pod.UID != podUID || isPodDone(pod) {
// We are certain that the owning pod is not going to need
// the claim and therefore remove the claim.
logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod))
err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("delete claim: %v", err)
}
} else {
logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod)
}
case apierrors.IsNotFound(err):
// We might not know the pod *yet*. Instead of doing an expensive API call,
// let the garbage collector handle the case that the pod is truly gone.
logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName))
default:
return fmt.Errorf("lookup pod: %v", err)
}
} else {
logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim))
}
}
return nil
}
func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) {
for _, owner := range claim.OwnerReferences {
if pointer.BoolDeref(owner.Controller, false) &&
owner.APIVersion == "v1" &&
owner.Kind == "Pod" {
return owner.Name, owner.UID
}
}
return "", ""
}
// podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (=
// namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod.
func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
keys := []string{}
for _, podClaim := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &podClaim)
if err != nil || claimName == nil {
// Index functions are not supposed to fail, the caller will panic.
// For both error reasons (claim not created yet, unknown API)
// we simply don't index.
continue
}
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName))
}
return keys, nil
}
// isPodDone returns true if it is certain that none of the containers are running and never will run.
func isPodDone(pod *v1.Pod) bool {
return podutil.IsPodPhaseTerminal(pod.Status.Phase) ||
// Deleted and not scheduled:
pod.DeletionTimestamp != nil && pod.Spec.NodeName == ""
}
// claimPodOwnerIndexFunc is an index function that returns the pod UIDs of
// all pods which own the resource claim. Should only be one, though.
func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) {
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
if !ok {
return nil, nil
}
var keys []string
for _, owner := range claim.OwnerReferences {
if owner.Controller != nil &&
*owner.Controller &&
owner.APIVersion == "v1" &&
owner.Kind == "Pod" {
keys = append(keys, string(owner.UID))
}
}
return keys, nil
}