Skip to content

Commit

Permalink
Extract common types from sync (#7843)
Browse files Browse the repository at this point in the history
* extract common types from sync

* fix tests

* simplify

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
farazdagi and prylabs-bulldozer[bot] committed Nov 18, 2020
1 parent fec4692 commit 7735a08
Show file tree
Hide file tree
Showing 17 changed files with 163 additions and 137 deletions.
6 changes: 5 additions & 1 deletion beacon-chain/p2p/types/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["types.go"],
srcs = [
"rpc_errors.go",
"rpc_goodbye_codes.go",
"types.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
Expand Down
15 changes: 15 additions & 0 deletions beacon-chain/p2p/types/rpc_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

import "errors"

var (
ErrWrongForkDigestVersion = errors.New("wrong fork digest version")
ErrInvalidEpoch = errors.New("invalid epoch")
ErrInvalidFinalizedRoot = errors.New("invalid finalized root")
ErrInvalidSequenceNum = errors.New("invalid sequence number provided")
ErrGeneric = errors.New("internal service error")
ErrInvalidParent = errors.New("mismatched parent root")
ErrRateLimited = errors.New("rate limited")
ErrIODeadline = errors.New("i/o deadline exceeded")
ErrInvalidRequest = errors.New("invalid range, step or count")
)
40 changes: 40 additions & 0 deletions beacon-chain/p2p/types/rpc_goodbye_codes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package types

// RPCGoodbyeCode represents goodbye code, used in sync package.
type RPCGoodbyeCode = SSZUint64

const (
// Spec defined codes.
GoodbyeCodeClientShutdown RPCGoodbyeCode = iota
GoodbyeCodeWrongNetwork
GoodbyeCodeGenericError

// Teku specific codes
GoodbyeCodeUnableToVerifyNetwork = RPCGoodbyeCode(128)

// Lighthouse specific codes
GoodbyeCodeTooManyPeers = RPCGoodbyeCode(129)
GoodbyeCodeBadScore = RPCGoodbyeCode(250)
GoodbyeCodeBanned = RPCGoodbyeCode(251)
)

// GoodbyeCodeMessages defines a mapping between goodbye codes and string messages.
var GoodbyeCodeMessages = map[RPCGoodbyeCode]string{
GoodbyeCodeClientShutdown: "client shutdown",
GoodbyeCodeWrongNetwork: "irrelevant network",
GoodbyeCodeGenericError: "fault/error",
GoodbyeCodeUnableToVerifyNetwork: "unable to verify network",
GoodbyeCodeTooManyPeers: "client has too many peers",
GoodbyeCodeBadScore: "peer score too low",
GoodbyeCodeBanned: "client banned this node",
}

// ErrToGoodbyeCode converts given error to RPC goodbye code.
func ErrToGoodbyeCode(err error) RPCGoodbyeCode {
switch err {
case ErrWrongForkDigestVersion:
return GoodbyeCodeWrongNetwork
default:
return GoodbyeCodeGenericError
}
}
13 changes: 0 additions & 13 deletions beacon-chain/sync/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ import (
"github.com/sirupsen/logrus"
)

const genericError = "internal service error"
const rateLimitedError = "rate limited"
const reqError = "invalid range, step or count"
const seqError = "invalid sequence number provided"
const deadlineError = "i/o deadline exceeded"

var errWrongForkDigestVersion = errors.New("wrong fork digest version")
var errInvalidEpoch = errors.New("invalid epoch")
var errInvalidFinalizedRoot = errors.New("invalid finalized root")
var errInvalidSequenceNum = errors.New(seqError)
var errGeneric = errors.New(genericError)
var errInvalidParent = errors.New("mismatched parent root")

var responseCodeSuccess = byte(0x00)
var responseCodeInvalidRequest = byte(0x01)
var responseCodeServerError = byte(0x02)
Expand Down
14 changes: 7 additions & 7 deletions beacon-chain/sync/pending_blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil"
Expand Down Expand Up @@ -150,9 +150,9 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin
if code == 0 {
t.Error("Expected a non-zero code")
}
if errMsg != errWrongForkDigestVersion.Error() {
t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(errWrongForkDigestVersion.Error()))
t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, errWrongForkDigestVersion.Error())
if errMsg != p2ptypes.ErrWrongForkDigestVersion.Error() {
t.Logf("Received error string len %d, wanted error string len %d", len(errMsg), len(p2ptypes.ErrWrongForkDigestVersion.Error()))
t.Errorf("Received unexpected message response in the stream: %s. Wanted %s.", errMsg, p2ptypes.ErrWrongForkDigestVersion.Error())
}
})

Expand Down Expand Up @@ -374,15 +374,15 @@ func TestService_BatchRootRequest(t *testing.T) {
require.NoError(t, err)

// Send in duplicated roots to also test deduplicaton.
sentRoots := p2pTypes.BeaconBlockByRootsReq{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root}
expectedRoots := p2pTypes.BeaconBlockByRootsReq{b2Root, b3Root, b4Root, b5Root}
sentRoots := p2ptypes.BeaconBlockByRootsReq{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root}
expectedRoots := p2ptypes.BeaconBlockByRootsReq{b2Root, b3Root, b4Root, b5Root}

pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
var out p2pTypes.BeaconBlockByRootsReq
var out p2ptypes.BeaconBlockByRootsReq
assert.NoError(t, p2.Encoding().DecodeWithMaxLength(stream, &out))
assert.DeepEqual(t, expectedRoots, out, "Did not receive expected message")
response := []*ethpb.SignedBeaconBlock{b2, b3, b4, b5}
Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)
Expand Down Expand Up @@ -81,8 +82,8 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
}
if amt > uint64(remaining) {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
writeErrorResponseToStream(responseCodeInvalidRequest, rateLimitedError, stream, l.p2p)
return errors.New(rateLimitedError)
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
Expand Down Expand Up @@ -42,8 +43,7 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) {
code, errMsg, err := readStatusCodeNoDeadline(stream, p2.Encoding())
require.NoError(t, err, "could not read incoming stream")
assert.Equal(t, responseCodeInvalidRequest, code, "not equal response codes")
assert.Equal(t, rateLimitedError, errMsg, "not equal errors")

assert.Equal(t, p2ptypes.ErrRateLimited.Error(), errMsg, "not equal errors")
})
wg.Add(1)
stream, err := p1.BHost.NewStream(context.Background(), p2.PeerID(), protocol.ID(topic))
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/prysmaticlabs/prysm/shared/traceutil"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
// Check before hand that peer is valid.
if s.p2p.Peers().IsBad(stream.Conn().RemotePeer()) {
closeStream(stream, log)
if err := s.sendGoodByeAndDisconnect(ctx, codeBanned, stream.Conn().RemotePeer()); err != nil {
if err := s.sendGoodByeAndDisconnect(ctx, p2ptypes.GoodbyeCodeBanned, stream.Conn().RemotePeer()); err != nil {
log.Debugf("Could not disconnect from peer: %v", err)
}
return
Expand All @@ -100,7 +101,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
if baseTopic == p2p.RPCMetaDataTopic {
if err := handle(ctx, base, stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != errWrongForkDigestVersion {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Failed to handle p2p RPC")
}
traceutil.AnnotateError(span, err)
Expand All @@ -126,7 +127,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
}
if err := handle(ctx, msg.Interface(), stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != errWrongForkDigestVersion {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Failed to handle p2p RPC")
}
traceutil.AnnotateError(span, err)
Expand All @@ -140,7 +141,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
}
if err := handle(ctx, msg.Elem().Interface(), stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != errWrongForkDigestVersion {
if err != p2ptypes.ErrWrongForkDigestVersion {
log.WithError(err).Debug("Failed to handle p2p RPC")
}
traceutil.AnnotateError(span, err)
Expand Down
31 changes: 16 additions & 15 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -80,14 +81,14 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}

if endSlot-startSlot > rangeLimit {
s.writeErrorResponseToStream(responseCodeInvalidRequest, reqError, stream)
err := errors.New(reqError)
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidRequest.Error(), stream)
err := p2ptypes.ErrInvalidRequest
traceutil.AnnotateError(span, err)
return err
}

err := s.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, &prevRoot, stream)
if err != nil && !errors.Is(err, errInvalidParent) {
if err != nil && !errors.Is(err, p2ptypes.ErrInvalidParent) {
return err
}
// Reduce capacity of peer in the rate limiter first.
Expand All @@ -97,7 +98,7 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
// Exit in the event we have a disjoint chain to
// return.
if errors.Is(err, errInvalidParent) {
if errors.Is(err, p2ptypes.ErrInvalidParent) {
break
}

Expand Down Expand Up @@ -128,7 +129,7 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
blks, roots, err := s.db.Blocks(ctx, filter)
if err != nil {
log.WithError(err).Debug("Failed to retrieve blocks")
s.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, err)
return err
}
Expand All @@ -137,7 +138,7 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
genBlock, genRoot, err := s.retrieveGenesisBlock(ctx)
if err != nil {
log.WithError(err).Debug("Failed to retrieve genesis block")
s.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, err)
return err
}
Expand All @@ -148,15 +149,15 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
// we only return valid sets of blocks.
blks, roots, err = s.dedupBlocksAndRoots(blks, roots)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, err)
return err
}
blks, roots = s.sortBlocksAndRoots(blks, roots)

blks, err = s.filterBlocks(ctx, blks, roots, prevRoot, step, startSlot)
if err != nil && err != errInvalidParent {
s.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
if err != nil && err != p2ptypes.ErrInvalidParent {
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, err)
return err
}
Expand All @@ -166,7 +167,7 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
}
if chunkErr := s.chunkWriter(stream, b); chunkErr != nil {
log.WithError(chunkErr).Debug("Failed to send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, genericError, stream)
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, chunkErr)
return chunkErr
}
Expand All @@ -189,20 +190,20 @@ func (s *Service) validateRangeRequest(r *pb.BeaconBlocksByRangeRequest) error {

// Ensure all request params are within appropriate bounds
if count == 0 || count > maxRequestBlocks {
return errors.New(reqError)
return p2ptypes.ErrInvalidRequest
}

if step == 0 || step > rangeLimit {
return errors.New(reqError)
return p2ptypes.ErrInvalidRequest
}

if startSlot > highestExpectedSlot {
return errors.New(reqError)
return p2ptypes.ErrInvalidRequest
}

endSlot := startSlot + (step * (count - 1))
if endSlot-startSlot > rangeLimit {
return errors.New(reqError)
return p2ptypes.ErrInvalidRequest
}
return nil
}
Expand All @@ -228,7 +229,7 @@ func (s *Service) filterBlocks(ctx context.Context, blks []*ethpb.SignedBeaconBl
if isRequestedSlotStep && isCanonical {
// Exit early if our valid block is non linear.
if parentValid && isSingular && !isLinear {
return newBlks, errInvalidParent
return newBlks, p2ptypes.ErrInvalidParent
}
newBlks = append(newBlks, blks[i])
// Set the previous root as the
Expand Down

0 comments on commit 7735a08

Please sign in to comment.