/
cnsvolumemetadata_controller.go
547 lines (508 loc) · 23 KB
/
cnsvolumemetadata_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cnsvolumemetadata
import (
"context"
"fmt"
"os"
"reflect"
"strconv"
"sync"
"time"
commonconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common/commonco"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
"github.com/davecgh/go-spew/spew"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
cnstypes "github.com/vmware/govmomi/cns/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
cnsoperatorapis "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator"
cnsv1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/apis/cnsoperator/cnsvolumemetadata/v1alpha1"
volumes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/volume"
cnsvsphere "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/vsphere"
csitypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/types"
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
cnsoperatortypes "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types"
)
const (
defaultMaxWorkerThreadsToProcessCnsVolumeMetadata = 3
)
// backOffDuration is a map of cnsvolumemetadata name's to the time after which
// a request for this instance will be requeued. Initialized to 1 second for new
// instances and for instances whose latest reconcile operation succeeded.
// If the reconcile fails, backoff is incremented exponentially.
var (
backOffDuration map[string]time.Duration
backOffDurationMapMutex = sync.Mutex{}
)
// Add creates a new CnsVolumeMetadata Controller and adds it to the Manager,
// ConfigurationInfo, volumeManager and k8sclient. The Manager will set fields
// on the Controller and Start it when the Manager is Started.
func Add(mgr manager.Manager, clusterFlavor cnstypes.CnsClusterFlavor,
configInfo *commonconfig.ConfigurationInfo, volumeManager volumes.Manager) error {
// Initializes kubernetes client.
ctx, log := logger.GetNewContextWithLogger()
if clusterFlavor != cnstypes.CnsClusterFlavorWorkload {
log.Debug("Not initializing the CnsVolumeMetadata Controller as its a non-WCP CSI deployment")
return nil
}
k8sclient, err := k8s.NewClient(ctx)
if err != nil {
log.Errorf("Creating Kubernetes client failed. Err: %v", err)
return err
}
// eventBroadcaster broadcasts events on cnsvolumemetadata instances to the
// event sink.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: k8sclient.CoreV1().Events(""),
},
)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: cnsoperatorapis.GroupName})
return add(mgr, newReconciler(mgr, configInfo, volumeManager, k8sclient, recorder))
}
// newReconciler returns a new reconcile.Reconciler.
func newReconciler(mgr manager.Manager, configInfo *commonconfig.ConfigurationInfo, volumeManager volumes.Manager,
k8sclient kubernetes.Interface, recorder record.EventRecorder) reconcile.Reconciler {
return &ReconcileCnsVolumeMetadata{client: mgr.GetClient(), scheme: mgr.GetScheme(), configInfo: configInfo,
volumeManager: volumeManager, k8sclient: k8sclient, recorder: recorder}
}
// add adds a new Controller to mgr with r as the reconcile.Reconciler.
func add(mgr manager.Manager, r reconcile.Reconciler) error {
ctx, log := logger.GetNewContextWithLogger()
maxWorkerThreads := getMaxWorkerThreadsToReconcileCnsVolumeMetadata(ctx)
// Create a new controller.
c, err := controller.New("cnsvolumemetadata-controller", mgr,
controller.Options{Reconciler: r, MaxConcurrentReconciles: maxWorkerThreads})
if err != nil {
log.Errorf("failed to create new CnsVolumeMetadata controller with error: %+v", err)
return err
}
backOffDuration = make(map[string]time.Duration)
src := &source.Kind{Type: &cnsv1alpha1.CnsVolumeMetadata{}}
h := &handler.EnqueueRequestForObject{}
// Predicates are used to determine under which conditions
// the reconcile callback will be made for an instance.
pred := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj, ok := e.ObjectOld.(*cnsv1alpha1.CnsVolumeMetadata)
if oldObj == nil || !ok {
return false
}
newObj, ok := e.ObjectNew.(*cnsv1alpha1.CnsVolumeMetadata)
if newObj == nil || !ok {
return false
}
// Return true if finalizer or spec has changed.
// Return true if deletion timestamp is non-nil and the finalizer is still set.
// Return false for updates to any other fields.
// Finalizer is added and removed by CNS Operator.
return !(reflect.DeepEqual(oldObj.Finalizers, newObj.Finalizers) && reflect.DeepEqual(oldObj.Spec, newObj.Spec)) ||
(newObj.DeletionTimestamp != nil && newObj.Finalizers != nil)
},
DeleteFunc: func(e event.DeleteEvent) bool {
// Instances are deleted only after CNS Operator has removed its
// finalizer from that instance. No reconcile operations need to
// take place after the finalizer is removed.
return false
},
}
// Watch for changes to primary resource CnsVolumeMetadata.
err = c.Watch(src, h, pred)
if err != nil {
log.Errorf("failed to watch for changes to CnsVolumeMetadata resource with error: %+v", err)
return err
}
return nil
}
// blank assignment to verify that ReconcileCnsVolumeMetadata implements
// reconcile.Reconciler.
var _ reconcile.Reconciler = &ReconcileCnsVolumeMetadata{}
// ReconcileCnsVolumeMetadata reconciles a CnsVolumeMetadata object.
type ReconcileCnsVolumeMetadata struct {
client client.Client
scheme *runtime.Scheme
configInfo *commonconfig.ConfigurationInfo
volumeManager volumes.Manager
k8sclient kubernetes.Interface
recorder record.EventRecorder
}
// Reconcile reads that state of the cluster for a CnsVolumeMetadata object and
// makes changes on CNS based on the state read in the CnsVolumeMetadata.Spec.
// The Controller will requeue the Request to be processed again if the returned
// error is non-nil or Result.Requeue is true, otherwise upon completion it will
// remove the work from the queue.
func (r *ReconcileCnsVolumeMetadata) Reconcile(ctx context.Context,
request reconcile.Request) (reconcile.Result, error) {
log := logger.GetLogger(ctx)
isTKGSHAEnabled := commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.TKGsHA)
instance := &cnsv1alpha1.CnsVolumeMetadata{}
err := r.client.Get(ctx, request.NamespacedName, instance)
if err != nil {
if errors.IsNotFound(err) {
log.Infof("ReconcileCnsVolumeMetadata: Failed to get CnsVolumeMetadata instance %q. Ignoring request.",
request.Name)
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
log.Errorf("ReconcileCnsVolumeMetadata: Error reading CnsVolumeMetadata instance "+
"with name: %q on namespace: %q. Err: %+v", request.Name, request.Namespace, err)
return reconcile.Result{}, err
}
log.Infof("ReconcileCnsVolumeMetadata: Received request for instance %q and type %q",
instance.Name, instance.Spec.EntityType)
// Initialize backOffDuration for the instance, if required.
backOffDurationMapMutex.Lock()
var timeout time.Duration
if _, exists := backOffDuration[instance.Name]; !exists {
backOffDuration[instance.Name] = time.Second
}
timeout = backOffDuration[instance.Name]
backOffDurationMapMutex.Unlock()
// Validate input instance fields.
if err = validateReconileRequest(instance); err != nil {
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to validate reconcile request with error: %v", err)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
// If deletion timestamp is set, instance is marked for deletion by k8s.
// Remove corresponding metadata from CNS.
// If the operation succeeds, remove the finalizer.
// If the operation fails, requeue the request.
if instance.DeletionTimestamp != nil {
if !r.updateCnsMetadata(ctx, instance, true, isTKGSHAEnabled) {
// Failed to update CNS.
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to delete entry in CNS for instance "+
"with name %q and entity type %q in the guest cluster %q. Requeuing request.",
instance.Spec.EntityName, instance.Spec.EntityType, instance.Spec.GuestClusterID)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
// Update instance.status fields with the errors per volume.
if err = r.client.Update(ctx, instance); err != nil {
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to update status for %q. "+
"Err: %v.", instance.Name, err)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
}
// updateCnsMetadata failed, so the request will be requeued.
return reconcile.Result{RequeueAfter: timeout}, err
}
// Remove finalizer as update on CNS was successful.
for index, finalizer := range instance.Finalizers {
if finalizer == cnsoperatortypes.CNSFinalizer {
log.Debugf("ReconcileCnsVolumeMetadata: Removing finalizer %q for instance %q", finalizer, instance.Name)
instance.Finalizers = append(instance.Finalizers[:index], instance.Finalizers[index+1:]...)
if err = r.client.Update(ctx, instance); err != nil {
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to remove finalizer %q for %q. "+
"Err: %v. Requeueing request.", finalizer, instance.Name, err)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
return reconcile.Result{RequeueAfter: timeout}, err
}
log.Debugf("ReconcileCnsVolumeMetadata: Successfully removed finalizer %q for instance %q",
finalizer, instance.Name)
}
}
// Cleanup instance entry from backOffDuration map.
backOffDurationMapMutex.Lock()
delete(backOffDuration, instance.Name)
backOffDurationMapMutex.Unlock()
return reconcile.Result{}, nil
}
// Deletion timestamp was not set.
// Instance was either created or updated on the supervisor API server.
isFinalizerSet := false
for _, finalizer := range instance.Finalizers {
if finalizer == cnsoperatortypes.CNSFinalizer {
isFinalizerSet = true
break
}
}
// Set finalizer if it was not set already on this instance.
if !isFinalizerSet {
instance.Finalizers = append(instance.Finalizers, cnsoperatortypes.CNSFinalizer)
if err = r.client.Update(ctx, instance); err != nil {
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to add finalizer %q for %q. "+
"Err: %v. Requeueing request.", cnsoperatortypes.CNSFinalizer, instance.Name, err)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
return reconcile.Result{RequeueAfter: timeout}, err
}
} else {
// Update CNS volume entry with instance's metadata.
if !r.updateCnsMetadata(ctx, instance, false, isTKGSHAEnabled) {
// Failed to update CNS.
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to update entry in CNS for instance "+
"with name %q and entity type %q in the guest cluster %q. Requeueing request.",
instance.Spec.EntityName, instance.Spec.EntityType, instance.Spec.GuestClusterID)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
// Update instance.status fields on supervisor API server and requeue
// the request.
_ = r.client.Update(ctx, instance)
return reconcile.Result{RequeueAfter: timeout}, nil
}
// Successfully updated CNS.
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Successfully updated entry in CNS for instance "+
"with name %q and entity type %q in the guest cluster %q.",
instance.Spec.EntityName, instance.Spec.EntityType, instance.Spec.GuestClusterID)
recordEvent(ctx, r, instance, v1.EventTypeNormal, msg)
// Update instance.status fields on supervisor API server.
if err = r.client.Update(ctx, instance); err != nil {
msg := fmt.Sprintf("ReconcileCnsVolumeMetadata: Failed to update status for %q. "+
"Err: %v. Requeueing request.", instance.Name, err)
recordEvent(ctx, r, instance, v1.EventTypeWarning, msg)
return reconcile.Result{RequeueAfter: timeout}, err
}
}
return reconcile.Result{}, nil
}
// updateCnsMetadata updates the volume entry on CNS.
// If deleteFlag is true, metadata is deleted for the given instance.
// Returns true if all updates on CNS succeeded, otherwise return false.
func (r *ReconcileCnsVolumeMetadata) updateCnsMetadata(ctx context.Context,
instance *cnsv1alpha1.CnsVolumeMetadata, deleteFlag bool, useSupervisorID bool) bool {
log := logger.GetLogger(ctx)
log.Debugf("ReconcileCnsVolumeMetadata: Calling updateCnsMetadata for instance %q with delete flag %v",
instance.Name, deleteFlag)
vCenter, err := cnsvsphere.GetVirtualCenterInstance(ctx, r.configInfo, false)
if err != nil {
log.Errorf("ReconcileCnsVolumeMetadata: Failed to get virtual center instance. Err: %v", err)
return false
}
if vCenter.Config == nil {
log.Errorf("ReconcileCnsVolumeMetadata: vcenter config is empty")
return false
}
host := vCenter.Config.Host
var entityReferences []cnstypes.CnsKubernetesEntityReference
for _, reference := range instance.Spec.EntityReferences {
clusterID := reference.ClusterID
if instance.Spec.EntityType == cnsv1alpha1.CnsOperatorEntityTypePV {
if useSupervisorID {
clusterID = r.configInfo.Cfg.Global.SupervisorID
} else {
clusterID = r.configInfo.Cfg.Global.ClusterID
}
}
entityReferences = append(entityReferences, cnsvsphere.CreateCnsKuberenetesEntityReference(
reference.EntityType, reference.EntityName, reference.Namespace, clusterID))
}
var volumeStatus []*cnsv1alpha1.CnsVolumeMetadataVolumeStatus
success := true
for index, volume := range instance.Spec.VolumeNames {
status := cnsv1alpha1.GetCnsOperatorVolumeStatus(volume, "")
status.Updated = true
volumeStatus = append(volumeStatus, &status)
// Get pvc object in the supervisor cluster that this instance refers to.
pvc, err := r.k8sclient.CoreV1().PersistentVolumeClaims(instance.Namespace).Get(ctx, volume, metav1.GetOptions{})
if err != nil {
log.Errorf("ReconcileCnsVolumeMetadata: Failed to get PVC %q in namespace %q. Err: %v",
volume, instance.Namespace, err)
if errors.IsNotFound(err) && deleteFlag {
log.Info("Assuming volume entry is deleted from CNS.")
continue
} else {
status.ErrorMessage = err.Error()
status.Updated = false
success = false
continue
}
}
// Get the corresponding pv object bound to the pvc.
pv, err := r.k8sclient.CoreV1().PersistentVolumes().Get(ctx, pvc.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
log.Errorf("ReconcileCnsVolumeMetadata: Failed to get PV %q. Err: %v", pvc.Spec.VolumeName, err)
status.ErrorMessage = err.Error()
status.Updated = false
success = false
continue
}
if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != csitypes.Name {
continue
}
var metadataList []cnstypes.BaseCnsEntityMetadata
metadata := cnsvsphere.GetCnsKubernetesEntityMetaData(instance.Spec.EntityName, instance.Spec.Labels,
deleteFlag, string(instance.Spec.EntityType), instance.Spec.Namespace, instance.Spec.GuestClusterID,
[]cnstypes.CnsKubernetesEntityReference{entityReferences[index]})
metadataList = append(metadataList, cnstypes.BaseCnsEntityMetadata(metadata))
cluster := cnsvsphere.GetContainerCluster(instance.Spec.GuestClusterID,
r.configInfo.Cfg.VirtualCenter[host].User, cnstypes.CnsClusterFlavorGuest,
instance.Spec.ClusterDistribution)
updateSpec := &cnstypes.CnsVolumeMetadataUpdateSpec{
VolumeId: cnstypes.CnsVolumeId{
Id: pv.Spec.CSI.VolumeHandle,
},
Metadata: cnstypes.CnsVolumeMetadata{
ContainerCluster: cluster,
ContainerClusterArray: []cnstypes.CnsContainerCluster{cluster},
EntityMetadata: metadataList,
},
}
log.Debugf("ReconcileCnsVolumeMetadata: Calling UpdateVolumeMetadata for "+
"volume %q of instance %q with updateSpec: %+v", volume, instance.Name, spew.Sdump(updateSpec))
if err := r.volumeManager.UpdateVolumeMetadata(ctx, updateSpec); err != nil {
if cnsvsphere.IsNotFoundError(err) && deleteFlag {
log.Infof("ReconcileCnsVolumeMetadata: volume ID %q not found in CNS meaning it was "+
"already deleted, thus returning success", updateSpec.VolumeId.Id)
continue
}
log.Errorf("ReconcileCnsVolumeMetadata: UpdateVolumeMetadata failed with err %v", err)
status.ErrorMessage = err.Error()
status.Updated = false
success = false
}
}
// Modify status field of instance.
// Update on API server will be made by the calling function.
instance.Status.VolumeStatus = nil
for _, status := range volumeStatus {
instance.Status.VolumeStatus = append(instance.Status.VolumeStatus, *status)
}
return success
}
// validateReconileRequest validates the fields of the request against the
// cnsvolumemetadata API. Returns an error if any validation fails.
func validateReconileRequest(req *cnsv1alpha1.CnsVolumeMetadata) error {
var err error
if req.Spec.EntityName == "" || req.Spec.EntityType == "" || req.Spec.GuestClusterID == "" {
return errors.NewBadRequest("EntityName, EntityType and GuestClusterID are required parameters.")
}
switch req.Spec.EntityType {
case cnsv1alpha1.CnsOperatorEntityTypePV:
if req.Spec.Namespace != "" {
err = errors.NewBadRequest("Namespace cannot be set for PERSISTENT_VOLUME instances")
}
if len(req.Spec.VolumeNames) != 1 || len(req.Spec.EntityReferences) != 1 {
err = errors.NewBadRequest(
"VolumeNames and EntityReferences should have length 1 for PERSISTENT_VOLUME instances")
}
for _, reference := range req.Spec.EntityReferences {
if reference.EntityType != string(cnsv1alpha1.CnsOperatorEntityTypePVC) {
err = errors.NewBadRequest(
"PERSISTENT_VOLUME instances can only refer to PERSISTENT_VOLUME_CLAIM instances")
}
if reference.ClusterID != "" {
err = errors.NewBadRequest("EntityReferences.ClusterID should be empty for PERSISTENT_VOLUME instances")
}
}
case cnsv1alpha1.CnsOperatorEntityTypePVC:
if req.Spec.Namespace == "" {
err = errors.NewBadRequest("Namespace should be set for PERSISTENT_VOLUME_CLAIM instances")
}
if len(req.Spec.VolumeNames) != 1 || len(req.Spec.EntityReferences) != 1 {
err = errors.NewBadRequest(
"VolumeNames and EntityReferences should have length 1 for PERSISTENT_VOLUME_CLAIM instances")
}
for _, reference := range req.Spec.EntityReferences {
if reference.EntityType != string(cnsv1alpha1.CnsOperatorEntityTypePV) {
err = errors.NewBadRequest(
"PERSISTENT_VOLUME_CLAIM instances can only refer to PERSISTENT_VOLUME instances")
}
if reference.ClusterID == "" {
err = errors.NewBadRequest(
"EntityReferences.ClusterID should not be empty for PERSISTENT_VOLUME_CLAIM instances")
}
}
case cnsv1alpha1.CnsOperatorEntityTypePOD:
if req.Spec.Namespace == "" {
err = errors.NewBadRequest("Namespace should be set for POD instances")
}
if req.Spec.Labels != nil {
err = errors.NewBadRequest("Labels cannot be set for POD instances")
}
if len(req.Spec.VolumeNames) == 0 || len(req.Spec.EntityReferences) == 0 {
err = errors.NewBadRequest(
"VolumeNames and EntityReferences should have length greater than 0 for POD instances")
}
for _, reference := range req.Spec.EntityReferences {
if reference.EntityType != string(cnsv1alpha1.CnsOperatorEntityTypePVC) {
err = errors.NewBadRequest("POD instances can only refer to PERSISTENT_VOLUME_CLAIM instances")
}
if reference.ClusterID == "" {
err = errors.NewBadRequest("EntityReferences.ClusterID should not be empty for POD instances")
}
}
default:
err = errors.NewBadRequest(fmt.Sprintf("Invalid entity type %q", req.Spec.EntityType))
}
return err
}
// recordEvent records the event, sets the backOffDuration for the instance
// appropriately and logs the message. backOffDuration is reset to 1 second
// on success and doubled on failure.
func recordEvent(ctx context.Context, r *ReconcileCnsVolumeMetadata,
instance *cnsv1alpha1.CnsVolumeMetadata, eventtype string, msg string) {
log := logger.GetLogger(ctx)
switch eventtype {
case v1.EventTypeWarning:
// Double backOff duration.
backOffDurationMapMutex.Lock()
backOffDuration[instance.Name] = backOffDuration[instance.Name] * 2
backOffDurationMapMutex.Unlock()
r.recorder.Event(instance, v1.EventTypeWarning, "UpdateFailed", msg)
log.Error(msg)
case v1.EventTypeNormal:
// Reset backOff duration to one second.
backOffDurationMapMutex.Lock()
backOffDuration[instance.Name] = time.Second
backOffDurationMapMutex.Unlock()
r.recorder.Event(instance, v1.EventTypeNormal, "UpdateSucceeded", msg)
log.Info(msg)
}
}
// getMaxWorkerThreadsToReconcileCnsVolumeMetadata returns the maximum number
// of worker threads which can be run to reconcile CnsVolumeMetadata instances.
// If environment variable WORKER_THREADS_VOLUME_METADATA is set and valid,
// return the value read from environment variable. Otherwise, use the default
// value.
func getMaxWorkerThreadsToReconcileCnsVolumeMetadata(ctx context.Context) int {
log := logger.GetLogger(ctx)
workerThreads := defaultMaxWorkerThreadsToProcessCnsVolumeMetadata
if v := os.Getenv("WORKER_THREADS_VOLUME_METADATA"); v != "" {
if value, err := strconv.Atoi(v); err == nil {
if value <= 0 {
log.Warnf("Maximum number of worker threads to run set in env variable WORKER_THREADS_VOLUME_METADATA %s is "+
"less than 1, will use the default value %d", v, defaultMaxWorkerThreadsToProcessCnsVolumeMetadata)
} else if value > defaultMaxWorkerThreadsToProcessCnsVolumeMetadata {
log.Warnf("Maximum number of worker threads to run set in env variable WORKER_THREADS_VOLUME_METADATA %s "+
"is greater than %d, will use the default value %d",
v, defaultMaxWorkerThreadsToProcessCnsVolumeMetadata, defaultMaxWorkerThreadsToProcessCnsVolumeMetadata)
} else {
workerThreads = value
log.Debugf("Maximum number of worker threads to run is set to %d", workerThreads)
}
} else {
log.Warnf("Maximum number of worker threads to run set in env variable WORKER_THREADS_VOLUME_METADATA %s "+
"is invalid, will use the default value %d", v, defaultMaxWorkerThreadsToProcessCnsVolumeMetadata)
}
} else {
log.Debugf("WORKER_THREADS_VOLUME_METADATA is not set. Picking the default value %d",
defaultMaxWorkerThreadsToProcessCnsVolumeMetadata)
}
return workerThreads
}