/
helper.go
360 lines (305 loc) · 13.7 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
// Copyright 2019 NetApp, Inc. All Rights Reserved.
package kubernetes
import (
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
k8sstoragev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/netapp/trident/config"
frontendcommon "github.com/netapp/trident/frontend/common"
"github.com/netapp/trident/frontend/csi"
"github.com/netapp/trident/frontend/csi/helpers"
"github.com/netapp/trident/storage"
)
/////////////////////////////////////////////////////////////////////////////
//
// This file contains the methods that implement the HybridPlugin interface.
//
/////////////////////////////////////////////////////////////////////////////
// GetVolumeConfig accepts the attributes of a volume being requested by
// the CSI provisioner, combines those with PVC and storage class info
// retrieved from the K8S API server, and returns a VolumeConfig structure
// as needed by Trident to create a new volume.
func (p *Plugin) GetVolumeConfig(
name string, sizeBytes int64, parameters map[string]string,
protocol config.Protocol, accessModes []config.AccessMode, volumeMode config.VolumeMode, fsType string,
) (*storage.VolumeConfig, error) {
// Kubernetes CSI passes us the name of what will become a new PV
pvName := name
fields := log.Fields{"Method": "GetVolumeConfig", "Type": "K8S helper", "name": pvName}
log.WithFields(fields).Debug(">>>> GetVolumeConfig")
defer log.WithFields(fields).Debug("<<<< GetVolumeConfig")
// Get the PVC corresponding to the new PV being provisioned
pvc, err := p.getPVCForCSIVolume(pvName)
if err != nil {
return nil, err
}
log.WithFields(log.Fields{
"name": pvc.Name,
"namespace": pvc.Namespace,
"UID": pvc.UID,
"size": pvc.Spec.Resources.Requests[v1.ResourceStorage],
"storageClass": getStorageClassForPVC(pvc),
}).Infof("Found PVC for requested volume %s.", pvName)
// Validate the PVC
if pvc.Status.Phase != v1.ClaimPending {
return nil, fmt.Errorf("PVC %s is not in Pending state", pvc.Name)
}
pvcSize, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage]
if !ok {
return nil, fmt.Errorf("PVC %s does not have a valid size", pvc.Name)
}
// Get the storage class requested by the PVC. It should always be set by this time,
// even if a default storage class was applied by the CSI provisioner.
scName := getStorageClassForPVC(pvc)
if scName == "" {
return nil, fmt.Errorf("PVC %s does not specify a storage class", pvc.Name)
}
// Get the cached storage class for this PVC
sc, err := p.getStorageClass(scName)
if err != nil {
return nil, err
}
log.WithField("name", sc.Name).Infof("Found storage class for requested volume %s.", pvName)
// Validate the storage class
if sc.Provisioner != csi.Provisioner {
return nil, fmt.Errorf("the provisioner for storage class %s is not %s", sc.Name, csi.Provisioner)
}
// Create the volume config
volumeConfig := getVolumeConfig(pvc.Spec.AccessModes, pvc.Spec.VolumeMode, pvName, pvcSize,
processPVCAnnotations(pvc, fsType), scName)
// Check if we're cloning a PVC, and if so, do some further validation
if cloneSourcePVName, err := p.getCloneSourceInfo(pvc); err != nil {
return nil, err
} else if cloneSourcePVName != "" {
volumeConfig.CloneSourceVolume = cloneSourcePVName
}
return volumeConfig, nil
}
// getPVCForCSIVolume accepts the name of a volume being requested by the CSI provisioner,
// extracts the PVC name from the volume name, and returns the PVC object as read from the
// Kubernetes API server. The method waits for the object to appear in cache, resyncs the
// cache if not found after an initial wait, and waits again after the resync. This strategy
// is necessary because CSI only provides us with the PVC's UID, and the Kubernetes API does
// not support querying objects by UID.
func (p *Plugin) getPVCForCSIVolume(name string) (*v1.PersistentVolumeClaim, error) {
// Get the PVC UID from the volume name. The CSI provisioner sidecar creates
// volume names of the form "pvc-<PVC_UID>".
pvcUID, err := getPVCUIDFromCSIVolumeName(name)
if err != nil {
return nil, err
}
// Get the cached PVC that started this workflow
pvc, err := p.waitForCachedPVCByUID(pvcUID, PreSyncCacheWaitPeriod)
if err != nil {
log.WithField("uid", pvcUID).Warningf("PVC not found in local cache: %v", err)
// Not found immediately, so re-sync and try again
if err = p.pvcIndexer.Resync(); err != nil {
return nil, fmt.Errorf("could not refresh local PVC cache: %v", err)
}
if pvc, err = p.waitForCachedPVCByUID(pvcUID, PostSyncCacheWaitPeriod); err != nil {
log.WithField("uid", pvcUID).Errorf("PVC not found in local cache after resync: %v", err)
return nil, fmt.Errorf("could not find PVC with UID %s: %v", pvcUID, err)
}
}
return pvc, nil
}
// getPVCUIDFromCSIVolumeName accepts the name of a CSI-requested volume and extracts
// the UID of the corresponding PVC using regex matching. This obviously assumes that
// the Kubernetes CSI provisioner continues this naming scheme indefinitely.
func getPVCUIDFromCSIVolumeName(volumeName string) (string, error) {
match := pvcRegex.FindStringSubmatch(volumeName)
paramsMap := make(map[string]string)
for i, name := range pvcRegex.SubexpNames() {
if i > 0 && i <= len(match) {
paramsMap[name] = match[i]
}
}
if uid, ok := paramsMap["uid"]; !ok {
return "", fmt.Errorf("volume name %s does not contain a uid", volumeName)
} else {
return uid, nil
}
}
// getStorageClass accepts the name of a storage class and returns the storage class object
// as read from the Kubernetes API server. The method waits for the object to appear in cache,
// resyncs the cache if not found after an initial wait, and waits again after the resync.
func (p *Plugin) getStorageClass(name string) (*k8sstoragev1.StorageClass, error) {
sc, err := p.waitForCachedStorageClassByName(name, PreSyncCacheWaitPeriod)
if err != nil {
log.WithField("name", name).Warningf("Storage class not found in local cache: %v", err)
// Not found immediately, so re-sync and try again
if err = p.scIndexer.Resync(); err != nil {
return nil, fmt.Errorf("could not refresh local storage class cache: %v", err)
}
if sc, err = p.waitForCachedStorageClassByName(name, PostSyncCacheWaitPeriod); err != nil {
log.WithField("name", name).Errorf("Storage class not found in local cache after resync: %v", err)
return nil, fmt.Errorf("could not find storage class %s: %v", name, err)
}
}
return sc, nil
}
// getCloneSourceInfo accepts the PVC of a volume being provisioned by CSI and inspects it
// for the annotations indicating a clone operation (of which CSI is unaware). If a clone is
// being created, the method completes several checks on the source PVC/PV and returns the
// name of the source PV as needed by Trident to clone a volume as well as an optional
// snapshot name (also potentially unknown to CSI). Note that these legacy clone annotations
// will be overridden if the VolumeContentSource is set in the CSI CreateVolume request.
func (p *Plugin) getCloneSourceInfo(clonePVC *v1.PersistentVolumeClaim) (string, error) {
// Check if this is a clone operation
annotations := processPVCAnnotations(clonePVC, "")
sourcePVCName := getAnnotation(annotations, AnnCloneFromPVC)
if sourcePVCName == "" {
return "", nil
}
// Check that the source PVC is in the same namespace.
sourcePVC, err := p.waitForCachedPVCByName(sourcePVCName, clonePVC.Namespace, PreSyncCacheWaitPeriod)
if err != nil {
log.WithFields(log.Fields{
"sourcePVCName": sourcePVCName,
"namespace": clonePVC.Namespace,
}).Errorf("Clone source PVC not found in local cache: %v", err)
return "", fmt.Errorf("cloning from a PVC requires both PVCs be in the same namespace: %v", err)
}
// Check that both source and clone PVCs have the same storage class
if getStorageClassForPVC(sourcePVC) != getStorageClassForPVC(clonePVC) {
log.WithFields(log.Fields{
"clonePVCName": clonePVC.Name,
"clonePVCNamespace": clonePVC.Namespace,
"clonePVCStorageClass": getStorageClassForPVC(clonePVC),
"sourcePVCName": sourcePVC.Name,
"sourcePVCNamespace": sourcePVC.Namespace,
"sourcePVCStorageClass": getStorageClassForPVC(sourcePVC),
}).Error("Cloning from a PVC requires both PVCs have the same storage class.")
return "", fmt.Errorf("cloning from a PVC requires both PVCs have the same storage class")
}
// Check that the source PVC has an associated PV
sourcePVName := sourcePVC.Spec.VolumeName
if sourcePVName == "" {
log.WithFields(log.Fields{
"sourcePVCName": sourcePVC.Name,
"sourcePVCNamespace": sourcePVC.Namespace,
}).Error("Cloning from a PVC requires the source to be bound to a PV.")
return "", fmt.Errorf("cloning from a PVC requires the source to be bound to a PV")
}
// Check that the clone size is <= the source size
sourcePVCSizeResource := sourcePVC.Spec.Resources.Requests[v1.ResourceStorage]
sourcePVCSize := sourcePVCSizeResource.Value()
clonePVCSizeResource := clonePVC.Spec.Resources.Requests[v1.ResourceStorage]
clonePVCSize := clonePVCSizeResource.Value()
if sourcePVCSize < clonePVCSize {
log.WithFields(log.Fields{
"sourcePVCSize": sourcePVCSize,
"clonePVCSize": clonePVCSize,
}).Error("requested PVC size is too large for the clone source")
return "", fmt.Errorf("requested PVC size is too large for the clone source")
}
return sourcePVName, nil
}
// GetSnapshotConfig accepts the attributes of a snapshot being requested by the CSI
// provisioner and returns a SnapshotConfig structure as needed by Trident to create a new snapshot.
func (p *Plugin) GetSnapshotConfig(volumeName, snapshotName string) (*storage.SnapshotConfig, error) {
return &storage.SnapshotConfig{
Version: config.OrchestratorAPIVersion,
Name: snapshotName,
VolumeName: volumeName,
}, nil
}
// RecordVolumeEvent accepts the name of a CSI volume (i.e. a PV name), finds the associated
// PVC, and posts and event message on the PVC object with the K8S API server.
func (p *Plugin) RecordVolumeEvent(name, eventType, reason, message string) {
log.WithFields(log.Fields{
"name": name,
"eventType": eventType,
"reason": reason,
"message": message,
}).Debug("Volume event.")
if pvc, err := p.getPVCForCSIVolume(name); err != nil {
log.WithField("error", err).Debug("Failed to find PVC for event.")
} else {
p.eventRecorder.Event(pvc, mapEventType(eventType), reason, message)
}
}
// mapEventType maps between K8S API event types and Trident CSI helper event types. The
// two sets of types may be identical, but the CSI helper interface should not be tightly
// coupled to Kubernetes.
func mapEventType(eventType string) string {
switch eventType {
case helpers.EventTypeNormal:
return v1.EventTypeNormal
case helpers.EventTypeWarning:
return v1.EventTypeWarning
default:
return v1.EventTypeWarning
}
}
// getVolumeConfig accepts the attributes of a new volume and assembles them into a
// VolumeConfig structure as needed by Trident to create a new volume.
func getVolumeConfig(
pvcAccessModes []v1.PersistentVolumeAccessMode, volumeMode *v1.PersistentVolumeMode, name string,
size resource.Quantity,
annotations map[string]string, storageClassName string,
) *storage.VolumeConfig {
var accessModes []config.AccessMode
for _, pvcAccessMode := range pvcAccessModes {
accessModes = append(accessModes, config.AccessMode(pvcAccessMode))
}
accessMode := frontendcommon.CombineAccessModes(accessModes)
if volumeMode == nil {
volumeModeVal := v1.PersistentVolumeFilesystem
volumeMode = &volumeModeVal
}
if getAnnotation(annotations, AnnFileSystem) == "" {
annotations[AnnFileSystem] = "ext4"
}
if getAnnotation(annotations, AnnNotManaged) == "" {
annotations[AnnNotManaged] = "false"
}
notManaged, err := strconv.ParseBool(getAnnotation(annotations, AnnNotManaged))
if err != nil {
log.Warnf("unable to parse notManaged annotation into bool; %v", err)
}
return &storage.VolumeConfig{
Name: name,
Size: fmt.Sprintf("%d", size.Value()),
Protocol: config.Protocol(getAnnotation(annotations, AnnProtocol)),
SnapshotPolicy: getAnnotation(annotations, AnnSnapshotPolicy),
SnapshotReserve: getAnnotation(annotations, AnnSnapshotReserve),
SnapshotDir: getAnnotation(annotations, AnnSnapshotDir),
ExportPolicy: getAnnotation(annotations, AnnExportPolicy),
UnixPermissions: getAnnotation(annotations, AnnUnixPermissions),
StorageClass: storageClassName,
BlockSize: getAnnotation(annotations, AnnBlockSize),
FileSystem: getAnnotation(annotations, AnnFileSystem),
SplitOnClone: getAnnotation(annotations, AnnSplitOnClone),
VolumeMode: config.VolumeMode(*volumeMode),
AccessMode: accessMode,
ImportOriginalName: getAnnotation(annotations, AnnImportOriginalName),
ImportBackendUUID: getAnnotation(annotations, AnnImportBackendUUID),
ImportNotManaged: notManaged,
}
}
// getAnnotation returns an annotation from a map, or an empty string if not found.
func getAnnotation(annotations map[string]string, key string) string {
if val, ok := annotations[key]; ok {
return val
}
return ""
}
// processPVCAnnotations returns the annotations from a PVC (ensuring a valid map even
// if empty). It also mixes in a Trident-standard fsType annotation using the value supplied
// *if* one isn't already set in the PVC annotation map.
func processPVCAnnotations(pvc *v1.PersistentVolumeClaim, fsType string) map[string]string {
annotations := pvc.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
// Set the file system from the PVC annotations. If not present, fall back to the CSI request,
// which should have read the file system from the storage class (if specified there).
if _, found := annotations[AnnFileSystem]; !found && fsType != "" {
annotations[AnnFileSystem] = fsType
}
return annotations
}