Skip to content

Commit

Permalink
Fix issue kubernetes#34242: Attach/detach should recover from a crash
Browse files Browse the repository at this point in the history
When the attach/detach controller crashes and a pod with attached PV is deleted
afterwards the controller will never detach the pod's attached volumes. To
prevent this the controller should try to recover the state from the nodes
status.
  • Loading branch information
tsmetana committed Apr 20, 2017
1 parent 53258ba commit 852c44a
Show file tree
Hide file tree
Showing 13 changed files with 845 additions and 128 deletions.
6 changes: 6 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand All @@ -46,9 +47,14 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

Expand Down
156 changes: 143 additions & 13 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -102,12 +103,16 @@ func NewAttachDetachController(
// dropped pods so they are continuously processed until it is accepted or
// deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
cloud: cloud,
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
}

if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
Expand Down Expand Up @@ -155,14 +160,12 @@ func NewAttachDetachController(
UpdateFunc: adc.podUpdate,
DeleteFunc: adc.podDelete,
})
adc.podsSynced = podInformer.Informer().HasSynced

nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})
adc.nodesSynced = nodeInformer.Informer().HasSynced

return adc, nil
}
Expand All @@ -184,7 +187,10 @@ type attachDetachController struct {
pvLister corelisters.PersistentVolumeLister
pvsSynced kcache.InformerSynced

podsSynced kcache.InformerSynced
podLister corelisters.PodLister
podsSynced kcache.InformerSynced

nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced

// cloud provider used by volume host
Expand Down Expand Up @@ -239,12 +245,136 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
return
}

err := adc.populateActualStateOfWorld()
if err != nil {
glog.Errorf("Error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("Error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
}

func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.nodeLister.List(labels.Everything())
if err != nil {
return err
}

for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
// In such a case it should be detached in the first reconciliation cycle and the
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
// scans the pods and updates their volumes in the ActualStateOfWorld too.
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
glog.Errorf("Failed to mark the volume as attached: %v", err)
continue
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, true /* forceUnmount */)
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
// Node specifies annotation indicating it should be managed by
// attach detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(types.NodeName(node.Name)) // Needed for DesiredStateOfWorld population
}
}
}
return nil
}

func (adc *attachDetachController) getNodeVolumeDevicePath(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
var devicePath string
var found bool
node, err := adc.nodeLister.Get(string(nodeName))
if err != nil {
return devicePath, err
}
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeName == attachedVolume.Name {
devicePath = attachedVolume.DevicePath
found = true
break
}
}
if !found {
err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
}

return devicePath, err
}

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")

pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
podToAdd := pod
adc.podAdd(&podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
if err != nil {
glog.Errorf(
"Error creating spec for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
nodeName := types.NodeName(podToAdd.Spec.NodeName)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
glog.Errorf(
"Failed to find unique name for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) {
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to find device path: %v", err)
continue
}
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
}
}
}
}

return nil
}

func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
Expand Down Expand Up @@ -308,7 +438,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

func (adc *attachDetachController) nodeDelete(obj interface{}) {
Expand All @@ -322,15 +452,15 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
glog.V(10).Infof("%v", err)
}

adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
Expand All @@ -341,7 +471,7 @@ func (adc *attachDetachController) processVolumesInUse(
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
attachedVolume.VolumeName, nodeName, mounted)
attachedVolume.VolumeName, nodeName, mounted, forceUnmount)
if err != nil {
glog.Warningf(
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
Expand Down

0 comments on commit 852c44a

Please sign in to comment.