Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/watch: fix potential deadlock #1321

Merged
merged 2 commits into from
Sep 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 20 additions & 8 deletions pkg/watch/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ func (m *Mux) Watch() Interface {
id := m.nextWatcher
m.nextWatcher++
w := &muxWatcher{
result: make(chan Event),
id: id,
m: m,
result: make(chan Event),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
return w
Expand Down Expand Up @@ -114,20 +115,28 @@ func (m *Mux) loop() {
m.closeAll()
}

var testHookMuxDistribute = func() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to repro the race without this, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can sleep in the test if you'd prefer. That way the mutex should be held in distribute but it isn't guaranteed to be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, can't you just not read from the watch? Wouldn't that guarantee that the mutex is held?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, if you do that then you have no way of knowing if it's even tried to distribute a message yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I just had an idea. Make two watchers and receive on both of them in a
single select. Then when one gets an event, Stop the other one.
On Sep 15, 2014 6:24 PM, "Daniel Smith" notifications@github.com wrote:

In pkg/watch/mux.go:

@@ -114,20 +115,28 @@ func (m *Mux) loop() {
m.closeAll()
}

+var testHookMuxDistribute = func() {}

Oh, I see, if you do that then you have no way of knowing if it's even
tried to distribute a message yet.


Reply to this email directly or view it on GitHub
https://github.com/GoogleCloudPlatform/kubernetes/pull/1321/files#r17579206
.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, great idea! Can you make another PR since I just merged this one? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


// distribute sends event to all watchers. Blocking.
func (m *Mux) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
testHookMuxDistribute()
for _, w := range m.watchers {
w.result <- event
select {
case w.result <- event:
case <-w.stopped:
}
}
}

// muxWatcher handles a single watcher of a mux
type muxWatcher struct {
result chan Event
id int64
m *Mux
result chan Event
stopped chan struct{}
stop sync.Once
id int64
m *Mux
}

// ResultChan returns a channel to use for waiting on events.
Expand All @@ -137,5 +146,8 @@ func (mw *muxWatcher) ResultChan() <-chan Event {

// Stop stops watching and removes mw from its list.
func (mw *muxWatcher) Stop() {
mw.m.stopWatching(mw.id)
mw.stop.Do(func() {
close(mw.stopped)
mw.m.stopWatching(mw.id)
})
}
22 changes: 22 additions & 0 deletions pkg/watch/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"sync"
"testing"
"time"
)

type myType struct {
Expand Down Expand Up @@ -91,3 +92,24 @@ func TestMuxWatcherClose(t *testing.T) {
w.Stop()
w2.Stop()
}

func TestMuxWatcherStopDeadlock(t *testing.T) {
defer func(fn func()) { testHookMuxDistribute = fn }(testHookMuxDistribute)
sig, done := make(chan bool), make(chan bool)
testHookMuxDistribute = func() { sig <- true }
m := NewMux(0)
go func(w Interface) {
// Imagine this goroutine was receiving from w.ResultChan()
// until it received some signal and stopped watching.
<-sig
w.Stop()
close(done)
}(m.Watch())
m.Action(Added, &myType{})
select {
case <-time.After(5 * time.Second):
t.Error("timeout: deadlocked")
case <-done:
}
m.Shutdown()
}