Skip to content

Commit

Permalink
Fix issue kubernetes#34242: Attach/detach should recover from a crash
Browse files Browse the repository at this point in the history
When the attach/detach controller crashes and a pod with attached PV is deleted
afterwards the controller will never detach the pod's attached volumes. To
prevent this the controller should try to recover the state from the nodes
status and figure out which volumes to detach. This requires some changes in the
volume providers too: the only information available from the nodes is the
volume name and the device path. The controller needs to find the correct volume
plugin and reconstruct the volume spec just from the name. This reuired a small
change also in the volume plugin interface.
  • Loading branch information
tsmetana committed Jan 19, 2017
1 parent f90bb17 commit 546bd27
Show file tree
Hide file tree
Showing 26 changed files with 202 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,62 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
glog.Infof("Starting Attach Detach Controller")

err := adc.populateActualStateOfWorld()
if err != nil {
glog.Errorf("error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
}

func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.kubeClient.Core().Nodes().List(v1.ListOptions{})
if err != nil {
return err
}

for i := range nodes.Items {
nodeName := types.NodeName(nodes.Items[i].Name)
for _, attachedVolume := range nodes.Items[i].Status.VolumesAttached {
uniqueName := string(attachedVolume.Name)
pluginName, volumeName := volumehelper.SplitUniqueName(uniqueName)
plug, err := adc.volumePluginMgr.FindPluginByName(pluginName)
if err != nil {
glog.Errorf("could not find plugin named %s for volume %s: %v", pluginName, volumeName, err)
continue
}
volumeSpec, err := plug.ConstructVolumeSpecFromName(volumeName)
if err != nil {
glog.Errorf("could not construct volume spec in plugin %s for volume %s: %v", pluginName, volumeName, err)
continue
}
adc.actualStateOfWorld.AddVolumeNode(volumeSpec, nodeName, attachedVolume.DevicePath)
}
}
return nil
}

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")
pods, err := adc.kubeClient.Core().Pods(v1.NamespaceAll).List(v1.ListOptions{})
if err != nil {
return err
}
for _, pod := range pods.Items {
adc.podAdd(pod)
}

return nil
}

func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
Expand Down
12 changes: 12 additions & 0 deletions pkg/volume/aws_ebs/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ func getVolumeSource(
return nil, false, fmt.Errorf("Spec does not reference an AWS EBS volume type")
}

func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
awsVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: volName,
},
},
}
return volume.NewSpecFromVolume(awsVolume), nil
}

func (plugin *awsElasticBlockStorePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down
12 changes: 12 additions & 0 deletions pkg/volume/azure_dd/azure_dd.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ func (plugin *azureDataDiskPlugin) newUnmounterInternal(volName string, podUID t
}, nil
}

func (plugin *azureDataDiskPlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
azVolume := &v1.Volume{
Name: volName,
VolumeSource: v1.VolumeSource{
AzureDisk: &v1.AzureDiskVolumeSource{
DiskName: volName,
},
},
}
return volume.NewSpecFromVolume(azVolume), nil
}

func (plugin *azureDataDiskPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/azure_file/azure_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (plugin *azureFilePlugin) newUnmounterInternal(volName string, podUID types
}}, nil
}

func (plugin *azureFilePlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volName, "")
}

func (plugin *azureFilePlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
azureVolume := &v1.Volume{
Name: volName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/cephfs/cephfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UI
}, nil
}

func (plugin *cephfsPlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volName, "")
}

func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
cephfsVolume := &v1.Volume{
Name: volumeName,
Expand Down
13 changes: 13 additions & 0 deletions pkg/volume/cinder/cinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,19 @@ func (plugin *cinderPlugin) getCloudProvider() (CinderProvider, error) {
}
}

func (plugin *cinderPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
// The volumeName and VolumeID are identical: see the GetVolumeName method
cinderVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
Cinder: &v1.CinderVolumeSource{
VolumeID: volumeName,
},
},
}
return volume.NewSpecFromVolume(cinderVolume), nil
}

func (plugin *cinderPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/configmap/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (plugin *configMapPlugin) NewUnmounter(volName string, podUID types.UID) (v
return &configMapVolumeUnmounter{&configMapVolume{volName, podUID, plugin, plugin.host.GetMounter(), plugin.host.GetWriter(), volume.MetricsNil{}}}, nil
}

func (plugin *configMapPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *configMapPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
configMapVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/downwardapi/downwardapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (plugin *downwardAPIPlugin) NewUnmounter(volName string, podUID types.UID)
}, nil
}

func (plugin *downwardAPIPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *downwardAPIPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
downwardAPIVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/empty_dir/empty_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (plugin *emptyDirPlugin) newUnmounterInternal(volName string, podUID types.
return ed, nil
}

func (plugin *emptyDirPlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volName, "")
}

func (plugin *emptyDirPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
emptyDirVolume := &v1.Volume{
Name: volName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/fc/fc.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (plugin *fcPlugin) execCommand(command string, args []string) ([]byte, erro
return cmd.CombinedOutput()
}

func (plugin *fcPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *fcPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
fcVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/flexvolume/flexvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func (plugin *flexVolumePlugin) newUnmounterInternal(volName string, podUID type
}, nil
}

func (plugin *flexVolumePlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *flexVolumePlugin) ConstructVolumeSpec(volumeName, sourceName string) (*volume.Spec, error) {
flexVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/flocker/flocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (p *flockerPlugin) newUnmounterInternal(volName string, podUID types.UID, m
}}, nil
}

func (p *flockerPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return p.ConstructVolumeSpec(volumeName, "")
}

func (p *flockerPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
flockerVolume := &v1.Volume{
Name: volumeName,
Expand Down
12 changes: 12 additions & 0 deletions pkg/volume/gce_pd/gce_pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ func (plugin *gcePersistentDiskPlugin) newProvisionerInternal(options volume.Vol
}, nil
}

func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
gceVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
PDName: volumeName,
},
},
}
return volume.NewSpecFromVolume(gceVolume), nil
}

func (plugin *gcePersistentDiskPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/git_repo/git_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (plugin *gitRepoPlugin) NewUnmounter(volName string, podUID types.UID) (vol
}, nil
}

func (plugin *gitRepoPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *gitRepoPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
gitVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/glusterfs/glusterfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ func (plugin *glusterfsPlugin) execCommand(command string, args []string) ([]byt
return cmd.CombinedOutput()
}

func (plugin *glusterfsPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *glusterfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
glusterfsVolume := &v1.Volume{
Name: volumeName,
Expand Down
3 changes: 3 additions & 0 deletions pkg/volume/host_path/host_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu
return newProvisioner(options, plugin.host, plugin)
}

func (plugin *hostPathPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}
func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
hostPathVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/iscsi/iscsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (plugin *iscsiPlugin) execCommand(command string, args []string) ([]byte, e
return cmd.CombinedOutput()
}

func (plugin *iscsiPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *iscsiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
iscsiVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/nfs/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec, eventReco
return newRecycler(pvName, spec, eventRecorder, plugin.host, plugin.config)
}

func (plugin *nfsPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
nfsVolume := &v1.Volume{
Name: volumeName,
Expand Down
12 changes: 12 additions & 0 deletions pkg/volume/photon_pd/photon_pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ func (plugin *photonPersistentDiskPlugin) newUnmounterInternal(volName string, p
}}, nil
}

func (plugin *photonPersistentDiskPlugin) ConstructVolumeSpecFromName(volumeSpecName string) (*volume.Spec, error) {
photonPersistentDisk := &v1.Volume{
Name: volumeSpecName,
VolumeSource: v1.VolumeSource{
PhotonPersistentDisk: &v1.PhotonPersistentDiskVolumeSource{
PdID: volumeSpecName,
},
},
}
return volume.NewSpecFromVolume(photonPersistentDisk), nil
}

func (plugin *photonPersistentDiskPlugin) ConstructVolumeSpec(volumeSpecName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down
6 changes: 6 additions & 0 deletions pkg/volume/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ type VolumePlugin interface {
// information from input. This function is used by volume manager to reconstruct
// volume spec by reading the volume directories from disk
ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error)

// ConstructVolumeSpecFromName constructs a volume spec based on the given volume
// name only. The spec may have incomplete information due to limited
// information from input. This function is used by volume manager to reconstruct
// volume spec by reading the volume directories from disk
ConstructVolumeSpecFromName(volumeName string) (*Spec, error)
}

// PersistentVolumePlugin is an extended interface of VolumePlugin and is used
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/quobyte/quobyte.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func getVolumeSource(spec *volume.Spec) (*v1.QuobyteVolumeSource, bool, error) {
return nil, false, fmt.Errorf("Spec does not reference a Quobyte volume type")
}

func (plugin *quobytePlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *quobytePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
quobyteVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/rbd/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (plugin *rbdPlugin) newUnmounterInternal(volName string, podUID types.UID,
}, nil
}

func (plugin *rbdPlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volumeName, "")
}

func (plugin *rbdPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
rbdVolume := &v1.Volume{
Name: volumeName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/secret/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (plugin *secretPlugin) NewUnmounter(volName string, podUID types.UID) (volu
}, nil
}

func (plugin *secretPlugin) ConstructVolumeSpecFromName(volName string) (*volume.Spec, error) {
return plugin.ConstructVolumeSpec(volName, "")
}

func (plugin *secretPlugin) ConstructVolumeSpec(volName, mountPath string) (*volume.Spec, error) {
secretVolume := &v1.Volume{
Name: volName,
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (plugin *FakeVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode
return []v1.PersistentVolumeAccessMode{}
}

func (plugin *FakeVolumePlugin) ConstructVolumeSpecFromName(volumeName string) (*Spec, error) {
return nil, nil
}

func (plugin *FakeVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*Spec, error) {
return nil, nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/volume/util/volumehelper/volumehelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package volumehelper

import (
"fmt"
"strings"

"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/volume"
Expand Down Expand Up @@ -87,3 +88,9 @@ func GetUniqueVolumeNameFromSpec(
volumeName),
nil
}

func SplitUniqueName(uniqueName string) (string, string) {
components := strings.SplitN(uniqueName, "/", 3)
pluginName := fmt.Sprintf("%s/%s", components[0], components[1])
return pluginName, components[2]
}
12 changes: 12 additions & 0 deletions pkg/volume/vsphere_volume/vsphere_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ func (plugin *vsphereVolumePlugin) newUnmounterInternal(volName string, podUID t
}}, nil
}

func (plugin *vsphereVolumePlugin) ConstructVolumeSpecFromName(volumeName string) (*volume.Spec, error) {
vsphereVolume := &v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
VsphereVolume: &v1.VsphereVirtualDiskVolumeSource{
VolumePath: volumeName,
},
},
}
return volume.NewSpecFromVolume(vsphereVolume), nil
}

func (plugin *vsphereVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
mounter := plugin.host.GetMounter()
pluginDir := plugin.host.GetPluginDir(plugin.GetPluginName())
Expand Down

0 comments on commit 546bd27

Please sign in to comment.