Skip to content

Commit

Permalink
Merge pull request etcd-io#12538 from lzhfromustc/12_9_GoroutineLeak
Browse files Browse the repository at this point in the history
test: change channel operations to avoid potential goroutine leaks
  • Loading branch information
ptabor committed Feb 1, 2021
2 parents b5d1172 + f2a912a commit d6d03be
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 18 deletions.
6 changes: 3 additions & 3 deletions pkg/transport/timeout_dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (

func TestReadWriteTimeoutDialer(t *testing.T) {
stop := make(chan struct{})
defer func() {
stop <- struct{}{}
}()

ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("unexpected listen error: %v", err)
}
defer func() {
stop <- struct{}{}
}()
ts := testBlockingServer{ln, 2, stop}
go ts.Start(t)

Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func TestApplyRepeat(t *testing.T) {
// wait for conf change message
act, err := n.Wait(1)
// wait for stop message (async to avoid deadlock)
stopc := make(chan error)
stopc := make(chan error, 1)
go func() {
_, werr := n.Wait(1)
stopc <- werr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestDoubleBarrier(t *testing.T) {

b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
donec := make(chan struct{})
defer close(donec)
for i := 0; i < waiters-1; i++ {
go func() {
session, err := concurrency.NewSession(clus.RandClient())
Expand All @@ -48,17 +49,17 @@ func TestDoubleBarrier(t *testing.T) {
if err := bb.Enter(); err != nil {
t.Errorf("could not enter on barrier (%v)", err)
}
donec <- struct{}{}
<-donec
if err := bb.Leave(); err != nil {
t.Errorf("could not leave on barrier (%v)", err)
}
donec <- struct{}{}
<-donec
}()
}

time.Sleep(10 * time.Millisecond)
select {
case <-donec:
case donec <- struct{}{}:
t.Fatalf("barrier did not enter-wait")
default:
}
Expand All @@ -72,13 +73,13 @@ func TestDoubleBarrier(t *testing.T) {
select {
case <-timerC:
t.Fatalf("barrier enter timed out")
case <-donec:
case donec <- struct{}{}:
}
}

time.Sleep(10 * time.Millisecond)
select {
case <-donec:
case donec <- struct{}{}:
t.Fatalf("barrier did not leave-wait")
default:
}
Expand All @@ -89,7 +90,7 @@ func TestDoubleBarrier(t *testing.T) {
select {
case <-timerC:
t.Fatalf("barrier leave timed out")
case <-donec:
case donec <- struct{}{}:
}
}
}
Expand All @@ -100,6 +101,7 @@ func TestDoubleBarrierFailover(t *testing.T) {

waiters := 10
donec := make(chan struct{})
defer close(donec)

s0, err := concurrency.NewSession(clus.Client(0))
if err != nil {
Expand All @@ -118,7 +120,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
if berr := b.Enter(); berr != nil {
t.Errorf("could not enter on barrier (%v)", berr)
}
donec <- struct{}{}
<-donec
}()

for i := 0; i < waiters-1; i++ {
Expand All @@ -127,16 +129,16 @@ func TestDoubleBarrierFailover(t *testing.T) {
if berr := b.Enter(); berr != nil {
t.Errorf("could not enter on barrier (%v)", berr)
}
donec <- struct{}{}
<-donec
b.Leave()
donec <- struct{}{}
<-donec
}()
}

// wait for barrier enter to unblock
for i := 0; i < waiters; i++ {
select {
case <-donec:
case donec <- struct{}{}:
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for enter, %d", i)
}
Expand All @@ -148,7 +150,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
// join on rest of waiters
for i := 0; i < waiters-1; i++ {
select {
case <-donec:
case donec <- struct{}{}:
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for leave, %d", i)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/clientv3/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ func TestKVPutFailGetRetry(t *testing.T) {
t.Fatalf("got success on disconnected put, wanted error")
}

donec := make(chan struct{})
donec := make(chan struct{}, 1)
go func() {
// Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo")
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v2_http_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ func TestV2WatchKeyInDir(t *testing.T) {
tc := NewTestClient()

var body map[string]interface{}
c := make(chan bool)
c := make(chan bool, 1)

// Create an expiring directory
v := url.Values{}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ func TestV3WatchWithFilter(t *testing.T) {
t.Fatal(err)
}

recv := make(chan *pb.WatchResponse)
recv := make(chan *pb.WatchResponse, 1)
go func() {
// check received PUT
resp, rerr := ws.Recv()
Expand Down

0 comments on commit d6d03be

Please sign in to comment.