Skip to content

Commit

Permalink
Address Review of v0.12 (#6179)
Browse files Browse the repository at this point in the history
* rem outdated tests

* remove redundant

* refactor and cleanup

* comment

* terence's review

* Update beacon-chain/rpc/validator/attester.go

* Update beacon-chain/rpc/validator/attester.go

* Update beacon-chain/rpc/validator/attester.go

* Update beacon-chain/rpc/validator/attester.go

* benches passing

* fix up attester test

Co-authored-by: rauljordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
  • Loading branch information
4 people committed Jun 9, 2020
1 parent 132785f commit a75480d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 34 deletions.
43 changes: 16 additions & 27 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import (
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")

const attestationSubnetTopicFormat = "/eth2/%x/beacon_attestation_%d"

// Broadcast a message to the p2p network.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
forkDigest, err := s.forkDigest()
if err != nil {
err := errors.Wrap(err, "could not retrieve fork digest")
traceutil.AnnotateError(span, err)
return err
}

Expand All @@ -32,29 +36,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
topic = fmt.Sprintf(topic, forkDigest)

span.AddAttributes(trace.StringAttribute("topic", topic))

buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, msg); err != nil {
err := errors.Wrap(err, "could not encode message")
traceutil.AnnotateError(span, err)
return err
}

if span.IsRecordingEvents() {
id := hashutil.FastSum64(buf.Bytes())
messageLen := int64(buf.Len())
span.AddMessageSendEvent(int64(id), messageLen /*uncompressed*/, messageLen /*compressed*/)
}

if err := s.pubsub.Publish(topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
err := errors.Wrap(err, "could not publish message")
traceutil.AnnotateError(span, err)
return err
}
return nil
return s.broadcastObject(ctx, msg, fmt.Sprintf(topic, forkDigest))
}

// BroadcastAttestation broadcasts an attestation to the p2p network.
Expand All @@ -63,13 +45,22 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
defer span.End()
forkDigest, err := s.forkDigest()
if err != nil {
err := errors.Wrap(err, "could not retrieve fork digest")
traceutil.AnnotateError(span, err)
return err
}
topic := attestationToTopic(subnet, forkDigest)
return s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest))
}

// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj interface{}, topic string) error {
_, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()

span.AddAttributes(trace.StringAttribute("topic", topic))

buf := new(bytes.Buffer)
if _, err := s.Encoding().EncodeGossip(buf, att); err != nil {
if _, err := s.Encoding().EncodeGossip(buf, obj); err != nil {
err := errors.Wrap(err, "could not encode message")
traceutil.AnnotateError(span, err)
return err
Expand All @@ -89,8 +80,6 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
return nil
}

const attestationSubnetTopicFormat = "/eth2/%x/beacon_attestation_%d"

func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, subnet)
}
17 changes: 12 additions & 5 deletions beacon-chain/rpc/validator/attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation
// ProposeAttestation is a function called by an attester to vote
// on a block via an attestation object as defined in the Ethereum Serenity specification.
func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation) (*ethpb.AttestResponse, error) {
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
defer span.End()

if _, err := bls.SignatureFromBytes(att.Signature); err != nil {
return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature")
}
Expand Down Expand Up @@ -195,11 +198,14 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation

// SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.CommitteeSubnetsSubscribeRequest) (*ptypes.Empty, error) {
ctx, span := trace.StartSpan(ctx, "AttesterServer.SubscribeCommitteeSubnets")
defer span.End()

if len(req.Slots) != len(req.CommitteeIds) && len(req.CommitteeIds) != len(req.IsAggregator) {
return nil, status.Error(codes.InvalidArgument, "request fields are not the same length")
}
if len(req.Slots) == 0 {
return &ptypes.Empty{}, nil
return nil, status.Error(codes.InvalidArgument, "no attester slots provided")
}

fetchValsLen := func(slot uint64) (uint64, error) {
Expand All @@ -211,19 +217,20 @@ func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *ethpb.Comm
return uint64(len(vals)), nil
}

// run first request
// Request the head validator indices of epoch represented by the first requested
// slot.
currValsLen, err := fetchValsLen(req.Slots[0])
if err != nil {
return nil, err
return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err)
}
currEpoch := helpers.SlotToEpoch(req.Slots[0])

for i := 0; i < len(req.Slots); i++ {
// if epoch has changed, re-request active validators
// If epoch has changed, re-request active validators length
if currEpoch != helpers.SlotToEpoch(req.Slots[i]) {
currValsLen, err = fetchValsLen(req.Slots[i])
if err != nil {
return nil, err
return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err)
}
currEpoch = helpers.SlotToEpoch(req.Slots[i])
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/rpc/validator/attester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ func TestServer_SubscribeCommitteeSubnets_NoSlots(t *testing.T) {
CommitteeIds: nil,
IsAggregator: nil,
})
if err != nil {
t.Fatal(err)
if err == nil || !strings.Contains(err.Error(), "no attester slots provided") {
t.Fatalf("Expected no attester slots provided error, received: %v", err)
}
}

Expand Down

0 comments on commit a75480d

Please sign in to comment.