Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename watch.Mux -> watch.Broadcaster #2748

Merged
merged 1 commit into from
Dec 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func GetEvents(f func(*api.Event)) watch.Interface {

const queueLen = 1000

var events = watch.NewMux(queueLen)
var events = watch.NewBroadcaster(queueLen)

// Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/event/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestRESTWatch(t *testing.T) {
t.Fatalf("Unexpected error %v", err)
}
go func() {
reg.Mux.Action(watch.Added, eventA)
reg.Broadcaster.Action(watch.Added, eventA)
}()
got := <-wi.ResultChan()
if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/registry/registrytest/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type GenericRegistry struct {
ObjectList runtime.Object
sync.Mutex

Mux *watch.Mux
Broadcaster *watch.Broadcaster
}

func NewGeneric(list runtime.Object) *GenericRegistry {
return &GenericRegistry{
ObjectList: list,
Mux: watch.NewMux(0),
ObjectList: list,
Broadcaster: watch.NewBroadcaster(0),
}
}

Expand All @@ -54,7 +54,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje

func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Mux.Watch(), nil
return r.Broadcaster.Watch(), nil
}

func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) {
Expand All @@ -67,21 +67,21 @@ func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object)
r.Lock()
defer r.Unlock()
r.Object = obj
r.Mux.Action(watch.Added, obj)
r.Broadcaster.Action(watch.Added, obj)
return r.Err
}

func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object) error {
r.Lock()
defer r.Unlock()
r.Object = obj
r.Mux.Action(watch.Modified, obj)
r.Broadcaster.Action(watch.Modified, obj)
return r.Err
}

func (r *GenericRegistry) Delete(ctx api.Context, id string) error {
r.Lock()
defer r.Unlock()
r.Mux.Action(watch.Deleted, r.Object)
r.Broadcaster.Action(watch.Deleted, r.Object)
return r.Err
}
16 changes: 8 additions & 8 deletions pkg/registry/registrytest/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ type PodRegistry struct {
Pods *api.PodList
sync.Mutex

mux *watch.Mux
broadcaster *watch.Broadcaster
}

func NewPodRegistry(pods *api.PodList) *PodRegistry {
return &PodRegistry{
Pods: pods,
mux: watch.NewMux(0),
Pods: pods,
broadcaster: watch.NewBroadcaster(0),
}
}

Expand Down Expand Up @@ -64,8 +64,8 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api.
}

func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.mux.Watch(), nil
// TODO: wire filter down into the broadcaster; it needs access to current and previous state :(
return r.broadcaster.Watch(), nil
}

func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) {
Expand All @@ -78,21 +78,21 @@ func (r *PodRegistry) CreatePod(ctx api.Context, pod *api.Pod) error {
r.Lock()
defer r.Unlock()
r.Pod = pod
r.mux.Action(watch.Added, pod)
r.broadcaster.Action(watch.Added, pod)
return r.Err
}

func (r *PodRegistry) UpdatePod(ctx api.Context, pod *api.Pod) error {
r.Lock()
defer r.Unlock()
r.Pod = pod
r.mux.Action(watch.Modified, pod)
r.broadcaster.Action(watch.Modified, pod)
return r.Err
}

func (r *PodRegistry) DeletePod(ctx api.Context, podId string) error {
r.Lock()
defer r.Unlock()
r.mux.Action(watch.Deleted, r.Pod)
r.broadcaster.Action(watch.Deleted, r.Pod)
return r.Err
}
44 changes: 22 additions & 22 deletions pkg/watch/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)

// Mux distributes event notifications among any number of watchers. Every event
// Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher.
type Mux struct {
type Broadcaster struct {
lock sync.Mutex

watchers map[int64]*muxWatcher
watchers map[int64]*broadcasterWatcher
nextWatcher int64

incoming chan Event
}

// NewMux creates a new Mux. queueLength is the maximum number of events to queue.
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue.
// When queueLength is 0, Action will block until any prior event has been
// completely distributed. It is guaranteed that events will be distibuted in the
// order in which they ocurr, but the order in which a single event is distributed
// among all of the watchers is unspecified.
func NewMux(queueLength int) *Mux {
m := &Mux{
watchers: map[int64]*muxWatcher{},
func NewBroadcaster(queueLength int) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength),
}
go m.loop()
Expand All @@ -50,12 +50,12 @@ func NewMux(queueLength int) *Mux {
// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Mux) Watch() Interface {
func (m *Broadcaster) Watch() Interface {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w := &muxWatcher{
w := &broadcasterWatcher{
result: make(chan Event),
stopped: make(chan struct{}),
id: id,
Expand All @@ -66,7 +66,7 @@ func (m *Mux) Watch() Interface {
}

// stopWatching stops the given watcher and removes it from the list.
func (m *Mux) stopWatching(id int64) {
func (m *Broadcaster) stopWatching(id int64) {
m.lock.Lock()
defer m.lock.Unlock()
w, ok := m.watchers[id]
Expand All @@ -79,32 +79,32 @@ func (m *Mux) stopWatching(id int64) {
}

// closeAll disconnects all watchers (presumably in response to a Shutdown call).
func (m *Mux) closeAll() {
func (m *Broadcaster) closeAll() {
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
close(w.result)
}
// Delete everything from the map, since presence/absence in the map is used
// by stopWatching to avoid double-closing the channel.
m.watchers = map[int64]*muxWatcher{}
m.watchers = map[int64]*broadcasterWatcher{}
}

// Action distributes the given event among all watchers.
func (m *Mux) Action(action EventType, obj runtime.Object) {
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj}
}

// Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action after calling Shutdown.
func (m *Mux) Shutdown() {
func (m *Broadcaster) Shutdown() {
close(m.incoming)
}

// loop recieves from m.incoming and distributes to all watchers.
func (m *Mux) loop() {
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Mux.
// bug in watch.Broadcaster.
for {
event, ok := <-m.incoming
if !ok {
Expand All @@ -116,7 +116,7 @@ func (m *Mux) loop() {
}

// distribute sends event to all watchers. Blocking.
func (m *Mux) distribute(event Event) {
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
Expand All @@ -127,22 +127,22 @@ func (m *Mux) distribute(event Event) {
}
}

// muxWatcher handles a single watcher of a mux
type muxWatcher struct {
// broadcasterWatcher handles a single watcher of a broadcaster
type broadcasterWatcher struct {
result chan Event
stopped chan struct{}
stop sync.Once
id int64
m *Mux
m *Broadcaster
}

// ResultChan returns a channel to use for waiting on events.
func (mw *muxWatcher) ResultChan() <-chan Event {
func (mw *broadcasterWatcher) ResultChan() <-chan Event {
return mw.result
}

// Stop stops watching and removes mw from its list.
func (mw *muxWatcher) Stop() {
func (mw *broadcasterWatcher) Stop() {
mw.stop.Do(func() {
close(mw.stopped)
mw.m.stopWatching(mw.id)
Expand Down
16 changes: 8 additions & 8 deletions pkg/watch/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ type myType struct {

func (*myType) IsAnAPIObject() {}

func TestMux(t *testing.T) {
func TestBroadcaster(t *testing.T) {
table := []Event{
{Added, &myType{"foo", "hello world 1"}},
{Added, &myType{"bar", "hello world 2"}},
{Modified, &myType{"foo", "goodbye world 3"}},
{Deleted, &myType{"bar", "hello world 4"}},
}

// The mux we're testing
m := NewMux(0)
// The broadcaster we're testing
m := NewBroadcaster(0)

// Add a bunch of watchers
const testWatchers = 2
Expand Down Expand Up @@ -76,8 +76,8 @@ func TestMux(t *testing.T) {
wg.Wait()
}

func TestMuxWatcherClose(t *testing.T) {
m := NewMux(0)
func TestBroadcasterWatcherClose(t *testing.T) {
m := NewBroadcaster(0)
w := m.Watch()
w2 := m.Watch()
w.Stop()
Expand All @@ -93,11 +93,11 @@ func TestMuxWatcherClose(t *testing.T) {
w2.Stop()
}

func TestMuxWatcherStopDeadlock(t *testing.T) {
func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
done := make(chan bool)
m := NewMux(0)
m := NewBroadcaster(0)
go func(w0, w1 Interface) {
// We know Mux is in the distribute loop once one watcher receives
// We know Broadcaster is in the distribute loop once one watcher receives
// an event. Stop the other watcher while distribute is trying to
// send to it.
select {
Expand Down