Skip to content

Commit

Permalink
Node status updater now deletes the node entry in attach updates when…
Browse files Browse the repository at this point in the history
… node is missing in NodeInformer cache. Fixes kubernetes#42438.

- Added RemoveNodeFromAttachUpdates as part of node status updater operations.
  • Loading branch information
verult committed Jun 10, 2017
1 parent e04b145 commit 1f42790
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 7 deletions.
17 changes: 17 additions & 0 deletions pkg/controller/volume/attachdetach/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ type ActualStateOfWorld interface {

// GetNodesToUpdateStatusFor returns the map of nodeNames to nodeToUpdateStatusFor
GetNodesToUpdateStatusFor() map[types.NodeName]nodeToUpdateStatusFor

// Removes the given node from the record of attach updates. The node's entire
// volumesToReportAsAttached list is removed.
RemoveNodeFromAttachUpdates(nodeName types.NodeName) error
}

// AttachedVolume represents a volume that is attached to a node.
Expand Down Expand Up @@ -254,6 +258,19 @@ func (asw *actualStateOfWorld) AddVolumeToReportAsAttached(
asw.addVolumeToReportAsAttached(volumeName, nodeName)
}

func (asw *actualStateOfWorld) RemoveNodeFromAttachUpdates(nodeName types.NodeName) error {
asw.Lock()
defer asw.Unlock()

_, nodeToUpdateExists := asw.nodesToUpdateStatusFor[nodeName]
if nodeToUpdateExists {
delete(asw.nodesToUpdateStatusFor, nodeName)
return nil
}
return fmt.Errorf("node %q does not exist in volumesToReportAsAttached list",
nodeName)
}

func (asw *actualStateOfWorld) AddVolumeNode(
volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) (v1.UniqueVolumeName, error) {
asw.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,89 @@ func Test_updateNodeStatusUpdateNeededError(t *testing.T) {
}
}

// Test_RemoveNodeFromAttachUpdates_Positive expects an entire node entry to be removed
// from nodesToUpdateStatusFor
func Test_RemoveNodeFromAttachUpdates_Positive(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
nodeName := types.NodeName("node-1")
nodeToUpdate := nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[v1.UniqueVolumeName]v1.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate

// Act
err := asw.RemoveNodeFromAttachUpdates(nodeName)

// Assert
if err != nil {
t.Fatalf("RemoveNodeFromAttachUpdates should not return error, but got: %v", err)
}
if len(asw.nodesToUpdateStatusFor) > 0 {
t.Fatal("nodesToUpdateStatusFor should be empty as its only entry has been deleted.")
}
}

// Test_RemoveNodeFromAttachUpdates_Negative_NodeDoesNotExist expects an error to be thrown
// when nodeName is not in nodesToUpdateStatusFor.
func Test_RemoveNodeFromAttachUpdates_Negative_NodeDoesNotExist(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}
nodeName := types.NodeName("node-1")
nodeToUpdate := nodeToUpdateStatusFor{
nodeName: nodeName,
statusUpdateNeeded: true,
volumesToReportAsAttached: make(map[v1.UniqueVolumeName]v1.UniqueVolumeName),
}
asw.nodesToUpdateStatusFor[nodeName] = nodeToUpdate

// Act
err := asw.RemoveNodeFromAttachUpdates("node-2")

// Assert
if err == nil {
t.Fatal("RemoveNodeFromAttachUpdates should return an error as the nodeName doesn't exist.")
}
if len(asw.nodesToUpdateStatusFor) != 1 {
t.Fatal("The length of nodesToUpdateStatusFor should not change because no operation was performed.")
}
}

// Test_RemoveNodeFromAttachUpdates_Negative_Empty expects an error to be thrown
// when a nodesToUpdateStatusFor is empty.
func Test_RemoveNodeFromAttachUpdates_Negative_Empty(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
asw := &actualStateOfWorld{
attachedVolumes: make(map[v1.UniqueVolumeName]attachedVolume),
nodesToUpdateStatusFor: make(map[types.NodeName]nodeToUpdateStatusFor),
volumePluginMgr: volumePluginMgr,
}

// Act
err := asw.RemoveNodeFromAttachUpdates("node-1")

// Assert
if err == nil {
t.Fatal("RemoveNodeFromAttachUpdates should return an error as nodeToUpdateStatusFor is empty.")
}
if len(asw.nodesToUpdateStatusFor) != 0 {
t.Fatal("The length of nodesToUpdateStatusFor should be 0.")
}
}

func verifyAttachedVolume(
t *testing.T,
attachedVolumes []AttachedVolume,
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/volume/attachdetach/statusupdater/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/util/strategicpatch",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/golang/glog"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -64,14 +65,20 @@ func (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {
nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()
for nodeName, attachedVolumes := range nodesToUpdate {
nodeObj, err := nsu.nodeLister.Get(string(nodeName))
if nodeObj == nil || err != nil {
// If node does not exist, its status cannot be updated, log error and
// reset flag statusUpdateNeeded back to true to indicate this node status
// needs to be updated again
if errors.IsNotFound(err) {
// If node does not exist, its status cannot be updated.
// Remove the node entry from the collection of attach updates, preventing the
// status updater from unnecessarily updating the node.
glog.V(2).Infof(
"Could not update node status. Failed to find node %q in NodeInformer cache. %v",
"Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",
nodeName,
err)
nsu.actualStateOfWorld.RemoveNodeFromAttachUpdates(nodeName)
continue
} else if err != nil {
// For all other errors, log error and reset flag statusUpdateNeeded
// back to true to indicate this node status needs to be updated again.
glog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)
nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/volume/util/operationexecutor/operation_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ type ActualStateOfWorldAttacherUpdater interface {
MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)

// Marks desire to detach the specified volume (remove the volume from the node's
// volumesToReportedAsAttached list)
// volumesToReportAsAttached list)
RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error

// Unmarks the desire to detach for the specified volume (add the volume back to
// the node's volumesToReportedAsAttached list)
// the node's volumesToReportAsAttached list)
AddVolumeToReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
}

Expand Down

0 comments on commit 1f42790

Please sign in to comment.