Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Metadata And Ping RPC methods #5271

Merged
merged 24 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var appFlags = []cli.Flag{
cmd.P2PHostDNS,
cmd.P2PMaxPeers,
cmd.P2PPrivKey,
cmd.P2PMetadata,
cmd.P2PWhitelist,
cmd.P2PEncoding,
cmd.DataDirFlag,
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (b *BeaconNode) registerP2P(ctx *cli.Context) error {
HostAddress: ctx.String(cmd.P2PHost.Name),
HostDNS: ctx.String(cmd.P2PHostDNS.Name),
PrivateKey: ctx.String(cmd.P2PPrivKey.Name),
MetaDataDir: ctx.String(cmd.P2PMetadata.Name),
TCPPort: ctx.Uint(cmd.P2PTCPPort.Name),
UDPPort: ctx.Uint(cmd.P2PUDPPort.Name),
MaxPeers: ctx.Uint(cmd.P2PMaxPeers.Name),
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Config struct {
HostDNS string
PrivateKey string
DataDir string
MetaDataDir string
TCPPort uint
UDPPort uint
MaxPeers uint
Expand Down
10 changes: 9 additions & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)

// P2P represents the full p2p interface composed of all of the sub-interfaces.
Expand All @@ -21,6 +22,7 @@ type P2P interface {
Sender
ConnectionHandler
PeersProvider
MetadataProvider
}

// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
Expand Down Expand Up @@ -60,10 +62,16 @@ type PeerManager interface {

// Sender abstracts the sending functionality from libp2p.
type Sender interface {
Send(context.Context, interface{}, peer.ID) (network.Stream, error)
Send(context.Context, interface{}, string, peer.ID) (network.Stream, error)
}

// PeersProvider abstracts obtaining our current list of known peers status.
type PeersProvider interface {
Peers() *peers.Status
}

// MetadataProvider returns the metadata related information for the local peer.
type MetadataProvider interface {
Metadata() *pb.MetaData
MetadataSeq() uint64
}
37 changes: 21 additions & 16 deletions beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
package p2p

import (
"reflect"

p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)

const (
// RPCStatusTopic defines the topic for the status rpc method.
RPCStatusTopic = "/eth2/beacon_chain/req/status/1"
// RPCGoodByeTopic defines the topic for the goodbye rpc method.
RPCGoodByeTopic = "/eth2/beacon_chain/req/goodbye/1"
// RPCBlocksByRangeTopic defines the topic for the blocks by range rpc method.
RPCBlocksByRangeTopic = "/eth2/beacon_chain/req/beacon_blocks_by_range/1"
// RPCBlocksByRootTopic defines the topic for the blocks by root rpc method.
RPCBlocksByRootTopic = "/eth2/beacon_chain/req/beacon_blocks_by_root/1"
// RPCPingTopic defines the topic for the ping rpc method.
RPCPingTopic = "/eth2/beacon_chain/req/ping/1"
// RPCMetaDataTopic defines the topic for the metadata rpc method.
RPCMetaDataTopic = "/eth2/beacon_chain/req/metadata/1"
)

// RPCTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup. These mappings should be used for outbound sending only. Peers may respond
// with a different message type as defined by the p2p protocol.
var RPCTopicMappings = map[string]interface{}{
"/eth2/beacon_chain/req/status/1": &p2ppb.Status{},
"/eth2/beacon_chain/req/goodbye/1": new(uint64),
"/eth2/beacon_chain/req/beacon_blocks_by_range/1": &p2ppb.BeaconBlocksByRangeRequest{},
"/eth2/beacon_chain/req/beacon_blocks_by_root/1": [][32]byte{},
}

// RPCTypeMapping is the inverse of RPCTopicMappings so that an arbitrary protobuf message
// can be mapped to a protocol ID string.
var RPCTypeMapping = make(map[reflect.Type]string)

func init() {
for k, v := range RPCTopicMappings {
RPCTypeMapping[reflect.TypeOf(v)] = k
}
RPCStatusTopic: &p2ppb.Status{},
RPCGoodByeTopic: new(uint64),
RPCBlocksByRangeTopic: &p2ppb.BeaconBlocksByRangeRequest{},
RPCBlocksByRootTopic: [][32]byte{},
RPCPingTopic: new(uint64),
RPCMetaDataTopic: new(interface{}),
}
14 changes: 8 additions & 6 deletions beacon-chain/p2p/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"context"
"reflect"
"time"

"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -14,10 +13,10 @@ import (

// Send a message to a specific peer. The returned stream may be used for reading, but has been
// closed for writing.
func (s *Service) Send(ctx context.Context, message interface{}, pid peer.ID) (network.Stream, error) {
func (s *Service) Send(ctx context.Context, message interface{}, topic string, pid peer.ID) (network.Stream, error) {
ctx, span := trace.StartSpan(ctx, "p2p.Send")
defer span.End()
topic := RPCTypeMapping[reflect.TypeOf(message)] + s.Encoding().ProtocolSuffix()
topic = topic + s.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))

// TTFB_TIME (5s) + RESP_TIMEOUT (10s).
Expand All @@ -38,9 +37,12 @@ func (s *Service) Send(ctx context.Context, message interface{}, pid peer.ID) (n
traceutil.AnnotateError(span, err)
return nil, err
}
if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
// do not encode anything if we are sending a metadata request
if topic != RPCMetaDataTopic {
if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
return nil, err
}
}

// Close stream for writing.
Expand Down
6 changes: 1 addition & 5 deletions beacon-chain/p2p/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"context"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -29,9 +28,6 @@ func TestService_Send(t *testing.T) {
Bar: 55,
}

// Register testing topic.
RPCTypeMapping[reflect.TypeOf(msg)] = "/testing/1"

// Register external listener which will repeat the message back.
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -50,7 +46,7 @@ func TestService_Send(t *testing.T) {
wg.Done()
})

stream, err := svc.Send(context.Background(), msg, p2.Host.ID())
stream, err := svc.Send(context.Background(), msg, "/testing/1", p2.Host.ID())
if err != nil {
t.Fatal(err)
}
Expand Down
22 changes: 20 additions & 2 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (
"strings"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/dgraph-io/ristretto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
Expand All @@ -27,10 +26,13 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -69,6 +71,7 @@ type Service struct {
peers *peers.Status
genesisTime time.Time
genesisValidatorsRoot []byte
metaData *pb.MetaData
}

// NewService initializes a new p2p service compatible with shared.Service interface. No
Expand Down Expand Up @@ -101,6 +104,11 @@ func NewService(cfg *Config) (*Service, error) {
log.WithError(err).Error("Failed to generate p2p private key")
return nil, err
}
s.metaData, err = metaDataFromConfig(s.cfg)
if err != nil {
log.WithError(err).Error("Failed to create peer metadata")
return nil, err
}

opts := buildOptions(s.cfg, ipAddr, s.privKey)
h, err := libp2p.New(s.ctx, opts...)
Expand Down Expand Up @@ -327,6 +335,16 @@ func (s *Service) Peers() *peers.Status {
return s.peers
}

// Metadata returns a copy of the peer's metadata.
func (s *Service) Metadata() *pb.MetaData {
return proto.Clone(s.metaData).(*pb.MetaData)
}

// MetadataSeq returns the metadata sequence number
terencechain marked this conversation as resolved.
Show resolved Hide resolved
func (s *Service) MetadataSeq() uint64 {
return s.metaData.SeqNumber
}

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee id's for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee id's.
Expand Down
22 changes: 18 additions & 4 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var TopicMappings = map[reflect.Type]string{
reflect.TypeOf(new(uint64)): "/eth2/beacon_chain/req/goodbye/1",
reflect.TypeOf(&pb.BeaconBlocksByRangeRequest{}): "/eth2/beacon_chain/req/beacon_blocks_by_range/1",
reflect.TypeOf([][32]byte{}): "/eth2/beacon_chain/req/beacon_blocks_by_root/1",
reflect.TypeOf(new(uint64)): "/eth2/beacon_chain/req/ping/1/",
}

// TestP2P represents a p2p implementation that can be used for testing.
Expand All @@ -39,6 +40,7 @@ type TestP2P struct {
BroadcastCalled bool
DelaySend bool
peers *peers.Status
LocalMetadata *pb.MetaData
}

// NewTestP2P initializes a new p2p test service.
Expand Down Expand Up @@ -198,8 +200,8 @@ func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID
}

// Send a message to a specific peer.
func (p *TestP2P) Send(ctx context.Context, msg interface{}, pid peer.ID) (network.Stream, error) {
protocol := TopicMappings[reflect.TypeOf(msg)]
func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid peer.ID) (network.Stream, error) {
protocol := topic
if protocol == "" {
return nil, fmt.Errorf("protocol doesnt exist for proto message: %v", msg)
}
Expand All @@ -208,8 +210,10 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, pid peer.ID) (netwo
return nil, err
}

if _, err := p.Encoding().EncodeWithLength(stream, msg); err != nil {
return nil, err
if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithLength(stream, msg); err != nil {
return nil, err
}
}

// Close stream for writing.
Expand Down Expand Up @@ -248,3 +252,13 @@ func (p *TestP2P) RefreshENR(epoch uint64) {
func (p *TestP2P) ForkDigest() ([4]byte, error) {
return [4]byte{}, nil
}

// Metadata mocks the peer's metadata.
func (p *TestP2P) Metadata() *pb.MetaData {
return proto.Clone(p.LocalMetadata).(*pb.MetaData)
}

// MetadataSeq mocks metadata sequence number.
func (p *TestP2P) MetadataSeq() uint64 {
return p.LocalMetadata.SeqNumber
}
41 changes: 41 additions & 0 deletions beacon-chain/p2p/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/iputils"
)

const keyPath = "network-keys"
const metaDataPath = "metaData"

func convertFromInterfacePrivKey(privkey crypto.PrivKey) *ecdsa.PrivateKey {
typeAssertedKey := (*ecdsa.PrivateKey)((*btcec.PrivateKey)(privkey.(*crypto.Secp256k1PrivateKey)))
Expand Down Expand Up @@ -83,6 +86,44 @@ func retrievePrivKeyFromFile(path string) (*ecdsa.PrivateKey, error) {
return convertFromInterfacePrivKey(unmarshalledKey), nil
}

func metaDataFromConfig(cfg *Config) (*pbp2p.MetaData, error) {
defaultKeyPath := path.Join(cfg.DataDir, metaDataPath)
metaDataPath := cfg.MetaDataDir

_, err := os.Stat(defaultKeyPath)
defaultMetadataExist := !os.IsNotExist(err)
if err != nil && defaultMetadataExist {
return nil, err
}
if metaDataPath == "" && !defaultMetadataExist {
metaData := &pbp2p.MetaData{
SeqNumber: 0,
Attnets: bitfield.NewBitvector64(),
}
dst, err := metaData.Marshal()
if err != nil {
return nil, err
}
if err = ioutil.WriteFile(defaultKeyPath, dst, 0600); err != nil {
return nil, err
}
return metaData, nil
}
if defaultMetadataExist && metaDataPath == "" {
metaDataPath = defaultKeyPath
}
src, err := ioutil.ReadFile(metaDataPath)
if err != nil {
log.WithError(err).Error("Error reading metadata from file")
return nil, err
}
metaData := &pbp2p.MetaData{}
if err := metaData.Unmarshal(src); err != nil {
return nil, err
}
return metaData, nil
}

func ipAddr() net.IP {
ip, err := iputils.ExternalIPv4()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/state/stateutil/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package stateutil_test
import (
"testing"

"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil"
)

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ go_library(
"rpc_beacon_blocks_by_root.go",
"rpc_chunked_response.go",
"rpc_goodbye.go",
"rpc_metadata.go",
"rpc_ping.go",
"rpc_status.go",
"service.go",
"subscriber.go",
Expand Down Expand Up @@ -93,6 +95,8 @@ go_test(
"rpc_beacon_blocks_by_range_test.go",
"rpc_beacon_blocks_by_root_test.go",
"rpc_goodbye_test.go",
"rpc_metadata_test.go",
"rpc_ping_test.go",
"rpc_status_test.go",
"rpc_test.go",
"subscriber_beacon_aggregate_proof_test.go",
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/initial-sync-old/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -304,7 +305,7 @@ func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRa
"count": req.Count,
"step": req.Step,
}).Debug("Requesting blocks")
stream, err := s.p2p.Send(ctx, req, pid)
stream, err := s.p2p.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to peer")
}
Expand Down
Loading