Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bundle small writes on streams #2538

Merged
merged 1 commit into from May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 18 additions & 8 deletions send_stream.go
Expand Up @@ -117,13 +117,19 @@ func (s *sendStream) Write(p []byte) (int, error) {
// When the user now calls Close(), this is much more likely to happen before we popped that last STREAM frame,
// allowing us to set the FIN bit on that frame (instead of sending an empty STREAM frame with FIN).
if s.canBufferStreamFrame() && len(s.dataForWriting) > 0 {
f := wire.GetStreamFrame()
f.Offset = s.writeOffset
f.StreamID = s.streamID
f.DataLenPresent = true
f.Data = f.Data[:len(s.dataForWriting)]
copy(f.Data, s.dataForWriting)
s.nextFrame = f
if s.nextFrame == nil {
f := wire.GetStreamFrame()
f.Offset = s.writeOffset
f.StreamID = s.streamID
f.DataLenPresent = true
f.Data = f.Data[:len(s.dataForWriting)]
copy(f.Data, s.dataForWriting)
s.nextFrame = f
} else {
l := len(s.nextFrame.Data)
s.nextFrame.Data = s.nextFrame.Data[:l+len(s.dataForWriting)]
copy(s.nextFrame.Data[l:], s.dataForWriting)
}
s.dataForWriting = nil
bytesWritten = len(p)
copied = true
Expand Down Expand Up @@ -176,7 +182,11 @@ func (s *sendStream) Write(p []byte) (int, error) {
}

func (s *sendStream) canBufferStreamFrame() bool {
return s.nextFrame == nil && protocol.ByteCount(len(s.dataForWriting)) <= protocol.MaxReceivePacketSize
var l protocol.ByteCount
if s.nextFrame != nil {
l = s.nextFrame.DataLen()
}
return l+protocol.ByteCount(len(s.dataForWriting)) <= protocol.MaxReceivePacketSize
}

// popStreamFrame returns the next STREAM frame that is supposed to be sent on this stream
Expand Down
23 changes: 23 additions & 0 deletions send_stream_test.go
Expand Up @@ -124,6 +124,29 @@ var _ = Describe("Send Stream", func() {
Eventually(done).Should(BeClosed())
})

It("bundles small writes", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
mockSender.EXPECT().onHasStreamData(streamID).Times(2)
n, err := strWithTimeout.Write([]byte("foo"))
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(3))
n, err = strWithTimeout.Write([]byte("bar"))
Expect(err).ToNot(HaveOccurred())
Expect(n).To(Equal(3))
close(done)
}()
Eventually(done).Should(BeClosed()) // both Write calls returned without any data having been dequeued yet
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
frame, _ := str.popStreamFrame(protocol.MaxByteCount)
f := frame.Frame.(*wire.StreamFrame)
Expect(f.Offset).To(BeZero())
Expect(f.FinBit).To(BeFalse())
Expect(f.Data).To(Equal([]byte("foobar")))
})

It("writes and gets data in multiple turns, for large writes", func() {
mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount).Times(5)
var totalBytesSent protocol.ByteCount
Expand Down