-
Notifications
You must be signed in to change notification settings - Fork 359
/
groupsnapshot_helper.go
833 lines (733 loc) · 38.9 KB
/
groupsnapshot_helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sidecar_controller
import (
"context"
"crypto/sha256"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
klog "k8s.io/klog/v2"
crdv1alpha1 "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumegroupsnapshot/v1alpha1"
crdv1 "github.com/kubernetes-csi/external-snapshotter/client/v7/apis/volumesnapshot/v1"
"github.com/kubernetes-csi/external-snapshotter/v7/pkg/utils"
)
func (ctrl *csiSnapshotSideCarController) storeGroupSnapshotContentUpdate(groupSnapshotContent interface{}) (bool, error) {
return utils.StoreObjectUpdate(ctrl.groupSnapshotContentStore, groupSnapshotContent, "groupsnapshotcontent")
}
// enqueueGroupSnapshotContentWork adds group snapshot content to given work queue.
func (ctrl *csiSnapshotSideCarController) enqueueGroupSnapshotContentWork(obj interface{}) {
// Beware of "xxx deleted" events
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
obj = unknown.Obj
}
if groupSnapshotContent, ok := obj.(*crdv1alpha1.VolumeGroupSnapshotContent); ok {
objName, err := cache.DeletionHandlingMetaNamespaceKeyFunc(groupSnapshotContent)
if err != nil {
klog.Errorf("failed to get key from object: %v, %v", err, groupSnapshotContent)
return
}
klog.V(5).Infof("enqueued %q for sync", objName)
ctrl.groupSnapshotContentQueue.Add(objName)
}
}
// groupSnapshotContentWorker processes items from groupSnapshotContentQueue.
// It must run only once, syncGroupSnapshotContent is not assured to be reentrant.
func (ctrl *csiSnapshotSideCarController) groupSnapshotContentWorker() {
keyObj, quit := ctrl.groupSnapshotContentQueue.Get()
if quit {
return
}
defer ctrl.groupSnapshotContentQueue.Done(keyObj)
if err := ctrl.syncGroupSnapshotContentByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
ctrl.groupSnapshotContentQueue.AddRateLimited(keyObj)
klog.V(4).Infof("Failed to sync group snapshot content %q, will retry again: %v", keyObj.(string), err)
return
}
// Finally, if no error occurs we forget this item so it does not
// get queued again until another change happens.
ctrl.groupSnapshotContentQueue.Forget(keyObj)
return
}
func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContentByKey(key string) error {
klog.V(5).Infof("syncGroupSnapshotContentByKey[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.V(4).Infof("error getting name of groupSnapshotContent %q from informer: %v", key, err)
return nil
}
groupSnapshotContent, err := ctrl.groupSnapshotContentLister.Get(name)
// The group snapshot content still exists in informer cache, the event must
// have been add/update/sync
if err == nil {
if ctrl.isDriverMatch(groupSnapshotContent) {
err = ctrl.updateGroupSnapshotContentInInformerCache(groupSnapshotContent)
}
if err != nil {
// If error occurs we add this item back to the queue
return err
}
return nil
}
if !errors.IsNotFound(err) {
klog.V(2).Infof("error getting group snapshot content %q from informer: %v", key, err)
return nil
}
// The groupSnapshotContent is not in informer cache, the event must have been
// "delete"
groupSnapshotContentObj, found, err := ctrl.groupSnapshotContentStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting group snapshot content %q from cache: %v", key, err)
return nil
}
if !found {
// The controller has already processed the delete event and
// deleted the group snapshot content from its cache
klog.V(2).Infof("deletion of group snapshot content %q was already processed", key)
return nil
}
groupSnapshotContent, ok := groupSnapshotContentObj.(*crdv1alpha1.VolumeGroupSnapshotContent)
if !ok {
klog.Errorf("expected group snapshot content, got %+v", groupSnapshotContent)
return nil
}
ctrl.deleteGroupSnapshotContentInCacheStore(groupSnapshotContent)
return nil
}
// updateGroupSnapshotContentInInformerCache runs in worker thread and handles
// "group snapshot content added", "group snapshot content updated" and "periodic
// sync" events.
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentInInformerCache(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
// Store the new group snapshot content version in the cache and do not process
// it if this is an old version.
new, err := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContent)
if err != nil {
klog.Errorf("%v", err)
}
if !new {
return nil
}
err = ctrl.syncGroupSnapshotContent(groupSnapshotContent)
if err != nil {
if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync group snapshot content %q: %+v", groupSnapshotContent.Name, err)
} else {
klog.Errorf("could not sync group snapshot content %q: %+v", groupSnapshotContent.Name, err)
}
return err
}
return nil
}
// deleteGroupSnapshotContentInCacheStore runs in worker thread and handles "group
// snapshot content deleted" event.
func (ctrl *csiSnapshotSideCarController) deleteGroupSnapshotContentInCacheStore(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) {
_ = ctrl.groupSnapshotContentStore.Delete(groupSnapshotContent)
klog.V(4).Infof("group snapshot content %q deleted", groupSnapshotContent.Name)
}
// syncGroupSnapshotContent deals with one key off the queue. It returns false when it's time to quit.
func (ctrl *csiSnapshotSideCarController) syncGroupSnapshotContent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("synchronizing VolumeGroupSnapshotContent[%s]", groupSnapshotContent.Name)
if ctrl.shouldDeleteGroupSnapshotContent(groupSnapshotContent) {
klog.V(4).Infof("VolumeGroupSnapshotContent[%s]: the policy is %s", groupSnapshotContent.Name, groupSnapshotContent.Spec.DeletionPolicy)
if groupSnapshotContent.Spec.DeletionPolicy == crdv1.VolumeSnapshotContentDelete &&
groupSnapshotContent.Status != nil && groupSnapshotContent.Status.VolumeGroupSnapshotHandle != nil {
// issue a CSI deletion call if the group snapshot has not been deleted
// yet from underlying storage system. Note that the delete group snapshot
// operation will update groups snapshot content's GroupSnapshotHandle
// to nil upon a successful deletion. At this point, the finalizer on
// group snapshot content should NOT be removed to avoid leaking.
return ctrl.deleteCSIGroupSnapshotOperation(groupSnapshotContent)
}
// otherwise, either the snapshot has been deleted from the underlying
// storage system, or the deletion policy is Retain, remove the finalizer
// if there is one so that API server could delete the object if there is
// no other finalizer.
return ctrl.removeGroupSnapshotContentFinalizer(groupSnapshotContent)
}
if len(groupSnapshotContent.Spec.Source.VolumeHandles) != 0 && groupSnapshotContent.Status == nil {
klog.V(5).Infof("syncGroupSnapshotContent: Call CreateGroupSnapshot for group snapshot content %s", groupSnapshotContent.Name)
return ctrl.createGroupSnapshot(groupSnapshotContent)
}
// Skip checkandUpdateGroupSnapshotContentStatus() if ReadyToUse is already
// true. We don't want to keep calling CreateGroupSnapshot CSI methods over
// and over again for performance reasons.
var err error
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.ReadyToUse != nil && *groupSnapshotContent.Status.ReadyToUse == true {
// Try to remove AnnVolumeGroupSnapshotBeingCreated if it is not removed yet for some reason
_, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
return err
}
return ctrl.checkandUpdateGroupSnapshotContentStatus(groupSnapshotContent)
}
// removeGroupSnapshotContentFinalizer removes the VolumeGroupSnapshotContentFinalizer from a
// group snapshot content if there exists one.
func (ctrl csiSnapshotSideCarController) removeGroupSnapshotContentFinalizer(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
if !utils.ContainsString(groupSnapshotContent.ObjectMeta.Finalizers, utils.VolumeGroupSnapshotContentFinalizer) {
// the finalizer does not exit, return directly
return nil
}
var patches []utils.PatchOp
groupSnapshotContentClone := groupSnapshotContent.DeepCopy()
patches = append(patches,
utils.PatchOp{
Op: "replace",
Path: "/metadata/finalizers",
Value: utils.RemoveString(groupSnapshotContentClone.ObjectMeta.Finalizers, utils.VolumeGroupSnapshotContentFinalizer),
})
updatedGroupSnapshotContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContentClone, patches, ctrl.clientset)
if err != nil {
return newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
klog.V(5).Infof("Removed protection finalizer from volume group snapshot content %s", updatedGroupSnapshotContent.Name)
_, err = ctrl.storeGroupSnapshotContentUpdate(updatedGroupSnapshotContent)
if err != nil {
klog.Errorf("failed to update group snapshot content store %v", err)
}
return nil
}
// Delete a groupsnapshot: Ask the backend to remove the groupsnapshot device
func (ctrl *csiSnapshotSideCarController) deleteCSIGroupSnapshotOperation(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("deleteCSIGroupSnapshotOperation [%s] started", groupSnapshotContent.Name)
snapshotterCredentials, err := ctrl.GetCredentialsFromAnnotationForGroupSnapshot(groupSnapshotContent)
if err != nil {
ctrl.eventRecorder.Event(groupSnapshotContent, v1.EventTypeWarning, "GroupSnapshotDeleteError", "Failed to get snapshot credentials")
return fmt.Errorf("failed to get input parameters to delete group snapshot for group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
var snapshotIDs []string
if groupSnapshotContent.Status != nil && len(groupSnapshotContent.Status.VolumeSnapshotContentRefList) != 0 {
for _, contentRef := range groupSnapshotContent.Status.VolumeSnapshotContentRefList {
snapshotContent, err := ctrl.contentLister.Get(contentRef.Name)
if err != nil {
return fmt.Errorf("failed to get snapshot content %s from snapshot content store: %v", contentRef.Name, err)
}
snapshotIDs = append(snapshotIDs, *snapshotContent.Status.SnapshotHandle)
}
}
err = ctrl.handler.DeleteGroupSnapshot(groupSnapshotContent, snapshotIDs, snapshotterCredentials)
if err != nil {
ctrl.eventRecorder.Event(groupSnapshotContent, v1.EventTypeWarning, "GroupSnapshotDeleteError", "Failed to delete group snapshot")
return fmt.Errorf("failed to delete group snapshot %#v, err: %v", groupSnapshotContent.Name, err)
}
// the group snapshot has been deleted from the underlying storage system, update
// group snapshot content status to remove the group snapshot handle etc.
newContent, err := ctrl.clearGroupSnapshotContentStatus(groupSnapshotContent.Name)
if err != nil {
ctrl.eventRecorder.Event(groupSnapshotContent, v1.EventTypeWarning, "GroupSnapshotDeleteError", "Failed to clear content status")
return err
}
// trigger syncGroupSnapshotContent
ctrl.updateGroupSnapshotContentInInformerCache(newContent)
return nil
}
// clearGroupSnapshotContentStatus resets all fields to nil related to a group snapshot
// in groupSnapshotContent.Status. On success, the latest version of the group snapshot
// content object will be returned.
func (ctrl *csiSnapshotSideCarController) clearGroupSnapshotContentStatus(
groupSnapshotContentName string) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.V(5).Infof("clearGroupSnapshotContentStatus content [%s]", groupSnapshotContentName)
// get the latest version from API server
groupSnapshotContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Get(context.TODO(), groupSnapshotContentName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot content %s from api server: %v", groupSnapshotContentName, err)
}
if groupSnapshotContent.Status != nil {
groupSnapshotContent.Status.VolumeGroupSnapshotHandle = nil
groupSnapshotContent.Status.ReadyToUse = nil
groupSnapshotContent.Status.CreationTime = nil
groupSnapshotContent.Status.Error = nil
groupSnapshotContent.Status.VolumeSnapshotContentRefList = nil
}
newContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().UpdateStatus(context.TODO(), groupSnapshotContent, metav1.UpdateOptions{})
if err != nil {
return groupSnapshotContent, newControllerUpdateError(groupSnapshotContentName, err.Error())
}
return newContent, nil
}
func (ctrl *csiSnapshotSideCarController) GetCredentialsFromAnnotationForGroupSnapshot(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (map[string]string, error) {
// get secrets if VolumeGroupSnapshotClass specifies it
var snapshotterCredentials map[string]string
var err error
// Check if annotation exists
if metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnDeletionSecretRefName) && metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnDeletionSecretRefNamespace) {
annDeletionSecretName := groupSnapshotContent.Annotations[utils.AnnDeletionSecretRefName]
annDeletionSecretNamespace := groupSnapshotContent.Annotations[utils.AnnDeletionSecretRefNamespace]
snapshotterSecretRef := &v1.SecretReference{}
if annDeletionSecretName == "" || annDeletionSecretNamespace == "" {
return nil, fmt.Errorf("cannot retrieve secrets for group snapshot content %#v, err: secret name or namespace not specified", groupSnapshotContent.Name)
}
snapshotterSecretRef.Name = annDeletionSecretName
snapshotterSecretRef.Namespace = annDeletionSecretNamespace
snapshotterCredentials, err = utils.GetCredentials(ctrl.client, snapshotterSecretRef)
if err != nil {
// Continue with deletion, as the secret may have already been deleted.
klog.Errorf("Failed to get credentials for group snapshot content %s: %s", groupSnapshotContent.Name, err.Error())
return nil, fmt.Errorf("cannot get credentials for group snapshot content %#v", groupSnapshotContent.Name)
}
}
return snapshotterCredentials, nil
}
// shouldDeleteGroupSnapshotContent checks if groupSnapshotContent object should be deleted
// if DeletionTimestamp is set on the groupSnapshotContent
func (ctrl *csiSnapshotSideCarController) shouldDeleteGroupSnapshotContent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) bool {
klog.V(5).Infof("Check if VolumeGroupSnapshotContent[%s] should be deleted.", groupSnapshotContent.Name)
if groupSnapshotContent.ObjectMeta.DeletionTimestamp == nil {
return false
}
// 1) shouldDeleteGroupSnapshot returns true if a content is not bound
// (VolumeGroupSnapshotRef == "") for pre-provisioned snapshot
if groupSnapshotContent.Spec.Source.GroupSnapshotHandles != nil && groupSnapshotContent.Spec.VolumeGroupSnapshotRef.UID == "" {
return true
}
// NOTE(xyang): Handle create snapshot timeout
// 2) shouldDeleteGroupSnapshotContent returns false if AnnVolumeGroupSnapshotBeingCreated
// annotation is set. This indicates a CreateGroupSnapshot CSI RPC has
// not responded with success or failure.
// We need to keep waiting for a response from the CSI driver.
if metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
return false
}
// 3) shouldDeleteGroupSnapshotContent returns true if AnnVolumeSnapshotBeingDeleted annotation is set
if metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingDeleted) {
return true
}
return false
}
// createGroupSnapshot starts new asynchronous operation to create group snapshot
func (ctrl *csiSnapshotSideCarController) createGroupSnapshot(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("createGroupSnapshot for group snapshot content [%s]: started", groupSnapshotContent.Name)
groupSnapshotContentObj, err := ctrl.createGroupSnapshotWrapper(groupSnapshotContent)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContentObj, v1.EventTypeWarning, "GroupSnapshotCreationFailed", fmt.Sprintf("Failed to create group snapshot: %v", err))
klog.Errorf("createGroupSnapshot for groupSnapshotContent [%s]: error occurred in createGroupSnapshotWrapper: %v", groupSnapshotContent.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("createGroupSnapshot for groupSnapshotContent [%s]: cannot update internal groupSnapshotContent cache: %v", groupSnapshotContent.Name, updateErr)
}
return nil
}
// This is a wrapper function for the group snapshot creation process.
func (ctrl *csiSnapshotSideCarController) createGroupSnapshotWrapper(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.Infof("createGroupSnapshotWrapper: Creating group snapshot for group snapshot content %s through the plugin ...", groupSnapshotContent.Name)
class, snapshotterCredentials, err := ctrl.getCSIGroupSnapshotInput(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to get input parameters to create group snapshot for group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
// NOTE(xyang): handle create timeout
// Add an annotation to indicate the group snapshot creation request has been
// sent to the storage system and the controller is waiting for a response.
// The annotation will be removed after the storage system has responded with
// success or permanent failure. If the request times out, annotation will
// remain on the groupSnapshotContent to avoid potential leaking of a group snapshot resource on
// the storage system.
groupSnapshotContent, err = ctrl.setAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to add VolumeGroupSnapshotBeingCreated annotation on the group snapshot content %s: %q", groupSnapshotContent.Name, err)
}
parameters, err := utils.RemovePrefixedParameters(class.Parameters)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}
if ctrl.extraCreateMetadata {
parameters[utils.PrefixedVolumeGroupSnapshotNameKey] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Name
parameters[utils.PrefixedVolumeGroupSnapshotNamespaceKey] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Namespace
parameters[utils.PrefixedVolumeGroupSnapshotContentNameKey] = groupSnapshotContent.Name
}
driverName, groupSnapshotID, snapshots, creationTime, readyToUse, err := ctrl.handler.CreateGroupSnapshot(groupSnapshotContent, parameters, snapshotterCredentials)
if err != nil {
// NOTE(xyang): handle create timeout
// If it is a final error, remove annotation to indicate
// storage system has responded with an error
klog.Infof("createGroupSnapshotWrapper: CreateGroupSnapshot for groupSnapshotContent %s returned error: %v", groupSnapshotContent.Name, err)
if isCSIFinalError(err) {
var removeAnnotationErr error
if groupSnapshotContent, removeAnnotationErr = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent); removeAnnotationErr != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation from the group snapshot content %s: %s", groupSnapshotContent.Name, removeAnnotationErr)
}
}
return groupSnapshotContent, fmt.Errorf("failed to take group snapshot of the volumes %s: %q", groupSnapshotContent.Spec.Source.VolumeHandles, err)
}
klog.V(5).Infof("Created group snapshot: driver %s, groupSnapshotId %s, creationTime %v, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
// Create individual snapshots and snapshot contents
var snapshotContentNames []string
for _, snapshot := range snapshots {
volumeSnapshotContentName := GetSnapshotContentNameForVolumeGroupSnapshotContent(string(groupSnapshotContent.UID), snapshot.SourceVolumeId)
volumeSnapshotName := GetSnapshotNameForVolumeGroupSnapshotContent(string(groupSnapshotContent.UID), snapshot.SourceVolumeId)
volumeSnapshotNamespace := groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Namespace
volumeSnapshotContent := &crdv1.VolumeSnapshotContent{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotContentName,
},
Spec: crdv1.VolumeSnapshotContentSpec{
VolumeSnapshotRef: v1.ObjectReference{
Kind: "VolumeSnapshots",
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
},
DeletionPolicy: groupSnapshotContent.Spec.DeletionPolicy,
Driver: groupSnapshotContent.Spec.Driver,
Source: crdv1.VolumeSnapshotContentSource{
SnapshotHandle: &snapshot.SnapshotId,
},
// TODO: Populate this field when volume mode conversion is enabled by default
SourceVolumeMode: nil,
},
Status: &crdv1.VolumeSnapshotContentStatus{
VolumeGroupSnapshotHandle: &groupSnapshotContent.Name,
},
}
label := make(map[string]string)
label["volumeGroupSnapshotName"] = groupSnapshotContent.Spec.VolumeGroupSnapshotRef.Name
volumeSnapshot := &crdv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: volumeSnapshotName,
Namespace: volumeSnapshotNamespace,
Labels: label,
Finalizers: []string{utils.VolumeSnapshotInGroupFinalizer},
},
Spec: crdv1.VolumeSnapshotSpec{
Source: crdv1.VolumeSnapshotSource{
VolumeSnapshotContentName: &volumeSnapshotContentName,
},
},
}
vsc, err := ctrl.clientset.SnapshotV1().VolumeSnapshotContents().Create(context.TODO(), volumeSnapshotContent, metav1.CreateOptions{})
if err != nil {
return groupSnapshotContent, err
}
snapshotContentNames = append(snapshotContentNames, vsc.Name)
_, err = ctrl.clientset.SnapshotV1().VolumeSnapshots(volumeSnapshotNamespace).Create(context.TODO(), volumeSnapshot, metav1.CreateOptions{})
if err != nil {
return groupSnapshotContent, err
}
_, err = ctrl.updateSnapshotContentStatus(volumeSnapshotContent, snapshot.SnapshotId, snapshot.ReadyToUse, snapshot.CreationTime.AsTime().UnixNano(), snapshot.SizeBytes, groupSnapshotID)
if err != nil {
return groupSnapshotContent, err
}
}
newGroupSnapshotContent, err := ctrl.updateGroupSnapshotContentStatus(groupSnapshotContent, groupSnapshotID, readyToUse, creationTime.UnixNano(), snapshotContentNames)
if err != nil {
klog.Errorf("error updating status for volume group snapshot content %s: %v.", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("error updating status for volume group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
groupSnapshotContent = newGroupSnapshotContent
// NOTE(xyang): handle create timeout
// Remove annotation to indicate storage system has successfully
// cut the group snapshot
groupSnapshotContent, err = ctrl.removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent)
if err != nil {
return groupSnapshotContent, fmt.Errorf("failed to remove VolumeGroupSnapshotBeingCreated annotation on the groupSnapshotContent %s: %q", groupSnapshotContent.Name, err)
}
return groupSnapshotContent, nil
}
func (ctrl *csiSnapshotSideCarController) getCSIGroupSnapshotInput(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotClass, map[string]string, error) {
className := groupSnapshotContent.Spec.VolumeGroupSnapshotClassName
klog.V(5).Infof("getCSIGroupSnapshotInput for group snapshot content [%s]", groupSnapshotContent.Name)
var class *crdv1alpha1.VolumeGroupSnapshotClass
var err error
if className != nil {
class, err = ctrl.getGroupSnapshotClass(*className)
if err != nil {
klog.Errorf("getCSISnapshotInput failed to getClassFromVolumeGroupSnapshot %s", err)
return nil, nil, err
}
} else {
// If dynamic provisioning, return failure if no group snapshot class
if len(groupSnapshotContent.Spec.Source.VolumeHandles) != 0 {
klog.Errorf("failed to getCSISnapshotInput %s without a group snapshot class", groupSnapshotContent.Name)
return nil, nil, fmt.Errorf("failed to take group snapshot %s without a group snapshot class", groupSnapshotContent.Name)
}
// For pre-provisioned group snapshot, group snapshot class is not required
klog.V(5).Infof("getCSISnapshotInput for groupSnapshotContent [%s]: no VolumeGroupSnapshotClassName provided for pre-provisioned group snapshot", groupSnapshotContent.Name)
}
// TODO: Resolve snapshotting secret credentials.
return class, nil, nil
}
// getGroupSnapshotClass is a helper function to get group snapshot class from the class name.
func (ctrl *csiSnapshotSideCarController) getGroupSnapshotClass(className string) (*crdv1alpha1.VolumeGroupSnapshotClass, error) {
klog.V(5).Infof("getGroupSnapshotClass: VolumeGroupSnapshotClassName [%s]", className)
class, err := ctrl.groupSnapshotClassLister.Get(className)
if err != nil {
klog.Errorf("failed to retrieve group snapshot class %s from the informer: %q", className, err)
return nil, err
}
return class, nil
}
// setAnnVolumeGroupSnapshotBeingCreated sets VolumeGroupSnapshotBeingCreated annotation
// on VolumeGroupSnapshotContent
// If set, it indicates group snapshot is being created
func (ctrl *csiSnapshotSideCarController) setAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation already exists, return directly
return groupSnapshotContent, nil
}
// Set AnnVolumeGroupSnapshotBeingCreated
// Combine existing annotations with the new annotations.
// If there are no existing annotations, we create a new map.
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: set annotation [%s:yes] on groupSnapshotContent [%s].", utils.AnnVolumeGroupSnapshotBeingCreated, groupSnapshotContent.Name)
patchedAnnotations := make(map[string]string)
for k, v := range groupSnapshotContent.GetAnnotations() {
patchedAnnotations[k] = v
}
patchedAnnotations[utils.AnnVolumeGroupSnapshotBeingCreated] = "yes"
var patches []utils.PatchOp
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/metadata/annotations",
Value: patchedAnnotations,
})
patchedGroupSnapshotContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset)
if err != nil {
return groupSnapshotContent, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
// update groupSnapshotContent if update is successful
groupSnapshotContent = patchedGroupSnapshotContent
_, err = ctrl.storeContentUpdate(groupSnapshotContent)
if err != nil {
klog.V(4).Infof("setAnnVolumeGroupSnapshotBeingCreated for groupSnapshotContent [%s]: cannot update internal cache %v", groupSnapshotContent.Name, err)
}
klog.V(5).Infof("setAnnVolumeGroupSnapshotBeingCreated: volume group snapshot content %+v", groupSnapshotContent)
return groupSnapshotContent, nil
}
// removeAnnVolumeGroupSnapshotBeingCreated removes the VolumeGroupSnapshotBeingCreated
// annotation from a groupSnapshotContent if there exists one.
func (ctrl csiSnapshotSideCarController) removeAnnVolumeGroupSnapshotBeingCreated(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
if !metav1.HasAnnotation(groupSnapshotContent.ObjectMeta, utils.AnnVolumeGroupSnapshotBeingCreated) {
// the annotation does not exist, return directly
return groupSnapshotContent, nil
}
groupSnapshotContentClone := groupSnapshotContent.DeepCopy()
annotationPatchPath := strings.ReplaceAll(utils.AnnVolumeGroupSnapshotBeingCreated, "/", "~1")
var patches []utils.PatchOp
patches = append(patches, utils.PatchOp{
Op: "remove",
Path: "/metadata/annotations/" + annotationPatchPath,
})
updatedGroupSnapshotContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContentClone, patches, ctrl.clientset)
if err != nil {
return groupSnapshotContent, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
klog.V(5).Infof("Removed VolumeGroupSnapshotBeingCreated annotation from volume group snapshot content %s", groupSnapshotContent.Name)
_, err = ctrl.storeGroupSnapshotContentUpdate(updatedGroupSnapshotContent)
if err != nil {
klog.Errorf("failed to update groupSnapshotContent store %v", err)
}
return updatedGroupSnapshotContent, nil
}
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentStatus(
groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent,
groupSnapshotHandle string,
readyToUse bool,
createdAt int64,
snapshotContentNames []string) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
klog.V(5).Infof("updateGroupSnapshotContentStatus: updating VolumeGroupSnapshotContent [%s], groupSnapshotHandle %s, readyToUse %v, createdAt %v", groupSnapshotContent.Name, groupSnapshotHandle, readyToUse, createdAt)
groupSnapshotContentObj, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().Get(context.TODO(), groupSnapshotContent.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error get group snapshot content %s from api server: %v", groupSnapshotContent.Name, err)
}
var newStatus *crdv1alpha1.VolumeGroupSnapshotContentStatus
updated := false
if groupSnapshotContentObj.Status == nil {
newStatus = &crdv1alpha1.VolumeGroupSnapshotContentStatus{
VolumeGroupSnapshotHandle: &groupSnapshotHandle,
ReadyToUse: &readyToUse,
CreationTime: &createdAt,
}
for _, name := range snapshotContentNames {
newStatus.VolumeSnapshotContentRefList = append(newStatus.VolumeSnapshotContentRefList, v1.ObjectReference{
Kind: "VolumeSnapshotContent",
Name: name,
})
}
updated = true
} else {
newStatus = groupSnapshotContentObj.Status.DeepCopy()
if newStatus.VolumeGroupSnapshotHandle == nil {
newStatus.VolumeGroupSnapshotHandle = &groupSnapshotHandle
updated = true
}
if newStatus.ReadyToUse == nil || *newStatus.ReadyToUse != readyToUse {
newStatus.ReadyToUse = &readyToUse
updated = true
if readyToUse && newStatus.Error != nil {
newStatus.Error = nil
}
}
if newStatus.CreationTime == nil {
newStatus.CreationTime = &createdAt
updated = true
}
if len(newStatus.VolumeSnapshotContentRefList) == 0 {
for _, name := range snapshotContentNames {
newStatus.VolumeSnapshotContentRefList = append(newStatus.VolumeSnapshotContentRefList, v1.ObjectReference{
Kind: "VolumeSnapshotContent",
Name: name,
})
}
updated = true
}
}
if updated {
groupSnapshotContentClone := groupSnapshotContentObj.DeepCopy()
groupSnapshotContentClone.Status = newStatus
newContent, err := ctrl.clientset.GroupsnapshotV1alpha1().VolumeGroupSnapshotContents().UpdateStatus(context.TODO(), groupSnapshotContentClone, metav1.UpdateOptions{})
if err != nil {
return groupSnapshotContentObj, newControllerUpdateError(groupSnapshotContent.Name, err.Error())
}
return newContent, nil
}
return groupSnapshotContentObj, nil
}
// updateContentStatusWithEvent saves new groupSnapshotContent.Status to API server
// and emits given event on the groupSnapshotContent. It saves the status and emits
// the event only when the status has actually changed from the version saved in API server.
// Parameters:
//
// * groupSnapshotContent - group snapshot content to update
// * eventtype, reason, message - event to send, see EventRecorder.Event()
func (ctrl *csiSnapshotSideCarController) updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent, eventtype, reason, message string) error {
klog.V(5).Infof("updateGroupSnapshotContentErrorStatusWithEvent[%s]", groupSnapshotContent.Name)
if groupSnapshotContent.Status != nil && groupSnapshotContent.Status.Error != nil && *groupSnapshotContent.Status.Error.Message == message {
klog.V(4).Infof("updateGroupSnapshotContentErrorStatusWithEvent[%s]: the same error %v is already set", groupSnapshotContent.Name, groupSnapshotContent.Status.Error)
return nil
}
var patches []utils.PatchOp
ready := false
groupSnapshotContentStatusError := &crdv1.VolumeSnapshotError{
Time: &metav1.Time{
Time: time.Now(),
},
Message: &message,
}
if groupSnapshotContent.Status == nil {
// Initialize status if nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status",
Value: &crdv1alpha1.VolumeGroupSnapshotContentStatus{
ReadyToUse: &ready,
Error: groupSnapshotContentStatusError,
},
})
} else {
// Patch status if non-nil
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/error",
Value: groupSnapshotContentStatusError,
})
patches = append(patches, utils.PatchOp{
Op: "replace",
Path: "/status/readyToUse",
Value: &ready,
})
}
newContent, err := utils.PatchVolumeGroupSnapshotContent(groupSnapshotContent, patches, ctrl.clientset, "status")
// Emit the event even if the status update fails so that user can see the error
ctrl.eventRecorder.Event(newContent, eventtype, reason, message)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status failed %v", groupSnapshotContent.Name, err)
return err
}
_, err = ctrl.storeGroupSnapshotContentUpdate(newContent)
if err != nil {
klog.V(4).Infof("updating VolumeGroupSnapshotContent[%s] error status: cannot update internal cache %v", groupSnapshotContent.Name, err)
return err
}
return nil
}
// GetSnapshotNameForVolumeGroupSnapshotContent returns a unique snapshot name for a VolumeGroupSnapshotContent.
func GetSnapshotNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, pvUUID string) string {
return fmt.Sprintf("snapshot-%x-%s", sha256.Sum256([]byte(groupSnapshotContentUUID+pvUUID)), time.Now().Format("2006-01-02-3.4.5"))
}
// GetSnapshotContentNameForVolumeGroupSnapshotContent returns a unique content name for the
// passed in VolumeGroupSnapshotContent.
func GetSnapshotContentNameForVolumeGroupSnapshotContent(groupSnapshotContentUUID, pvUUID string) string {
return fmt.Sprintf("snapcontent-%x-%s", sha256.Sum256([]byte(groupSnapshotContentUUID+pvUUID)), time.Now().Format("2006-01-02-3.4.5"))
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatus(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) error {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatus[%s] started", groupSnapshotContent.Name)
groupSnapshotContentObj, err := ctrl.checkandUpdateGroupSnapshotContentStatusOperation(groupSnapshotContent)
if err != nil {
ctrl.updateGroupSnapshotContentErrorStatusWithEvent(groupSnapshotContentObj, v1.EventTypeWarning, "GroupSnapshotContentCheckandUpdateFailed", fmt.Sprintf("Failed to check and update group snapshot content: %v", err))
klog.Errorf("checkandUpdateGroupSnapshotContentStatus [%s]: error occurred %v", groupSnapshotContent.Name, err)
return err
}
_, updateErr := ctrl.storeGroupSnapshotContentUpdate(groupSnapshotContentObj)
if updateErr != nil {
// We will get a "group snapshot update" event soon, this is not a big error
klog.V(4).Infof("checkandUpdateGroupSnapshotContentStatus [%s]: cannot update internal cache: %v", groupSnapshotContent.Name, updateErr)
}
return nil
}
func (ctrl *csiSnapshotSideCarController) checkandUpdateGroupSnapshotContentStatusOperation(groupSnapshotContent *crdv1alpha1.VolumeGroupSnapshotContent) (*crdv1alpha1.VolumeGroupSnapshotContent, error) {
var err error
var creationTime time.Time
readyToUse := false
var driverName string
var groupSnapshotID string
var snapshotterListCredentials map[string]string
if groupSnapshotContent.Spec.Source.GroupSnapshotHandles != nil {
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: call GetGroupSnapshotStatus for group snapshot which is pre-bound to group snapshot content [%s]", groupSnapshotContent.Name)
if groupSnapshotContent.Spec.VolumeGroupSnapshotClassName != nil {
class, err := ctrl.getGroupSnapshotClass(*groupSnapshotContent.Spec.VolumeGroupSnapshotClassName)
if err != nil {
klog.Errorf("Failed to get group snapshot class %s for group snapshot content %s: %v", *groupSnapshotContent.Spec.VolumeGroupSnapshotClassName, groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get group snapshot class %s for group snapshot content %s: %v", *groupSnapshotContent.Spec.VolumeGroupSnapshotClassName, groupSnapshotContent.Name, err)
}
snapshotterListSecretRef, err := utils.GetSecretReference(utils.SnapshotterListSecretParams, class.Parameters, groupSnapshotContent.GetObjectMeta().GetName(), nil)
if err != nil {
klog.Errorf("Failed to get secret reference for group snapshot content %s: %v", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get secret reference for group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
snapshotterListCredentials, err = utils.GetCredentials(ctrl.client, snapshotterListSecretRef)
if err != nil {
// Continue with deletion, as the secret may have already been deleted.
klog.Errorf("Failed to get credentials for group snapshot content %s: %v", groupSnapshotContent.Name, err)
return groupSnapshotContent, fmt.Errorf("failed to get credentials for group snapshot content %s: %v", groupSnapshotContent.Name, err)
}
}
snapshotIDs := groupSnapshotContent.Spec.Source.GroupSnapshotHandles.VolumeSnapshotHandles
readyToUse, creationTime, err = ctrl.handler.GetGroupSnapshotStatus(groupSnapshotContent, snapshotIDs, snapshotterListCredentials)
if err != nil {
klog.Errorf("checkandUpdateGroupSnapshotContentStatusOperation: failed to call get group snapshot status to check whether group snapshot is ready to use %q", err)
return groupSnapshotContent, err
}
driverName = groupSnapshotContent.Spec.Driver
groupSnapshotID = groupSnapshotContent.Spec.Source.GroupSnapshotHandles.VolumeGroupSnapshotHandle
klog.V(5).Infof("checkandUpdateGroupSnapshotContentStatusOperation: driver %s, groupSnapshotId %s, creationTime %v, readyToUse %t", driverName, groupSnapshotID, creationTime, readyToUse)
if creationTime.IsZero() {
creationTime = time.Now()
}
// TODO: Get a reference to snapshot contents for this volume group snapshot
updatedContent, err := ctrl.updateGroupSnapshotContentStatus(groupSnapshotContent, groupSnapshotID, readyToUse, creationTime.UnixNano(), []string{})
if err != nil {
return groupSnapshotContent, err
}
return updatedContent, nil
}
return ctrl.createGroupSnapshotWrapper(groupSnapshotContent)
}