Skip to content

Commit

Permalink
p2p/sync: Wrong genesis digest is used to register pubsub topic subsc…
Browse files Browse the repository at this point in the history
…riptions (race condition) (#5500)

* Return error when genesis time or genesis root is not set
* Merge refs/heads/master into fail-on-unset-fork-digest
* move to own helpers
* make it non public
* Merge branch 'fail-on-unset-fork-digest' of https://github.com/prysmaticlabs/geth-sharding into fail-on-unset-fork-digest
* lint
* fix
* return error
* fix tests and error
* Merge refs/heads/master into fail-on-unset-fork-digest
* first round of test fixes
* second round of fixes
* Merge branch 'fail-on-unset-fork-digest' of https://github.com/prysmaticlabs/geth-sharding into fail-on-unset-fork-digest
* lint
* Merge refs/heads/master into fail-on-unset-fork-digest
* gaz
* Merge branch 'fail-on-unset-fork-digest' of https://github.com/prysmaticlabs/geth-sharding into fail-on-unset-fork-digest
  • Loading branch information
prestonvanloon committed Apr 19, 2020
1 parent 0ed0cb5 commit 3377472
Show file tree
Hide file tree
Showing 24 changed files with 222 additions and 66 deletions.
16 changes: 16 additions & 0 deletions beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
)

Expand All @@ -19,6 +20,7 @@ import (
type ChainInfoFetcher interface {
HeadFetcher
FinalizationFetcher
GenesisFetcher
}

// TimeFetcher retrieves the Eth2 data that's related to time.
Expand All @@ -27,6 +29,11 @@ type TimeFetcher interface {
CurrentSlot() uint64
}

// GenesisFetcher retrieves the eth2 data related to its genesis.
type GenesisFetcher interface {
GenesisValidatorRoot() [32]byte
}

// HeadFetcher defines a common interface for methods in blockchain service which
// directly retrieves head related data.
type HeadFetcher interface {
Expand Down Expand Up @@ -178,6 +185,15 @@ func (s *Service) GenesisTime() time.Time {
return s.genesisTime
}

// GenesisValidatorRoot returns the genesis validator
// root of the chain.
func (s *Service) GenesisValidatorRoot() [32]byte {
if !s.hasHeadState() {
return [32]byte{}
}
return bytesutil.ToBytes32(s.head.state.GenesisValidatorRoot())
}

// CurrentFork retrieves the latest fork information of the beacon chain.
func (s *Service) CurrentFork() *pb.Fork {
if !s.hasHeadState() {
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type ChainService struct {
BlocksReceived []*ethpb.SignedBeaconBlock
Balance *precompute.Balance
Genesis time.Time
ValidatorsRoot [32]byte
Fork *pb.Fork
DB db.Database
stateNotifier statefeed.Notifier
Expand Down Expand Up @@ -217,6 +218,11 @@ func (ms *ChainService) GenesisTime() time.Time {
return ms.Genesis
}

// GenesisValidatorRoot mocks the same method in the chain service.
func (ms *ChainService) GenesisValidatorRoot() [32]byte {
return ms.ValidatorsRoot
}

// CurrentSlot mocks the same method in the chain service.
func (ms *ChainService) CurrentSlot() uint64 {
return uint64(time.Now().Unix()-ms.Genesis.Unix()) / params.BeaconConfig().SecondsPerSlot
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/core/helpers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers",
visibility = [
"//beacon-chain:__subpackages__",
"//endtoend/evaluators:__pkg__",
"//shared/benchutil/benchmark_files:__subpackages__",
"//shared/testutil:__pkg__",
"//shared/keystore:__pkg__",
"//shared/interop:__pkg__",
"//shared/keystore:__pkg__",
"//shared/p2putils:__pkg__",
"//shared/testutil:__pkg__",
"//slasher:__subpackages__",
"//tools:__subpackages__",
"//validator:__subpackages__",
"//endtoend/evaluators:__pkg__",
],
deps = [
"//beacon-chain/cache:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/iputils:go_default_library",
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",
"//shared/runutil:go_default_library",
"//shared/sliceutil:go_default_library",
Expand Down Expand Up @@ -114,6 +115,7 @@ go_test(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",
"//shared/iputils:go_default_library",
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
forkDigest, err := s.ForkDigest()
forkDigest, err := s.forkDigest()
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func TestService_Broadcast(t *testing.T) {
cfg: &Config{
Encoding: "ssz",
},
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}

msg := &testpb.TestSimpleMessage{
Expand All @@ -38,7 +40,7 @@ func TestService_Broadcast(t *testing.T) {
topic := "/eth2/%x/testing"
// Set a test gossip mapping for testpb.TestSimpleMessage.
GossipTypeMapping[reflect.TypeOf(msg)] = topic
digest, err := p.ForkDigest()
digest, err := p.forkDigest()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -85,7 +87,10 @@ func TestService_Broadcast(t *testing.T) {
}

func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
p := Service{}
p := Service{
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}
if err := p.Broadcast(context.Background(), &testpb.AddressBook{}); err != ErrMessageNotMapped {
t.Fatalf("Expected error %v, got %v", ErrMessageNotMapped, err)
}
Expand Down
11 changes: 9 additions & 2 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func TestCreateListener(t *testing.T) {
port := 1024
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
cfg: &Config{UDPPort: uint(port)},
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
cfg: &Config{UDPPort: uint(port)},
}
listener := s.createListener(ipAddr, pkey)
defer listener.Close()
Expand Down Expand Up @@ -130,7 +132,10 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) {
addr := net.ParseIP("invalidIP")
_, pkey := createAddrAndPrivKey(t)
s := &Service{}
s := &Service{
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}
node, err := s.createLocalNode(pkey, addr, 0, 0)
if err != nil {
t.Fatal(err)
Expand All @@ -149,6 +154,8 @@ func TestMultiAddrConversion_OK(t *testing.T) {
TCPPort: 0,
UDPPort: 0,
},
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}
listener := s.createListener(ipAddr, pkey)
defer listener.Close()
Expand Down
37 changes: 4 additions & 33 deletions beacon-chain/p2p/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
Expand All @@ -21,8 +21,8 @@ var eth2ENRKey = params.BeaconNetworkConfig().ETH2Key

// ForkDigest returns the current fork digest of
// the node.
func (s *Service) ForkDigest() ([4]byte, error) {
return createForkDigest(s.genesisTime, s.genesisValidatorsRoot)
func (s *Service) forkDigest() ([4]byte, error) {
return p2putils.CreateForkDigest(s.genesisTime, s.genesisValidatorsRoot)
}

// Compares fork ENRs between an incoming peer's record and our node's
Expand Down Expand Up @@ -72,35 +72,6 @@ func (s *Service) compareForkENR(record *enr.Record) error {
return nil
}

// Creates a fork digest from a genesis time and genesis
// validators root, utilizing the current slot to determine
// the active fork version in the node.
func createForkDigest(
genesisTime time.Time,
genesisValidatorsRoot []byte,
) ([4]byte, error) {
currentSlot := helpers.SlotsSince(genesisTime)
currentEpoch := helpers.SlotToEpoch(currentSlot)

// We retrieve a list of scheduled forks by epoch.
// We loop through the keys in this map to determine the current
// fork version based on the current, time-based epoch number
// since the genesis time.
currentForkVersion := params.BeaconConfig().GenesisForkVersion
scheduledForks := params.BeaconConfig().ForkVersionSchedule
for epoch, forkVersion := range scheduledForks {
if epoch <= currentEpoch {
currentForkVersion = forkVersion
}
}

digest, err := helpers.ComputeForkDigest(currentForkVersion, genesisValidatorsRoot)
if err != nil {
return [4]byte{}, err
}
return digest, nil
}

// Adds a fork entry as an ENR record under the eth2EnrKey for
// the local node. The fork entry is an ssz-encoded enrForkID type
// which takes into account the current fork version from the current
Expand All @@ -111,7 +82,7 @@ func addForkEntry(
genesisTime time.Time,
genesisValidatorsRoot []byte,
) (*enode.LocalNode, error) {
digest, err := createForkDigest(genesisTime, genesisValidatorsRoot)
digest, err := p2putils.CreateForkDigest(genesisTime, genesisValidatorsRoot)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -213,7 +214,7 @@ func TestDiscv5_AddRetrieveForkEntryENR(t *testing.T) {

genesisTime := time.Now()
genesisValidatorsRoot := make([]byte, 32)
digest, err := createForkDigest(genesisTime, make([]byte, 32))
digest, err := p2putils.CreateForkDigest(genesisTime, make([]byte, 32))
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type ConnectionHandler interface {
// EncodingProvider provides p2p network encoding.
type EncodingProvider interface {
Encoding() encoder.NetworkEncoding
ForkDigest() ([4]byte, error)
}

// PubSubProvider provides the p2p pubsub protocol.
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type TestP2P struct {
pubsub *pubsub.PubSub
BroadcastCalled bool
DelaySend bool
Digest [4]byte
peers *peers.Status
LocalMetadata *pb.MetaData
}
Expand Down Expand Up @@ -257,7 +258,7 @@ func (p *TestP2P) RefreshENR(epoch uint64) {

// ForkDigest mocks the p2p func.
func (p *TestP2P) ForkDigest() ([4]byte, error) {
return [4]byte{}, nil
return p.Digest, nil
}

// Metadata mocks the peer's metadata.
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/messagehandler:go_default_library",
"//shared/p2putils:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/runutil:go_default_library",
Expand Down
7 changes: 6 additions & 1 deletion beacon-chain/sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ var (
)

func (r *Service) updateMetrics() {
// do not update metrics if genesis time
// has not been initialized
if r.chain.GenesisTime().IsZero() {
return
}
// We update the dynamic subnet topics.
digest, err := r.p2p.ForkDigest()
digest, err := r.forkDigest()
if err != nil {
log.WithError(err).Errorf("Could not compute fork digest")
}
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/rpc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
return err
}

forkDigest, err := r.p2p.ForkDigest()
forkDigest, err := r.forkDigest()
if err != nil {
return err
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
return err
}

forkDigest, err := r.p2p.ForkDigest()
forkDigest, err := r.forkDigest()
if err != nil {
return err
}
Expand All @@ -200,7 +200,7 @@ func (r *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream
}

func (r *Service) validateStatusMessage(msg *pb.Status, stream network.Stream) error {
forkDigest, err := r.p2p.ForkDigest()
forkDigest, err := r.forkDigest()
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 3377472

Please sign in to comment.