Skip to content

Commit

Permalink
Write Unit test to imitate Panic
Browse files Browse the repository at this point in the history
There was a race creating a panic with shutting down
an eventbroadcaster and it's associated watchers. This
test exposes it.

Signed-off-by: Andrew Stoycos <astoycos@redhat.com>
  • Loading branch information
astoycos committed May 16, 2022
1 parent 30adcd0 commit 2d614a1
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions staging/src/k8s.io/apimachinery/pkg/watch/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package watch_test
package watch

import (
"reflect"
Expand All @@ -26,7 +26,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
. "k8s.io/apimachinery/pkg/watch"
)

type myType struct {
Expand Down Expand Up @@ -246,3 +245,47 @@ func TestBroadcasterSendEventAfterShutdown(t *testing.T) {
assert.Equal(t, sendOnClosed, false, "ActionOrDrop should return false if broadcaster is already shutdown")
assert.EqualError(t, err, "broadcaster already stopped", "ActionOrDrop should report error id broadcaster is shutdown")
}

// Test this since we see usage patterns where the broadcaster and watchers are
// stopped simultaneously leading to races.
func TestBroadcasterShutdownRace(t *testing.T) {
m := NewBroadcaster(1, WaitIfChannelFull)
stopCh := make(chan struct{})

// Add a bunch of watchers
const testWatchers = 2
for i := 0; i < testWatchers; i++ {
i := i

_, err := m.Watch()
if err != nil {
t.Fatalf("Unable start event watcher: '%v' (will not retry!)", err)
}
// This is how we force the watchers to close down independently of the
// eventbroadcaster, see real usage pattern in startRecordingEvents()
go func() {
<-stopCh
t.Log("Stopping Watchers")
m.stopWatching(int64(i))
}()
}

event := Event{Type: Added, Object: &myType{"foo", "hello world"}}
err := m.Action(event.Type, event.Object)
if err != nil {
t.Fatalf("error sending event: %v", err)
}

// Manually simulate m.Shutdown() but change it to force a race scenario
// 1. Close watcher stopchannel, so watchers are closed independently of the
// eventBroadcaster
// 2. Shutdown the m.incoming slightly Before m.stopped so that the watcher's
// call of Blockqueue can pass the m.stopped check.
m.blockQueue(func() {
close(stopCh)
close(m.incoming)
time.Sleep(1 * time.Millisecond)
close(m.stopped)
})
m.distributing.Wait()
}

0 comments on commit 2d614a1

Please sign in to comment.