Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support namespacing in cache.Store #3810

Merged
merged 1 commit into from
Jan 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 41 additions & 27 deletions pkg/client/cache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package cache

import (
"fmt"
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// FIFO receives adds and updates from a Reflector, and puts them in a queue for
Expand All @@ -33,32 +32,45 @@ type FIFO struct {
// We depend on the property that items in the set are in the queue and vice versa.
items map[string]interface{}
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
}

// Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set.
func (f *FIFO) Add(id string, obj interface{}) {
func (f *FIFO) Add(obj interface{}) error {
id, err := f.keyFunc(obj)
if err != nil {
return fmt.Errorf("couldn't create key for object: %v", err)
}
f.lock.Lock()
defer f.lock.Unlock()
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = obj
f.cond.Broadcast()
return nil
}

// Update is the same as Add in this implementation.
func (f *FIFO) Update(id string, obj interface{}) {
f.Add(id, obj)
func (f *FIFO) Update(obj interface{}) error {
return f.Add(obj)
}

// Delete removes an item. It doesn't add it to the queue, because
// this implementation assumes the consumer only cares about the objects,
// not the order in which they were created/added.
func (f *FIFO) Delete(id string) {
func (f *FIFO) Delete(obj interface{}) error {
id, err := f.keyFunc(obj)
if err != nil {
return fmt.Errorf("couldn't create key for object: %v", err)
}
f.lock.Lock()
defer f.lock.Unlock()
delete(f.items, id)
return err
}

// List returns a list of all the items.
Expand All @@ -72,25 +84,16 @@ func (f *FIFO) List() []interface{} {
return list
}

// ContainedIDs returns a util.StringSet containing all IDs of the stored items.
// This is a snapshot of a moment in time, and one should keep in mind that
// other go routines can add or remove items after you call this.
func (c *FIFO) ContainedIDs() util.StringSet {
c.lock.RLock()
defer c.lock.RUnlock()
set := util.StringSet{}
for id := range c.items {
set.Insert(id)
}
return set
}

// Get returns the requested item, or sets exists=false.
func (f *FIFO) Get(id string) (item interface{}, exists bool) {
func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
id, err := f.keyFunc(obj)
if err != nil {
return nil, false, fmt.Errorf("couldn't create key for object: %v", err)
}
f.lock.RLock()
defer f.lock.RUnlock()
item, exists = f.items[id]
return item, exists
return item, exists, nil
}

// Pop waits until an item is ready and returns it. If multiple items are
Expand Down Expand Up @@ -120,25 +123,36 @@ func (f *FIFO) Pop() interface{} {
// 'f' takes ownersip of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *FIFO) Replace(idToObj map[string]interface{}) {
func (f *FIFO) Replace(list []interface{}) error {
items := map[string]interface{}{}
for _, item := range list {
key, err := f.keyFunc(item)
if err != nil {
return fmt.Errorf("couldn't create key for object: %v", err)
}
items[key] = item
}

f.lock.Lock()
defer f.lock.Unlock()
f.items = idToObj
f.items = items
f.queue = f.queue[:0]
for id := range idToObj {
for id := range items {
f.queue = append(f.queue, id)
}
if len(f.queue) > 0 {
f.cond.Broadcast()
}
return nil
}

// NewFIFO returns a Store which can be used to queue up items to
// process.
func NewFIFO() *FIFO {
func NewFIFO(keyFunc KeyFunc) *FIFO {
f := &FIFO{
items: map[string]interface{}{},
queue: []string{},
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,
}
f.cond.L = &f.lock
return f
Expand Down
93 changes: 59 additions & 34 deletions pkg/client/cache/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,43 @@ import (
"time"
)

func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}

type testFifoObject struct {
name string
val interface{}
}

func TestFIFO_basic(t *testing.T) {
f := NewFIFO()
mkObj := func(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}

f := NewFIFO(testFifoObjectKeyFunc)
const amount = 500
go func() {
for i := 0; i < amount; i++ {
f.Add(string([]rune{'a', rune(i)}), i+1)
f.Add(mkObj(string([]rune{'a', rune(i)}), i+1))
}
}()
go func() {
for u := uint(0); u < amount; u++ {
f.Add(string([]rune{'b', rune(u)}), u+1)
for u := uint64(0); u < amount; u++ {
f.Add(mkObj(string([]rune{'b', rune(u)}), u+1))
}
}()

lastInt := int(0)
lastUint := uint(0)
lastUint := uint64(0)
for i := 0; i < amount*2; i++ {
switch obj := f.Pop().(type) {
switch obj := f.Pop().(testFifoObject).val.(type) {
case int:
if obj <= lastInt {
t.Errorf("got %v (int) out of order, last was %v", obj, lastInt)
}
lastInt = obj
case uint:
case uint64:
if obj <= lastUint {
t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint)
} else {
Expand All @@ -57,81 +70,93 @@ func TestFIFO_basic(t *testing.T) {
}

func TestFIFO_addUpdate(t *testing.T) {
f := NewFIFO()
f.Add("foo", 10)
f.Update("foo", 15)
got := make(chan int, 2)
mkObj := func(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}

f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkObj("foo", 10))
f.Update(mkObj("foo", 15))
got := make(chan testFifoObject, 2)
go func() {
for {
got <- f.Pop().(int)
got <- f.Pop().(testFifoObject)
}
}()

first := <-got
if e, a := 15, first; e != a {
if e, a := 15, first.val; e != a {
t.Errorf("Didn't get updated value (%v), got %v", e, a)
}
select {
case unexpected := <-got:
t.Errorf("Got second value %v", unexpected)
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists := f.Get("foo")
_, exists, _ := f.Get(mkObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
}

func TestFIFO_addReplace(t *testing.T) {
f := NewFIFO()
f.Add("foo", 10)
f.Replace(map[string]interface{}{"foo": 15})
got := make(chan int, 2)
mkObj := func(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}

f := NewFIFO(testFifoObjectKeyFunc)
f.Add(mkObj("foo", 10))
f.Replace([]interface{}{mkObj("foo", 15)})
got := make(chan testFifoObject, 2)
go func() {
for {
got <- f.Pop().(int)
got <- f.Pop().(testFifoObject)
}
}()

first := <-got
if e, a := 15, first; e != a {
if e, a := 15, first.val; e != a {
t.Errorf("Didn't get updated value (%v), got %v", e, a)
}
select {
case unexpected := <-got:
t.Errorf("Got second value %v", unexpected)
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists := f.Get("foo")
_, exists, _ := f.Get(mkObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
}

func TestFIFO_detectLineJumpers(t *testing.T) {
f := NewFIFO()
mkObj := func(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}

f := NewFIFO(testFifoObjectKeyFunc)

f.Add("foo", 10)
f.Add("bar", 1)
f.Add("foo", 11)
f.Add("foo", 13)
f.Add("zab", 30)
f.Add(mkObj("foo", 10))
f.Add(mkObj("bar", 1))
f.Add(mkObj("foo", 11))
f.Add(mkObj("foo", 13))
f.Add(mkObj("zab", 30))

if e, a := 13, f.Pop().(int); a != e {
if e, a := 13, f.Pop().(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a)
}

f.Add("foo", 14) // ensure foo doesn't jump back in line
f.Add(mkObj("foo", 14)) // ensure foo doesn't jump back in line

if e, a := 1, f.Pop().(int); a != e {
if e, a := 1, f.Pop().(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a)
}

if e, a := 30, f.Pop().(int); a != e {
if e, a := 30, f.Pop().(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a)
}

if e, a := 14, f.Pop().(int); a != e {
if e, a := 14, f.Pop().(testFifoObject).val; a != e {
t.Fatalf("expected %d, got %d", e, a)
}
}
13 changes: 10 additions & 3 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,17 @@ func (s *StoreToNodeLister) List() (machines api.NodeList, err error) {
// rather than a method of StoreToNodeLister.
// GetNodeInfo returns cached data for the minion 'id'.
func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) {
if minion, ok := s.Get(id); ok {
return minion.(*api.Node), nil
minion, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smarterclayton So, this replaces all the interface changes, only if Nodes have an empty namespace. If they are stored with some uniform non-blank namespace, either:

  1. That namespace would have to be hard-coded here or,
  2. The Node would need to propagate down to here via the interfaces.

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: If Node is namespaced uniformly, it could be hard-coded here. If Node is namespaced ad-hoc, the namespace or Node would have to become an arg of the interfaces to provide cache query context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodes don't have namespaces, and won't. So until that happens, node name is sufficient.

Maybe not understanding.

----- Original Message -----

@@ -69,10 +69,17 @@ func (s _StoreToNodeLister) List() (machines
api.NodeList, err error) {
// rather than a method of StoreToNodeLister.
// GetNodeInfo returns cached data for the minion 'id'.
func (s *StoreToNodeLister) GetNodeInfo(id string) (_api.Node, error) {

  • if minion, ok := s.Get(id); ok {
  •   return minion.(*api.Node), nil
    
  • minion, exists, err := s.Get(&api.Node{ObjectMeta: api.ObjectMeta{Name:
    id}})

@smarterclayton So, this replaces all the interface changes, only if Nodes
have an empty namespace. If they are stored with some uniform non-blank
namespace, either:

  1. That namespace would have to be hard-coded here or,
  2. The Node would need to propagate down to here via the interfaces.

Thoughts?


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/3810/files#r23789908

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're on the same page. I've got e2e running again now.


if err != nil {
return nil, fmt.Errorf("error retrieving minion '%v' from cache: %v", id, err)
}

if !exists {
return nil, fmt.Errorf("minion '%v' is not in cache", id)
}
return nil, fmt.Errorf("minion '%v' is not in cache", id)

return minion.(*api.Node), nil
}

// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
Expand Down
8 changes: 4 additions & 4 deletions pkg/client/cache/listers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
)

func TestStoreToMinionLister(t *testing.T) {
store := NewStore()
store := NewStore(MetaNamespaceKeyFunc)
ids := util.NewStringSet("foo", "bar", "baz")
for id := range ids {
store.Add(id, &api.Node{ObjectMeta: api.ObjectMeta{Name: id}})
store.Add(&api.Node{ObjectMeta: api.ObjectMeta{Name: id}})
}
sml := StoreToNodeLister{store}

Expand All @@ -46,10 +46,10 @@ func TestStoreToMinionLister(t *testing.T) {
}

func TestStoreToPodLister(t *testing.T) {
store := NewStore()
store := NewStore(MetaNamespaceKeyFunc)
ids := []string{"foo", "bar", "baz"}
for _, id := range ids {
store.Add(id, &api.Pod{
store.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{
Name: id,
Labels: map[string]string{"name": id},
Expand Down
15 changes: 6 additions & 9 deletions pkg/client/cache/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// one object at a time.
type Enumerator interface {
Len() int
Get(index int) (ID string, object interface{})
Get(index int) (object interface{})
}

// GetFunc should return an enumerator that you wish the Poller to proccess.
Expand Down Expand Up @@ -76,14 +76,11 @@ func (p *Poller) run() {
}

func (p *Poller) sync(e Enumerator) {
current := p.store.ContainedIDs()
items := []interface{}{}
for i := 0; i < e.Len(); i++ {
id, object := e.Get(i)
p.store.Update(id, object)
current.Delete(id)
}
// Delete all the objects not found.
for id := range current {
p.store.Delete(id)
object := e.Get(i)
items = append(items, object)
}

p.store.Replace(items)
}