Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix OpenStreamSync busy looping #2827

Merged
merged 3 commits into from Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 9 additions & 3 deletions streams_map_outgoing_bidi.go
Expand Up @@ -106,6 +106,7 @@ func (m *outgoingBidiStreamsMap) OpenStreamSync(ctx context.Context) (streamI, e
}
str := m.openStream()
delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync()
return str, nil
}
Expand Down Expand Up @@ -172,8 +173,10 @@ func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num
m.blockedSent = false
m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
}

// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingBidiStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {
return
Expand All @@ -183,9 +186,12 @@ func (m *outgoingBidiStreamsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled
continue
}
close(c)
m.openQueue[qp] = nil
m.lowestInQueue = qp + 1
// unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
// It's sufficient to only unblock OpenStreamSync once.
select {
case c <- struct{}{}:
default:
}
return
}
}
Expand Down
12 changes: 9 additions & 3 deletions streams_map_outgoing_generic.go
Expand Up @@ -104,6 +104,7 @@ func (m *outgoingItemsMap) OpenStreamSync(ctx context.Context) (item, error) {
}
str := m.openStream()
delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync()
return str, nil
}
Expand Down Expand Up @@ -170,8 +171,10 @@ func (m *outgoingItemsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num
m.blockedSent = false
m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
}

// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingItemsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {
return
Expand All @@ -181,9 +184,12 @@ func (m *outgoingItemsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled
continue
}
close(c)
m.openQueue[qp] = nil
m.lowestInQueue = qp + 1
// unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
// It's sufficient to only unblock OpenStreamSync once.
select {
case c <- struct{}{}:
default:
}
return
}
}
Expand Down
192 changes: 184 additions & 8 deletions streams_map_outgoing_generic_test.go
Expand Up @@ -3,6 +3,11 @@ package quic
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"sync"
"time"

"github.com/golang/mock/gomock"
"github.com/lucas-clemente/quic-go/internal/protocol"
Expand All @@ -18,6 +23,15 @@ var _ = Describe("Streams Map (outgoing)", func() {
mockSender *MockStreamSender
)

// waitForEnqueued waits until there are n go routines waiting on OpenStreamSync()
waitForEnqueued := func(n int) {
Eventually(func() int {
m.mutex.Lock()
defer m.mutex.Unlock()
return len(m.openQueue)
}, 50*time.Millisecond, 50*time.Microsecond).Should(Equal(n))
}

BeforeEach(func() {
newItem = func(num protocol.StreamNum) item {
return &mockGenericStream{num: num}
Expand Down Expand Up @@ -124,8 +138,8 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
close(done)
}()
waitForEnqueued(1)

Consistently(done).ShouldNot(BeClosed())
m.SetMaxStream(1)
Eventually(done).Should(BeClosed())
})
Expand All @@ -140,12 +154,12 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(err).To(MatchError("context canceled"))
close(done)
}()
waitForEnqueued(1)

Consistently(done).ShouldNot(BeClosed())
cancel()
Eventually(done).Should(BeClosed())

// make sure that the next stream openend is stream 1
// make sure that the next stream opened is stream 1
m.SetMaxStream(1000)
str, err := m.OpenStream()
Expect(err).ToNot(HaveOccurred())
Expand All @@ -162,7 +176,8 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
close(done1)
}()
Consistently(done1).ShouldNot(BeClosed())
waitForEnqueued(1)

done2 := make(chan struct{})
go func() {
defer GinkgoRecover()
Expand All @@ -171,7 +186,7 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
close(done2)
}()
Consistently(done2).ShouldNot(BeClosed())
waitForEnqueued(2)

m.SetMaxStream(1)
Eventually(done1).Should(BeClosed())
Expand All @@ -180,6 +195,45 @@ var _ = Describe("Streams Map (outgoing)", func() {
Eventually(done2).Should(BeClosed())
})

It("opens streams in the right order, when one of the contexts is canceled", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done1 := make(chan struct{})
go func() {
defer GinkgoRecover()
str, err := m.OpenStreamSync(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
close(done1)
}()
waitForEnqueued(1)

done2 := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer GinkgoRecover()
_, err := m.OpenStreamSync(ctx)
Expect(err).To(MatchError(context.Canceled))
close(done2)
}()
waitForEnqueued(2)

done3 := make(chan struct{})
go func() {
defer GinkgoRecover()
str, err := m.OpenStreamSync(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(2)))
close(done3)
}()
waitForEnqueued(3)

cancel()
Eventually(done2).Should(BeClosed())
m.SetMaxStream(1000)
Eventually(done1).Should(BeClosed())
Eventually(done3).Should(BeClosed())
})

It("unblocks multiple OpenStreamSync calls at the same time", func() {
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done := make(chan struct{})
Expand All @@ -195,14 +249,14 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(err).ToNot(HaveOccurred())
done <- struct{}{}
}()
Consistently(done).ShouldNot(Receive())
waitForEnqueued(2)
go func() {
defer GinkgoRecover()
_, err := m.OpenStreamSync(context.Background())
Expect(err).To(MatchError("test done"))
done <- struct{}{}
}()
Consistently(done).ShouldNot(Receive())
waitForEnqueued(3)

m.SetMaxStream(2)
Eventually(done).Should(Receive())
Expand All @@ -223,7 +277,7 @@ var _ = Describe("Streams Map (outgoing)", func() {
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(1)))
close(openedSync)
}()
Consistently(openedSync).ShouldNot(BeClosed())
waitForEnqueued(1)

start := make(chan struct{})
openend := make(chan struct{})
Expand Down Expand Up @@ -309,4 +363,126 @@ var _ = Describe("Streams Map (outgoing)", func() {
expectTooManyStreamsError(err)
})
})

Context("randomized tests", func() {
It("opens streams", func() {
rand.Seed(GinkgoRandomSeed())
const n = 100
fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n)

// TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
done := make(map[int]chan struct{})
for i := 1; i <= n; i++ {
c := make(chan struct{})
done[i] = c

go func(doneChan chan struct{}, id protocol.StreamNum) {
defer GinkgoRecover()
defer close(doneChan)
str, err := m.OpenStreamSync(context.Background())
Expect(err).ToNot(HaveOccurred())
Expect(str.(*mockGenericStream).num).To(Equal(id))
}(c, protocol.StreamNum(i))
waitForEnqueued(i)
}

var limit int
for limit < n {
limit += rand.Intn(n/5) + 1
fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit)
m.SetMaxStream(protocol.StreamNum(limit))
for i := 1; i <= n; i++ {
if i <= limit {
Eventually(done[i]).Should(BeClosed())
} else {
Expect(done[i]).ToNot(BeClosed())
}
}
str, err := m.OpenStream()
if limit <= n {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(errTooManyOpenStreams.Error()))
} else {
Expect(str.(*mockGenericStream).num).To(Equal(protocol.StreamNum(n + 1)))
}
}
})

It("opens streams, when some of them are getting canceled", func() {
rand.Seed(GinkgoRandomSeed())
const n = 100
fmt.Fprintf(GinkgoWriter, "Opening %d streams concurrently.\n", n)

// TODO(#2826): check stream limits sent in STREAMS_BLOCKED frames
mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()

ctx, cancel := context.WithCancel(context.Background())
streamsToCancel := make(map[protocol.StreamNum]struct{}) // used as a set
for i := 0; i < 10; i++ {
id := protocol.StreamNum(rand.Intn(n) + 1)
fmt.Fprintf(GinkgoWriter, "Canceling stream %d.\n", id)
streamsToCancel[id] = struct{}{}
}

streamWillBeCanceled := func(id protocol.StreamNum) bool {
_, ok := streamsToCancel[id]
return ok
}

var streamIDs []int
var mutex sync.Mutex
done := make(map[int]chan struct{})
for i := 1; i <= n; i++ {
c := make(chan struct{})
done[i] = c

go func(doneChan chan struct{}, id protocol.StreamNum) {
defer GinkgoRecover()
defer close(doneChan)
cont := context.Background()
if streamWillBeCanceled(id) {
cont = ctx
}
str, err := m.OpenStreamSync(cont)
if streamWillBeCanceled(id) {
Expect(err).To(MatchError(context.Canceled))
return
}
Expect(err).ToNot(HaveOccurred())
mutex.Lock()
streamIDs = append(streamIDs, int(str.(*mockGenericStream).num))
mutex.Unlock()
}(c, protocol.StreamNum(i))
waitForEnqueued(i)
}

cancel()
for id := range streamsToCancel {
Eventually(done[int(id)]).Should(BeClosed())
}
var limit int
numStreams := n - len(streamsToCancel)
for limit < numStreams {
limit += rand.Intn(n/5) + 1
fmt.Fprintf(GinkgoWriter, "Setting stream limit to %d.\n", limit)
m.SetMaxStream(protocol.StreamNum(limit))
l := limit
if l > numStreams {
l = numStreams
}
Eventually(func() int {
mutex.Lock()
defer mutex.Unlock()
return len(streamIDs)
}).Should(Equal(l))
// check that all stream IDs were used
Expect(streamIDs).To(HaveLen(l))
sort.Ints(streamIDs)
for i := 0; i < l; i++ {
Expect(streamIDs[i]).To(Equal(i + 1))
}
}
})
})
})
12 changes: 9 additions & 3 deletions streams_map_outgoing_uni.go
Expand Up @@ -106,6 +106,7 @@ func (m *outgoingUniStreamsMap) OpenStreamSync(ctx context.Context) (sendStreamI
}
str := m.openStream()
delete(m.openQueue, queuePos)
m.lowestInQueue = queuePos + 1
m.unblockOpenSync()
return str, nil
}
Expand Down Expand Up @@ -172,8 +173,10 @@ func (m *outgoingUniStreamsMap) SetMaxStream(num protocol.StreamNum) {
m.maxStream = num
m.blockedSent = false
m.unblockOpenSync()
// TODO(#2826): it might be necessary to send a STREAMS_BLOCKED frame
}

// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream
func (m *outgoingUniStreamsMap) unblockOpenSync() {
if len(m.openQueue) == 0 {
return
Expand All @@ -183,9 +186,12 @@ func (m *outgoingUniStreamsMap) unblockOpenSync() {
if !ok { // entry was deleted because the context was canceled
continue
}
close(c)
m.openQueue[qp] = nil
m.lowestInQueue = qp + 1
// unblockOpenSync is called both from OpenStreamSync and from SetMaxStream.
// It's sufficient to only unblock OpenStreamSync once.
select {
case c <- struct{}{}:
default:
}
return
}
}
Expand Down