Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge attach and detach common func #80420

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
166 changes: 82 additions & 84 deletions pkg/volume/csi/csi_attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"k8s.io/klog"

v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -51,6 +51,8 @@ type csiAttacher struct {
csiClient csiClient
}

type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)

// volume.Attacher methods
var _ volume.Attacher = &csiAttacher{}

Expand Down Expand Up @@ -148,79 +150,18 @@ func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, tim
}

func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {

klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
}
successful, err := verifyAttachmentStatus(attach, volumeHandle)
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
if err != nil {
return "", err
}
if successful {
return attachID, nil
}

watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return "", fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}

ch := watcher.ResultChan()
defer watcher.Stop()

for {
select {
case event, ok := <-ch:
if !ok {
klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
return "", errors.New("volume attachment watch channel had been closed")
}

switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
successful, err := verifyAttachmentStatus(attach, volumeHandle)
if err != nil {
return "", err
}
if successful {
return attachID, nil
}
case watch.Deleted:
// if deleted, fail fast
klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", attachID))
return "", errors.New("volume attachment has been deleted")

case watch.Error:
klog.Warningf("waitForVolumeAttachmentInternal received watch error: %v", event)
}

case <-timer.C:
klog.Error(log("attacher.WaitForAttach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return "", fmt.Errorf("attachment timeout for volume %v", volumeHandle)
}
}
}

func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// if being deleted, fail fast
if attachment.GetDeletionTimestamp() != nil {
klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
return false, errors.New("volume attachment is being deleted")
}
// attachment OK
if attachment.Status.Attached {
return true, nil
}
// driver reports attach error
attachErr := attachment.Status.AttachError
if attachErr != nil {
klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return false, errors.New(attachErr.Message)
}
return false, nil
return attach.Name, nil
}

func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
Expand Down Expand Up @@ -318,8 +259,8 @@ func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMo
}
if err = saveVolumeData(dataDir, volDataFileName, data); err != nil {
klog.Error(log("failed to save volume info data: %v", err))
if cleanerr := os.RemoveAll(dataDir); cleanerr != nil {
klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanerr))
if cleanErr := os.RemoveAll(dataDir); cleanErr != nil {
klog.Error(log("failed to remove dir after error [%s]: %v", dataDir, cleanErr))
}
return err
}
Expand Down Expand Up @@ -445,7 +386,8 @@ func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
}

klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
return c.waitForVolumeDetachment(volID, attachID)
err := c.waitForVolumeDetachment(volID, attachID)
return err
}

func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) error {
Expand All @@ -458,7 +400,8 @@ func (c *csiAttacher) waitForVolumeDetachment(volumeHandle, attachID string) err
return c.waitForVolumeDetachmentInternal(volumeHandle, attachID, timer, timeout)
}

func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) error {
func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID string, timer *time.Timer,
timeout time.Duration) error {
klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
attach, err := c.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
Expand All @@ -469,16 +412,26 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str
}
return errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
}
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyDetachmentStatus)
if err != nil {
return err
}
return err
}

func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}

watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(meta.SingleObject(meta.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
if err != nil {
return errors.New(log("watch error:%v for volume %v", err, volumeHandle))
return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
}

ch := watcher.ResultChan()
Expand All @@ -494,24 +447,30 @@ func (c *csiAttacher) waitForVolumeDetachmentInternal(volumeHandle, attachID str
switch event.Type {
case watch.Added, watch.Modified:
attach, _ := event.Object.(*storage.VolumeAttachment)
// driver reports attach error
detachErr := attach.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment [%v] for volume [%s] failed: %v", attachID, volumeHandle, detachErr.Message))
return errors.New(detachErr.Message)
successful, err := verifyStatus(attach, volumeHandle)
if err != nil {
return err
}
if successful {
return nil
}
case watch.Deleted:
//object deleted
// set attach nil to get different results
// for detachment, a deleted event means successful detachment, should return success
// for attachment, should return fail
if successful, err := verifyStatus(nil, volumeHandle); !successful {
return err
}
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
return nil

case watch.Error:
klog.Warningf("waitForVolumeDetachmentInternal received watch error: %v", event)
klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
}

case <-timer.C:
klog.Error(log("detacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("detachment timeout for volume %v", volumeHandle)
klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
}
}
}
Expand Down Expand Up @@ -638,3 +597,42 @@ func getDriverAndVolNameFromDeviceMountPath(k8s kubernetes.Interface, deviceMoun

return csiSource.Driver, csiSource.VolumeHandle, nil
}

func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// when we received a deleted event during attachment, fail fast
if attachment == nil {
klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle))
return false, errors.New("volume attachment has been deleted")
}
// if being deleted, fail fast
if attachment.GetDeletionTimestamp() != nil {
klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
return false, errors.New("volume attachment is being deleted")
}
// attachment OK
if attachment.Status.Attached {
return true, nil
}
// driver reports attach error
attachErr := attachment.Status.AttachError
if attachErr != nil {
klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
return false, errors.New(attachErr.Message)
}
return false, nil
}

func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
// when we received a deleted event during detachment
// it means we have successfully detached it.
if attachment == nil {
return true, nil
}
// driver reports detach error
detachErr := attachment.Status.DetachError
if detachErr != nil {
klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message))
return false, errors.New(detachErr.Message)
}
return false, nil
}