Skip to content

Commit

Permalink
Revert "Expand stream's flow control in case of an active read. (grpc…
Browse files Browse the repository at this point in the history
…#1248)"

This reverts commit 6dff7c5.
  • Loading branch information
menghanl committed May 24, 2017
1 parent 6dff7c5 commit 8f0921a
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 591 deletions.
4 changes: 2 additions & 2 deletions rpc_util.go
Expand Up @@ -278,7 +278,7 @@ type parser struct {
// that the underlying io.Reader must not return an incompatible
// error.
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
if _, err := io.ReadFull(p.r, p.header[:]); err != nil {
return 0, nil, err
}

Expand All @@ -294,7 +294,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
if _, err := p.r.Read(msg); err != nil {
if _, err := io.ReadFull(p.r, msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
Expand Down
12 changes: 2 additions & 10 deletions rpc_util_test.go
Expand Up @@ -47,14 +47,6 @@ import (
"google.golang.org/grpc/transport"
)

type fullReader struct {
reader io.Reader
}

func (f fullReader) Read(p []byte) (int, error) {
return io.ReadFull(f.reader, p)
}

var _ CallOption = EmptyCallOption{} // ensure EmptyCallOption implements the interface

func TestSimpleParsing(t *testing.T) {
Expand All @@ -75,7 +67,7 @@ func TestSimpleParsing(t *testing.T) {
// Check that messages with length >= 2^24 are parsed.
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := fullReader{bytes.NewReader(test.p)}
buf := bytes.NewReader(test.p)
parser := &parser{r: buf}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
Expand All @@ -87,7 +79,7 @@ func TestSimpleParsing(t *testing.T) {
func TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := fullReader{bytes.NewReader(p)}
b := bytes.NewReader(p)
parser := &parser{r: b}

wantRecvs := []struct {
Expand Down
42 changes: 5 additions & 37 deletions test/end2end_test.go
Expand Up @@ -449,7 +449,6 @@ type test struct {
streamServerInt grpc.StreamServerInterceptor
unknownHandler grpc.StreamHandler
sc <-chan grpc.ServiceConfig
customCodec grpc.Codec
serverInitialWindowSize int32
serverInitialConnWindowSize int32
clientInitialWindowSize int32
Expand Down Expand Up @@ -556,9 +555,6 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
case "clientTimeoutCreds":
sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
}
if te.customCodec != nil {
sopts = append(sopts, grpc.CustomCodec(te.customCodec))
}
s := grpc.NewServer(sopts...)
te.srv = s
if te.e.httpHandler {
Expand Down Expand Up @@ -645,9 +641,6 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.perRPCCreds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
}
if te.customCodec != nil {
opts = append(opts, grpc.WithCodec(te.customCodec))
}
var err error
te.cc, err = grpc.Dial(te.srvAddr, opts...)
if err != nil {
Expand Down Expand Up @@ -3278,51 +3271,26 @@ func testServerStreamingConcurrent(t *testing.T, e env) {

}

func generatePayloadSizes() [][]int {
reqSizes := [][]int{
{27182, 8, 1828, 45904},
}

num8KPayloads := 1024
eightKPayloads := []int{}
for i := 0; i < num8KPayloads; i++ {
eightKPayloads = append(eightKPayloads, (1 << 13))
}
reqSizes = append(reqSizes, eightKPayloads)

num2MPayloads := 8
twoMPayloads := []int{}
for i := 0; i < num2MPayloads; i++ {
twoMPayloads = append(twoMPayloads, (1 << 21))
}
reqSizes = append(reqSizes, twoMPayloads)

return reqSizes
}

func TestClientStreaming(t *testing.T) {
defer leakCheck(t)()
for _, s := range generatePayloadSizes() {
for _, e := range listTestEnv() {
testClientStreaming(t, e, s)
}
for _, e := range listTestEnv() {
testClientStreaming(t, e)
}
}

func testClientStreaming(t *testing.T, e env, sizes []int) {
func testClientStreaming(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())

ctx, _ := context.WithTimeout(te.ctx, time.Second*30)
stream, err := tc.StreamingInputCall(ctx)
stream, err := tc.StreamingInputCall(te.ctx)
if err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
}

var sum int
for _, s := range sizes {
for _, s := range reqSizes {
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 0 additions & 6 deletions test/servertester.go
Expand Up @@ -287,9 +287,3 @@ func (st *serverTester) writeRSTStream(streamID uint32, code http2.ErrCode) {
st.t.Fatalf("Error writing RST_STREAM: %v", err)
}
}

func (st *serverTester) writeDataPadded(streamID uint32, endStream bool, data, padding []byte) {
if err := st.fr.WriteDataPadded(streamID, endStream, data, padding); err != nil {
st.t.Fatalf("Error writing DATA with padding: %v", err)
}
}
45 changes: 1 addition & 44 deletions transport/control.go
Expand Up @@ -58,8 +58,6 @@ const (
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
// max window limit set by HTTP2 Specs.
maxWindowSize = math.MaxInt32
)

// The following defines various control items which could flow through
Expand Down Expand Up @@ -169,48 +167,14 @@ type inFlow struct {
// The amount of data the application has consumed but grpc has not sent
// window update for them. Used to reduce window update frequency.
pendingUpdate uint32
// delta is the extra window update given by receiver when an application
// is reading data bigger in size than the inFlow limit.
delta uint32
}

func (f *inFlow) maybeAdjust(n uint32) uint32 {
if n > uint32(math.MaxInt32) {
n = uint32(math.MaxInt32)
}
f.mu.Lock()
defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
// estUntransmittedData is the maximum number of bytes the sends might not have put
// on the wire yet. A value of 0 or less means that we have already received all or
// more bytes than the application is requesting to read.
estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
// This implies that unless we send a window update, the sender won't be able to send all the bytes
// for this message. Therefore we must send an update over the limit since there's an active read
// request from the application.
if estUntransmittedData > estSenderQuota {
// Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
if f.limit+n > maxWindowSize {
f.delta = maxWindowSize - f.limit
} else {
// Send a window update for the whole message and not just the difference between
// estUntransmittedData and estSenderQuota. This will be helpful in case the message
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
return f.delta
}
return 0
}

// onData is invoked when some data frame is received. It updates pendingData.
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
if f.pendingData+f.pendingUpdate > f.limit {
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
}
return nil
Expand All @@ -225,13 +189,6 @@ func (f *inFlow) onRead(n uint32) uint32 {
return 0
}
f.pendingData -= n
if n > f.delta {
n -= f.delta
f.delta = 0
} else {
f.delta -= n
n = 0
}
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
Expand Down
15 changes: 8 additions & 7 deletions transport/handler_server.go
Expand Up @@ -316,12 +316,13 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
req := ht.req

s := &Stream{
id: 0, // irrelevant
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
id: 0, // irrelevant
windowHandler: func(int) {}, // nothing
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
Expand All @@ -332,7 +333,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
ctx = peer.NewContext(ctx, pr)
s.ctx = newContextWithStream(ctx, s)
s.trReader = &recvBufferReader{ctx: s.ctx, recv: s.buf}
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}

// readerDone is closed when the Body.Read-ing goroutine exits.
readerDone := make(chan struct{})
Expand Down
44 changes: 12 additions & 32 deletions transport/http2_client.go
Expand Up @@ -173,9 +173,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
conn, err := dial(ctx, opts.Dialer, addr.Addr)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
}
return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
Expand All @@ -194,7 +194,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
temp := isTemporary(err)
return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
return nil, connectionErrorf(temp, err, "transport: %v", err)
}
isSecure = true
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
n, err := t.conn.Write(clientPreface)
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
if n != len(clientPreface) {
t.Close()
Expand All @@ -285,13 +285,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
}
if err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
go t.controller()
Expand All @@ -316,24 +316,18 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
headerChan: make(chan struct{}),
}
t.nextID += 2
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
s.windowHandler = func(n int) {
t.updateWindow(s, uint32(n))
}
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
s.ctx = ctx
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
s.dec = &recvBufferReader{
ctx: s.ctx,
goAway: s.goAway,
recv: s.buf,
}

return s
}

Expand Down Expand Up @@ -808,20 +802,6 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
return s, ok
}

// adjustWindow sends out extra window update over the initial window size
// of stream if the application is requesting data larger in size than
// the window.
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
s.mu.Lock()
defer s.mu.Unlock()
if s.state == streamDone {
return
}
if w := s.fc.maybeAdjust(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w})
}
}

// updateWindow adjusts the inbound quota for the stream and the transport.
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
Expand Down

0 comments on commit 8f0921a

Please sign in to comment.