Skip to content

Commit

Permalink
Better receiver names in validator and slasher modules (#8296)
Browse files Browse the repository at this point in the history
* validator

* slasher

* rename db to s for store

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
rkapka and prylabs-bulldozer[bot] committed Jan 20, 2021
1 parent 27847ee commit 4b14fa4
Show file tree
Hide file tree
Showing 39 changed files with 635 additions and 635 deletions.
20 changes: 10 additions & 10 deletions slasher/beaconclient/chain_data.go
Expand Up @@ -15,12 +15,12 @@ var syncStatusPollingInterval = time.Duration(params.BeaconConfig().SecondsPerSl

// ChainHead requests the latest beacon chain head
// from a beacon node via gRPC.
func (bs *Service) ChainHead(
func (s *Service) ChainHead(
ctx context.Context,
) (*ethpb.ChainHead, error) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ChainHead")
defer span.End()
res, err := bs.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
res, err := s.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil || res == nil {
return nil, errors.Wrap(err, "Could not retrieve chain head or got nil chain head")
}
Expand All @@ -29,29 +29,29 @@ func (bs *Service) ChainHead(

// GenesisValidatorsRoot requests or fetch from memory the beacon chain genesis
// validators root via gRPC.
func (bs *Service) GenesisValidatorsRoot(
func (s *Service) GenesisValidatorsRoot(
ctx context.Context,
) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "beaconclient.GenesisValidatorsRoot")
defer span.End()

if bs.genesisValidatorRoot == nil {
res, err := bs.nodeClient.GetGenesis(ctx, &ptypes.Empty{})
if s.genesisValidatorRoot == nil {
res, err := s.nodeClient.GetGenesis(ctx, &ptypes.Empty{})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve genesis data")
}
if res == nil {
return nil, errors.Wrap(err, "nil genesis data")
}
bs.genesisValidatorRoot = res.GenesisValidatorsRoot
s.genesisValidatorRoot = res.GenesisValidatorsRoot
}
return bs.genesisValidatorRoot, nil
return s.genesisValidatorRoot, nil
}

// Poll the beacon node every syncStatusPollingInterval until the node
// is no longer syncing.
func (bs *Service) querySyncStatus(ctx context.Context) {
status, err := bs.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
func (s *Service) querySyncStatus(ctx context.Context) {
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}
Expand All @@ -65,7 +65,7 @@ func (bs *Service) querySyncStatus(ctx context.Context) {
for {
select {
case <-ticker.C:
status, err := bs.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}
Expand Down
4 changes: 2 additions & 2 deletions slasher/beaconclient/historical_data_retrieval.go
Expand Up @@ -10,7 +10,7 @@ import (

// RequestHistoricalAttestations requests all indexed attestations for a
// given epoch from a beacon node via gRPC.
func (bs *Service) RequestHistoricalAttestations(
func (s *Service) RequestHistoricalAttestations(
ctx context.Context,
epoch uint64,
) ([]*ethpb.IndexedAttestation, error) {
Expand All @@ -26,7 +26,7 @@ func (bs *Service) RequestHistoricalAttestations(
if res == nil {
res = &ethpb.ListIndexedAttestationsResponse{}
}
res, err = bs.beaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
res, err = s.beaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
QueryFilter: &ethpb.ListIndexedAttestationsRequest_Epoch{
Epoch: epoch,
},
Expand Down
42 changes: 21 additions & 21 deletions slasher/beaconclient/receivers.go
Expand Up @@ -24,10 +24,10 @@ var reconnectPeriod = 5 * time.Second
// ReceiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
func (bs *Service) ReceiveBlocks(ctx context.Context) {
func (s *Service) ReceiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks")
defer span.End()
stream, err := bs.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err := s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
return
Expand All @@ -48,12 +48,12 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
switch e.Code() {
case codes.Canceled, codes.Internal, codes.Unavailable:
log.WithError(err).Infof("Trying to restart connection. rpc status: %v", e.Code())
err = bs.restartBeaconConnection(ctx)
err = s.restartBeaconConnection(ctx)
if err != nil {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = bs.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err = s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Could not restart block stream")
return
Expand Down Expand Up @@ -83,23 +83,23 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
"root": fmt.Sprintf("%#x...", root[:8]),
}).Info("Received block from beacon node")
// We send the received block over the block feed.
bs.blockFeed.Send(res)
s.blockFeed.Send(res)
}
}

// ReceiveAttestations starts a gRPC client stream listener to obtain
// attestations from the beacon node. Upon receiving an attestation, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
func (bs *Service) ReceiveAttestations(ctx context.Context) {
func (s *Service) ReceiveAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveAttestations")
defer span.End()
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err := s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Failed to retrieve attestations stream")
return
}

go bs.collectReceivedAttestations(ctx)
go s.collectReceivedAttestations(ctx)
for {
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
Expand All @@ -117,12 +117,12 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) {
switch e.Code() {
case codes.Canceled, codes.Internal, codes.Unavailable:
log.WithError(err).Infof("Trying to restart connection. rpc status: %v", e.Code())
err = bs.restartBeaconConnection(ctx)
err = s.restartBeaconConnection(ctx)
if err != nil {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err = s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not restart attestation stream")
return
Expand All @@ -140,11 +140,11 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) {
if res == nil {
continue
}
bs.receivedAttestationsBuffer <- res
s.receivedAttestationsBuffer <- res
}
}

func (bs *Service) collectReceivedAttestations(ctx context.Context) {
func (s *Service) collectReceivedAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.collectReceivedAttestations")
defer span.End()

Expand All @@ -156,13 +156,13 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
select {
case <-ticker.C:
if len(atts) > 0 {
bs.collectedAttestationsBuffer <- atts
s.collectedAttestationsBuffer <- atts
atts = []*ethpb.IndexedAttestation{}
}
case att := <-bs.receivedAttestationsBuffer:
case att := <-s.receivedAttestationsBuffer:
atts = append(atts, att)
case collectedAtts := <-bs.collectedAttestationsBuffer:
if err := bs.slasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
case collectedAtts := <-s.collectedAttestationsBuffer:
if err := s.slasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
log.WithError(err).Error("Could not save indexed attestation")
continue
}
Expand All @@ -178,26 +178,26 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
"slot": att.Data.Slot,
"indices": att.AttestingIndices,
}).Debug("Sending attestation to detection service")
bs.attestationFeed.Send(att)
s.attestationFeed.Send(att)
}
case <-ctx.Done():
return
}
}
}

func (bs *Service) restartBeaconConnection(ctx context.Context) error {
func (s *Service) restartBeaconConnection(ctx context.Context) error {
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if bs.conn.GetState() == connectivity.TransientFailure || bs.conn.GetState() == connectivity.Idle {
log.Debugf("Connection status %v", bs.conn.GetState())
if s.conn.GetState() == connectivity.TransientFailure || s.conn.GetState() == connectivity.Idle {
log.Debugf("Connection status %v", s.conn.GetState())
log.Info("Beacon node is still down")
continue
}
status, err := bs.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
continue
Expand Down
50 changes: 25 additions & 25 deletions slasher/beaconclient/service.go
Expand Up @@ -104,36 +104,36 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {

// BlockFeed returns a feed other services in slasher can subscribe to
// blocks received via the beacon node through gRPC.
func (bs *Service) BlockFeed() *event.Feed {
return bs.blockFeed
func (s *Service) BlockFeed() *event.Feed {
return s.blockFeed
}

// AttestationFeed returns a feed other services in slasher can subscribe to
// attestations received via the beacon node through gRPC.
func (bs *Service) AttestationFeed() *event.Feed {
return bs.attestationFeed
func (s *Service) AttestationFeed() *event.Feed {
return s.attestationFeed
}

// ClientReadyFeed returns a feed other services in slasher can subscribe to
// to indicate when the gRPC connection is ready.
func (bs *Service) ClientReadyFeed() *event.Feed {
return bs.clientFeed
func (s *Service) ClientReadyFeed() *event.Feed {
return s.clientFeed
}

// Stop the beacon client service by closing the gRPC connection.
func (bs *Service) Stop() error {
bs.cancel()
func (s *Service) Stop() error {
s.cancel()
log.Info("Stopping service")
if bs.conn != nil {
return bs.conn.Close()
if s.conn != nil {
return s.conn.Close()
}
return nil
}

// Status returns an error if there exists a gRPC connection error
// in the service.
func (bs *Service) Status() error {
if bs.conn == nil {
func (s *Service) Status() error {
if s.conn == nil {
return errors.New("no connection to beacon RPC")
}
return nil
Expand All @@ -143,10 +143,10 @@ func (bs *Service) Status() error {
// a gRPC client connection with a beacon node, listening for
// streamed blocks/attestations, and submitting slashing operations
// after they are detected by other services in the slasher.
func (bs *Service) Start() {
func (s *Service) Start() {
var dialOpt grpc.DialOption
if bs.cert != "" {
creds, err := credentials.NewClientTLSFromFile(bs.cert, "")
if s.cert != "" {
creds, err := credentials.NewClientTLSFromFile(s.cert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
}
Expand All @@ -173,27 +173,27 @@ func (bs *Service) Start() {
grpcutils.LogGRPCRequests,
)),
}
conn, err := grpc.DialContext(bs.ctx, bs.provider, beaconOpts...)
conn, err := grpc.DialContext(s.ctx, s.provider, beaconOpts...)
if err != nil {
log.Fatalf("Could not dial endpoint: %s, %v", bs.provider, err)
log.Fatalf("Could not dial endpoint: %s, %v", s.provider, err)
}
bs.beaconDialOptions = beaconOpts
s.beaconDialOptions = beaconOpts
log.Info("Successfully started gRPC connection")
bs.conn = conn
bs.beaconClient = ethpb.NewBeaconChainClient(bs.conn)
bs.nodeClient = ethpb.NewNodeClient(bs.conn)
s.conn = conn
s.beaconClient = ethpb.NewBeaconChainClient(s.conn)
s.nodeClient = ethpb.NewNodeClient(s.conn)

// We poll for the sync status of the beacon node until it is fully synced.
bs.querySyncStatus(bs.ctx)
s.querySyncStatus(s.ctx)

// We notify other services in slasher that the beacon client is ready
// and the connection is active.
bs.clientFeed.Send(true)
s.clientFeed.Send(true)

// We register subscribers for any detected proposer/attester slashings
// in the slasher services that we can submit to the beacon node
// as they are found.
go bs.subscribeDetectedProposerSlashings(bs.ctx, bs.proposerSlashingsChan)
go bs.subscribeDetectedAttesterSlashings(bs.ctx, bs.attesterSlashingsChan)
go s.subscribeDetectedProposerSlashings(s.ctx, s.proposerSlashingsChan)
go s.subscribeDetectedAttesterSlashings(s.ctx, s.attesterSlashingsChan)

}
12 changes: 6 additions & 6 deletions slasher/beaconclient/submit.go
Expand Up @@ -15,15 +15,15 @@ import (
// slashing objects from the slasher runtime. Upon receiving
// a proposer slashing from the feed, we submit the object to the
// connected beacon node via a client RPC.
func (bs *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
func (s *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitProposerSlashing")
defer span.End()
sub := bs.proposerSlashingsFeed.Subscribe(ch)
sub := s.proposerSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := bs.beaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
if _, err := s.beaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-sub.Err():
Expand All @@ -40,17 +40,17 @@ func (bs *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch ch
// slashing objects from the slasher runtime. Upon receiving an
// attester slashing from the feed, we submit the object to the
// connected beacon node via a client RPC.
func (bs *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
func (s *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitAttesterSlashing")
defer span.End()
sub := bs.attesterSlashingsFeed.Subscribe(ch)
sub := s.attesterSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if slashing != nil && slashing.Attestation_1 != nil && slashing.Attestation_2 != nil {
slashableIndices := sliceutil.IntersectionUint64(slashing.Attestation_1.AttestingIndices, slashing.Attestation_2.AttestingIndices)
_, err := bs.beaconClient.SubmitAttesterSlashing(ctx, slashing)
_, err := s.beaconClient.SubmitAttesterSlashing(ctx, slashing)
if err == nil {
log.WithFields(logrus.Fields{
"sourceEpoch": slashing.Attestation_1.Data.Source.Epoch,
Expand Down
8 changes: 4 additions & 4 deletions slasher/beaconclient/validator_retrieval.go
Expand Up @@ -10,7 +10,7 @@ import (

// FindOrGetPublicKeys gets public keys from cache or request validators public
// keys from a beacon node via gRPC.
func (bs *Service) FindOrGetPublicKeys(
func (s *Service) FindOrGetPublicKeys(
ctx context.Context,
validatorIndices []uint64,
) (map[uint64][]byte, error) {
Expand All @@ -20,7 +20,7 @@ func (bs *Service) FindOrGetPublicKeys(
validators := make(map[uint64][]byte, len(validatorIndices))
notFound := 0
for _, validatorIdx := range validatorIndices {
pub, exists := bs.publicKeyCache.Get(validatorIdx)
pub, exists := s.publicKeyCache.Get(validatorIdx)
if exists {
validators[validatorIdx] = pub
continue
Expand All @@ -42,15 +42,15 @@ func (bs *Service) FindOrGetPublicKeys(
if notFound == 0 {
return validators, nil
}
vc, err := bs.beaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
vc, err := s.beaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
Indices: validatorIndices,
})
if err != nil {
return nil, errors.Wrapf(err, "could not request validators public key: %d", validatorIndices)
}
for _, v := range vc.ValidatorList {
validators[v.Index] = v.Validator.PublicKey
bs.publicKeyCache.Set(v.Index, v.Validator.PublicKey)
s.publicKeyCache.Set(v.Index, v.Validator.PublicKey)
}
log.Tracef(
"Retrieved validators id public key map: %v",
Expand Down

0 comments on commit 4b14fa4

Please sign in to comment.