Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch pv controller to shared informer #41273

Merged
merged 1 commit into from
Feb 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/kube-controller-manager/app/controllermanager.go
Expand Up @@ -483,6 +483,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
Cloud: cloud,
ClusterName: s.ClusterName,
VolumeInformer: newSharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: newSharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: newSharedInformers.Storage().V1beta1().StorageClasses(),
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
}
volumeController := persistentvolumecontroller.NewController(params)
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/volume/persistentvolume/BUILD
Expand Up @@ -23,6 +23,10 @@ go_library(
"//pkg/apis/storage/v1beta1:go_default_library",
"//pkg/apis/storage/v1beta1/util:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/storage/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/storage/v1beta1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
Expand All @@ -34,10 +38,9 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
Expand Down Expand Up @@ -67,7 +70,9 @@ go_test(
"//pkg/apis/storage/v1beta1/util:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/listers/storage/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/volume:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
Expand All @@ -76,6 +81,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/diff",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
Expand All @@ -94,7 +100,6 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/controller/volume/persistentvolume/options:all-srcs",
"//pkg/controller/volume/persistentvolume/testing:all-srcs",
],
tags = ["automanaged"],
)
130 changes: 67 additions & 63 deletions pkg/controller/volume/persistentvolume/framework_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
Expand All @@ -45,7 +46,9 @@ import (
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
fcache "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/controller"
vol "k8s.io/kubernetes/pkg/volume"
)

Expand Down Expand Up @@ -112,9 +115,10 @@ var noerrors = []reactorError{}
// is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries.
// - Number of changes since the last call to volumeReactor.syncAll().
// - Optionally, volume and claim event sources. When set, all changed
// volumes/claims are sent as Modify event to these sources. These sources can
// be linked back to the controller watcher as "volume/claim updated" events.
// - Optionally, volume and claim fake watchers which should be the same ones
// used by the controller. Any time an event function like deleteVolumeEvent
// is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching
Expand All @@ -126,8 +130,8 @@ type volumeReactor struct {
changedObjects []interface{}
changedSinceLastSync int
ctrl *PersistentVolumeController
volumeSource *fcache.FakePVControllerSource
claimSource *fcache.FakePVCControllerSource
fakeVolumeWatch *watch.FakeWatcher
fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex
errors []reactorError
}
Expand Down Expand Up @@ -176,9 +180,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}

// Store the updated object to appropriate places.
if r.volumeSource != nil {
r.volumeSource.Add(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
Expand All @@ -203,9 +204,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
}

// Store the updated object to appropriate places.
if r.volumeSource != nil {
r.volumeSource.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
Expand All @@ -231,9 +229,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj

// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
if r.claimSource != nil {
r.claimSource.Modify(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
glog.V(4).Infof("saved updated claim %s", claim.Name)
Expand Down Expand Up @@ -513,9 +508,11 @@ func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) {

// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume)
r.volumeSource.Delete(volumeClone)
if r.fakeVolumeWatch != nil {
clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume)
r.fakeVolumeWatch.Delete(volumeClone)
}
}

// deleteClaimEvent simulates that a claim has been deleted in etcd and the
Expand All @@ -529,9 +526,11 @@ func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) {

// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
clone, _ := api.Scheme.DeepCopy(claim)
claimClone := clone.(*v1.PersistentVolumeClaim)
r.claimSource.Delete(claimClone)
if r.fakeClaimWatch != nil {
clone, _ := api.Scheme.DeepCopy(claim)
claimClone := clone.(*v1.PersistentVolumeClaim)
r.fakeClaimWatch.Delete(claimClone)
}
}

// addVolumeEvent simulates that a volume has been added in etcd and the
Expand All @@ -543,7 +542,9 @@ func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
r.volumeSource.Add(volume)
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
}

// modifyVolumeEvent simulates that a volume has been modified in etcd and the
Expand All @@ -555,9 +556,11 @@ func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume)
r.volumeSource.Modify(volumeClone)
if r.fakeVolumeWatch != nil {
clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume)
r.fakeVolumeWatch.Modify(volumeClone)
}
}

// addClaimEvent simulates that a claim has been deleted in etcd and the
Expand All @@ -569,45 +572,49 @@ func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) {
r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
r.claimSource.Add(claim)
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
}

func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *fcache.FakePVControllerSource, claimSource *fcache.FakePVCControllerSource, errors []reactorError) *volumeReactor {
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor {
reactor := &volumeReactor{
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
ctrl: ctrl,
volumeSource: volumeSource,
claimSource: claimSource,
errors: errors,
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
ctrl: ctrl,
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
}
client.AddReactor("*", "*", reactor.React)
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)

return reactor
}
func alwaysReady() bool { return true }

func newTestController(kubeClient clientset.Interface, volumeSource, claimSource, classSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController {
if volumeSource == nil {
volumeSource = fcache.NewFakePVControllerSource()
}
if claimSource == nil {
claimSource = fcache.NewFakePVCControllerSource()
func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) *PersistentVolumeController {
if informerFactory == nil {
informerFactory = informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
}
if classSource == nil {
classSource = fcache.NewFakeControllerSource()
}

params := ControllerParameters{
KubeClient: kubeClient,
SyncPeriod: 5 * time.Second,
VolumePlugins: []vol.VolumePlugin{},
VolumeSource: volumeSource,
ClaimSource: claimSource,
ClassSource: classSource,
VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ClassInformer: informerFactory.Storage().V1beta1().StorageClasses(),
EventRecorder: record.NewFakeRecorder(1000),
EnableDynamicProvisioning: enableDynamicProvisioning,
}
ctrl := NewController(params)

ctrl.volumeListerSynced = alwaysReady
ctrl.claimListerSynced = alwaysReady
ctrl.classListerSynced = alwaysReady
// Speed up the test
ctrl.createProvisionedPVInterval = 5 * time.Millisecond
return ctrl
Expand Down Expand Up @@ -924,7 +931,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag

// Initialize the controller
client := &fake.Clientset{}
ctrl := newTestController(client, nil, nil, nil, true)
ctrl := newTestController(client, nil, true)
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
ctrl.claims.Add(claim)
Expand All @@ -935,14 +942,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
reactor.volumes[volume.Name] = volume
}

// Convert classes to []interface{} and forcefully inject them into
// controller.
storageClassPtrs := make([]interface{}, len(storageClasses))
for i, s := range storageClasses {
storageClassPtrs[i] = s
// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, class := range storageClasses {
indexer.Add(class)
}
// 1 is the resource version
ctrl.classes.Replace(storageClassPtrs, "1")
ctrl.classLister = storagelisters.NewStorageClassLister(indexer)

// Run the tested functions
err := test.test(ctrl, reactor, test)
Expand Down Expand Up @@ -980,15 +985,14 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s

// Initialize the controller
client := &fake.Clientset{}
ctrl := newTestController(client, nil, nil, nil, true)
ctrl := newTestController(client, nil, true)

// Convert classes to []interface{} and forcefully inject them into
// controller.
storageClassPtrs := make([]interface{}, len(storageClasses))
for i, s := range storageClasses {
storageClassPtrs[i] = s
// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, class := range storageClasses {
indexer.Add(class)
}
ctrl.classes.Replace(storageClassPtrs, "1")
ctrl.classLister = storagelisters.NewStorageClassLister(indexer)

reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/volume/persistentvolume/provision_test.go
Expand Up @@ -422,7 +422,7 @@ func TestProvisionMultiSync(t *testing.T) {

// When provisioning is disabled, provisioning a claim should instantly return nil
func TestDisablingDynamicProvisioner(t *testing.T) {
ctrl := newTestController(nil, nil, nil, nil, false)
ctrl := newTestController(nil, nil, false)
retVal := ctrl.provisionClaim(nil)
if retVal != nil {
t.Errorf("Expected nil return but got %v", retVal)
Expand Down
27 changes: 10 additions & 17 deletions pkg/controller/volume/persistentvolume/pv_controller.go
Expand Up @@ -31,6 +31,8 @@ import (
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume"
Expand Down Expand Up @@ -146,14 +148,13 @@ const createProvisionedPVInterval = 10 * time.Second
// cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
// changes.
type PersistentVolumeController struct {
volumeController cache.Controller
volumeInformer cache.Indexer
volumeSource cache.ListerWatcher
claimController cache.Controller
claimInformer cache.Store
claimSource cache.ListerWatcher
classReflector *cache.Reflector
classSource cache.ListerWatcher
volumeLister corelisters.PersistentVolumeLister
volumeListerSynced cache.InformerSynced
claimLister corelisters.PersistentVolumeClaimLister
claimListerSynced cache.InformerSynced
classLister storagelisters.StorageClassLister
classListerSynced cache.InformerSynced

kubeClient clientset.Interface
eventRecorder record.EventRecorder
cloud cloudprovider.Interface
Expand Down Expand Up @@ -182,7 +183,6 @@ type PersistentVolumeController struct {
// have been already written.
volumes persistentVolumeOrderedIndex
claims cache.Store
classes cache.Store

// Work queues of claims and volumes to process. Every queue should have
// exactly one worker thread, especially syncClaim() is not reentrant.
Expand Down Expand Up @@ -1464,17 +1464,10 @@ func (ctrl *PersistentVolumeController) findProvisionablePlugin(claim *v1.Persis
// provisionClaim() which leads here is never called with claimClass=="", we
// can save some checks.
claimClass := storageutil.GetClaimStorageClass(claim)
classObj, found, err := ctrl.classes.GetByKey(claimClass)
class, err := ctrl.classLister.Get(claimClass)
if err != nil {
return nil, nil, err
}
if !found {
return nil, nil, fmt.Errorf("StorageClass %q not found", claimClass)
}
class, ok := classObj.(*storage.StorageClass)
if !ok {
return nil, nil, fmt.Errorf("Cannot convert object to StorageClass: %+v", classObj)
}

// Find a plugin for the class
plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner)
Expand Down