Skip to content

Commit

Permalink
Merge pull request #66971 from tnozicka/informer-watcher
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

#50102 Task 2: Add UntilWithSync

**What this PR does / why we need it**:
This is a split off from kubernetes/kubernetes#50102 to go in smaller pieces.

Introduces UntilWithSync based on informer.

**Needs kubernetes/kubernetes#66906 first**
/hold

**Release note**:
```release-note
NONE
```

/priority important-soon
/kind bug
(bug after the main PR which is this split from)

Kubernetes-commit: c4f355a2ad9692f5459541d4e4d94fcbc5f7d946
  • Loading branch information
k8s-publishing-bot committed Sep 6, 2018
2 parents bdc400e + dbf9709 commit d91c23d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
47 changes: 47 additions & 0 deletions pkg/watch/watch.go
Expand Up @@ -268,3 +268,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
}

// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
type ProxyWatcher struct {
result chan Event
stopCh chan struct{}

mutex sync.Mutex
stopped bool
}

var _ Interface = &ProxyWatcher{}

// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
func NewProxyWatcher(ch chan Event) *ProxyWatcher {
return &ProxyWatcher{
result: ch,
stopCh: make(chan struct{}),
stopped: false,
}
}

// Stop implements Interface
func (pw *ProxyWatcher) Stop() {
pw.mutex.Lock()
defer pw.mutex.Unlock()
if !pw.stopped {
pw.stopped = true
close(pw.stopCh)
}
}

// Stopping returns true if Stop() has been called
func (pw *ProxyWatcher) Stopping() bool {
pw.mutex.Lock()
defer pw.mutex.Unlock()
return pw.stopped
}

// ResultChan implements Interface
func (pw *ProxyWatcher) ResultChan() <-chan Event {
return pw.result
}

// StopChan returns stop channel
func (pw *ProxyWatcher) StopChan() <-chan struct{} {
return pw.stopCh
}
38 changes: 38 additions & 0 deletions pkg/watch/watch_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package watch_test

import (
"reflect"
"testing"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -135,3 +136,40 @@ func TestEmpty(t *testing.T) {
t.Errorf("unexpected result channel result")
}
}

func TestProxyWatcher(t *testing.T) {
events := []Event{
{Added, testType("foo")},
{Modified, testType("qux")},
{Modified, testType("bar")},
{Deleted, testType("bar")},
{Error, testType("error: blah")},
}

ch := make(chan Event, len(events))
w := NewProxyWatcher(ch)

for _, e := range events {
ch <- e
}

for _, e := range events {
g := <-w.ResultChan()
if !reflect.DeepEqual(e, g) {
t.Errorf("Expected %#v, got %#v", e, g)
continue
}
}

w.Stop()

select {
// Closed channel always reads immediately
case <-w.StopChan():
default:
t.Error("Channel isn't closed")
}

// Test double close
w.Stop()
}

0 comments on commit d91c23d

Please sign in to comment.