Skip to content

Commit

Permalink
Merge pull request kubernetes#32242 from jingxu97/bug-wrongvolume-9-2
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Fix race condition in updating attached volume between master and node

This PR tries to fix issue kubernetes#29324. The cause of this issue is that a race
condition happens when marking volumes as attached for node status. This
PR tries to clean up the logic of when and where to mark volumes as
attached/detached. Basically the workflow as follows,
1. When volume is attached sucessfully, the volume and node info is
added into nodesToUpdateStatusFor to mark the volume as attached to the
node.
2. When detach request comes in, it will check whether it is safe to
detach now. If the check passes, remove the volume from volumesToReportAsAttached
to indicate the volume is no longer considered as attached now.
Afterwards, reconciler tries to update node status and trigger detach
operation. If any of these operation fails, the volume is added back to
the volumesToReportAsAttached list showing that it is still attached.

These steps should make sure that kubelet get the right (might be
outdated) information about which volume is attached or not. It also
garantees that if detach operation is pending, kubelet should not
trigger any mount operations.
  • Loading branch information
Kubernetes Submit Queue committed Sep 12, 2016
2 parents 32a3296 + efaceb2 commit 6a9a93d
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func (adc *attachDetachController) getPVSpecFromCache(
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName string, volumesInUse []api.UniqueVolumeName) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
for _, volumeInUse := range volumesInUse {
Expand Down
200 changes: 121 additions & 79 deletions pkg/controller/volume/attachdetach/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
Expand All @@ -48,8 +50,6 @@ type ActualStateOfWorld interface {
// indicating the specified volume is attached to the specified node.
// A unique volume name is generated from the volumeSpec and returned on
// success.
// If the volume/node combo already exists, the detachRequestedTime is reset
// to zero.
// If volumeSpec is not an attachable volume plugin, an error is returned.
// If no volume with the name volumeName exists in the store, the volume is
// added.
Expand All @@ -66,22 +66,22 @@ type ActualStateOfWorld interface {
// the specified volume, an error is returned.
SetVolumeMountedByNode(volumeName api.UniqueVolumeName, nodeName string, mounted bool) error

// MarkDesireToDetach returns the difference between the current time and
// the DetachRequestedTime for the given volume/node combo. If the
// DetachRequestedTime is zero, it is set to the current time.
// If no volume with the name volumeName exists in the store, an error is
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
MarkDesireToDetach(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error)

// ResetNodeStatusUpdateNeeded resets statusUpdateNeeded for the specified
// node to false indicating the AttachedVolume field of the Node's Status
// object has been updated.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
ResetNodeStatusUpdateNeeded(nodeName string) error

// ResetDetachRequestTime resets the detachRequestTime to 0 which indicates there is no detach
// request any more for the volume
ResetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string)

// SetDetachRequestTime sets the detachRequestedTime to current time if this is no
// previous request (the previous detachRequestedTime is zero) and return the time elapsed
// since last request
SetDetachRequestTime(volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error)

// DeleteVolumeNode removes the given volume and node from the underlying
// store indicating the specified volume is no longer attached to the
// specified node.
Expand Down Expand Up @@ -126,9 +126,9 @@ type AttachedVolume struct {

// DetachRequestedTime is used to capture the desire to detach this volume.
// When the volume is newly created this value is set to time zero.
// It is set to current time, when MarkDesireToDetach(...) is called, if it
// It is set to current time, when SetDetachRequestTime(...) is called, if it
// was previously set to zero (other wise its value remains the same).
// It is reset to zero on AddVolumeNode(...) calls.
// It is reset to zero on ResetDetachRequestTime(...) calls.
DetachRequestedTime time.Time
}

Expand Down Expand Up @@ -234,6 +234,20 @@ func (asw *actualStateOfWorld) MarkVolumeAsDetached(
asw.DeleteVolumeNode(volumeName, nodeName)
}

func (asw *actualStateOfWorld) RemoveVolumeFromReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) error {
asw.Lock()
defer asw.Unlock()
return asw.removeVolumeFromReportAsAttached(volumeName, nodeName)
}

func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) {
asw.Lock()
defer asw.Unlock()
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}

func (asw *actualStateOfWorld) AddVolumeNode(
volumeSpec *volume.Spec, nodeName string, devicePath string) (api.UniqueVolumeName, error) {
asw.Lock()
Expand Down Expand Up @@ -267,7 +281,7 @@ func (asw *actualStateOfWorld) AddVolumeNode(
asw.attachedVolumes[volumeName] = volumeObj
}

nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
_, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
// Create object if it doesn't exist.
volumeObj.nodesAttachedTo[nodeName] = nodeAttachedTo{
Expand All @@ -276,53 +290,24 @@ func (asw *actualStateOfWorld) AddVolumeNode(
mountedByNodeSetCount: 0,
detachRequestedTime: time.Time{},
}
} else if !nodeObj.detachRequestedTime.IsZero() {
// Reset detachRequestedTime values if object exists and time is non-zero
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}

nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// Create object if it doesn't exist
nodeToUpdate = nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if !nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
} else {
glog.V(5).Infof("Volume %q is already added to attachedVolume list to the node %q",
volumeName,
nodeName)
}

asw.addVolumeToReportAsAttached(volumeName, nodeName)
return volumeName, nil
}

func (asw *actualStateOfWorld) SetVolumeMountedByNode(
volumeName api.UniqueVolumeName, nodeName string, mounted bool) error {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) volumeName does not exist",
volumeName,
nodeName,
mounted)
}

nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return fmt.Errorf(
"failed to SetVolumeMountedByNode(volumeName=%v, nodeName=%q, mounted=%v) nodeName does not exist",
volumeName,
nodeName,
mounted)
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
return fmt.Errorf("Failed to SetVolumeMountedByNode with error: %v", err)
}

if mounted {
Expand All @@ -337,37 +322,70 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode(

nodeObj.mountedByNode = mounted
volumeObj.nodesAttachedTo[nodeName] = nodeObj

glog.V(4).Infof("SetVolumeMountedByNode volume %v to the node %q mounted %q",
volumeName,
nodeName,
mounted)
return nil
}

func (asw *actualStateOfWorld) MarkDesireToDetach(
volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) {
func (asw *actualStateOfWorld) ResetDetachRequestTime(
volumeName api.UniqueVolumeName, nodeName string) {
asw.Lock()
defer asw.Unlock()

volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) volumeName does not exist",
volumeName,
nodeName)
volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to ResetDetachRequestTime with error: %v", err)
return
}
nodeObj.detachRequestedTime = time.Time{}
volumeObj.nodesAttachedTo[nodeName] = nodeObj
}

nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if !nodeExists {
return time.Millisecond * 0, fmt.Errorf(
"failed to MarkDesireToDetach(volumeName=%v, nodeName=%q) nodeName does not exist",
volumeName,
nodeName)
}
func (asw *actualStateOfWorld) SetDetachRequestTime(
volumeName api.UniqueVolumeName, nodeName string) (time.Duration, error) {
asw.Lock()
defer asw.Unlock()

volumeObj, nodeObj, err := asw.getNodeAndVolume(volumeName, nodeName)
if err != nil {
return 0, fmt.Errorf("Failed to set detach request time with error: %v", err)
}
// If there is no previous detach request, set it to the current time
if nodeObj.detachRequestedTime.IsZero() {
nodeObj.detachRequestedTime = time.Now()
volumeObj.nodesAttachedTo[nodeName] = nodeObj
glog.V(4).Infof("Set detach request time to current time for volume %v on node %q",
volumeName,
nodeName)
}
return time.Since(nodeObj.detachRequestedTime), nil
}

// Get the volume and node object from actual state of world
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) getNodeAndVolume(
volumeName api.UniqueVolumeName, nodeName string) (attachedVolume, nodeAttachedTo, error) {

volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if volumeExists {
nodeObj, nodeExists := volumeObj.nodesAttachedTo[nodeName]
if nodeExists {
return volumeObj, nodeObj, nil
}
}

return attachedVolume{}, nodeAttachedTo{}, fmt.Errorf("volume %v is no longer attached to the node %q",
volumeName,
nodeName)
}

// Remove the volumeName from the node's volumesToReportAsAttached list
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) removeVolumeFromReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) error {

// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
Expand All @@ -376,10 +394,43 @@ func (asw *actualStateOfWorld) MarkDesireToDetach(
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
return nil
}
}
return fmt.Errorf("volume %q or node %q does not exist in volumesToReportAsAttached list",
volumeName,
nodeName)

return time.Since(volumeObj.nodesAttachedTo[nodeName].detachRequestedTime), nil
}

// Add the volumeName to the node's volumesToReportAsAttached list
// This is an internal function and caller should acquire and release the lock
func (asw *actualStateOfWorld) addVolumeToReportAsAttached(
volumeName api.UniqueVolumeName, nodeName string) {
// In case the volume/node entry is no longer in attachedVolume list, skip the rest
if _, _, err := asw.getNodeAndVolume(volumeName, nodeName); err != nil {
glog.V(4).Infof("Volume %q is no longer attached to node %q", volumeName, nodeName)
return
}
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if !nodeToUpdateExists {
// Create object if it doesn't exist
nodeToUpdate = nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[api.UniqueVolumeName]api.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
glog.V(4).Infof("Add new node %q to nodesToUpdateStatusFor", nodeName)
}
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if !nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
nodeToUpdate.volumesToReportAsAttached[volumeName] = volumeName
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
glog.V(4).Infof("Report volume %q as attached to node %q", volumeName, nodeName)
}
}

func (asw *actualStateOfWorld) ResetNodeStatusUpdateNeeded(
Expand Down Expand Up @@ -419,16 +470,7 @@ func (asw *actualStateOfWorld) DeleteVolumeNode(
}

// Remove volume from volumes to report as attached
nodeToUpdate, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
_, nodeToUpdateVolumeExists :=
nodeToUpdate.volumesToReportAsAttached[volumeName]
if nodeToUpdateVolumeExists {
nodeToUpdate.statusUpdateNeeded = true
delete(nodeToUpdate.volumesToReportAsAttached, volumeName)
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate
}
}
asw.removeVolumeFromReportAsAttached(volumeName, nodeName)
}

func (asw *actualStateOfWorld) VolumeNodeExists(
Expand Down

0 comments on commit 6a9a93d

Please sign in to comment.