-
Notifications
You must be signed in to change notification settings - Fork 7
/
controller.go
372 lines (322 loc) · 9.84 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
package service
import (
"math/rand"
"os"
"path"
"path/filepath"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/akutz/gofsutil"
"github.com/container-storage-interface/spec/lib/go/csi"
csiutils "github.com/rexray/gocsi/utils"
)
func (s *service) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest) (
*csi.CreateVolumeResponse, error) {
// Get the path to the volume directory and create it if necessary.
volPath := path.Join(s.vol, req.Name)
if ok, err := fileExists(volPath); !ok {
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s: %v", volPath, err)
}
// Create the volume's virtual directory if it does not exist.
if err := os.MkdirAll(volPath, 0755); err != nil {
return nil, status.Errorf(codes.Internal, "mkdir failed: %v", err)
}
}
// Get the path to the volume's info file.
volInfoPath := path.Join(volPath, infoFileName)
if ok, err := fileExists(volInfoPath); !ok {
if err != nil {
return nil, status.Errorf(
codes.NotFound, "%s: %v", volInfoPath, err)
}
// Assign the volume info structure that is marshaled to disk.
vol := volumeInfo{
CreateVolumeRequest: *req,
path: volPath,
infoPath: volInfoPath,
}
// Figure out the volume's capacity.
if cr := vol.CapacityRange; cr != nil {
if cr.RequiredBytes == cr.LimitBytes {
vol.capacityBytes = cr.RequiredBytes
} else {
// Generate a random size that is somewhere between the min
// and max limits provided by the request.
vol.capacityBytes = uint64(rand.Int63n(int64(cr.LimitBytes))) +
cr.RequiredBytes
}
}
// Save the volume info to disk.
if err := vol.save(); err != nil {
return nil, err
}
return &csi.CreateVolumeResponse{
VolumeInfo: vol.toCSIVolInfo(),
}, nil
}
// Create a new volumeInfo object and try to unmarshal its contents
// from disk.
vol := &volumeInfo{path: volPath, infoPath: volInfoPath}
if err := vol.load(); err != nil {
return nil, err
}
// Validate request capacity range against existing size.
if cr := req.CapacityRange; cr != nil {
if vol.capacityBytes < cr.RequiredBytes {
return nil, status.Errorf(
codes.AlreadyExists,
"required bytes exceeds existing: %d", vol.capacityBytes)
}
if vol.capacityBytes > cr.LimitBytes {
return nil, status.Errorf(
codes.AlreadyExists,
"limit bytes less than existing: %d", vol.capacityBytes)
}
}
// Validate request parameters against existing parameters.
if len(req.Parameters) != len(vol.Parameters) {
return nil, status.Error(
codes.AlreadyExists,
"requested params exceed existing")
}
for k, v := range req.Parameters {
if v != vol.Parameters[k] {
return nil, status.Error(
codes.AlreadyExists,
"requested params != existing")
}
}
// Validate request capabilities against existing capabilities.
if ok, err := csiutils.AreVolumeCapabilitiesCompatible(
req.VolumeCapabilities, vol.VolumeCapabilities); !ok {
if err != nil {
return nil, err
}
return nil, status.Error(
codes.AlreadyExists,
"requested capabilities incompatible w existing")
}
return &csi.CreateVolumeResponse{
VolumeInfo: vol.toCSIVolInfo(),
}, nil
}
func (s *service) DeleteVolume(
ctx context.Context,
req *csi.DeleteVolumeRequest) (
*csi.DeleteVolumeResponse, error) {
// Get the path of the volume.
volPath := path.Join(s.vol, req.VolumeId)
if _, err := fileExists(volPath); err != nil {
return nil, status.Errorf(codes.NotFound, "%s: %v", volPath, err)
}
// Attempt to delete the "volume".
if err := os.RemoveAll(volPath); err != nil {
return nil, status.Errorf(
codes.Internal, "delete failed: %s: %v", volPath, err)
}
// Indicate the operation was a success.
return &csi.DeleteVolumeResponse{}, nil
}
func (s *service) ControllerPublishVolume(
ctx context.Context,
req *csi.ControllerPublishVolumeRequest) (
*csi.ControllerPublishVolumeResponse, error) {
// Get the existing volume info.
vol, err := s.getVolumeInfo(req.VolumeId)
if err != nil {
return nil, err
}
// Verify that the requested capability is compatible with the volume's
// capabilities.
if ok, err := csiutils.IsVolumeCapabilityCompatible(
req.VolumeCapability, vol.VolumeCapabilities); !ok {
if err != nil {
return nil, err
}
return nil, status.Error(
codes.InvalidArgument, "invalid volume capability")
}
// Get the path of the volume's device and see if it exists.
devPath := path.Join(s.dev, req.VolumeId)
ok, err := fileExists(devPath)
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s: %v", devPath, err)
}
// If the volume's device path already exists then check to see if
// this is an idempotent publish.
if !ok {
if err := os.MkdirAll(devPath, 0755); err != nil {
return nil, status.Errorf(
codes.Internal, "mkdir failed: %s: %v", devPath, err)
}
}
// Get the mount info to determine if the volume dir is already
// bind mounted to the device dir.
minfo, err := getMounts(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal, "failed to get mount info: %v", err)
}
mounted := false
for _, i := range minfo {
// If bindfs is not used then the device path will not match
// the volume path, otherwise test both the source and target.
if i.Source == vol.path && i.Path == devPath {
mounted = true
break
}
}
if !mounted {
if err := gofsutil.BindMount(ctx, vol.path, devPath); err != nil {
return nil, status.Errorf(
codes.Internal, "bind mount failed: src=%s, tgt=%s: %v",
vol.path, devPath, err)
}
}
return &csi.ControllerPublishVolumeResponse{
PublishVolumeInfo: map[string]string{"path": devPath},
}, nil
}
func (s *service) ControllerUnpublishVolume(
ctx context.Context,
req *csi.ControllerUnpublishVolumeRequest) (
*csi.ControllerUnpublishVolumeResponse, error) {
// Get the path of the volume and ensure it exists.
volPath := path.Join(s.vol, req.VolumeId)
if ok, err := fileExists(volPath); !ok {
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s: %v", volPath, err)
}
return nil, status.Error(codes.NotFound, volPath)
}
// Get the path of the volume's device.
devPath := path.Join(s.dev, req.VolumeId)
// Get the node's mount information.
minfo, err := getMounts(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal, "failed to get mount info: %v", err)
}
// The loop below unmounts the device path if it is mounted.
for _, i := range minfo {
// If there is a device that matches the volPath value and
// a path that matches the devPath value then unmount it as
// it is the subject of this request.
if i.Source == volPath && i.Path == devPath {
if err := gofsutil.Unmount(ctx, devPath); err != nil {
return nil, status.Errorf(codes.Internal,
"failed to unmount device dir: %s: %v", devPath, err)
}
}
}
// If the device path exists then remove it.
ok, err := fileExists(devPath)
if err != nil {
return nil, status.Errorf(codes.NotFound, "%s: %v", devPath, err)
}
if ok {
if err := os.RemoveAll(devPath); err != nil {
return nil, status.Errorf(codes.Internal,
"failed to remove device dir: %s: %v", devPath, err)
}
}
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
func (s *service) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (
*csi.ValidateVolumeCapabilitiesResponse, error) {
// Get the existing volume info.
vol, err := s.getVolumeInfo(req.VolumeId)
if err != nil {
return nil, err
}
supported := true
msg := ""
// Validate that the volume capabilities from the request are
// compatible with the existing volume's capabilities.
if ok, err := csiutils.AreVolumeCapabilitiesCompatible(
req.VolumeCapabilities, vol.VolumeCapabilities); !ok {
if err != nil {
return nil, err
}
supported = false
msg = "incompatible volume capabilities"
}
return &csi.ValidateVolumeCapabilitiesResponse{
Supported: supported,
Message: msg,
}, nil
}
func (s *service) ListVolumes(
ctx context.Context,
req *csi.ListVolumesRequest) (
*csi.ListVolumesResponse, error) {
fileNames, err := filepath.Glob(s.volGlob)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to list volume dir: %s: %v", s.volGlob, err)
}
rep := &csi.ListVolumesResponse{
Entries: make([]*csi.ListVolumesResponse_Entry, len(fileNames)),
}
for i, volInfoPath := range fileNames {
vol := volumeInfo{
path: path.Dir(volInfoPath),
infoPath: volInfoPath,
}
if err := vol.load(); err != nil {
return nil, err
}
rep.Entries[i] = &csi.ListVolumesResponse_Entry{
VolumeInfo: vol.toCSIVolInfo(),
}
}
return rep, nil
}
func (s *service) GetCapacity(
ctx context.Context,
req *csi.GetCapacityRequest) (
*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (s *service) ControllerGetCapabilities(
ctx context.Context,
req *csi.ControllerGetCapabilitiesRequest) (
*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: []*csi.ControllerServiceCapability{
&csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
},
},
},
&csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
},
},
},
&csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
},
},
},
},
}, nil
}
func (s *service) ControllerProbe(
ctx context.Context,
req *csi.ControllerProbeRequest) (
*csi.ControllerProbeResponse, error) {
return &csi.ControllerProbeResponse{}, nil
}