Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

volume controller: Add cache with the latest version of PVs and PVCs #25881

Merged
merged 1 commit into from
May 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 41 additions & 3 deletions pkg/controller/persistentvolume/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ const createProvisionedPVInterval = 10 * time.Second
// framework.Controllers that watch PerstentVolume and PersistentVolumeClaim
// changes.
type PersistentVolumeController struct {
volumes persistentVolumeOrderedIndex
volumeController *framework.Controller
volumeControllerStopCh chan struct{}
claims cache.Store
claimController *framework.Controller
claimControllerStopCh chan struct{}
kubeClient clientset.Interface
Expand All @@ -119,6 +117,14 @@ type PersistentVolumeController struct {
provisioner vol.ProvisionableVolumePlugin
clusterName string

// Cache of the last known version of volumes and claims. This cache is
// thread safe as long as the volumes/claims there are not modified, they
// must be cloned before any modification. These caches get updated both by
// "xxx added/updated/deleted" events from etcd and by the controller when
// it saves newer version to etcd.
volumes persistentVolumeOrderedIndex
claims cache.Store

// PersistentVolumeController keeps track of long running operations and
// makes sure it won't start the same operation twice in parallel.
// Each operation is identified by unique operationName.
Expand Down Expand Up @@ -507,6 +513,11 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err)
return newClaim, err
}
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
if err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
}
glog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase)
return newClaim, nil
}
Expand Down Expand Up @@ -559,6 +570,11 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent
glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
return newVol, err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
}
glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)
return newVol, err
}
Expand Down Expand Up @@ -639,6 +655,11 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent
glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err)
return newVol, err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return newVol, err
}
glog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", newVol.Name, claimToClaimKey(claim))
return newVol, nil
}
Expand Down Expand Up @@ -696,6 +717,11 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err)
return newClaim, err
}
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
if err != nil {
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return newClaim, err
}
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name)
return newClaim, nil
}
Expand Down Expand Up @@ -785,6 +811,11 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum
glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)
return err
}
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
if err != nil {
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
return err
}
glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)

// Update the status
Expand Down Expand Up @@ -1124,9 +1155,16 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil {
var newVol *api.PersistentVolume
if newVol, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil {
// Save succeeded.
glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))

_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
if err != nil {
// We will get an "volume added" event soon, this is not a big error
glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err)
}
break
}
// Save failed, try again after a while.
Expand Down
105 changes: 103 additions & 2 deletions pkg/controller/persistentvolume/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package persistentvolume

import (
"fmt"
"strconv"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
Expand Down Expand Up @@ -58,6 +60,8 @@ func NewPersistentVolumeController(
}

controller := &PersistentVolumeController{
volumes: newPersistentVolumeOrderedIndex(),
claims: cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient: kubeClient,
eventRecorder: eventRecorder,
runningOperations: make(map[string]bool),
Expand Down Expand Up @@ -97,7 +101,7 @@ func NewPersistentVolumeController(
}
}

controller.volumes.store, controller.volumeController = framework.NewIndexerInformer(
_, controller.volumeController = framework.NewIndexerInformer(
volumeSource,
&api.PersistentVolume{},
syncPeriod,
Expand All @@ -108,7 +112,7 @@ func NewPersistentVolumeController(
},
cache.Indexers{"accessmodes": accessModesIndexFunc},
)
controller.claims, controller.claimController = framework.NewInformer(
_, controller.claimController = framework.NewInformer(
claimSource,
&api.PersistentVolumeClaim{},
syncPeriod,
Expand All @@ -124,6 +128,16 @@ func NewPersistentVolumeController(
// addVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, obj, "volume")
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}

if !ctrl.isFullySynced() {
return
}
Expand All @@ -147,6 +161,16 @@ func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
// updateVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
// Store the new volume version in the cache and do not process it if this
// is an old version.
new, err := storeObjectUpdate(ctrl.volumes.store, newObj, "volume")
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}

if !ctrl.isFullySynced() {
return
}
Expand All @@ -170,6 +194,8 @@ func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{})
// deleteVolume is callback from framework.Controller watching PersistentVolume
// events.
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
_ = ctrl.volumes.store.Delete(obj)

if !ctrl.isFullySynced() {
return
}
Expand Down Expand Up @@ -218,6 +244,16 @@ func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
// addClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := storeObjectUpdate(ctrl.claims, obj, "claim")
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}

if !ctrl.isFullySynced() {
return
}
Expand All @@ -241,6 +277,16 @@ func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
// updateClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
// Store the new claim version in the cache and do not process it if this is
// an old version.
new, err := storeObjectUpdate(ctrl.claims, newObj, "claim")
if err != nil {
glog.Errorf("%v", err)
}
if !new {
return
}

if !ctrl.isFullySynced() {
return
}
Expand All @@ -264,6 +310,8 @@ func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{})
// deleteClaim is callback from framework.Controller watching PersistentVolumeClaim
// events.
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
_ = ctrl.claims.Delete(obj)

if !ctrl.isFullySynced() {
return
}
Expand Down Expand Up @@ -388,3 +436,56 @@ func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVol
}
return true
}

// storeObjectUpdate updates given cache with a new object version from Informer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be a little clearer to wrap this with storeClaimUpdate and storeVolumeUpdate

// callback (i.e. with events from etcd) or with an object modified by the
// controller itself. Returns "true", if the cache was updated, false if the
// object is an old version and should be ignored.
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
objAccessor, err := meta.Accessor(obj)
if err != nil {
return false, fmt.Errorf("Error reading cache of %s: %v", className, err)
}
objName := objAccessor.GetNamespace() + "/" + objAccessor.GetName()

oldObj, found, err := store.Get(obj)
if err != nil {
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
}

if !found {
// This is a new object
glog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Add(obj); err != nil {
return false, fmt.Errorf("Error adding %s %q to controller cache: %v", className, objName, err)
}
return true, nil
}

oldObjAccessor, err := meta.Accessor(oldObj)
if err != nil {
return false, err
}

objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("Error parsing ResourceVersion %q of %s %q: %s", objAccessor.GetResourceVersion(), className, objName, err)
}
oldObjResourceVersion, err := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
if err != nil {
return false, fmt.Errorf("Error parsing old ResourceVersion %q of %s %q: %s", oldObjAccessor.GetResourceVersion(), className, objName, err)
}

// Throw away only older version, let the same version pass - we do want to
// get periodic sync events.
if oldObjResourceVersion > objResourceVersion {
glog.V(4).Infof("storeObjectUpdate: ignoring %s %q version %s", className, objName, objAccessor.GetResourceVersion())
return false, nil
}

glog.V(4).Infof("storeObjectUpdate updating %s %q with version %s", className, objName, objAccessor.GetResourceVersion())
if err = store.Update(obj); err != nil {
return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err)
}
return true, nil
}
69 changes: 69 additions & 0 deletions pkg/controller/persistentvolume/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
Expand Down Expand Up @@ -159,3 +160,71 @@ func TestControllerSync(t *testing.T) {
evaluateTestResults(ctrl, reactor, test, t)
}
}

func storeVersion(t *testing.T, prefix string, c cache.Store, version string, expectedReturn bool) {
pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete)
pv.ResourceVersion = version
ret, err := storeObjectUpdate(c, pv, "volume")
if err != nil {
t.Errorf("%s: expected storeObjectUpdate to succeed, got: %v", prefix, err)
}
if expectedReturn != ret {
t.Errorf("%s: expected storeObjectUpdate to return %v, got: %v", prefix, expectedReturn, ret)
}

// find the stored version

pvObj, found, err := c.GetByKey("pvName")
if err != nil {
t.Errorf("expected volume 'pvName' in the cache, got error instead: %v", err)
}
if !found {
t.Errorf("expected volume 'pvName' in the cache but it was not found")
}
pv, ok := pvObj.(*api.PersistentVolume)
if !ok {
t.Errorf("expected volume in the cache, got different object instead: %+v", pvObj)
}

if ret {
if pv.ResourceVersion != version {
t.Errorf("expected volume with version %s in the cache, got %s instead", version, pv.ResourceVersion)
}
} else {
if pv.ResourceVersion == version {
t.Errorf("expected volume with version other than %s in the cache, got %s instead", version, pv.ResourceVersion)
}
}
}

// TestControllerCache tests func storeObjectUpdate()
func TestControllerCache(t *testing.T) {
// Cache under test
c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)

// Store new PV
storeVersion(t, "Step1", c, "1", true)
// Store the same PV
storeVersion(t, "Step2", c, "1", true)
// Store newer PV
storeVersion(t, "Step3", c, "2", true)
// Store older PV - simulating old "PV updated" event or periodic sync with
// old data
storeVersion(t, "Step4", c, "1", false)
// Store newer PV - test integer parsing ("2" > "10" as string,
// while 2 < 10 as integers)
storeVersion(t, "Step5", c, "10", true)
}

func TestControllerCacheParsingError(t *testing.T) {
c := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
// There must be something in the cache to compare with
storeVersion(t, "Step1", c, "1", true)

pv := newVolume("pvName", "1Gi", "", "", api.VolumeAvailable, api.PersistentVolumeReclaimDelete)
pv.ResourceVersion = "xxx"
_, err := storeObjectUpdate(c, pv, "volume")
if err == nil {
t.Errorf("Expected parsing error, got nil instead")
}
}
4 changes: 4 additions & 0 deletions pkg/controller/persistentvolume/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type persistentVolumeOrderedIndex struct {
store cache.Indexer
}

func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex {
return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})}
}

// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
func accessModesIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*api.PersistentVolume); ok {
Expand Down
5 changes: 0 additions & 5 deletions pkg/controller/persistentvolume/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache"
)

func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex {
return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})}
}

func TestMatchVolume(t *testing.T) {
volList := newPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
Expand Down