Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attachdetach controller: attach volumes immediately when Pod's PVCs are bound #66863

Merged
merged 2 commits into from Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Expand Up @@ -26,16 +26,19 @@ go_library(
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
Expand Down
127 changes: 127 additions & 0 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Expand Up @@ -26,16 +26,19 @@ import (
"github.com/golang/glog"
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
Expand Down Expand Up @@ -125,9 +128,11 @@ func NewAttachDetachController(
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
podIndexer: podInformer.Informer().GetIndexer(),
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
}

if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
Expand Down Expand Up @@ -179,15 +184,54 @@ func NewAttachDetachController(
DeleteFunc: adc.podDelete,
})

// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
adc.podIndexer.AddIndexers(kcache.Indexers{
pvcKeyIndex: indexByPVCKey,
})

nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})

pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
adc.enqueuePVC(obj)
},
UpdateFunc: func(old, new interface{}) {
adc.enqueuePVC(new)
},
})

return adc, nil
}

const (
pvcKeyIndex string = "pvcKey"
)

// indexByPVCKey returns PVC keys for given pod. Note that the index is only
// used for attaching, so we are only interested in active pods with nodeName
// set.
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}

type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
Expand All @@ -207,6 +251,7 @@ type attachDetachController struct {

podLister corelisters.PodLister
podsSynced kcache.InformerSynced
podIndexer kcache.Indexer

nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced
Expand Down Expand Up @@ -251,10 +296,14 @@ type attachDetachController struct {

// recorder is used to record events in the API server
recorder record.EventRecorder

// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
}

func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer adc.pvcQueue.ShutDown()

glog.Infof("Starting attach detach controller")
defer glog.Infof("Shutting down attach detach controller")
Expand All @@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)

<-stopCh
Expand Down Expand Up @@ -486,6 +536,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}

func (adc *attachDetachController) enqueuePVC(obj interface{}) {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: Why runtime.HandleError? Why not just glog.Errorf?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed https://github.com/kubernetes/kubernetes/blob/v1.12.0-alpha.1/pkg/controller/daemon/daemon_controller.go#L316, and found sample-controller uses runtime.HandleError to log errors too. Perhaps because runtime.HandleError is dedicated to log non-user facing ignored errors and can be extended by adding error handlers.

return
}
adc.pvcQueue.Add(key)
}

// pvcWorker processes items from pvcQueue
func (adc *attachDetachController) pvcWorker() {
for adc.processNextItem() {
}
}

func (adc *attachDetachController) processNextItem() bool {
keyObj, shutdown := adc.pvcQueue.Get()
if shutdown {
return false
}
defer adc.pvcQueue.Done(keyObj)

if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
adc.pvcQueue.AddRateLimited(keyObj)
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
return true
}

// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
adc.pvcQueue.Forget(keyObj)
return true
}

func (adc *attachDetachController) syncPVCByKey(key string) error {
glog.V(5).Infof("syncPVCByKey[%s]", key)
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
return nil
}
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if apierrors.IsNotFound(err) {
glog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
return nil
}
if err != nil {
return err
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
// Skip unbound PVCs.
return nil
}

objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to verify, if there are multiple pods using the same PVC, this will return all pods? If so make sure there is a test case for that.

Copy link
Member Author

@cofyc cofyc Aug 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, test has been updated. Created more pvc notbound pods for testing in 74cfffc.

if err != nil {
return err
}
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
volumeActionFlag := util.DetermineVolumeAction(
pod,
adc.desiredStateOfWorld,
true /* default volume action */)

util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}
return nil
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
Expand Down
1 change: 1 addition & 0 deletions test/integration/volume/BUILD
Expand Up @@ -20,6 +20,7 @@ go_test(
"//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/persistentvolume/options:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
Expand Down