Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Federated ingress unit test fix #34325

Merged
merged 2 commits into from
Oct 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions federation/pkg/federation-controller/util/test/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ import (
// A structure that distributes eventes to multiple watchers.
type WatcherDispatcher struct {
sync.Mutex
watchers []*watch.FakeWatcher
watchers []*watch.RaceFreeFakeWatcher
eventsSoFar []*watch.Event
}

func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
func (wd *WatcherDispatcher) register(watcher *watch.RaceFreeFakeWatcher) {
wd.Lock()
defer wd.Unlock()
wd.watchers = append(wd.watchers, watcher)
Expand All @@ -50,6 +50,14 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
}
}

func (wd *WatcherDispatcher) Stop() {
wd.Lock()
defer wd.Unlock()
for _, watcher := range wd.watchers {
watcher.Stop()
}
}

func copy(obj runtime.Object) runtime.Object {
objCopy, err := api.Scheme.DeepCopy(obj)
if err != nil {
Expand Down Expand Up @@ -126,12 +134,12 @@ func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object)
// All subsequent requests for a watch on the client will result in returning this fake watcher.
func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
dispatcher := &WatcherDispatcher{
watchers: make([]*watch.FakeWatcher, 0),
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
eventsSoFar: make([]*watch.Event, 0),
}

client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
watcher := watch.NewFakeWithChanSize(100)
watcher := watch.NewRaceFreeFake()
dispatcher.register(watcher)
return true, watcher, nil
})
Expand Down
119 changes: 118 additions & 1 deletion pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package watch

import (
"fmt"
"sync"

"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -44,6 +45,8 @@ const (
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Error EventType = "ERROR"

DefaultChanSize int32 = 100
)

// Event represents a single event to a watched resource.
Expand Down Expand Up @@ -91,7 +94,7 @@ func NewFake() *FakeWatcher {
}
}

func NewFakeWithChanSize(size int) *FakeWatcher {
func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
return &FakeWatcher{
result: make(chan Event, size),
}
Expand Down Expand Up @@ -150,3 +153,117 @@ func (f *FakeWatcher) Error(errValue runtime.Object) {
func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
f.result <- Event{action, obj}
}

// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type RaceFreeFakeWatcher struct {
result chan Event
Stopped bool
sync.Mutex
}

func NewRaceFreeFake() *RaceFreeFakeWatcher {
return &RaceFreeFakeWatcher{
result: make(chan Event, DefaultChanSize),
}
}

// Stop implements Interface.Stop().
func (f *RaceFreeFakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
if !f.Stopped {
glog.V(4).Infof("Stopping fake watcher.")
close(f.result)
f.Stopped = true
}
}

func (f *RaceFreeFakeWatcher) IsStopped() bool {
f.Lock()
defer f.Unlock()
return f.Stopped
}

// Reset prepares the watcher to be reused.
func (f *RaceFreeFakeWatcher) Reset() {
f.Lock()
defer f.Unlock()
f.Stopped = false
f.result = make(chan Event, DefaultChanSize)
}

func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
f.Lock()
defer f.Unlock()
return f.result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you also lock here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can potentially race with Reset().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

// Add sends an add event.
func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
f.Lock()
defer f.Unlock()
if !f.Stopped {
select {
case f.result <- Event{Added, obj}:
return
default:
panic(fmt.Errorf("channel full"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. Why not just:

f.result <- Event{Added, obj}

Copy link
Member

@wojtek-t wojtek-t Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same for Modify and Delete, Error and Action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because f.resut <- may block if the buffer is full and wait until the test fail due to timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with that. If you have running only producer and not consumer in your test, then the test is wrong, not the framework.

}
}
}

// Modify sends a modify event.
func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
f.Lock()
defer f.Unlock()
if !f.Stopped {
select {
case f.result <- Event{Modified, obj}:
return
default:
panic(fmt.Errorf("channel full"))
}
}
}

// Delete sends a delete event.
func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
f.Lock()
defer f.Unlock()
if !f.Stopped {
select {
case f.result <- Event{Deleted, lastValue}:
return
default:
panic(fmt.Errorf("channel full"))
}
}
}

// Error sends an Error event.
func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
f.Lock()
defer f.Unlock()
if !f.Stopped {
select {
case f.result <- Event{Error, errValue}:
return
default:
panic(fmt.Errorf("channel full"))
}
}
}

// Action sends an event of the requested type, for table-based testing.
func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
f.Lock()
defer f.Unlock()
if !f.Stopped {
select {
case f.result <- Event{action, obj}:
return
default:
panic(fmt.Errorf("channel full"))
}
}
}
47 changes: 47 additions & 0 deletions pkg/watch/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,53 @@ func TestFake(t *testing.T) {
consumer(f)
}

func TestRaceFreeFake(t *testing.T) {
f := NewRaceFreeFake()

table := []struct {
t EventType
s testType
}{
{Added, testType("foo")},
{Modified, testType("qux")},
{Modified, testType("bar")},
{Deleted, testType("bar")},
{Error, testType("error: blah")},
}

// Prove that f implements Interface by phrasing this as a function.
consumer := func(w Interface) {
for _, expect := range table {
got, ok := <-w.ResultChan()
if !ok {
t.Fatalf("closed early")
}
if e, a := expect.t, got.Type; e != a {
t.Fatalf("Expected %v, got %v", e, a)
}
if a, ok := got.Object.(testType); !ok || a != expect.s {
t.Fatalf("Expected %v, got %v", expect.s, a)
}
}
_, stillOpen := <-w.ResultChan()
if stillOpen {
t.Fatal("Never stopped")
}
}

sender := func() {
f.Add(testType("foo"))
f.Action(Modified, testType("qux"))
f.Modify(testType("bar"))
f.Delete(testType("bar"))
f.Error(testType("error: blah"))
f.Stop()
}

go sender()
consumer(f)
}

func TestEmpty(t *testing.T) {
w := NewEmptyWatch()
_, ok := <-w.ResultChan()
Expand Down