diff --git a/beacon-chain/core/helpers/validators.go b/beacon-chain/core/helpers/validators.go index 0a7777658e4..0a1605307cb 100644 --- a/beacon-chain/core/helpers/validators.go +++ b/beacon-chain/core/helpers/validators.go @@ -254,7 +254,7 @@ func Domain(fork *pb.Fork, epoch uint64, domainType []byte) uint64 { } // IsEligibleForActivationQueue checks if the validator is eligible to -// be places into the activation queue. +// be placed into the activation queue. // // Spec pseudocode definition: // def is_eligible_for_activation_queue(validator: Validator) -> bool: @@ -266,8 +266,19 @@ func Domain(fork *pb.Fork, epoch uint64, domainType []byte) uint64 { // and validator.effective_balance == MAX_EFFECTIVE_BALANCE // ) func IsEligibleForActivationQueue(validator *ethpb.Validator) bool { - return validator.ActivationEligibilityEpoch == params.BeaconConfig().FarFutureEpoch && - validator.EffectiveBalance == params.BeaconConfig().MaxEffectiveBalance + return isEligibileForActivationQueue(validator.ActivationEligibilityEpoch, validator.EffectiveBalance) +} + +// IsEligibleForActivationQueueUsingTrie checks if the read-only validator is eligible to +// be placed into the activation queue. +func IsEligibleForActivationQueueUsingTrie(validator *stateTrie.ReadOnlyValidator) bool { + return isEligibileForActivationQueue(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance()) +} + +// isEligibleForActivationQueue carries out the logic for IsEligibleForActivationQueue* +func isEligibileForActivationQueue(activationEligibilityEpoch uint64, effectiveBalance uint64) bool { + return activationEligibilityEpoch == params.BeaconConfig().FarFutureEpoch && + effectiveBalance == params.BeaconConfig().MaxEffectiveBalance } // IsEligibleForActivation checks if the validator is eligible for activation. @@ -285,6 +296,20 @@ func IsEligibleForActivationQueue(validator *ethpb.Validator) bool { // ) func IsEligibleForActivation(state *stateTrie.BeaconState, validator *ethpb.Validator) bool { finalizedEpoch := state.FinalizedCheckpointEpoch() - return validator.ActivationEligibilityEpoch <= finalizedEpoch && - validator.ActivationEpoch == params.BeaconConfig().FarFutureEpoch + return isEligibileForActivation(validator.ActivationEligibilityEpoch, validator.ActivationEpoch, finalizedEpoch) +} + +// IsEligibleForActivationUsingTrie checks if the validator is eligible for activation. +func IsEligibleForActivationUsingTrie(state *stateTrie.BeaconState, validator *stateTrie.ReadOnlyValidator) bool { + cpt := state.FinalizedCheckpoint() + if cpt == nil { + return false + } + return isEligibileForActivation(validator.ActivationEligibilityEpoch(), validator.ActivationEpoch(), cpt.Epoch) +} + +// isEligibleForActivation carries out the logic for IsEligibleForActivation* +func isEligibileForActivation(activationEligibilityEpoch uint64, activationEpoch uint64, finalizedEpoch uint64) bool { + return activationEligibilityEpoch <= finalizedEpoch && + activationEpoch == params.BeaconConfig().FarFutureEpoch } diff --git a/beacon-chain/rpc/beacon/BUILD.bazel b/beacon-chain/rpc/beacon/BUILD.bazel index 54c25778f67..890fcd6410d 100644 --- a/beacon-chain/rpc/beacon/BUILD.bazel +++ b/beacon-chain/rpc/beacon/BUILD.bazel @@ -11,11 +11,13 @@ go_library( "server.go", "slashings.go", "validators.go", + "validators_stream.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/beacon", visibility = ["//beacon-chain:__subpackages__"], deps = [ "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/cache/depositcache:go_default_library", "//beacon-chain/core/epoch/precompute:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/block:go_default_library", @@ -29,18 +31,22 @@ go_library( "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/powchain:go_default_library", + "//beacon-chain/state:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/attestationutil:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "//shared/hashutil:go_default_library", "//shared/pagination:go_default_library", "//shared/params:go_default_library", "//shared/sliceutil:go_default_library", "//shared/slotutil:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", + "@com_github_patrickmn_go_cache//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", "@org_golang_google_grpc//codes:go_default_library", "@org_golang_google_grpc//status:go_default_library", ], @@ -55,6 +61,7 @@ go_test( "committees_test.go", "config_test.go", "slashings_test.go", + "validators_stream_test.go", "validators_test.go", ], embed = [":go_default_library"], diff --git a/beacon-chain/rpc/beacon/server.go b/beacon-chain/rpc/beacon/server.go index 2f784d2af4e..844a0e36caa 100644 --- a/beacon-chain/rpc/beacon/server.go +++ b/beacon-chain/rpc/beacon/server.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" @@ -26,6 +27,8 @@ type Server struct { HeadFetcher blockchain.HeadFetcher FinalizationFetcher blockchain.FinalizationFetcher ParticipationFetcher blockchain.ParticipationFetcher + DepositFetcher depositcache.DepositFetcher + BlockFetcher powchain.POWBlockFetcher GenesisTimeFetcher blockchain.TimeFetcher StateNotifier statefeed.Notifier BlockNotifier blockfeed.Notifier diff --git a/beacon-chain/rpc/beacon/validators.go b/beacon-chain/rpc/beacon/validators.go index 673be21c01c..a391fa54e89 100644 --- a/beacon-chain/rpc/beacon/validators.go +++ b/beacon-chain/rpc/beacon/validators.go @@ -629,12 +629,6 @@ func (bs *Server) GetValidatorPerformance( }, nil } -// StreamValidatorsInfo streams out important validator information and metadata -// each epoch. -func (bs *Server) StreamValidatorsInfo(stream ethpb.BeaconChain_StreamValidatorsInfoServer) error { - return status.Error(codes.Unimplemented, "Unimplemented") -} - // Determines whether a validator has already exited. func validatorHasExited(validator *ethpb.Validator, currentEpoch uint64) bool { farFutureEpoch := params.BeaconConfig().FarFutureEpoch diff --git a/beacon-chain/rpc/beacon/validators_stream.go b/beacon-chain/rpc/beacon/validators_stream.go new file mode 100644 index 00000000000..ddd16f0852d --- /dev/null +++ b/beacon-chain/rpc/beacon/validators_stream.go @@ -0,0 +1,470 @@ +package beacon + +import ( + "context" + "fmt" + "io" + "math/big" + "sort" + "sync" + "time" + + cache "github.com/patrickmn/go-cache" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db" + "github.com/prysmaticlabs/prysm/beacon-chain/powchain" + "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/shared/params" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// infostream is a struct for each instance of the infostream created by a client connection. +type infostream struct { + ctx context.Context + headFetcher blockchain.HeadFetcher + depositFetcher depositcache.DepositFetcher + blockFetcher powchain.POWBlockFetcher + beaconDB db.ReadOnlyDatabase + pubKeys [][]byte + pubKeysMutex *sync.RWMutex + stateChannel chan *feed.Event + stateSub event.Subscription + eth1Deposits *cache.Cache + eth1DepositsMutex *sync.RWMutex + currentEpoch uint64 + stream ethpb.BeaconChain_StreamValidatorsInfoServer + genesisTime uint64 +} + +// eth1Deposit contains information about a deposit made on the Ethereum 1 chain. +type eth1Deposit struct { + block *big.Int + data *ethpb.Deposit_Data +} + +// StreamValidatorsInfo returns a stream of information for given validators. +// Validators are supplied dynamically by the client, and can be added, removed and reset at any time. +// Information about the current set of validators is supplied as soon as the end-of-epoch accounting has been processed, +// providing a near real-time view of the state of the validators. +// Note that this will stream information whilst syncing; this is intended, to allow for complete validator state capture +// over time. If this is not required then the client can either wait until the beacon node is synced, or filter results +// based on the epoch value in the returned validator info. +func (bs *Server) StreamValidatorsInfo(stream ethpb.BeaconChain_StreamValidatorsInfoServer) error { + stateChannel := make(chan *feed.Event, 1) + epochDuration := time.Duration(params.BeaconConfig().SecondsPerSlot*params.BeaconConfig().SlotsPerEpoch) * time.Second + + // Fetch our current epoch. + headState, err := bs.HeadFetcher.HeadState(bs.Ctx) + if err != nil { + return status.Error(codes.Internal, "Could not access head state") + } + if headState == nil { + return status.Error(codes.Internal, "Not ready to serve information") + } + + // Create an infostream struct. This will track relevant state for the stream. + infostream := &infostream{ + ctx: bs.Ctx, + headFetcher: bs.HeadFetcher, + depositFetcher: bs.DepositFetcher, + blockFetcher: bs.BlockFetcher, + beaconDB: bs.BeaconDB, + pubKeys: make([][]byte, 0), + pubKeysMutex: &sync.RWMutex{}, + stateChannel: stateChannel, + stateSub: bs.StateNotifier.StateFeed().Subscribe(stateChannel), + eth1Deposits: cache.New(epochDuration, epochDuration*2), + eth1DepositsMutex: &sync.RWMutex{}, + currentEpoch: headState.Slot() / params.BeaconConfig().SlotsPerEpoch, + stream: stream, + genesisTime: headState.GenesisTime(), + } + defer infostream.stateSub.Unsubscribe() + + return infostream.handleConnection() +} + +// handleConnection handles the two-way connection between client and server. +func (is *infostream) handleConnection() error { + // Handle messages from client. + go func() { + for { + msg, err := is.stream.Recv() + if err == io.EOF { + return + } + if err != nil { + // Errors handle elsewhere + select { + case <-is.stream.Context().Done(): + return + case <-is.ctx.Done(): + return + case <-is.stateSub.Err(): + return + default: + } + log.WithError(err).Debug("Receive from validators stream listener failed; client probably closed connection") + return + } + is.handleMessage(msg) + } + }() + // Send responses at the end of every epoch. + for { + select { + case event := <-is.stateChannel: + if event.Type == statefeed.BlockProcessed { + go is.handleBlockProcessed() + } + case <-is.stateSub.Err(): + return status.Error(codes.Aborted, "Subscriber closed") + case <-is.ctx.Done(): + return status.Error(codes.Canceled, "Service context canceled") + case <-is.stream.Context().Done(): + return status.Error(codes.Canceled, "Stream context canceled") + } + } +} + +// handleMessage handles a message from the infostream client, updating the list of keys. +func (is *infostream) handleMessage(msg *ethpb.ValidatorChangeSet) { + var err error + switch msg.Action { + case ethpb.SetAction_ADD_VALIDATOR_KEYS: + err = is.handleAddValidatorKeys(msg.PublicKeys) + case ethpb.SetAction_REMOVE_VALIDATOR_KEYS: + is.handleRemoveValidatorKeys(msg.PublicKeys) + case ethpb.SetAction_SET_VALIDATOR_KEYS: + err = is.handleSetValidatorKeys(msg.PublicKeys) + } + if err != nil { + log.WithError(err).Debug("Error handling request; closing stream") + is.stream.Context().Done() + } +} + +// handleAddValidatorKeys handles a request to add validator keys. +func (is *infostream) handleAddValidatorKeys(reqPubKeys [][]byte) error { + is.pubKeysMutex.Lock() + // Create existence map to ensure we don't duplicate keys. + pubKeysMap := make(map[[48]byte]bool, len(is.pubKeys)) + for _, pubKey := range is.pubKeys { + pubKeysMap[bytesutil.ToBytes48(pubKey)] = true + } + addedPubKeys := make([][]byte, 0, len(reqPubKeys)) + for _, pubKey := range reqPubKeys { + if _, exists := pubKeysMap[bytesutil.ToBytes48(pubKey)]; !exists { + is.pubKeys = append(is.pubKeys, pubKey) + addedPubKeys = append(addedPubKeys, pubKey) + } + } + is.pubKeysMutex.Unlock() + // Send immediate info for the new validators. + return is.sendValidatorsInfo(addedPubKeys) +} + +// handleSetValidatorKeys handles a request to set validator keys. +func (is *infostream) handleSetValidatorKeys(reqPubKeys [][]byte) error { + is.pubKeysMutex.Lock() + is.pubKeys = make([][]byte, 0, len(reqPubKeys)) + for _, pubKey := range reqPubKeys { + is.pubKeys = append(is.pubKeys, pubKey) + } + is.pubKeysMutex.Unlock() + // Send immediate info for the new validators. + return is.sendValidatorsInfo(is.pubKeys) +} + +// handleRemoveValidatorKeys handles a request to remove validator keys. +func (is *infostream) handleRemoveValidatorKeys(reqPubKeys [][]byte) { + is.pubKeysMutex.Lock() + // Create existence map to track what we have to delete. + pubKeysMap := make(map[[48]byte]bool, len(reqPubKeys)) + for _, pubKey := range reqPubKeys { + pubKeysMap[bytesutil.ToBytes48(pubKey)] = true + } + max := len(is.pubKeys) + for i := 0; i < max; i++ { + if _, exists := pubKeysMap[bytesutil.ToBytes48(is.pubKeys[i])]; exists { + copy(is.pubKeys[i:], is.pubKeys[i+1:]) + is.pubKeys = is.pubKeys[:len(is.pubKeys)-1] + i-- + max-- + } + } + is.pubKeysMutex.Unlock() +} + +// sendValidatorsInfo sends validator info for a specific set of public keys. +func (is *infostream) sendValidatorsInfo(pubKeys [][]byte) error { + validators, err := is.generateValidatorsInfo(pubKeys) + if err != nil { + return err + } + for _, validator := range validators { + if err := is.stream.Send(validator); err != nil { + return err + } + } + return nil +} + +// generateValidatorsInfo generates the validator info for a set of public keys. +func (is *infostream) generateValidatorsInfo(pubKeys [][]byte) ([]*ethpb.ValidatorInfo, error) { + if is.headFetcher == nil { + return nil, status.Error(codes.Internal, "No head fetcher") + } + headState, err := is.headFetcher.HeadState(is.ctx) + if err != nil { + return nil, status.Error(codes.Internal, "Could not access head state") + } + if headState == nil { + return nil, status.Error(codes.Internal, "Not ready to serve information") + } + epoch := headState.Slot() / params.BeaconConfig().SlotsPerEpoch + if epoch == 0 { + // Not reporting, but no error. + return nil, nil + } + // We are reporting on the state at the end of the *previous* epoch. + epoch-- + + validators := headState.ValidatorsReadOnly() + res := make([]*ethpb.ValidatorInfo, 0, len(pubKeys)) + for _, pubKey := range pubKeys { + info, err := is.generateValidatorInfo(pubKey, validators, headState, epoch) + if err != nil { + return nil, err + } + res = append(res, info) + } + + // Calculate activation time for pending validators (if there are any). + is.calculateActivationTimeForPendingValidators(res, validators, headState, epoch) + + return res, nil +} + +// generateValidatorInfo generates the validator info for a public key. +func (is *infostream) generateValidatorInfo(pubKey []byte, validators []*state.ReadOnlyValidator, headState *state.BeaconState, epoch uint64) (*ethpb.ValidatorInfo, error) { + info := ðpb.ValidatorInfo{ + PublicKey: pubKey, + Epoch: epoch, + Status: ethpb.ValidatorStatus_UNKNOWN_STATUS, + } + + // Index + var ok bool + var err error + info.Index, ok, err = is.beaconDB.ValidatorIndex(is.ctx, pubKey) + if err != nil { + return nil, status.Error(codes.Internal, "Failed to obtain validator index") + } + if !ok { + // We don't know of this validator; it's either a pending deposit or totally unknown. + return is.generatePendingValidatorInfo(info) + } + validator := validators[info.Index] + + // Status and progression timestamp + info.Status, info.TransitionTimestamp = is.calculateStatusAndTransition(validator, helpers.CurrentEpoch(headState)) + + // Balance + info.Balance = headState.Balances()[info.Index] + + // Effective balance (for attesting states) + if info.Status == ethpb.ValidatorStatus_ACTIVE || + info.Status == ethpb.ValidatorStatus_SLASHING || + info.Status == ethpb.ValidatorStatus_EXITING { + info.EffectiveBalance = validator.EffectiveBalance() + } + + return info, nil +} + +// generatePendingValidatorInfo generates the validator info for a pending (or unknown) key. +func (is *infostream) generatePendingValidatorInfo(info *ethpb.ValidatorInfo) (*ethpb.ValidatorInfo, error) { + key := fmt.Sprintf("%s", info.PublicKey) + var deposit *eth1Deposit + is.eth1DepositsMutex.Lock() + if fetchedDeposit, exists := is.eth1Deposits.Get(key); exists { + deposit = fetchedDeposit.(*eth1Deposit) + } else { + fetchedDeposit, eth1BlockNumber := is.depositFetcher.DepositByPubkey(is.ctx, info.PublicKey) + if fetchedDeposit == nil { + deposit = ð1Deposit{} + is.eth1Deposits.Set(key, deposit, cache.DefaultExpiration) + } else { + deposit = ð1Deposit{ + block: eth1BlockNumber, + data: fetchedDeposit.Data, + } + is.eth1Deposits.Set(key, deposit, cache.DefaultExpiration) + } + } + is.eth1DepositsMutex.Unlock() + if deposit.block != nil { + info.Status = ethpb.ValidatorStatus_DEPOSITED + if queueTimestamp, err := is.depositQueueTimestamp(deposit.block); err != nil { + log.WithError(err).Error("Failed to obtain queue activation timestamp") + } else { + info.TransitionTimestamp = queueTimestamp + } + info.Balance = deposit.data.Amount + } + return info, nil +} + +func (is *infostream) calculateActivationTimeForPendingValidators(res []*ethpb.ValidatorInfo, validators []*state.ReadOnlyValidator, headState *state.BeaconState, epoch uint64) { + // pendingValidatorsMap is map from the validator pubkey to the index in our return array + pendingValidatorsMap := make(map[[48]byte]int) + for i, info := range res { + if info.Status == ethpb.ValidatorStatus_PENDING { + pendingValidatorsMap[bytesutil.ToBytes48(info.PublicKey)] = i + } + } + if len(pendingValidatorsMap) == 0 { + // Nothing to do. + return + } + + // Fetch the list of pending validators; count the number of attesting validators. + numAttestingValidators := uint64(0) + pendingValidators := make([]uint64, 0, len(validators)) + for _, validator := range validators { + if helpers.IsEligibleForActivationUsingTrie(headState, validator) { + pubKey := validator.PublicKey() + validatorIndex, ok, err := is.beaconDB.ValidatorIndex(is.ctx, pubKey[:]) + if err == nil && ok { + pendingValidators = append(pendingValidators, validatorIndex) + } + } + if helpers.IsActiveValidatorUsingTrie(validator, epoch) { + numAttestingValidators++ + } + } + + sortableIndices := &indicesSorter{ + validators: validators, + indices: pendingValidators, + } + sort.Sort(sortableIndices) + + sortedIndices := sortableIndices.indices + + // Loop over epochs, roughly simulating progression. + for curEpoch := epoch + 1; len(sortedIndices) > 0 && len(pendingValidators) > 0; curEpoch++ { + toProcess, _ := helpers.ValidatorChurnLimit(numAttestingValidators) + if toProcess > uint64(len(sortedIndices)) { + toProcess = uint64(len(sortedIndices)) + } + for i := uint64(0); i < toProcess; i++ { + validator := validators[sortedIndices[i]] + if index, exists := pendingValidatorsMap[validator.PublicKey()]; exists { + res[index].TransitionTimestamp = is.epochToTimestamp(helpers.DelayedActivationExitEpoch(curEpoch)) + delete(pendingValidatorsMap, validator.PublicKey()) + } + numAttestingValidators++ + } + sortedIndices = sortedIndices[toProcess:] + } +} + +// handleBlockProcessed handles the situation where a block has been processed by the Prysm server. +func (is *infostream) handleBlockProcessed() { + headState, err := is.headFetcher.HeadState(is.ctx) + if err != nil { + log.Warn("Could not access head state for infostream") + return + } + if headState == nil { + // We aren't ready to serve information + return + } + blockEpoch := headState.Slot() / params.BeaconConfig().SlotsPerEpoch + if blockEpoch == is.currentEpoch { + // Epoch hasn't changed, nothing to report yet. + return + } + is.currentEpoch = blockEpoch + if err := is.sendValidatorsInfo(is.pubKeys); err != nil { + // Client probably disconnected. + log.WithError(err).Debug("Failed to send infostream response") + } +} + +type indicesSorter struct { + validators []*state.ReadOnlyValidator + indices []uint64 +} + +func (s indicesSorter) Len() int { return len(s.indices) } +func (s indicesSorter) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] } +func (s indicesSorter) Less(i, j int) bool { + if s.validators[s.indices[i]].ActivationEligibilityEpoch() == s.validators[s.indices[j]].ActivationEligibilityEpoch() { + return s.indices[i] < s.indices[j] + } + return s.validators[s.indices[i]].ActivationEligibilityEpoch() < s.validators[s.indices[j]].ActivationEligibilityEpoch() +} + +func (is *infostream) calculateStatusAndTransition(validator *state.ReadOnlyValidator, currentEpoch uint64) (ethpb.ValidatorStatus, uint64) { + farFutureEpoch := params.BeaconConfig().FarFutureEpoch + + if validator == nil { + return ethpb.ValidatorStatus_UNKNOWN_STATUS, 0 + } + + if currentEpoch < validator.ActivationEligibilityEpoch() { + if helpers.IsEligibleForActivationQueueUsingTrie(validator) { + return ethpb.ValidatorStatus_DEPOSITED, is.epochToTimestamp(validator.ActivationEligibilityEpoch()) + } + return ethpb.ValidatorStatus_DEPOSITED, 0 + } + if currentEpoch < validator.ActivationEpoch() { + return ethpb.ValidatorStatus_PENDING, is.epochToTimestamp(validator.ActivationEpoch()) + } + if validator.ExitEpoch() == farFutureEpoch { + return ethpb.ValidatorStatus_ACTIVE, 0 + } + if currentEpoch < validator.ExitEpoch() { + if validator.Slashed() { + return ethpb.ValidatorStatus_SLASHING, is.epochToTimestamp(validator.ExitEpoch()) + } + return ethpb.ValidatorStatus_EXITING, is.epochToTimestamp(validator.ExitEpoch()) + } + return ethpb.ValidatorStatus_EXITED, is.epochToTimestamp(validator.WithdrawableEpoch()) +} + +// epochToTimestamp converts an epoch number to a timestamp. +func (is *infostream) epochToTimestamp(epoch uint64) uint64 { + return is.genesisTime + epoch*params.BeaconConfig().SecondsPerSlot*params.BeaconConfig().SlotsPerEpoch +} + +// depositQueueTimestamp calculates the timestamp for exit of the validator from the deposit queue. +func (is *infostream) depositQueueTimestamp(eth1BlockNumber *big.Int) (uint64, error) { + blockTimeStamp, err := is.blockFetcher.BlockTimeByHeight(is.ctx, eth1BlockNumber) + if err != nil { + return 0, err + } + followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().GoerliBlockTime) * time.Second + eth1UnixTime := time.Unix(int64(blockTimeStamp), 0).Add(followTime) + + votingPeriod := time.Duration(params.BeaconConfig().SlotsPerEth1VotingPeriod*params.BeaconConfig().SecondsPerSlot) * time.Second + activationTime := eth1UnixTime.Add(votingPeriod) + eth2Genesis := time.Unix(int64(is.genesisTime), 0) + + if eth2Genesis.After(activationTime) { + return is.genesisTime, nil + } + return uint64(activationTime.Unix()), nil +} diff --git a/beacon-chain/rpc/beacon/validators_stream_test.go b/beacon-chain/rpc/beacon/validators_stream_test.go new file mode 100644 index 00000000000..d94de08420a --- /dev/null +++ b/beacon-chain/rpc/beacon/validators_stream_test.go @@ -0,0 +1,165 @@ +package beacon + +import ( + "sync" + "testing" + + "github.com/prysmaticlabs/prysm/shared/params" +) + +func TestInfostream_EpochToTimestamp(t *testing.T) { + params.UseMainnetConfig() + tests := []struct { + name string + epoch uint64 + timestamp uint64 + }{ + { + name: "Genesis", + epoch: 0, + timestamp: 0, + }, + { + name: "One", + epoch: 1, + timestamp: 384, + }, + { + name: "Two", + epoch: 2, + timestamp: 768, + }, + { + name: "OneHundred", + epoch: 100, + timestamp: 38400, + }, + } + + is := &infostream{} + for _, test := range tests { + timestamp := is.epochToTimestamp(test.epoch) + if timestamp != test.timestamp { + t.Errorf("Incorrect timestamp: expected %v, received %v", test.timestamp, timestamp) + } + } +} + +func TestInfostream_HandleSetValidatorKeys(t *testing.T) { + params.UseMainnetConfig() + tests := []struct { + name string + reqPubKeys [][]byte + }{ + { + name: "None", + }, + { + name: "One", + reqPubKeys: [][]byte{{0x01}}, + }, + { + name: "Two", + reqPubKeys: [][]byte{{0x01}, {0x02}}, + }, + } + + is := &infostream{ + pubKeysMutex: &sync.RWMutex{}, + pubKeys: make([][]byte, 0), + } + for _, test := range tests { + is.handleSetValidatorKeys(test.reqPubKeys) + if len(is.pubKeys) != len(test.reqPubKeys) { + t.Errorf("Incorrect number of keys: expected %v, received %v", len(test.reqPubKeys), len(is.pubKeys)) + } + } +} + +func TestInfostream_HandleAddValidatorKeys(t *testing.T) { + params.UseMainnetConfig() + tests := []struct { + name string + initialPubKeys [][]byte + reqPubKeys [][]byte + finalLen int + }{ + { + name: "None", + finalLen: 0, + }, + { + name: "NoneAddOne", + reqPubKeys: [][]byte{{0x01}}, + finalLen: 1, + }, + { + name: "OneAddOne", + initialPubKeys: [][]byte{{0x01}}, + reqPubKeys: [][]byte{{0x02}}, + finalLen: 2, + }, + { + name: "Duplicate", + initialPubKeys: [][]byte{{0x01}}, + reqPubKeys: [][]byte{{0x01}}, + finalLen: 1, + }, + } + + is := &infostream{ + pubKeysMutex: &sync.RWMutex{}, + pubKeys: make([][]byte, 0), + } + for _, test := range tests { + is.handleSetValidatorKeys(test.initialPubKeys) + is.handleAddValidatorKeys(test.reqPubKeys) + if len(is.pubKeys) != test.finalLen { + t.Errorf("Incorrect number of keys: expected %v, received %v", len(is.pubKeys), test.finalLen) + } + } +} + +func TestInfostream_HandleRemoveValidatorKeys(t *testing.T) { + params.UseMainnetConfig() + tests := []struct { + name string + initialPubKeys [][]byte + reqPubKeys [][]byte + finalLen int + }{ + { + name: "None", + finalLen: 0, + }, + { + name: "OneRemoveNone", + initialPubKeys: [][]byte{{0x01}}, + finalLen: 1, + }, + { + name: "NoneRemoveOne", + initialPubKeys: [][]byte{}, + reqPubKeys: [][]byte{{0x01}}, + finalLen: 0, + }, + { + name: "TwoRemoveOne", + initialPubKeys: [][]byte{{0x01, 0x02}}, + reqPubKeys: [][]byte{{0x01}}, + finalLen: 1, + }, + } + + is := &infostream{ + pubKeysMutex: &sync.RWMutex{}, + pubKeys: make([][]byte, 0), + } + for _, test := range tests { + is.handleSetValidatorKeys(test.initialPubKeys) + is.handleRemoveValidatorKeys(test.reqPubKeys) + if len(is.pubKeys) != test.finalLen { + t.Errorf("Incorrect number of keys: expected %v, received %v", len(is.pubKeys), test.finalLen) + } + } +} diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 0ca8e146324..49a6faacb8c 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -251,6 +251,8 @@ func (s *Service) Start() { FinalizationFetcher: s.finalizationFetcher, ParticipationFetcher: s.participationFetcher, ChainStartFetcher: s.chainStartFetcher, + DepositFetcher: s.depositFetcher, + BlockFetcher: s.powChainService, CanonicalStateChan: s.canonicalStateChan, GenesisTimeFetcher: s.genesisTimeFetcher, StateNotifier: s.stateNotifier,