Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix startup type error in initializeCaches #28002

Merged
merged 1 commit into from
Jun 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 90 additions & 16 deletions pkg/controller/framework/fake_controller_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,41 @@ import (

func NewFakeControllerSource() *FakeControllerSource {
return &FakeControllerSource{
items: map[nnu]runtime.Object{},
broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}
}

func NewFakePVControllerSource() *FakePVControllerSource {
return &FakePVControllerSource{
FakeControllerSource{
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}}
}

func NewFakePVCControllerSource() *FakePVCControllerSource {
return &FakePVCControllerSource{
FakeControllerSource{
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}}
}

// FakeControllerSource implements listing/watching for testing.
type FakeControllerSource struct {
lock sync.RWMutex
items map[nnu]runtime.Object
Items map[nnu]runtime.Object
changes []watch.Event // one change per resourceVersion
broadcaster *watch.Broadcaster
Broadcaster *watch.Broadcaster
}

type FakePVControllerSource struct {
FakeControllerSource
}

type FakePVCControllerSource struct {
FakeControllerSource
}

// namespace, name, uid to be used as a key.
Expand Down Expand Up @@ -110,22 +134,19 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
key := f.key(accessor)
switch e.Type {
case watch.Added, watch.Modified:
f.items[key] = e.Object
f.Items[key] = e.Object
case watch.Deleted:
delete(f.items, key)
delete(f.Items, key)
}

if rand.Float64() < watchProbability {
f.broadcaster.Action(e.Type, e.Object)
f.Broadcaster.Action(e.Type, e.Object)
}
}

// List returns a list object, with its resource version set.
func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]runtime.Object, 0, len(f.items))
for _, obj := range f.items {
func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
list := make([]runtime.Object, 0, len(f.Items))
for _, obj := range f.Items {
// Must make a copy to allow clients to modify the object.
// Otherwise, if they make a change and write it back, they
// will inadvertently change our canonical copy (in
Expand All @@ -136,6 +157,17 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er
}
list = append(list, objCopy.(runtime.Object))
}
return list, nil
}

// List returns a list object, with its resource version set.
func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &api.List{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
Expand All @@ -149,6 +181,48 @@ func (f *FakeControllerSource) List(options api.ListOptions) (runtime.Object, er
return listObj, nil
}

// List returns a list object, with its resource version set.
func (f *FakePVControllerSource) List(options api.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.FakeControllerSource.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &api.PersistentVolumeList{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := api.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}

// List returns a list object, with its resource version set.
func (f *FakePVCControllerSource) List(options api.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.FakeControllerSource.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &api.PersistentVolumeClaimList{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := api.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}

// Watch returns a watch, which will be pre-populated with all changes
// after resourceVersion.
func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface, error) {
Expand All @@ -172,17 +246,17 @@ func (f *FakeControllerSource) Watch(options api.ListOptions) (watch.Interface,
}
changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)})
}
return f.broadcaster.WatchWithPrefix(changes), nil
return f.Broadcaster.WatchWithPrefix(changes), nil
} else if rc > len(f.changes) {
return nil, errors.New("resource version in the future not supported by this fake")
}
return f.broadcaster.Watch(), nil
return f.Broadcaster.Watch(), nil
}

// Shutdown closes the underlying broadcaster, waiting for events to be
// delivered. It's an error to call any method after calling shutdown. This is
// enforced by Shutdown() leaving f locked.
func (f *FakeControllerSource) Shutdown() {
f.lock.Lock() // Purposely no unlock.
f.broadcaster.Shutdown()
f.Broadcaster.Shutdown()
}
8 changes: 4 additions & 4 deletions pkg/controller/persistentvolume/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
volumeList, ok := volumeListObj.(*api.List)
volumeList, ok := volumeListObj.(*api.PersistentVolumeList)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %+v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
// Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(volume.(*api.PersistentVolume))
deleted := ctrl.upgradeVolumeFrom1_2(&volume)
if !deleted {
storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
Expand All @@ -157,9 +157,9 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return
}
claimList, ok := claimListObj.(*api.List)
claimList, ok := claimListObj.(*api.PersistentVolumeClaimList)
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", volumeListObj)
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %+v", claimListObj)
return
}
for _, claim := range claimList.Items {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/persistentvolume/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func TestControllerSync(t *testing.T) {

// Initialize the controller
client := &fake.Clientset{}
volumeSource := framework.NewFakeControllerSource()
claimSource := framework.NewFakeControllerSource()
volumeSource := framework.NewFakePVControllerSource()
claimSource := framework.NewFakePVCControllerSource()
ctrl := newTestController(client, volumeSource, claimSource, true)
reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors)
for _, claim := range test.initialClaims {
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/persistentvolume/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ type volumeReactor struct {
changedObjects []interface{}
changedSinceLastSync int
ctrl *PersistentVolumeController
volumeSource *framework.FakeControllerSource
claimSource *framework.FakeControllerSource
volumeSource *framework.FakePVControllerSource
claimSource *framework.FakePVCControllerSource
lock sync.Mutex
errors []reactorError
}
Expand Down Expand Up @@ -542,7 +542,7 @@ func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {
r.claimSource.Add(claim)
}

func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource, claimSource *framework.FakeControllerSource, errors []reactorError) *volumeReactor {
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *framework.FakePVControllerSource, claimSource *framework.FakePVCControllerSource, errors []reactorError) *volumeReactor {
reactor := &volumeReactor{
volumes: make(map[string]*api.PersistentVolume),
claims: make(map[string]*api.PersistentVolumeClaim),
Expand All @@ -557,10 +557,10 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,

func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController {
if volumeSource == nil {
volumeSource = framework.NewFakeControllerSource()
volumeSource = framework.NewFakePVControllerSource()
}
if claimSource == nil {
claimSource = framework.NewFakeControllerSource()
claimSource = framework.NewFakePVCControllerSource()
}
ctrl := NewPersistentVolumeController(
kubeClient,
Expand Down