Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable look-up by secondary index in cache #4125

Merged
merged 3 commits into from
Feb 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/client/cache/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"fmt"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
Store
// Retrieve list of objects that match on the named indexing function
Index(indexName string, obj interface{}) ([]interface{}, error)
}

// IndexFunc knows how to provide an indexed value for an object.
type IndexFunc func(obj interface{}) (string, error)

// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) (string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
return meta.Namespace(), nil
}

// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]util.StringSet

// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index
103 changes: 102 additions & 1 deletion pkg/client/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// Store is a generic object storage interface. Reflector knows how to watch a server
Expand Down Expand Up @@ -65,6 +66,10 @@ type cache struct {
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}

// Add inserts an item into the cache.
Expand All @@ -73,9 +78,66 @@ func (c *cache) Add(obj interface{}) error {
if err != nil {
return fmt.Errorf("couldn't create key for object: %v", err)
}
// keep a pointer to whatever could have been there previously
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj)
return nil
}

// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *cache) updateIndices(oldObj interface{}, newObj interface{}) error {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj)
}
key, err := c.keyFunc(newObj)
if err != nil {
return err
}
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(newObj)
if err != nil {
return err
}
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
set := index[indexValue]
if set == nil {
set = util.StringSet{}
index[indexValue] = set
}
set.Insert(key)
}
return nil
}

// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func (c *cache) deleteFromIndices(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return err
}
for name, indexFunc := range c.indexers {
indexValue, err := indexFunc(obj)
if err != nil {
return err
}
index := c.indices[name]
if index != nil {
set := index[indexValue]
if set != nil {
set.Delete(key)
}
}
}
return nil
}

Expand All @@ -87,7 +149,9 @@ func (c *cache) Update(obj interface{}) error {
}
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key]
c.items[key] = obj
c.updateIndices(oldObject, obj)
return nil
}

Expand All @@ -100,6 +164,7 @@ func (c *cache) Delete(obj interface{}) error {
c.lock.Lock()
defer c.lock.Unlock()
delete(c.items, key)
c.deleteFromIndices(obj)
return nil
}

Expand All @@ -115,6 +180,30 @@ func (c *cache) List() []interface{} {
return list
}

// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

indexKey, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for _, key := range set.List() {
list = append(list, c.items[key])
}
return list, nil
}

// Get returns the requested item, or sets exists=false.
// Get is completely threadsafe as long as you treat all items as immutable.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
Expand Down Expand Up @@ -150,10 +239,22 @@ func (c *cache) Replace(list []interface{}) error {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you build the index before the lock and then swap it in to limit contention? (Similar to above)

// rebuild any index
c.indices = Indices{}
for _, item := range c.items {
c.updateIndices(nil, item)
}

return nil
}

// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc) Store {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc}
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: Indexers{}, indices: Indices{}}
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{items: map[string]interface{}{}, keyFunc: keyFunc, indexers: indexers, indices: Indices{}}
}
47 changes: 47 additions & 0 deletions pkg/client/cache/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,53 @@ func doTestStore(t *testing.T, store Store) {
}
}

// Test public interface
func doTestIndex(t *testing.T, indexer Indexer) {
mkObj := func(id string, val string) testStoreObject {
return testStoreObject{id: id, val: val}
}

// Test Index
expected := map[string]util.StringSet{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: inline declaration?

expected["b"] = util.NewStringSet("a", "c")
expected["f"] = util.NewStringSet("e")
expected["h"] = util.NewStringSet("g")
indexer.Add(mkObj("a", "b"))
indexer.Add(mkObj("c", "b"))
indexer.Add(mkObj("e", "f"))
indexer.Add(mkObj("g", "h"))
{
for k, v := range expected {
found := util.StringSet{}
indexResults, err := indexer.Index("by_val", mkObj("", k))
if err != nil {
t.Errorf("Unexpected error %v", err)
}
for _, item := range indexResults {
found.Insert(item.(testStoreObject).id)
}
items := v.List()
if !found.HasAll(items...) {
t.Errorf("missing items, index %s, expected %v but found %v", k, items, found.List())
}
}
}
}

func testStoreKeyFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).id, nil
}

func testStoreIndexFunc(obj interface{}) (string, error) {
return obj.(testStoreObject).val, nil
}

func testStoreIndexers() Indexers {
indexers := Indexers{}
indexers["by_val"] = testStoreIndexFunc
return indexers
}

type testStoreObject struct {
id string
val string
Expand All @@ -107,3 +150,7 @@ func TestUndeltaStore(t *testing.T) {
nop := func([]interface{}) {}
doTestStore(t, NewUndeltaStore(nop, testStoreKeyFunc))
}

func TestIndex(t *testing.T) {
doTestIndex(t, NewIndexer(testStoreKeyFunc, testStoreIndexers()))
}