/
cnsvolumeinfoservice.go
307 lines (269 loc) · 12.4 KB
/
cnsvolumeinfoservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package cnsvolumeinfo
import (
"context"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/common"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
cnsvolumeinfoconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeinfo/config"
cnsvolumeinfov1alpha1 "sigs.k8s.io/vsphere-csi-driver/v3/pkg/internalapis/cnsvolumeinfo/v1alpha1"
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
)
type volumeInfo struct {
// volumeInfoInformer is the informer for VolumeInfo CRs
volumeInfoInformer cache.SharedIndexInformer
// k8sClient helps operate on CnsVolumeInfo custom resource.
k8sClient client.Client
}
var (
// volumeInfoServiceInstance is instance of volumeInfo and implements
// interface for VolumeInfoService.
volumeInfoServiceInstance *volumeInfo
// csiNamespace is the namespace on which vSphere CSI Driver is running
csiNamespace = common.GetCSINamespace()
)
const (
// CRDGroupName represent the group of cnsvolumeinfo CRD.
CRDGroupName = "cns.vmware.com"
// FileVolumePrefix represents the prefix for RWX volume's volumeHandle.
FileVolumePrefix = "file:"
)
// VolumeInfoService exposes interfaces to support Operate on cnsvolumeinfo CR
// for multi vCenter CSI topology feature.
// It will maintain internal state to map volume id to vCenter
type VolumeInfoService interface {
// GetvCenterForVolumeID return vCenter for the given VolumeID
GetvCenterForVolumeID(ctx context.Context, volumeID string) (string, error)
// CreateVolumeInfo creates VolumeInfo CR to persist VolumeID to vCenter mapping
CreateVolumeInfo(ctx context.Context, volumeID string, vCenter string) error
// CreateVolumeInfoWithPolicyInfo creates VolumeInfo CR to persist VolumeID,
// pvcnamespace, storage policy info and vCenter details
CreateVolumeInfoWithPolicyInfo(ctx context.Context, volumeID, pvcnamespace, storagePolicyId,
storageClassName, vCenter string, capacity *resource.Quantity) error
// DeleteVolumeInfo deletes VolumeInfo CR for the given VolumeID
DeleteVolumeInfo(ctx context.Context, volumeID string) error
// ListAllVolumeInfos lists all the VolumeInfo CRs present in the cluster
ListAllVolumeInfos() []interface{}
// VolumeInfoCrExistsForVolume returns true if VolumeInfo CR for
// a given volume exists
VolumeInfoCrExistsForVolume(ctx context.Context, volumeID string) (bool, error)
// GetVolumeInfoForVolumeID fetches VolumeInfo CR for the given VolumeID and returns cnsvolumeinfo object
GetVolumeInfoForVolumeID(ctx context.Context, volumeID string) (*cnsvolumeinfov1alpha1.CNSVolumeInfo, error)
// PatchVolumeInfo patches the CNSVolumeInfo instance associated with volumeID in given parameters.
PatchVolumeInfo(ctx context.Context, volumeID string, patchBytes []byte) error
}
// InitVolumeInfoService returns the singleton VolumeInfoService.
func InitVolumeInfoService(ctx context.Context) (VolumeInfoService, error) {
log := logger.GetLogger(ctx)
if volumeInfoServiceInstance == nil {
log.Info("Initializing volumeInfo service...")
// This is idempotent if CRD is pre-created then we continue with
// initialization of volumeInfoServiceInstance.
volumeInfoServiceInitErr := k8s.CreateCustomResourceDefinitionFromManifest(ctx,
cnsvolumeinfoconfig.EmbedCnsVolumeInfoFile, cnsvolumeinfoconfig.EmbedCnsVolumeInfoFileName)
if volumeInfoServiceInitErr != nil {
return nil, logger.LogNewErrorf(log, "failed to create volume info CRD. Error: %v",
volumeInfoServiceInitErr)
}
config, volumeInfoServiceInitErr := k8s.GetKubeConfig(ctx)
if volumeInfoServiceInitErr != nil {
return nil, logger.LogNewErrorf(log, "failed to get kubeconfig. err: %v", volumeInfoServiceInitErr)
}
volumeInfoServiceInstance = &volumeInfo{}
volumeInfoServiceInstance.k8sClient, volumeInfoServiceInitErr =
k8s.NewClientForGroup(ctx, config, CRDGroupName)
if volumeInfoServiceInitErr != nil {
volumeInfoServiceInstance = nil
return nil, logger.LogNewErrorf(log, "failed to create k8sClient for volumeinfo service. "+
"Err: %v", volumeInfoServiceInitErr)
}
log.Infof("Starting Informer for cnsvolumeinfo")
informer, err := k8s.GetDynamicInformer(ctx, cnsvolumeinfov1alpha1.SchemeGroupVersion.Group,
cnsvolumeinfov1alpha1.SchemeGroupVersion.Version, "cnsvolumeinfoes",
csiNamespace, config, true)
if err != nil {
return nil, logger.LogNewErrorf(log, "failed to create dynamic informer for cnsvolumeinfoes "+
"CRD. Err: %v", err)
}
volumeInfoServiceInstance.volumeInfoInformer = informer.Informer()
go func() {
stopCh := make(chan struct{})
informer.Informer().Run(stopCh)
}()
log.Info("volumeInfo service initialized")
}
return volumeInfoServiceInstance, nil
}
// ListAllVolumeInfos lists all the VolumeInfo CRs present in the cluster
func (volumeInfo *volumeInfo) ListAllVolumeInfos() []interface{} {
volumeInfoCrs := volumeInfo.volumeInfoInformer.GetStore().List()
return volumeInfoCrs
}
// VolumeInfoCrExistsForVolume returns true if VolumeInfo CR for
// a given volume exists
func (volumeInfo *volumeInfo) VolumeInfoCrExistsForVolume(ctx context.Context, volumeID string) (bool, error) {
log := logger.GetLogger(ctx)
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
key := csiNamespace + "/" + volumeInfoCrName
_, found, err := volumeInfo.volumeInfoInformer.GetStore().GetByKey(key)
if err != nil {
return false, logger.LogNewErrorf(log, "failed to find vCenter for VolumeID: %q", volumeID)
}
if !found {
log.Debugf("VolumeInfo CR for volume %s not found", volumeID)
return false, nil
}
return true, nil
}
// GetvCenterForVolumeID return vCenter for the given VolumeID
func (volumeInfo *volumeInfo) GetvCenterForVolumeID(ctx context.Context, volumeID string) (string, error) {
log := logger.GetLogger(ctx)
// Since CNSVolumeInfo is namespaced CR, we need to prefix "namespace-name/" to obtain value from the store
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
key := csiNamespace + "/" + volumeInfoCrName
info, found, err := volumeInfo.volumeInfoInformer.GetStore().GetByKey(key)
if err != nil || !found {
return "", logger.LogNewErrorf(log, "Could not find vCenter for VolumeID: %q", volumeID)
}
cnsvolumeinfo := &cnsvolumeinfov1alpha1.CNSVolumeInfo{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(info.(*unstructured.Unstructured).Object,
&cnsvolumeinfo)
if err != nil {
return "", logger.LogNewErrorf(log, "failed to parse cnsvolumeinfo object: %v, err: %v", info, err)
}
log.Infof("Volume ID %q is associated with VC %q", volumeID, cnsvolumeinfo.Spec.VCenterServer)
return cnsvolumeinfo.Spec.VCenterServer, nil
}
// CreateVolumeInfo creates VolumeInfo CR to persist VolumeID to vCenter mapping
func (volumeInfo *volumeInfo) CreateVolumeInfo(ctx context.Context, volumeID string, vCenter string) error {
log := logger.GetLogger(ctx)
log.Infof("creating cnsvolumeinfo for volumeID: %q and vCenter: %q mapping in the namespace: %q",
volumeID, vCenter, csiNamespace)
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
cnsvolumeinfo := cnsvolumeinfov1alpha1.CNSVolumeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: volumeInfoCrName,
Namespace: csiNamespace,
},
Spec: cnsvolumeinfov1alpha1.CNSVolumeInfoSpec{
VolumeID: volumeID,
VCenterServer: vCenter,
},
}
err := volumeInfo.k8sClient.Create(ctx, &cnsvolumeinfo)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return logger.LogNewErrorf(log, "failed to create CR for cnsvolumeInfo %v in the namespace: %q. "+
"Error: %v", cnsvolumeinfo, csiNamespace, err)
}
log.Infof("cnsvolumeInfo CR already exists for VolumeID: %q", volumeID)
return nil
}
log.Infof("Successfully created CNSVolumeInfo CR for volumeID: %q and "+
"vCenter: %q mapping in the namespace: %q", volumeID, vCenter, csiNamespace)
return nil
}
// CreateVolumeInfoWithPolicyInfo creates VolumeInfo CR to persist VolumeID to Storage policy mapping
func (volumeInfo *volumeInfo) CreateVolumeInfoWithPolicyInfo(ctx context.Context, volumeID string,
namespace, storagePolicyId, storageClassName, vCenter string, capacity *resource.Quantity) error {
log := logger.GetLogger(ctx)
log.Infof("creating cnsvolumeinfo for volumeID: %q, StoragePolicyID: %q, "+
"StorageClassName: %q, vCenter: %q, Capacity: %+v in the namespace: %q",
volumeID, storagePolicyId, storageClassName, vCenter, *capacity, csiNamespace)
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
cnsvolumeinfo := cnsvolumeinfov1alpha1.CNSVolumeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: volumeInfoCrName,
Namespace: csiNamespace,
},
Spec: cnsvolumeinfov1alpha1.CNSVolumeInfoSpec{
VolumeID: volumeID,
Namespace: namespace,
VCenterServer: vCenter,
StoragePolicyID: storagePolicyId,
StorageClassName: storageClassName,
Capacity: capacity,
},
}
err := volumeInfo.k8sClient.Create(ctx, &cnsvolumeinfo)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return logger.LogNewErrorf(log, "failed to create CR for CnsVolumeInfo %v in the namespace: %q. "+
"Error: %v", cnsvolumeinfo, csiNamespace, err)
}
log.Infof("cnsvolumeInfo CR already exists for VolumeID: %q", volumeID)
return nil
}
log.Infof("Successfully created CNSVolumeInfo CR for volumeID: %q, StoragePolicyID: %q, "+
"StorageClassName: %q, vCenter: %q, Capacity: %+v mapping in the namespace: %q",
volumeID, storagePolicyId, storageClassName, vCenter, *capacity, csiNamespace)
return nil
}
// DeleteVolumeInfo deletes VolumeInfo CR for the given VolumeID
func (volumeInfo *volumeInfo) DeleteVolumeInfo(ctx context.Context, volumeID string) error {
log := logger.GetLogger(ctx)
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
object := cnsvolumeinfov1alpha1.CNSVolumeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: volumeInfoCrName,
Namespace: csiNamespace,
},
}
err := volumeInfo.k8sClient.Delete(ctx, &object)
if err != nil {
if apierrors.IsNotFound(err) {
log.Infof("volumeInfoCR is already deleted for volumeID: %q", volumeID)
return nil
}
return logger.LogNewErrorf(log, "failed to delete volumeInfo CR for volumeID: %q "+
"from namespace: %q", volumeID, csiNamespace)
}
log.Infof("Successfully deleted CNSVolumeInfo CR for volumeID: %q from namespace: %q",
volumeID, csiNamespace)
return nil
}
// GetVolumeInfoForVolumeID return cnsVolumeInfo for the given VolumeID
func (volumeInfo *volumeInfo) GetVolumeInfoForVolumeID(ctx context.Context, volumeID string) (
*cnsvolumeinfov1alpha1.CNSVolumeInfo, error) {
log := logger.GetLogger(ctx)
// Since CNSVolumeInfo is namespaced CR, we need to prefix "namespace-name/" to obtain value from the store
volumeInfoCrName := getCnsVolumeInfoCrName(ctx, volumeID)
key := csiNamespace + "/" + volumeInfoCrName
info, found, err := volumeInfo.volumeInfoInformer.GetStore().GetByKey(key)
if err != nil || !found {
return nil, logger.LogNewErrorf(log, "Could not find CnsVolumeInfo instance for volumeID: %q", volumeID)
}
cnsVolumeInfo := &cnsvolumeinfov1alpha1.CNSVolumeInfo{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(info.(*unstructured.Unstructured).Object,
&cnsVolumeInfo)
if err != nil {
return nil, logger.LogNewErrorf(log, "failed to parse cnsVolumeInfo object: %v, err: %v", info, err)
}
return cnsVolumeInfo, nil
}
// PatchVolumeInfo patches the CNSVolumeInfo instance associated with volumeID in given parameters.
func (volumeInfo *volumeInfo) PatchVolumeInfo(ctx context.Context, volumeID string, patchBytes []byte) error {
log := logger.GetLogger(ctx)
volumeInfoInstance, err := volumeInfo.GetVolumeInfoForVolumeID(ctx, volumeID)
if err != nil {
return logger.LogNewErrorf(log, "failed to fetch CnsVolumeInfo instance for volumeID: %q", volumeID)
}
return volumeInfo.k8sClient.Patch(ctx, volumeInfoInstance, client.RawPatch(types.MergePatchType, patchBytes))
}
// getCnsVolumeInfoCrName replaces "file:" with "file-" as K8s only allows alphanumeric and "-" in object name."
func getCnsVolumeInfoCrName(ctx context.Context, volumeID string) string {
log := logger.GetLogger(ctx)
if strings.HasPrefix(volumeID, FileVolumePrefix) {
log.Debugf("File volume observed %s", volumeID)
volumeInfoCrName := strings.Replace(volumeID, ":", "-", 1)
return volumeInfoCrName
}
return volumeID
}