Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make addIndexers safe for sharedInformer #25671

Merged
merged 1 commit into from
May 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/client/cache/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Indexer interface {
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

// IndexFunc knows how to provide an indexed value for an object.
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}

func (c *cache) AddIndexers(newIndexers Indexers) error {
return c.cacheStorage.AddIndexers(newIndexers)
}

// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/client/cache/thread_safe_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type ThreadSafeStore interface {
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

// threadSafeMap implements ThreadSafeStore
Expand Down Expand Up @@ -195,6 +199,27 @@ func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}

func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
c.lock.Lock()
defer c.lock.Unlock()

if len(c.items) > 0 {
return fmt.Errorf("cannot add indexers to running index")
}

oldKeys := sets.StringKeySet(c.indexers)
newKeys := sets.StringKeySet(newIndexers)

if oldKeys.HasAny(newKeys.List()...) {
return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
}

for k, v := range newIndexers {
c.indexers[k] = v
}
return nil
}

// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {
Expand Down
21 changes: 2 additions & 19 deletions pkg/controller/framework/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,7 @@ type SharedIndexInformer interface {
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
sharedInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
listerWatcher: lw,
objectType: objType,
fullResyncPeriod: resyncPeriod,
}
return sharedInformer
return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{})
}

// NewSharedIndexInformer creates a new instance for the listwatcher.
Expand Down Expand Up @@ -177,17 +170,7 @@ func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
return fmt.Errorf("informer has already started")
}

oldIndexers := s.indexer.GetIndexers()

for name, indexFunc := range oldIndexers {
if _, exist := indexers[name]; exist {
return fmt.Errorf("there is an index named %s already exist", name)
}
indexers[name] = indexFunc
}

s.indexer = cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return nil
return s.indexer.AddIndexers(indexers)
}

func (s *sharedIndexInformer) GetController() ControllerInterface {
Expand Down