Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions share/tunnel/wg.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
package tunnel

import (
"sync"
"sync/atomic"
)
import "sync"

type waitGroup struct {
mu sync.Mutex
inner sync.WaitGroup
n int32
n int
}

func (w *waitGroup) Add(n int) {
atomic.AddInt32(&w.n, int32(n))
w.mu.Lock()
w.n += n
w.inner.Add(n)
w.mu.Unlock()
Comment on lines +13 to +15
}

func (w *waitGroup) Done() {
if n := atomic.LoadInt32(&w.n); n > 0 && atomic.CompareAndSwapInt32(&w.n, n, n-1) {
w.mu.Lock()
if w.n > 0 {
w.n--
w.inner.Done()
}
w.mu.Unlock()
}

func (w *waitGroup) DoneAll() {
for atomic.LoadInt32(&w.n) > 0 {
w.Done()
w.mu.Lock()
for w.n > 0 {
w.n--
w.inner.Done()
}
w.mu.Unlock()
Comment on lines +28 to +33
}

func (w *waitGroup) Wait() {
Expand Down
63 changes: 63 additions & 0 deletions share/tunnel/wg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package tunnel

import (
"sync"
"testing"
"time"
)

// TestWaitGroupAddDoneAllRace stresses concurrent Add/DoneAll to reproduce the
// race fixed in #585. Without the mutex, Add() is non-atomic between bumping
// the counter and the inner sync.WaitGroup, so DoneAll()'s loop can call
// inner.Done() before inner.Add() runs and panic with "negative WaitGroup
// counter".
func TestWaitGroupAddDoneAllRace(t *testing.T) {
var wg waitGroup
stop := make(chan struct{})
var workers sync.WaitGroup
for i := 0; i < 8; i++ {
workers.Add(2)
go func() {
defer workers.Done()
for {
select {
case <-stop:
return
default:
wg.Add(1)
}
}
}()
go func() {
defer workers.Done()
for {
select {
case <-stop:
return
default:
wg.DoneAll()
}
}
}()
}
time.Sleep(500 * time.Millisecond)
close(stop)
workers.Wait()
wg.DoneAll()
wg.Wait()
}

func TestWaitGroupDoneAllDrains(t *testing.T) {
var wg waitGroup
wg.Add(3)
wg.DoneAll()
wg.Wait()
}

func TestWaitGroupDoneIsIdempotent(t *testing.T) {
var wg waitGroup
wg.Add(1)
wg.Done()
wg.Done() // extra Done must be a no-op, not a panic
wg.Wait()
}