Skip to content

Commit

Permalink
QSP 32: Add Appropriate Stream Deadlines for RPC Requests (#6583)
Browse files Browse the repository at this point in the history
* add no deadlines
* Merge branch 'master' into fixDeadlines
* nogo
* Merge refs/heads/master into fixDeadlines
* Merge refs/heads/master into fixDeadlines
  • Loading branch information
nisdas committed Jul 13, 2020
1 parent 79fbaae commit 27577bc
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 20 deletions.
15 changes: 2 additions & 13 deletions beacon-chain/p2p/sender.go
Expand Up @@ -2,12 +2,10 @@ package p2p

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
)
Expand All @@ -23,24 +21,15 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
topic := baseTopic + s.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))

// TTFB_TIME (5s) + RESP_TIMEOUT (10s).
var deadline = params.BeaconNetworkConfig().TtfbTimeout + params.BeaconNetworkConfig().RespTimeout
ctx, cancel := context.WithTimeout(ctx, deadline)
// Apply max dial timeout when opening a new stream.
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()

stream, err := s.host.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}
if err := stream.SetReadDeadline(time.Now().Add(deadline)); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}
if err := stream.SetWriteDeadline(time.Now().Add(deadline)); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic == RPCMetaDataTopic {
return stream, nil
Expand Down
34 changes: 32 additions & 2 deletions beacon-chain/sync/error.go
Expand Up @@ -3,10 +3,11 @@ package sync
import (
"bytes"
"errors"
"io"

"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
)

const genericError = "internal service error"
Expand Down Expand Up @@ -35,7 +36,36 @@ func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error
}

// ReadStatusCode response from a RPC stream.
func ReadStatusCode(stream io.Reader, encoding encoder.NetworkEncoding) (uint8, string, error) {
func ReadStatusCode(stream network.Stream, encoding encoder.NetworkEncoding) (uint8, string, error) {
// Set ttfb deadline.
SetStreamReadDeadline(stream, params.BeaconNetworkConfig().TtfbTimeout)
b := make([]byte, 1)
_, err := stream.Read(b)
if err != nil {
return 0, "", err
}

if b[0] == responseCodeSuccess {
// Set response deadline on a successful response code.
SetStreamReadDeadline(stream, params.BeaconNetworkConfig().RespTimeout)

return 0, "", nil
}

// Set response deadline, when reading error message.
SetStreamReadDeadline(stream, params.BeaconNetworkConfig().RespTimeout)
msg := &pb.ErrorResponse{
Message: []byte{},
}
if err := encoding.DecodeWithMaxLength(stream, msg); err != nil {
return 0, "", err
}

return b[0], string(msg.Message), nil
}

// reads data from the stream without applying any timeouts.
func readStatusCodeNoDeadline(stream network.Stream, encoding encoder.NetworkEncoding) (uint8, string, error) {
b := make([]byte, 1)
_, err := stream.Read(b)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/initial-sync/blocks_fetcher.go
Expand Up @@ -330,7 +330,8 @@ func (f *blocksFetcher) requestBlocks(

resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
for i := uint64(0); ; i++ {
blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p)
isFirstChunk := i == 0
blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p, isFirstChunk)
if err == io.EOF {
break
}
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Expand Up @@ -31,7 +31,8 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
}
}()
for i := 0; i < len(blockRoots); i++ {
blk, err := ReadChunkedBlock(stream, s.p2p)
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, s.p2p, isFirstChunk)
// Return error until #6408 is resolved.
if err == io.EOF {
return err
Expand Down Expand Up @@ -79,7 +80,8 @@ func (s *Service) sendRecentBeaconBlocksRequestFallback(ctx context.Context, blo
}
}()
for i := 0; i < len(blockRoots); i++ {
blk, err := ReadChunkedBlock(stream, s.p2p)
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, s.p2p, isFirstChunk)
if err == io.EOF {
break
}
Expand Down
23 changes: 21 additions & 2 deletions beacon-chain/sync/rpc_chunked_response.go
Expand Up @@ -29,19 +29,38 @@ func WriteChunk(stream libp2pcore.Stream, encoding encoder.NetworkEncoding, msg

// ReadChunkedBlock handles each response chunk that is sent by the
// peer and converts it into a beacon block.
func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) {
func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P, isFirstChunk bool) (*eth.SignedBeaconBlock, error) {
// Handle deadlines differently for first chunk
if isFirstChunk {
return readFirstChunkedBlock(stream, p2p)
}
blk := &eth.SignedBeaconBlock{}
if err := readResponseChunk(stream, p2p, blk); err != nil {
return nil, err
}
return blk, nil
}

// readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to
// it.
func readFirstChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) {
blk := &eth.SignedBeaconBlock{}
code, errMsg, err := ReadStatusCode(stream, p2p.Encoding())
if err != nil {
return nil, err
}
if code != 0 {
return nil, errors.New(errMsg)
}
err = p2p.Encoding().DecodeWithMaxLength(stream, blk)
return blk, err
}

// readResponseChunk reads the response from the stream and decodes it into the
// provided message type.
func readResponseChunk(stream libp2pcore.Stream, p2p p2p.P2P, to interface{}) error {
SetStreamReadDeadline(stream, respTimeout)
code, errMsg, err := ReadStatusCode(stream, p2p.Encoding())
code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding())
if err != nil {
return err
}
Expand Down

0 comments on commit 27577bc

Please sign in to comment.