Skip to content

Commit

Permalink
Add index to activationStatus and MultipleValidatorStatus response (#…
Browse files Browse the repository at this point in the history
…6026)

* Cleanup val status

* More cleanup

* Fix tests

* Fix tests again

* Change to nonExistentIndice

* Fix test

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
0xKiwi and prylabs-bulldozer[bot] committed May 29, 2020
1 parent 213f8ef commit d35531c
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 51 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Expand Up @@ -1333,7 +1333,7 @@ go_repository(

go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "3444ffb75440161e4d6b9d38edef65212474cc48",
commit = "f6aa45b3661f7dabdadf58f21b83b5125311c060",
importpath = "github.com/prysmaticlabs/ethereumapis",
)

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/rpc/beacon/validators_stream.go
Expand Up @@ -515,7 +515,7 @@ func (is *infostream) depositQueueTimestamp(eth1BlockNumber *big.Int) (uint64, e
}
is.eth1BlocktimesMutex.Unlock()

followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().GoerliBlockTime) * time.Second
followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().SecondsPerETH1Block) * time.Second
eth1UnixTime := time.Unix(int64(blockTimestamp), 0).Add(followTime)

period := params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().EpochsPerEth1VotingPeriod
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/rpc/validator/assignments.go
Expand Up @@ -146,11 +146,11 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
idx, ok := s.ValidatorIndexByPubkey(bytesutil.ToBytes48(pubKey))
if ok {
assignment.ValidatorIndex = idx
assignment.Status = vs.assignmentStatus(idx, s)
assignment.Status = assignmentStatus(s, idx)
assignment.ProposerSlots = proposerIndexToSlots[idx]

nextAssignment.ValidatorIndex = idx
nextAssignment.Status = vs.assignmentStatus(idx, s)
nextAssignment.Status = assignmentStatus(s, idx)
nextAssignment.ProposerSlots = nextProposerIndexToSlots[idx]

ca, ok := committeeAssignments[idx]
Expand All @@ -170,7 +170,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
}
} else {
// If the validator isn't in the beacon state, try finding their deposit to determine their status.
vStatus := vs.validatorStatus(ctx, pubKey, s)
vStatus, _ := vs.validatorStatus(ctx, s, pubKey)
assignment.Status = vStatus.Status
}
validatorAssignments = append(validatorAssignments, assignment)
Expand Down
11 changes: 9 additions & 2 deletions beacon-chain/rpc/validator/server_test.go
Expand Up @@ -188,15 +188,19 @@ func TestWaitForActivation_ValidatorOriginallyExists(t *testing.T) {
mockChainStream.EXPECT().Send(
&ethpb.ValidatorActivationResponse{
Statuses: []*ethpb.ValidatorActivationResponse_Status{
{PublicKey: pubKey1,
{
PublicKey: pubKey1,
Status: &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_ACTIVE,
},
Index: 0,
},
{PublicKey: pubKey2,
{
PublicKey: pubKey2,
Status: &ethpb.ValidatorStatusResponse{
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
},
Index: nonExistentIndex,
},
},
},
Expand Down Expand Up @@ -273,6 +277,7 @@ func TestWaitForActivation_MultipleStatuses(t *testing.T) {
Status: ethpb.ValidatorStatus_ACTIVE,
ActivationEpoch: 1,
},
Index: 0,
},
{
PublicKey: pubKey2,
Expand All @@ -281,12 +286,14 @@ func TestWaitForActivation_MultipleStatuses(t *testing.T) {
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
PositionInActivationQueue: 1,
},
Index: 1,
},
{
PublicKey: pubKey3,
Status: &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_EXITED,
},
Index: 2,
},
},
},
Expand Down
87 changes: 48 additions & 39 deletions beacon-chain/rpc/validator/status.go
Expand Up @@ -18,6 +18,7 @@ import (
)

var errPubkeyDoesNotExist = errors.New("pubkey does not exist")
var nonExistentIndex = ^uint64(0)

// ValidatorStatus returns the validator status of the current epoch.
// The status response can be one of the following:
Expand All @@ -30,19 +31,22 @@ var errPubkeyDoesNotExist = errors.New("pubkey does not exist")
// UNKNOWN_STATUS - validator does not have a known status in the network.
func (vs *Server) ValidatorStatus(
ctx context.Context,
req *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
req *ethpb.ValidatorStatusRequest,
) (*ethpb.ValidatorStatusResponse, error) {
headState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get head state")
}
return vs.validatorStatus(ctx, req.PublicKey, headState), nil
vStatus, _ := vs.validatorStatus(ctx, headState, req.PublicKey)
return vStatus, nil
}

// MultipleValidatorStatus is the same as ValidatorStatus. Supports retrieval of multiple
// validator statuses. Takes a list of public keys or a list of validator indices.
func (vs *Server) MultipleValidatorStatus(
ctx context.Context,
req *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
req *ethpb.MultipleValidatorStatusRequest,
) (*ethpb.MultipleValidatorStatusResponse, error) {
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
Expand All @@ -51,89 +55,98 @@ func (vs *Server) MultipleValidatorStatus(
return nil, status.Error(codes.Internal, "Could not get head state")
}
responseCap := len(req.PublicKeys) + len(req.Indices)
pubkeys := make([][]byte, 0, responseCap)
pubKeys := make([][]byte, 0, responseCap)
filtered := make(map[[48]byte]bool)
filtered[[48]byte{}] = true // Filter out keys with all zeros.
// Filter out duplicate public keys.
for _, pubKey := range req.PublicKeys {
pubkeyBytes := bytesutil.ToBytes48(pubKey)
if !filtered[pubkeyBytes] {
pubkeys = append(pubkeys, pubKey)
pubKeys = append(pubKeys, pubKey)
filtered[pubkeyBytes] = true
}
}
// Convert indices to public keys.
for _, idx := range req.Indices {
pubkeyBytes := headState.PubkeyAtIndex(uint64(idx))
if !filtered[pubkeyBytes] {
pubkeys = append(pubkeys, pubkeyBytes[:])
pubKeys = append(pubKeys, pubkeyBytes[:])
filtered[pubkeyBytes] = true
}
}
// Fetch statuses from beacon state.
statuses := make([]*ethpb.ValidatorStatusResponse, len(pubkeys))
for i, pubKey := range pubkeys {
statuses[i] = vs.validatorStatus(ctx, pubKey, headState)
statuses := make([]*ethpb.ValidatorStatusResponse, len(pubKeys))
indices := make([]uint64, len(pubKeys))
for i, pubKey := range pubKeys {
statuses[i], indices[i] = vs.validatorStatus(ctx, headState, pubKey)
}

return &ethpb.MultipleValidatorStatusResponse{
PublicKeys: pubkeys,
PublicKeys: pubKeys,
Statuses: statuses,
Indices: indices,
}, nil
}

// activationStatus returns the validator status response for the set of validators
// requested by their pub keys.
func (vs *Server) activationStatus(
ctx context.Context,
pubkeys [][]byte,
pubKeys [][]byte,
) (bool, []*ethpb.ValidatorActivationResponse_Status, error) {
headState, err := vs.HeadFetcher.HeadState(ctx)
if err != nil {
return false, nil, err
}
activeValidatorExists := false
statusResponses := make([]*ethpb.ValidatorActivationResponse_Status, len(pubkeys))
for i, key := range pubkeys {
statusResponses := make([]*ethpb.ValidatorActivationResponse_Status, len(pubKeys))
for i, pubKey := range pubKeys {
if ctx.Err() != nil {
return false, nil, ctx.Err()
}
status := vs.validatorStatus(ctx, key, headState)
if status == nil {
vStatus, idx := vs.validatorStatus(ctx, headState, pubKey)
if vStatus == nil {
continue
}
resp := &ethpb.ValidatorActivationResponse_Status{
Status: status,
PublicKey: key,
Status: vStatus,
PublicKey: pubKey,
Index: idx,
}
statusResponses[i] = resp
if status.Status == ethpb.ValidatorStatus_ACTIVE {
if vStatus.Status == ethpb.ValidatorStatus_ACTIVE {
activeValidatorExists = true
}
}

return activeValidatorExists, statusResponses, nil
}

func (vs *Server) validatorStatus(ctx context.Context, pubKey []byte, headState *stateTrie.BeaconState) *ethpb.ValidatorStatusResponse {
// validatorStatus searches for the requested validator's state and deposit to retrieve its inclusion estimate. Also returns the validators index.
func (vs *Server) validatorStatus(
ctx context.Context,
headState *stateTrie.BeaconState,
pubKey []byte,
) (*ethpb.ValidatorStatusResponse, uint64) {
ctx, span := trace.StartSpan(ctx, "validatorServer.validatorStatus")
defer span.End()

// Using ^0 as the default value for index, in case the validators index cannot be determined.
resp := &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_UNKNOWN_STATUS,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
}
vStatus, idx, err := vs.retrieveStatusFromState(ctx, pubKey, headState)
vStatus, idx, err := retrieveStatusForPubKey(headState, pubKey)
if err != nil && err != errPubkeyDoesNotExist {
traceutil.AnnotateError(span, err)
return resp
return resp, nonExistentIndex
}
resp.Status = vStatus
if err != errPubkeyDoesNotExist {
val, err := headState.ValidatorAtIndexReadOnly(idx)
if err != nil {
traceutil.AnnotateError(span, err)
return resp
return resp, idx
}
resp.ActivationEpoch = val.ActivationEpoch()
}
Expand All @@ -144,30 +157,31 @@ func (vs *Server) validatorStatus(ctx context.Context, pubKey []byte, headState
// If no connection to ETH1, the deposit block number or position in queue cannot be determined.
if !vs.Eth1InfoFetcher.IsConnectedToETH1() {
log.Warn("Not connected to ETH1. Cannot determine validator ETH1 deposit block number")
return resp
return resp, nonExistentIndex
}
_, eth1BlockNumBigInt := vs.DepositFetcher.DepositByPubkey(ctx, pubKey)
if eth1BlockNumBigInt == nil { // No deposit found in ETH1.
return resp
return resp, nonExistentIndex
}

// Mark a validator as DEPOSITED if their deposit is visible.
resp.Status = ethpb.ValidatorStatus_DEPOSITED

resp.Eth1DepositBlockNumber = eth1BlockNumBigInt.Uint64()

depositBlockSlot, err := vs.depositBlockSlot(ctx, eth1BlockNumBigInt, headState)
depositBlockSlot, err := vs.depositBlockSlot(ctx, headState, eth1BlockNumBigInt)
if err != nil {
return resp
return resp, nonExistentIndex
}
resp.DepositInclusionSlot = depositBlockSlot
return resp, nonExistentIndex
// Deposited and Pending mean the validator has been put into the state.
case ethpb.ValidatorStatus_DEPOSITED, ethpb.ValidatorStatus_PENDING:
var lastActivatedValidatorIdx uint64
for j := headState.NumValidators() - 1; j >= 0; j-- {
val, err := headState.ValidatorAtIndexReadOnly(uint64(j))
if err != nil {
return resp
return resp, idx
}
if helpers.IsActiveValidatorUsingTrie(val, helpers.CurrentEpoch(headState)) {
lastActivatedValidatorIdx = uint64(j)
Expand All @@ -178,29 +192,24 @@ func (vs *Server) validatorStatus(ctx context.Context, pubKey []byte, headState
if lastActivatedValidatorIdx < idx {
resp.PositionInActivationQueue = idx - lastActivatedValidatorIdx
}
return resp, idx
default:
return resp
return resp, idx
}

return resp
}

func (vs *Server) retrieveStatusFromState(
ctx context.Context,
pubKey []byte,
headState *stateTrie.BeaconState,
) (ethpb.ValidatorStatus, uint64, error) {
func retrieveStatusForPubKey(headState *stateTrie.BeaconState, pubKey []byte) (ethpb.ValidatorStatus, uint64, error) {
if headState == nil {
return ethpb.ValidatorStatus_UNKNOWN_STATUS, 0, errors.New("head state does not exist")
}
idx, ok := headState.ValidatorIndexByPubkey(bytesutil.ToBytes48(pubKey))
if !ok || int(idx) >= headState.NumValidators() {
return ethpb.ValidatorStatus_UNKNOWN_STATUS, 0, errPubkeyDoesNotExist
}
return vs.assignmentStatus(idx, headState), idx, nil
return assignmentStatus(headState, idx), idx, nil
}

func (vs *Server) assignmentStatus(validatorIdx uint64, beaconState *stateTrie.BeaconState) ethpb.ValidatorStatus {
func assignmentStatus(beaconState *stateTrie.BeaconState, validatorIdx uint64) ethpb.ValidatorStatus {
validator, err := beaconState.ValidatorAtIndexReadOnly(validatorIdx)
if err != nil {
return ethpb.ValidatorStatus_UNKNOWN_STATUS
Expand Down Expand Up @@ -229,13 +238,13 @@ func (vs *Server) assignmentStatus(validatorIdx uint64, beaconState *stateTrie.B
return ethpb.ValidatorStatus_EXITED
}

func (vs *Server) depositBlockSlot(ctx context.Context, eth1BlockNumBigInt *big.Int, beaconState *stateTrie.BeaconState) (uint64, error) {
func (vs *Server) depositBlockSlot(ctx context.Context, beaconState *stateTrie.BeaconState, eth1BlockNumBigInt *big.Int) (uint64, error) {
var depositBlockSlot uint64
blockTimeStamp, err := vs.BlockFetcher.BlockTimeByHeight(ctx, eth1BlockNumBigInt)
if err != nil {
return 0, err
}
followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().GoerliBlockTime) * time.Second
followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().SecondsPerETH1Block) * time.Second
eth1UnixTime := time.Unix(int64(blockTimeStamp), 0).Add(followTime)
period := params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().EpochsPerEth1VotingPeriod
votingPeriod := time.Duration(period*params.BeaconConfig().SecondsPerSlot) * time.Second
Expand Down
17 changes: 14 additions & 3 deletions beacon-chain/rpc/validator/status_test.go
Expand Up @@ -621,21 +621,33 @@ func TestActivationStatus_OK(t *testing.T) {
t.Errorf("Validator with pubkey %#x is not activated and instead has this status: %s",
response[0].PublicKey, response[0].Status.Status.String())
}
if response[0].Index != 0 {
t.Errorf("Validator with pubkey %#x is expected to have index %d, received %d", response[0].PublicKey, 0, response[0].Index)
}

if response[1].Status.Status != ethpb.ValidatorStatus_ACTIVE {
t.Errorf("Validator with pubkey %#x was activated when not supposed to",
response[1].PublicKey)
}
if response[1].Index != 1 {
t.Errorf("Validator with pubkey %#x is expected to have index %d, received %d", response[1].PublicKey, 1, response[1].Index)
}

if response[2].Status.Status != ethpb.ValidatorStatus_DEPOSITED {
t.Errorf("Validator with pubkey %#x is not unknown and instead has this status: %s",
response[2].PublicKey, response[2].Status.Status.String())
}
if response[2].Index != params.BeaconConfig().FarFutureEpoch {
t.Errorf("Validator with pubkey %#x is expected to have index %d, received %d", response[2].PublicKey, params.BeaconConfig().FarFutureEpoch, response[2].Index)
}

if response[3].Status.Status != ethpb.ValidatorStatus_DEPOSITED {
t.Errorf("Validator with pubkey %#x was not deposited and has this status: %s",
response[3].PublicKey, response[3].Status.Status.String())
}
if response[3].Index != 2 {
t.Errorf("Validator with pubkey %#x is expected to have index %d, received %d", response[3].PublicKey, 2, response[3].Index)
}
}

func TestValidatorStatus_CorrectActivationQueue(t *testing.T) {
Expand Down Expand Up @@ -817,7 +829,7 @@ func TestDepositBlockSlotAfterGenesisTime(t *testing.T) {

eth1BlockNumBigInt := big.NewInt(1000000)

resp, err := vs.depositBlockSlot(context.Background(), eth1BlockNumBigInt, state)
resp, err := vs.depositBlockSlot(context.Background(), state, eth1BlockNumBigInt)
if err != nil {
t.Fatalf("Could not get the deposit block slot %v", err)
}
Expand Down Expand Up @@ -891,8 +903,7 @@ func TestDepositBlockSlotBeforeGenesisTime(t *testing.T) {
}

eth1BlockNumBigInt := big.NewInt(1000000)

resp, err := vs.depositBlockSlot(context.Background(), eth1BlockNumBigInt, state)
resp, err := vs.depositBlockSlot(context.Background(), state, eth1BlockNumBigInt)
if err != nil {
t.Fatalf("Could not get the deposit block slot %v", err)
}
Expand Down
2 changes: 0 additions & 2 deletions shared/params/config.go
Expand Up @@ -97,7 +97,6 @@ type BeaconChainConfig struct {
ValidatorPrivkeyFileName string // ValidatorPrivKeyFileName specifies the string name of a validator private key file.
WithdrawalPrivkeyFileName string // WithdrawalPrivKeyFileName specifies the string name of a withdrawal private key file.
RPCSyncCheck time.Duration // Number of seconds to query the sync service, to find out if the node is synced or not.
GoerliBlockTime uint64 // GoerliBlockTime is the number of seconds on avg a Goerli block is created.
EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature.
DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request.
MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync.
Expand Down Expand Up @@ -199,7 +198,6 @@ var defaultBeaconConfig = &BeaconChainConfig{
WithdrawalPrivkeyFileName: "/shardwithdrawalkey",
ValidatorPrivkeyFileName: "/validatorprivatekey",
RPCSyncCheck: 1,
GoerliBlockTime: 14, // 14 seconds on average for a goerli block to be created.
EmptySignature: [96]byte{},
DefaultPageSize: 250,
MaxPeersToSync: 15,
Expand Down

0 comments on commit d35531c

Please sign in to comment.