Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #34242: Attach/detach should recover from a crash #39732

Merged
merged 1 commit into from
Apr 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
Expand All @@ -46,9 +47,14 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

Expand Down
156 changes: 143 additions & 13 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -102,12 +103,16 @@ func NewAttachDetachController(
// dropped pods so they are continuously processed until it is accepted or
// deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
cloud: cloud,
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
}

if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
Expand Down Expand Up @@ -155,14 +160,12 @@ func NewAttachDetachController(
UpdateFunc: adc.podUpdate,
DeleteFunc: adc.podDelete,
})
adc.podsSynced = podInformer.Informer().HasSynced

nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})
adc.nodesSynced = nodeInformer.Informer().HasSynced

return adc, nil
}
Expand All @@ -184,7 +187,10 @@ type attachDetachController struct {
pvLister corelisters.PersistentVolumeLister
pvsSynced kcache.InformerSynced

podsSynced kcache.InformerSynced
podLister corelisters.PodLister
podsSynced kcache.InformerSynced

nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced

// cloud provider used by volume host
Expand Down Expand Up @@ -239,12 +245,136 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
return
}

err := adc.populateActualStateOfWorld()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is controller.WaitForCacheSync() just few lines above. Does it mean that all the existing objects were already synced through podAdd() and nodeAdd() callbacks from the shared informers? They may be synced in a wrong order (podAdd before nodeAdd) and these populateActualStateOfWorld() and populateDesiredStateOfWorld() will just fix it, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to @tsmetana and yes, these new world populators will fix states of worlds that could be inconsistent due to bad event ordering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue is that we need to find volumes attached for the pods that were deleted during the controller downtime: this would not be solved by the WaitForCacheSync() -- we still would be missing the pod deletion event. And yes, in the case the events come in wrong order the ASW/DSW populators should help too.

if err != nil {
glog.Errorf("Error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("Error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
}

func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.nodeLister.List(labels.Everything())
if err != nil {
return err
}

for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
// In such a case it should be detached in the first reconciliation cycle and the
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
// scans the pods and updates their volumes in the ActualStateOfWorld too.
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
glog.Errorf("Failed to mark the volume as attached: %v", err)
continue
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, true /* forceUnmount */)
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
// Node specifies annotation indicating it should be managed by
// attach detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(types.NodeName(node.Name)) // Needed for DesiredStateOfWorld population
}
}
}
return nil
}

func (adc *attachDetachController) getNodeVolumeDevicePath(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
var devicePath string
var found bool
node, err := adc.nodeLister.Get(string(nodeName))
if err != nil {
return devicePath, err
}
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeName == attachedVolume.Name {
devicePath = attachedVolume.DevicePath
found = true
break
}
}
if !found {
err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
}

return devicePath, err
}

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")

pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
podToAdd := pod
adc.podAdd(&podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
if err != nil {
glog.Errorf(
"Error creating spec for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
nodeName := types.NodeName(podToAdd.Spec.NodeName)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
glog.Errorf(
"Failed to find unique name for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) {
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to find device path: %v", err)
continue
}
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
}
}
}
}

return nil
}

func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
Expand Down Expand Up @@ -308,7 +438,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

func (adc *attachDetachController) nodeDelete(obj interface{}) {
Expand All @@ -322,15 +452,15 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
glog.V(10).Infof("%v", err)
}

adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment to describe new boolean.

glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
Expand All @@ -341,7 +471,7 @@ func (adc *attachDetachController) processVolumesInUse(
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
attachedVolume.VolumeName, nodeName, mounted)
attachedVolume.VolumeName, nodeName, mounted, forceUnmount)
if err != nil {
glog.Warningf(
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
Expand Down