Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix TODO: find and add active pods for dswp #42033

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions hack/.linted_packages
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pkg/cloudprovider/providers/cloudstack
pkg/controller/volume/attachdetach/cache
pkg/controller/volume/attachdetach/populator
pkg/controller/volume/attachdetach/reconciler
pkg/controller/volume/attachdetach/util
pkg/conversion
pkg/conversion/queryparams
pkg/credentialprovider/aws
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/controller/volume/attachdetach/populator:go_default_library",
"//pkg/controller/volume/attachdetach/reconciler:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
Expand Down Expand Up @@ -67,6 +68,7 @@ filegroup(
"//pkg/controller/volume/attachdetach/reconciler:all-srcs",
"//pkg/controller/volume/attachdetach/statusupdater:all-srcs",
"//pkg/controller/volume/attachdetach/testing:all-srcs",
"//pkg/controller/volume/attachdetach/util:all-srcs",
],
tags = ["automanaged"],
)
237 changes: 15 additions & 222 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/populator"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/reconciler"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/util"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
Expand All @@ -62,6 +63,11 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute

// desiredStateOfWorldPopulatorListPodsRetryDuration is the amount of
// time the DesiredStateOfWorldPopulator loop waits between list pods
// calls.
desiredStateOfWorldPopulatorListPodsRetryDuration time.Duration = 3 * time.Minute
)

// AttachDetachController defines the operations supported by this controller.
Expand Down Expand Up @@ -137,8 +143,12 @@ func NewAttachDetachController(

adc.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
desiredStateOfWorldPopulatorLoopSleepPeriod,
desiredStateOfWorldPopulatorListPodsRetryDuration,
podInformer.Lister(),
adc.desiredStateOfWorld)
adc.desiredStateOfWorld,
&adc.volumePluginMgr,
pvcInformer.Lister(),
pvInformer.Lister())

podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.podAdd,
Expand Down Expand Up @@ -245,7 +255,8 @@ func (adc *attachDetachController) podAdd(obj interface{}) {
return
}

adc.processPodVolumes(pod, true /* addVolumes */)
util.ProcessPodVolumes(pod, true, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}

// GetDesiredStateOfWorld returns desired state of world associated with controller
Expand All @@ -264,7 +275,8 @@ func (adc *attachDetachController) podDelete(obj interface{}) {
return
}

adc.processPodVolumes(pod, false /* addVolumes */)
util.ProcessPodVolumes(pod, false, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}

func (adc *attachDetachController) nodeAdd(obj interface{}) {
Expand Down Expand Up @@ -313,225 +325,6 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}

// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world if addVolumes is true, otherwise it removes them.
func (adc *attachDetachController) processPodVolumes(
pod *v1.Pod, addVolumes bool) {
if pod == nil {
return
}

if len(pod.Spec.Volumes) <= 0 {
return
}

nodeName := types.NodeName(pod.Spec.NodeName)

if !adc.desiredStateOfWorld.NodeExists(nodeName) {
// If the node the pod is scheduled to does not exist in the desired
// state of the world data structure, that indicates the node is not
// yet managed by the controller. Therefore, ignore the pod.
// If the node is added to the list of managed nodes in the future,
// future adds and updates to the pod will be processed.
glog.V(10).Infof(
"Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.",
pod.Namespace,
pod.Name,
nodeName)
return
}

// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
volumeSpec, err := adc.createVolumeSpec(podVolume, pod.Namespace)
if err != nil {
glog.V(10).Infof(
"Error processing volume %q for pod %q/%q: %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}

attachableVolumePlugin, err :=
adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || attachableVolumePlugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}

uniquePodName := volumehelper.GetUniquePodName(pod)
if addVolumes {
// Add volume to desired state of world
_, err := adc.desiredStateOfWorld.AddPod(
uniquePodName, pod, volumeSpec, nodeName)
if err != nil {
glog.V(10).Infof(
"Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
}

} else {
// Remove volume from desired state of world
uniqueVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
attachableVolumePlugin, volumeSpec)
if err != nil {
glog.V(10).Infof(
"Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v",
podVolume.Name,
pod.Namespace,
pod.Name,
err)
continue
}
adc.desiredStateOfWorld.DeletePod(
uniquePodName, uniqueVolumeName, nodeName)
}
}

return
}

// createVolumeSpec creates and returns a mutatable volume.Spec object for the
// specified volume. It dereference any PVC to get PV objects, if needed.
func (adc *attachDetachController) createVolumeSpec(
podVolume v1.Volume, podNamespace string) (*volume.Spec, error) {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
glog.V(10).Infof(
"Found PVC, ClaimName: %q/%q",
podNamespace,
pvcSource.ClaimName)

// If podVolume is a PVC, fetch the real PV behind the claim
pvName, pvcUID, err := adc.getPVCFromCacheExtractPV(
podNamespace, pvcSource.ClaimName)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}

glog.V(10).Infof(
"Found bound PV for PVC (ClaimName %q/%q pvcUID %v): pvName=%q",
podNamespace,
pvcSource.ClaimName,
pvcUID,
pvName)

// Fetch actual PV object
volumeSpec, err := adc.getPVSpecFromCache(
pvName, pvcSource.ReadOnly, pvcUID)
if err != nil {
return nil, fmt.Errorf(
"error processing PVC %q/%q: %v",
podNamespace,
pvcSource.ClaimName,
err)
}

glog.V(10).Infof(
"Extracted volumeSpec (%v) from bound PV (pvName %q) and PVC (ClaimName %q/%q pvcUID %v)",
volumeSpec.Name,
pvName,
podNamespace,
pvcSource.ClaimName,
pvcUID)

return volumeSpec, nil
}

// Do not return the original volume object, since it's from the shared
// informer it may be mutated by another consumer.
clonedPodVolumeObj, err := api.Scheme.DeepCopy(&podVolume)
if err != nil || clonedPodVolumeObj == nil {
return nil, fmt.Errorf(
"failed to deep copy %q volume object. err=%v", podVolume.Name, err)
}

clonedPodVolume, ok := clonedPodVolumeObj.(*v1.Volume)
if !ok {
return nil, fmt.Errorf("failed to cast clonedPodVolume %#v to v1.Volume", clonedPodVolumeObj)
}

return volume.NewSpecFromVolume(clonedPodVolume), nil
}

// getPVCFromCacheExtractPV fetches the PVC object with the given namespace and
// name from the shared internal PVC store extracts the name of the PV it is
// pointing to and returns it.
// This method returns an error if a PVC object does not exist in the cache
// with the given namespace/name.
// This method returns an error if the PVC object's phase is not "Bound".
func (adc *attachDetachController) getPVCFromCacheExtractPV(namespace string, name string) (string, types.UID, error) {
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
return "", "", fmt.Errorf("failed to find PVC %s/%s in PVCInformer cache: %v", namespace, name, err)
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
return "", "", fmt.Errorf(
"PVC %s/%s has non-bound phase (%q) or empty pvc.Spec.VolumeName (%q)",
namespace,
name,
pvc.Status.Phase,
pvc.Spec.VolumeName)
}

return pvc.Spec.VolumeName, pvc.UID, nil
}

// getPVSpecFromCache fetches the PV object with the given name from the shared
// internal PV store and returns a volume.Spec representing it.
// This method returns an error if a PV object does not exist in the cache with
// the given name.
// This method deep copies the PV object so the caller may use the returned
// volume.Spec object without worrying about it mutating unexpectedly.
func (adc *attachDetachController) getPVSpecFromCache(name string, pvcReadOnly bool, expectedClaimUID types.UID) (*volume.Spec, error) {
pv, err := adc.pvLister.Get(name)
if err != nil {
return nil, fmt.Errorf("failed to find PV %q in PVInformer cache: %v", name, err)
}

if pv.Spec.ClaimRef == nil {
return nil, fmt.Errorf(
"found PV object %q but it has a nil pv.Spec.ClaimRef indicating it is not yet bound to the claim",
name)
}

if pv.Spec.ClaimRef.UID != expectedClaimUID {
return nil, fmt.Errorf(
"found PV object %q but its pv.Spec.ClaimRef.UID (%q) does not point to claim.UID (%q)",
name,
pv.Spec.ClaimRef.UID,
expectedClaimUID)
}

// Do not return the object from the informer, since the store is shared it
// may be mutated by another consumer.
clonedPVObj, err := api.Scheme.DeepCopy(pv)
if err != nil || clonedPVObj == nil {
return nil, fmt.Errorf("failed to deep copy %q PV object. err=%v", name, err)
}

clonedPV, ok := clonedPVObj.(*v1.PersistentVolume)
if !ok {
return nil, fmt.Errorf("failed to cast %q clonedPV %#v to PersistentVolume", name, pv)
}

return volume.NewSpecFromPersistentVolume(clonedPV, pvcReadOnly), nil
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
Expand Down
22 changes: 22 additions & 0 deletions pkg/controller/volume/attachdetach/populator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)

go_library(
Expand All @@ -14,9 +15,12 @@ go_library(
deps = [
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/util:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
Expand All @@ -35,3 +39,21 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)

go_test(
name = "go_default_test",
srcs = ["desired_state_of_world_populator_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)