Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix attach volume to instance repeatedly #44537

Merged
merged 2 commits into from
May 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 39 additions & 3 deletions pkg/cloudprovider/providers/openstack/openstack_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ type VolumeCreateOpts struct {
Metadata map[string]string
}

func (volumes *VolumesV1) createVolume(opts VolumeCreateOpts) (string, string, error) {
const (
VolumeAvailableStatus = "available"
VolumeInUseStatus = "in-use"
VolumeDeletedStatus = "deleted"
VolumeErrorStatus = "error"
)

func (volumes *VolumesV1) createVolume(opts VolumeCreateOpts) (string, string, error) {
create_opts := volumes_v1.CreateOpts{
Name: opts.Name,
Size: opts.Size,
Expand All @@ -92,7 +98,6 @@ func (volumes *VolumesV1) createVolume(opts VolumeCreateOpts) (string, string, e
}

func (volumes *VolumesV2) createVolume(opts VolumeCreateOpts) (string, string, error) {

create_opts := volumes_v2.CreateOpts{
Name: opts.Name,
Size: opts.Size,
Expand Down Expand Up @@ -201,12 +206,33 @@ func (volumes *VolumesV2) deleteVolume(volumeName string) error {
return err
}

func (os *OpenStack) OperationPending(diskName string) (bool, string, error) {
volume, err := os.getVolume(diskName)
if err != nil {
return false, "", err
}
volumeStatus := volume.Status
if volumeStatus == VolumeErrorStatus {
glog.Errorf("status of volume %s is %s", diskName, volumeStatus)
return false, volumeStatus, nil
}
if volumeStatus == VolumeAvailableStatus || volumeStatus == VolumeInUseStatus || volumeStatus == VolumeDeletedStatus {
return false, volume.Status, nil
}
return true, volumeStatus, nil
}

// Attaches given cinder volume to the compute running kubelet
func (os *OpenStack) AttachDisk(instanceID string, diskName string) (string, error) {
volume, err := os.getVolume(diskName)
if err != nil {
return "", err
}
if volume.Status != VolumeAvailableStatus {
errmsg := fmt.Sprintf("volume %s status is %s, not %s, can not be attached to instance %s.", volume.Name, volume.Status, VolumeAvailableStatus, instanceID)
glog.Errorf(errmsg)
return "", errors.New(errmsg)
}
cClient, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{
Region: os.region,
})
Expand Down Expand Up @@ -245,6 +271,11 @@ func (os *OpenStack) DetachDisk(instanceID string, partialDiskId string) error {
if err != nil {
return err
}
if volume.Status != VolumeInUseStatus {
errmsg := fmt.Sprintf("can not detach volume %s, its status is %s.", volume.Name, volume.Status)
glog.Errorf(errmsg)
return errors.New(errmsg)
}
cClient, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{
Region: os.region,
})
Expand Down Expand Up @@ -369,6 +400,11 @@ func (os *OpenStack) GetAttachmentDiskPath(instanceID string, diskName string) (
if err != nil {
return "", err
}
if volume.Status != VolumeInUseStatus {
errmsg := fmt.Sprintf("can not get device path of volume %s, its status is %s.", volume.Name, volume.Status)
glog.Errorf(errmsg)
return "", errors.New(errmsg)
}
if volume.AttachedServerId != "" {
if instanceID == volume.AttachedServerId {
// Attachment[0]["device"] points to the device path
Expand All @@ -380,7 +416,7 @@ func (os *OpenStack) GetAttachmentDiskPath(instanceID string, diskName string) (
return "", errors.New(errMsg)
}
}
return "", fmt.Errorf("volume %s is not attached to %s", diskName, instanceID)
return "", fmt.Errorf("volume %s has not ServerId.", diskName)
}

// query if a volume is attached to a compute instance
Expand Down
48 changes: 43 additions & 5 deletions pkg/cloudprovider/providers/rackspace/rackspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)

const ProviderName = "rackspace"
const metaDataPath = "/media/configdrive/openstack/latest/meta_data.json"
const (
ProviderName = "rackspace"
MetaDataPath = "/media/configdrive/openstack/latest/meta_data.json"
VolumeAvailableStatus = "available"
VolumeInUseStatus = "in-use"
VolumeErrorStatus = "error"
)

var ErrNotFound = errors.New("Failed to find object")
var ErrMultipleResults = errors.New("Multiple results where only one expected")
Expand Down Expand Up @@ -146,16 +151,16 @@ func parseMetaData(file io.Reader) (string, error) {
metaData := MetaData{}
err = json.Unmarshal(metaDataBytes, &metaData)
if err != nil {
return "", fmt.Errorf("Cannot parse %s: %v", metaDataPath, err)
return "", fmt.Errorf("Cannot parse %s: %v", MetaDataPath, err)
}

return metaData.UUID, nil
}

func readInstanceID() (string, error) {
file, err := os.Open(metaDataPath)
file, err := os.Open(MetaDataPath)
if err != nil {
return "", fmt.Errorf("Cannot open %s: %v", metaDataPath, err)
return "", fmt.Errorf("Cannot open %s: %v", MetaDataPath, err)
}
defer file.Close()

Expand Down Expand Up @@ -487,13 +492,35 @@ func (rs *Rackspace) DeleteVolume(volumeName string) error {
return errors.New("unimplemented")
}

func (rs *Rackspace) OperationPending(diskName string) (bool, string, error) {
disk, err := rs.getVolume(diskName)
if err != nil {
return false, "", err
}
volumeStatus := disk.Status
if volumeStatus == VolumeErrorStatus {
glog.Errorf("status of volume %s is %s", diskName, volumeStatus)
return false, volumeStatus, nil
}
if volumeStatus == VolumeAvailableStatus || volumeStatus == VolumeInUseStatus {
return false, disk.Status, nil
}
return true, volumeStatus, nil
}

// Attaches given cinder volume to the compute running kubelet
func (rs *Rackspace) AttachDisk(instanceID string, diskName string) (string, error) {
disk, err := rs.getVolume(diskName)
if err != nil {
return "", err
}

if disk.Status != VolumeAvailableStatus {
errmsg := fmt.Sprintf("volume %s status is %s, not %s, can not be attached to instance %s.", disk.Name, disk.Status, VolumeAvailableStatus, instanceID)
glog.Errorf(errmsg)
return "", errors.New(errmsg)
}

compute, err := rs.getComputeClient()
if err != nil {
return "", err
Expand Down Expand Up @@ -589,6 +616,12 @@ func (rs *Rackspace) DetachDisk(instanceID string, partialDiskId string) error {
return err
}

if disk.Status != VolumeInUseStatus {
errmsg := fmt.Sprintf("can not detach volume %s, its status is %s.", disk.Name, disk.Status)
glog.Errorf(errmsg)
return errors.New(errmsg)
}

compute, err := rs.getComputeClient()
if err != nil {
return err
Expand Down Expand Up @@ -626,6 +659,11 @@ func (rs *Rackspace) GetAttachmentDiskPath(instanceID string, diskName string) (
if err != nil {
return "", err
}
if disk.Status != VolumeInUseStatus {
errmsg := fmt.Sprintf("can not get device path of volume %s, its status is %s.", disk.Name, disk.Status)
glog.Errorf(errmsg)
return "", errors.New(errmsg)
}
if len(disk.Attachments) > 0 && disk.Attachments[0]["server_id"] != nil {
if instanceID == disk.Attachments[0]["server_id"] {
// Attachment[0]["device"] points to the device path
Expand Down
26 changes: 22 additions & 4 deletions pkg/volume/cinder/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.Nod
return "", err
}

pending, volumeStatus, err := attacher.cinderProvider.OperationPending(volumeID)
if err != nil {
return "", err
}
if pending {
glog.Infof("status of volume %q is %s, wait it to be finished", volumeID, volumeStatus)
return "", nil
}

attached, err := attacher.cinderProvider.DiskIsAttached(volumeID, instanceid)
if err != nil {
// Log error and continue with attach
Expand All @@ -89,18 +98,27 @@ func (attacher *cinderDiskAttacher) Attach(spec *volume.Spec, nodeName types.Nod
if err == nil {
glog.Infof("Attach operation successful: volume %q attached to instance %q.", volumeID, instanceid)
} else {
glog.Infof("Attach volume %q to instance %q failed with %v", volumeID, instanceid, err)
glog.Infof("Attach volume %q to instance %q failed with: %v", volumeID, instanceid, err)
return "", err
}
}

devicePath, err := attacher.cinderProvider.GetAttachmentDiskPath(instanceid, volumeID)
// When volume's status is 'attaching', DiskIsAttached() return <false, nil>, and can't get device path.
// We should get device path next time and do not return err.
devicePath := ""
pending, _, err = attacher.cinderProvider.OperationPending(volumeID)
if err != nil {
glog.Infof("Attach volume %q to instance %q failed with %v", volumeID, instanceid, err)
return "", err
}
if !pending {
devicePath, err = attacher.cinderProvider.GetAttachmentDiskPath(instanceid, volumeID)
if err != nil {
glog.Infof("Can not get device path of volume %q which be attached to instance %q, failed with: %v", volumeID, instanceid, err)
return "", err
}
}

return devicePath, err
return devicePath, nil
}

func (attacher *cinderDiskAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
Expand Down
90 changes: 67 additions & 23 deletions pkg/volume/cinder/attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

const VolumeStatusPending = "pending"
const VolumeStatusDone = "done"

func TestGetDeviceName_Volume(t *testing.T) {
plugin := newPlugin()
name := "my-cinder-volume"
Expand Down Expand Up @@ -88,6 +91,7 @@ type testcase struct {
// For fake GCE:
attach attachCall
detach detachCall
operationPending operationPendingCall
diskIsAttached diskIsAttachedCall
disksAreAttached disksAreAttachedCall
diskPath diskPathCall
Expand All @@ -104,6 +108,8 @@ type testcase struct {
func TestAttachDetach(t *testing.T) {
diskName := "disk"
instanceID := "instance"
pending := VolumeStatusPending
done := VolumeStatusDone
nodeName := types.NodeName("nodeName")
readOnly := false
spec := createVolSpec(diskName, readOnly)
Expand All @@ -115,11 +121,12 @@ func TestAttachDetach(t *testing.T) {
tests := []testcase{
// Successful Attach call
{
name: "Attach_Positive",
instanceID: instanceID,
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, nil},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
name: "Attach_Positive",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, false, done, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, nil},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
Expand All @@ -129,24 +136,39 @@ func TestAttachDetach(t *testing.T) {

// Disk is already attached
{
name: "Attach_Positive_AlreadyAttached",
instanceID: instanceID,
diskIsAttached: diskIsAttachedCall{diskName, instanceID, true, nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
name: "Attach_Positive_AlreadyAttached",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, false, done, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, true, nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedResult: "/dev/sda",
},

// Disk is attaching
{
name: "Attach_is_attaching",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, true, pending, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
},
expectedResult: "",
},

// DiskIsAttached fails and Attach succeeds
{
name: "Attach_Positive_CheckFails",
instanceID: instanceID,
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
name: "Attach_Positive_CheckFails",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, false, done, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
Expand All @@ -156,10 +178,11 @@ func TestAttachDetach(t *testing.T) {

// Attach call fails
{
name: "Attach_Negative",
instanceID: instanceID,
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "/dev/sda", attachError},
name: "Attach_Negative",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, false, done, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "/dev/sda", attachError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
Expand All @@ -169,11 +192,12 @@ func TestAttachDetach(t *testing.T) {

// GetAttachmentDiskPath call fails
{
name: "Attach_Negative_DiskPatchFails",
instanceID: instanceID,
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "", diskPathError},
name: "Attach_Negative_DiskPatchFails",
instanceID: instanceID,
operationPending: operationPendingCall{diskName, false, done, nil},
diskIsAttached: diskIsAttachedCall{diskName, instanceID, false, diskCheckError},
attach: attachCall{diskName, instanceID, "", nil},
diskPath: diskPathCall{diskName, instanceID, "", diskPathError},
test: func(testcase *testcase) (string, error) {
attacher := newAttacher(testcase)
return attacher.Attach(spec, nodeName)
Expand Down Expand Up @@ -384,6 +408,13 @@ type detachCall struct {
ret error
}

type operationPendingCall struct {
diskName string
pending bool
volumeStatus string
ret error
}

type diskIsAttachedCall struct {
diskName, instanceID string
isAttached bool
Expand Down Expand Up @@ -453,6 +484,19 @@ func (testcase *testcase) DetachDisk(instanceID string, partialDiskId string) er
return expected.ret
}

func (testcase *testcase) OperationPending(diskName string) (bool, string, error) {
expected := &testcase.operationPending

if expected.volumeStatus == VolumeStatusPending {
glog.V(4).Infof("OperationPending call: %s, returning %v, %v, %v", diskName, expected.pending, expected.volumeStatus, expected.ret)
return true, expected.volumeStatus, expected.ret
}

glog.V(4).Infof("OperationPending call: %s, returning %v, %v, %v", diskName, expected.pending, expected.volumeStatus, expected.ret)

return false, expected.volumeStatus, expected.ret
}

func (testcase *testcase) DiskIsAttached(diskName, instanceID string) (bool, error) {
expected := &testcase.diskIsAttached

Expand Down