Skip to content
This repository has been archived by the owner on Dec 7, 2019. It is now read-only.

Commit

Permalink
fix transport tests for quic
Browse files Browse the repository at this point in the history
1. Some transports won't open streams till we write.
2. Don't close the underlying connection till we're *actually* done with it.
  • Loading branch information
Stebalien committed Feb 8, 2019
1 parent 8f3a996 commit 22414bc
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 51 deletions.
100 changes: 68 additions & 32 deletions test/stream.go
Expand Up @@ -277,52 +277,83 @@ func SubtestStreamOpenStress(t *testing.T, ta, tb tpt.Transport, maddr ma.Multia
defer l.Close()

count := 10000
go func() {
c, err := l.Accept()
checkErr(t, err)
stress := func() {
for i := 0; i < count; i++ {
s, err := c.OpenStream()
if err != nil {
panic(err)
}
fullClose(t, s)
}
}
workers := 5

go stress()
go stress()
go stress()
go stress()
go stress()
}()
var (
connA, connB tpt.Conn
)

b, err := tb.Dial(context.Background(), l.Multiaddr(), peerA)
accepted := make(chan error, 1)
go func() {
var err error
connA, err = l.Accept()
accepted <- err
}()
connB, err = tb.Dial(context.Background(), l.Multiaddr(), peerA)
checkErr(t, err)
checkErr(t, <-accepted)

time.Sleep(time.Millisecond * 50)
defer func() {
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()

recv := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
str, err := b.AcceptStream()
defer wg.Done()
for j := 0; j < workers; j++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count; i++ {
s, err := connA.OpenStream()
if err != nil {
t.Error(err)
return
}
wg.Add(1)
go func() {
defer wg.Done()
fullClose(t, s)
}()
}
}()
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < count*workers; i++ {
str, err := connB.AcceptStream()
if err != nil {
break
}
wg.Add(1)
go func() {
recv <- struct{}{}
defer wg.Done()
fullClose(t, str)
}()
}
}()

limit := time.After(time.Second * 10)
for i := 0; i < count*5; i++ {
select {
case <-recv:
case <-limit:
t.Fatal("timed out receiving streams")
}
timeout := time.After(time.Second * 10)
done := make(chan struct{})

go func() {
wg.Wait()
close(done)
}()

select {
case <-timeout:
t.Fatal("timed out receiving streams")
case <-done:
}
}

Expand All @@ -339,9 +370,14 @@ func SubtestStreamReset(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr,
if err != nil {
panic(err)
}

// Some transports won't open the stream until we write. That's
// fine.
s.Write([]byte("foo"))

time.Sleep(time.Millisecond * 50)

_, err = s.Write([]byte("foo"))
_, err = s.Write([]byte("bar"))
if err == nil {
t.Error("should have failed to write")
}
Expand Down
62 changes: 43 additions & 19 deletions test/transport.go
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"sync"
"testing"
Expand Down Expand Up @@ -63,52 +62,70 @@ func SubtestBasic(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA
}
defer list.Close()

done := make(chan struct{})
var (
connA, connB tpt.Conn
done = make(chan struct{})
)
defer func() {
<-done
if connA != nil {
connA.Close()
}
if connB != nil {
connB.Close()
}
}()

go func() {
defer close(done)
c, err := list.Accept()
var err error
connB, err = list.Accept()
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
s, err := c.AcceptStream()
s, err := connB.AcceptStream()
if err != nil {
c.Close()
t.Fatal(err)
t.Error(err)
return
}

buf := make([]byte, len(testData))
_, err = io.ReadFull(s, buf)
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}

if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}

n, err := s.Write(testData)
if err != nil {
t.Fatal(err)
t.Error(err)
return
}
s.Close()

if n != len(testData) {
t.Fatal(err)
t.Error(err)
return
}

err = s.Close()
if err != nil {
t.Error(err)
}
}()

if !tb.CanDial(list.Multiaddr()) {
t.Error("CanDial should have returned true")
}

c, err := tb.Dial(ctx, list.Multiaddr(), peerA)
connA, err = tb.Dial(ctx, list.Multiaddr(), peerA)
if err != nil {
t.Fatal(err)
}
defer c.Close()

s, err := c.OpenStream()
s, err := connA.OpenStream()
if err != nil {
t.Fatal(err)
}
Expand All @@ -123,13 +140,20 @@ func SubtestBasic(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA
t.Fatalf("failed to write enough data (a->b)")
return
}
err = s.Close()
if err != nil {
t.Fatal(err)
return
}

buf := make([]byte, len(testData))
_, err = io.ReadFull(s, buf)
buf, err := ioutil.ReadAll(s)
if err != nil {
t.Fatal(err)
return
}
if !bytes.Equal(testData, buf) {
t.Errorf("expected %s, got %s", testData, buf)
}
}

func SubtestPingPong(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID) {
Expand Down

0 comments on commit 22414bc

Please sign in to comment.