Skip to content

Commit

Permalink
Fix issue kubernetes#34242: Attach/detach should recover from a crash
Browse files Browse the repository at this point in the history
When the attach/detach controller crashes and a pod with attached PV is deleted
afterwards the controller will never detach the pod's attached volumes. To
prevent this the controller should try to recover the state from the nodes
status and figure out which volumes to detach. This requires some changes in the
volume providers too: the only information available from the nodes is the
volume name and the device path. The controller needs to find the correct volume
plugin and reconstruct the volume spec just from the name. This reuired a small
change also in the volume plugin interface.
  • Loading branch information
tsmetana committed Feb 14, 2017
1 parent 739f4ff commit 5912b31
Show file tree
Hide file tree
Showing 34 changed files with 411 additions and 20 deletions.
5 changes: 5 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
Expand All @@ -44,9 +45,13 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
],
)

Expand Down
73 changes: 69 additions & 4 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -230,13 +231,77 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
// return
// }

err := adc.populateActualStateOfWorld()
if err != nil {
glog.Errorf("error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
}

func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.kubeClient.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
return err
}

for i := range nodes.Items {
nodeName := types.NodeName(nodes.Items[i].Name)
for _, attachedVolume := range nodes.Items[i].Status.VolumesAttached {
uniqueName := string(attachedVolume.Name)
pluginName, volumeName := volumehelper.SplitUniqueName(uniqueName)
plug, err := adc.volumePluginMgr.FindPluginByName(pluginName)
if err != nil {
glog.Errorf("could not find plugin named %s for volume %s: %v", pluginName, volumeName, err)
continue
}
volumeSpec, err := plug.ConstructVolumeSpecFromName(volumeName)
if err != nil {
glog.Errorf("could not construct volume spec in plugin %s for volume %s: %v", pluginName, volumeName, err)
continue
}
_, err = adc.actualStateOfWorld.AddVolumeNode(volumeSpec, nodeName, attachedVolume.DevicePath)
if err != nil {
glog.Errorf("error adding node to ActualStateOfWorld: %v", err)
continue
}
adc.processVolumesInUse(nodeName, nodes.Items[i].Status.VolumesInUse, true)
}
}
return nil
}

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")

nodes, err := adc.kubeClient.Core().Nodes().List(metav1.ListOptions{})
if err != nil {
return err
}
for i := range nodes.Items {
nodeName := types.NodeName(nodes.Items[i].Name)
adc.desiredStateOfWorld.AddNode(nodeName)
}

pods, err := adc.kubeClient.Core().Pods(v1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
return err
}
for _, pod := range pods.Items {
adc.podAdd(&pod)
}

return nil
}

func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
Expand Down Expand Up @@ -298,7 +363,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false)
}

func (adc *attachDetachController) nodeDelete(obj interface{}) {
Expand All @@ -312,7 +377,7 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
glog.V(10).Infof("%v", err)
}

adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false)
}

// processPodVolumes processes the volumes in the given pod and adds them to the
Expand Down Expand Up @@ -539,7 +604,7 @@ func (adc *attachDetachController) getPVSpecFromCache(name string, pvcReadOnly b
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
Expand All @@ -550,7 +615,7 @@ func (adc *attachDetachController) processVolumesInUse(
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
attachedVolume.VolumeName, nodeName, mounted)
attachedVolume.VolumeName, nodeName, mounted, forceUnmount)
if err != nil {
glog.Warningf(
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ limitations under the License.
package attachdetach

import (
"fmt"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
)

Expand All @@ -47,3 +52,60 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}
}

func Test_AttachDetachControllerStateOfWolrdPopulators_Positive(t *testing.T) {
// Arrange
fakeKubeClient := controllervolumetesting.CreateTestClient()

adc := &attachDetachController{
kubeClient: fakeKubeClient,
cloud: nil,
}

// Act
plugins := controllervolumetesting.CreateTestPlugin()

if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
t.Fatalf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
}

adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)

err := adc.populateActualStateOfWorld()
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: <%v>", err)
}

err = adc.populateDesiredStateOfWorld()
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}

// Test the ActualStateOfWorld contains all the node volumes
nodes, err := adc.kubeClient.Core().Nodes().List(metav1.ListOptions{})
for i := range nodes.Items {
nodeName := types.NodeName(nodes.Items[i].Name)
for _, attachedVolume := range nodes.Items[i].Status.VolumesAttached {
found := adc.actualStateOfWorld.VolumeNodeExists(attachedVolume.Name, nodeName)
if !found {
t.Fatalf("Run failed with error. Node %s, volume %s not found", nodeName, attachedVolume.Name)
}
}
}

pods, err := adc.kubeClient.Core().Pods(v1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
t.Fatalf("Run failed with error. Expected: <no error> Actual: %v", err)
}
for _, pod := range pods.Items {
uniqueName := fmt.Sprintf("%s/%s", controllervolumetesting.TestPluginName, pod.Spec.Volumes[0].Name)
nodeName := types.NodeName(pod.Spec.NodeName)
found := adc.desiredStateOfWorld.VolumeExists(v1.UniqueVolumeName(uniqueName), nodeName)
if !found {
t.Fatalf("Run failed with error. Volume %s, node %s not found in DesiredStateOfWorld",
pod.Spec.Volumes[0].Name,
pod.Spec.NodeName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ActualStateOfWorld interface {
// returned.
// If no node with the name nodeName exists in list of attached nodes for
// the specified volume, an error is returned.
SetVolumeMountedByNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error
SetVolumeMountedByNode(volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool, forceUnmount bool) error

// SetNodeStatusUpdateNeeded sets statusUpdateNeeded for the specified
// node to true indicating the AttachedVolume field in the Node's Status
Expand Down Expand Up @@ -316,7 +316,7 @@ func (asw *actualStateOfWorld) AddVolumeNode(
}

func (asw *actualStateOfWorld) SetVolumeMountedByNode(
volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool) error {
volumeName v1.UniqueVolumeName, nodeName types.NodeName, mounted bool, forceUnmount bool) error {
asw.Lock()
defer asw.Unlock()

Expand All @@ -330,7 +330,7 @@ func (asw *actualStateOfWorld) SetVolumeMountedByNode(
nodeObj.mountedByNodeSetCount = nodeObj.mountedByNodeSetCount + 1
} else {
// Do not allow value to be reset unless it has been set at least once
if nodeObj.mountedByNodeSetCount == 0 {
if nodeObj.mountedByNodeSetCount == 0 && !forceUnmount {
return nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSet(t *testing.T) {
}

// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)

// Assert
if setVolumeMountedErr1 != nil {
Expand Down Expand Up @@ -542,7 +542,7 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithoutInitialSet(t *testing.T) {
}

// Act
setVolumeMountedErr := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
setVolumeMountedErr := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)

// Assert
if setVolumeMountedErr != nil {
Expand Down Expand Up @@ -575,8 +575,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetAddVolumeNodeNotRes
}

// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)
generatedVolumeName, addErr = asw.AddVolumeNode(volumeSpec, nodeName, devicePath)

// Assert
Expand Down Expand Up @@ -625,8 +625,8 @@ func Test_SetVolumeMountedByNode_Positive_UnsetWithInitialSetVerifyDetachRequest
expectedDetachRequestedTime := asw.GetAttachedVolumes()[0].DetachRequestedTime

// Act
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)

// Assert
if setVolumeMountedErr1 != nil {
Expand Down Expand Up @@ -767,8 +767,8 @@ func Test_RemoveVolumeFromReportAsAttached_Positive_UnsetWithInitialSetVolumeMou
if addErr != nil {
t.Fatalf("AddVolumeNode failed. Expected: <no error> Actual: <%v>", addErr)
}
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
setVolumeMountedErr1 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
setVolumeMountedErr2 := asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)
if setVolumeMountedErr1 != nil {
t.Fatalf("SetVolumeMountedByNode1 failed. Expected <no error> Actual: <%v>", setVolumeMountedErr1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)

// Assert
waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin)
Expand Down Expand Up @@ -302,8 +302,8 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
generatedVolumeName,
nodeName)
}
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, true /* mounted */, false)
asw.SetVolumeMountedByNode(generatedVolumeName, nodeName, false /* mounted */, false)

// Assert
verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin)
Expand Down

0 comments on commit 5912b31

Please sign in to comment.