Skip to content

Commit

Permalink
Merge pull request #66863 from cofyc/fix64549
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

attachdetach controller: attach volumes immediately when Pod's PVCs are bound

**What this PR does / why we need it**:

Let attachdetach controller to attach volumes immediately when Pod's PVCs are bound.

Current attachdetach controller calls `util.ProcessPodVolume` to add pod volumes into `desiredStateOfWorld` on these events:

- podAdd event
- podUpdate event
- podDelete event
- periodical `desiredStateOfWorldPopulator.findAndAddActivePod`

But if a pod is created with PVCs not bound, no volumes will be added into `desiredStateOfWorld` [because PVCs not bound](https://github.com/kubernetes/kubernetes/blob/v1.12.0-alpha.0/pkg/controller/volume/attachdetach/util/util.go#L99). When pv controller binds PVCs successfully, attachdetach controller will not add pod volumes immediately because it does not watch on PVC events.

It will wait until a pod update event is triggered (normally will not happen because no new status will be reported by kubelet) or `desiredStateOfWorldPopulator.findAndAddActivePod` is called (maybe 0~3 minutes later, see [timer configs](https://github.com/kubernetes/kubernetes/blob/v1.12.0-alpha.0/pkg/controller/volume/attachdetach/attach_detach_controller.go)).

In bad case, pod start time will be very long (~3 minutes + ~2 minutes (kubelet max exponential backoff)), for example: #64549 (comment).

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #64549

**Special notes for your reviewer**:

**Release note**:

```release-note
attachdetach controller attaches volumes immediately when Pod's PVCs are bound
```
  • Loading branch information
Kubernetes Submit Queue committed Aug 14, 2018
2 parents d0439d4 + 17ab4c3 commit cd786bd
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ go_library(
"//pkg/volume/util/volumepathhandler:go_default_library",
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
Expand Down
127 changes: 127 additions & 0 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@ import (
"github.com/golang/glog"
authenticationv1 "k8s.io/api/authentication/v1"
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
Expand Down Expand Up @@ -125,9 +128,11 @@ func NewAttachDetachController(
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
podIndexer: podInformer.Informer().GetIndexer(),
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"),
}

if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil {
Expand Down Expand Up @@ -179,15 +184,54 @@ func NewAttachDetachController(
DeleteFunc: adc.podDelete,
})

// This custom indexer will index pods by its PVC keys. Then we don't need
// to iterate all pods every time to find pods which reference given PVC.
adc.podIndexer.AddIndexers(kcache.Indexers{
pvcKeyIndex: indexByPVCKey,
})

nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})

pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
adc.enqueuePVC(obj)
},
UpdateFunc: func(old, new interface{}) {
adc.enqueuePVC(new)
},
})

return adc, nil
}

const (
pvcKeyIndex string = "pvcKey"
)

// indexByPVCKey returns PVC keys for given pod. Note that the index is only
// used for attaching, so we are only interested in active pods with nodeName
// set.
func indexByPVCKey(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) {
return []string{}, nil
}
keys := []string{}
for _, podVolume := range pod.Spec.Volumes {
if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil {
keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName))
}
}
return keys, nil
}

type attachDetachController struct {
// kubeClient is the kube API client used by volumehost to communicate with
// the API server.
Expand All @@ -207,6 +251,7 @@ type attachDetachController struct {

podLister corelisters.PodLister
podsSynced kcache.InformerSynced
podIndexer kcache.Indexer

nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced
Expand Down Expand Up @@ -251,10 +296,14 @@ type attachDetachController struct {

// recorder is used to record events in the API server
recorder record.EventRecorder

// pvcQueue is used to queue pvc objects
pvcQueue workqueue.RateLimitingInterface
}

func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash()
defer adc.pvcQueue.ShutDown()

glog.Infof("Starting attach detach controller")
defer glog.Infof("Shutting down attach detach controller")
Expand All @@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)
go wait.Until(adc.pvcWorker, time.Second, stopCh)
metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, &adc.volumePluginMgr)

<-stopCh
Expand Down Expand Up @@ -486,6 +536,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
}

func (adc *attachDetachController) enqueuePVC(obj interface{}) {
key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
adc.pvcQueue.Add(key)
}

// pvcWorker processes items from pvcQueue
func (adc *attachDetachController) pvcWorker() {
for adc.processNextItem() {
}
}

func (adc *attachDetachController) processNextItem() bool {
keyObj, shutdown := adc.pvcQueue.Get()
if shutdown {
return false
}
defer adc.pvcQueue.Done(keyObj)

if err := adc.syncPVCByKey(keyObj.(string)); err != nil {
// Rather than wait for a full resync, re-add the key to the
// queue to be processed.
adc.pvcQueue.AddRateLimited(keyObj)
runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err))
return true
}

// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
adc.pvcQueue.Forget(keyObj)
return true
}

func (adc *attachDetachController) syncPVCByKey(key string) error {
glog.V(5).Infof("syncPVCByKey[%s]", key)
namespace, name, err := kcache.SplitMetaNamespaceKey(key)
if err != nil {
glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err)
return nil
}
pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name)
if apierrors.IsNotFound(err) {
glog.V(4).Infof("error getting pvc %q from informer: %v", key, err)
return nil
}
if err != nil {
return err
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
// Skip unbound PVCs.
return nil
}

objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key)
if err != nil {
return err
}
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
volumeActionFlag := util.DetermineVolumeAction(
pod,
adc.desiredStateOfWorld,
true /* default volume action */)

util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */
adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister)
}
return nil
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
Expand Down
1 change: 1 addition & 0 deletions test/integration/volume/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_test(
"//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/persistentvolume/options:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util:go_default_library",
Expand Down
Loading

0 comments on commit cd786bd

Please sign in to comment.