Skip to content

Commit

Permalink
Merge 72b6c04 into 454fdb4
Browse files Browse the repository at this point in the history
  • Loading branch information
AndyXiangLi committed Jun 8, 2021
2 parents 454fdb4 + 72b6c04 commit 0746057
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 65 deletions.
3 changes: 3 additions & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,9 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt
},
}

if ctx.Err() != nil {
return false, fmt.Errorf("context error while checking volume status")
}
volume, err := c.getVolume(ctx, request)
if err != nil {
// The VolumeNotFound error is special -- we don't need to wait for it to repeat
Expand Down
192 changes: 140 additions & 52 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,21 @@ func newControllerService(driverOptions *DriverOptions) controllerService {

func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(4).Infof("CreateVolume: called with args %+v", *req)
volName := req.GetName()
if len(volName) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
if err := validateCreateVolumeRequest(req); err != nil {
return nil, err
}

volSizeBytes, err := getVolSizeBytes(req)
if err != nil {
return nil, err
}
volName := req.GetName()

volCaps := req.GetVolumeCapabilities()
if len(volCaps) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities not provided")
}

if !isValidVolumeCapabilities(volCaps) {
modes := util.GetAccessModes(volCaps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported."
return nil, status.Error(codes.InvalidArgument, errString)
// check if a request is already in-flight
if ok := d.inFlight.Insert(volName); !ok {
msg := fmt.Sprintf("Create volume request for %s is already in progress", volName)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(volName)

disk, err := d.cloud.GetDiskByName(ctx, volName, volSizeBytes)
if err != nil {
Expand Down Expand Up @@ -217,13 +211,6 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
return newCreateVolumeResponse(disk), nil
}

// check if a request is already in-flight because the CreateVolume API is not idempotent
if ok := d.inFlight.Insert(req.String()); !ok {
msg := fmt.Sprintf("Create volume request for %s is already in progress", volName)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(req.String())

// create a new volume
zone := pickAvailabilityZone(req.GetAccessibilityRequirements())
outpostArn := getOutpostArn(req.GetAccessibilityRequirements())
Expand Down Expand Up @@ -264,12 +251,40 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol
return newCreateVolumeResponse(disk), nil
}

func validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
volName := req.GetName()
if len(volName) == 0 {
return status.Error(codes.InvalidArgument, "Volume name not provided")
}

volCaps := req.GetVolumeCapabilities()
if len(volCaps) == 0 {
return status.Error(codes.InvalidArgument, "Volume capabilities not provided")
}

if !isValidVolumeCapabilities(volCaps) {
modes := util.GetAccessModes(volCaps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported."
return status.Error(codes.InvalidArgument, errString)
}
return nil
}

func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
klog.V(4).Infof("DeleteVolume: called with args: %+v", *req)
if err := validateDeleteVolumeRequest(req); err != nil {
return nil, err
}

volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")

// check if a request is already in-flight
if ok := d.inFlight.Insert(volumeID); !ok {
msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, volumeID)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(volumeID)

if _, err := d.cloud.DeleteDisk(ctx, volumeID); err != nil {
if err == cloud.ErrNotFound {
Expand All @@ -282,30 +297,30 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}

func validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
if len(req.GetVolumeId()) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID not provided")
}
return nil
}

func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
klog.V(4).Infof("ControllerPublishVolume: called with args %+v", *req)
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
if err := validateControllerPublishVolumeRequest(req); err != nil {
return nil, err
}

volumeID := req.GetVolumeId()
nodeID := req.GetNodeId()
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")
}

volCap := req.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}

caps := []*csi.VolumeCapability{volCap}
if !isValidVolumeCapabilities(caps) {
modes := util.GetAccessModes(caps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported."
return nil, status.Error(codes.InvalidArgument, errString)
// check if a request is already in-flight, eventually we need to support the ability to publish the same volume onto different nodes concurrently
// Use nodeID/VolumeId as index to prevent operation like attach and detach same volume on same node at same time.
lockedVolumeIdNodeId := fmt.Sprintf("%s/%s", nodeID, volumeID)
if ok := d.inFlight.Insert(lockedVolumeIdNodeId); !ok {
msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, lockedVolumeIdNodeId)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(lockedVolumeIdNodeId)

if !d.cloud.IsExistInstance(ctx, nodeID) {
return nil, status.Errorf(codes.NotFound, "Instance %q not found", nodeID)
Expand Down Expand Up @@ -333,17 +348,47 @@ func (d *controllerService) ControllerPublishVolume(ctx context.Context, req *cs
return &csi.ControllerPublishVolumeResponse{PublishContext: pvInfo}, nil
}

func validateControllerPublishVolumeRequest(req *csi.ControllerPublishVolumeRequest) error {
if len(req.GetVolumeId()) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID not provided")
}

if len(req.GetNodeId()) == 0 {
return status.Error(codes.InvalidArgument, "Node ID not provided")
}

volCap := req.GetVolumeCapability()
if volCap == nil {
return status.Error(codes.InvalidArgument, "Volume capability not provided")
}

caps := []*csi.VolumeCapability{volCap}
if !isValidVolumeCapabilities(caps) {
modes := util.GetAccessModes(caps)
stringModes := strings.Join(*modes, ", ")
errString := "Volume capabilities " + stringModes + " not supported. Only AccessModes[ReadWriteOnce] supported."
return status.Error(codes.InvalidArgument, errString)
}
return nil
}

func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
klog.V(4).Infof("ControllerUnpublishVolume: called with args %+v", *req)
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
if err := validateControllerUnpublishVolumeRequest(req); err != nil {
return nil, err
}

volumeID := req.GetVolumeId()
nodeID := req.GetNodeId()
if len(nodeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Node ID not provided")

// check if a request is already in-flight, eventually we need to support the ability to publish the same volume onto different nodes concurrently
// Use nodeID/VolumeId as index to prevent operation like attach and detach same volume on same node at same time.
lockedVolumeIdNodeId := fmt.Sprintf("%s/%s", nodeID, volumeID)
if ok := d.inFlight.Insert(lockedVolumeIdNodeId); !ok {
msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, lockedVolumeIdNodeId)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(lockedVolumeIdNodeId)

if err := d.cloud.DetachDisk(ctx, volumeID, nodeID); err != nil {
if err == cloud.ErrNotFound {
Expand All @@ -356,6 +401,18 @@ func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, req *
return &csi.ControllerUnpublishVolumeResponse{}, nil
}

func validateControllerUnpublishVolumeRequest(req *csi.ControllerUnpublishVolumeRequest) error {
if len(req.GetVolumeId()) == 0 {
return status.Error(codes.InvalidArgument, "Volume ID not provided")
}

if len(req.GetNodeId()) == 0 {
return status.Error(codes.InvalidArgument, "Node ID not provided")
}

return nil
}

func (d *controllerService) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
klog.V(4).Infof("ControllerGetCapabilities: called with args %+v", *req)
var caps []*csi.ControllerServiceCapability
Expand Down Expand Up @@ -489,15 +546,20 @@ func isValidVolumeContext(volContext map[string]string) bool {

func (d *controllerService) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
klog.V(4).Infof("CreateSnapshot: called with args %+v", req)
snapshotName := req.GetName()
if len(snapshotName) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot name not provided")
if err := validateCreateSnapshotRequest(req); err != nil {
return nil, err
}

snapshotName := req.GetName()
volumeID := req.GetSourceVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot volume source ID not provided")

// check if a request is already in-flight
if ok := d.inFlight.Insert(snapshotName); !ok {
msg := fmt.Sprintf(internal.VolumeOperationAlreadyExistsErrorMsg, snapshotName)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(snapshotName)

snapshot, err := d.cloud.GetSnapshotByName(ctx, snapshotName)
if err != nil && err != cloud.ErrNotFound {
klog.Errorf("Error looking for the snapshot %s: %v", snapshotName, err)
Expand Down Expand Up @@ -535,12 +597,31 @@ func (d *controllerService) CreateSnapshot(ctx context.Context, req *csi.CreateS
return newCreateSnapshotResponse(snapshot)
}

func validateCreateSnapshotRequest(req *csi.CreateSnapshotRequest) error {
if len(req.GetName()) == 0 {
return status.Error(codes.InvalidArgument, "Snapshot name not provided")
}

if len(req.GetSourceVolumeId()) == 0 {
return status.Error(codes.InvalidArgument, "Snapshot volume source ID not provided")
}
return nil
}

func (d *controllerService) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
klog.V(4).Infof("DeleteSnapshot: called with args %+v", req)
if err := validateDeleteSnapshotRequest(req); err != nil {
return nil, err
}

snapshotID := req.GetSnapshotId()
if len(snapshotID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot ID not provided")

// check if a request is already in-flight
if ok := d.inFlight.Insert(snapshotID); !ok {
msg := fmt.Sprintf("DeleteSnapshot for Snapshot %s is already in progress", snapshotID)
return nil, status.Error(codes.Aborted, msg)
}
defer d.inFlight.Delete(snapshotID)

if _, err := d.cloud.DeleteSnapshot(ctx, snapshotID); err != nil {
if err == cloud.ErrNotFound {
Expand All @@ -553,6 +634,13 @@ func (d *controllerService) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return &csi.DeleteSnapshotResponse{}, nil
}

func validateDeleteSnapshotRequest(req *csi.DeleteSnapshotRequest) error {
if len(req.GetSnapshotId()) == 0 {
return status.Error(codes.InvalidArgument, "Snapshot ID not provided")
}
return nil
}

func (d *controllerService) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
klog.V(4).Infof("ListSnapshots: called with args %+v", req)
var snapshots []*cloud.Snapshot
Expand Down

0 comments on commit 0746057

Please sign in to comment.