Skip to content

Commit

Permalink
Fix Status Messages During Genesis (#6046)
Browse files Browse the repository at this point in the history
* fix status messages
* fix finalized checkpoint
* remove code
* Merge refs/heads/master into fixStatusMessages
* Update beacon-chain/blockchain/service.go
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* fix
* fix test
* fix again
* fix
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
* Merge refs/heads/master into fixStatusMessages
  • Loading branch information
nisdas committed Jun 1, 2020
1 parent 4ed0e43 commit e715339
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 32 deletions.
19 changes: 0 additions & 19 deletions beacon-chain/blockchain/chain_info.go
@@ -1,7 +1,6 @@
package blockchain

import (
"bytes"
"context"
"time"

Expand Down Expand Up @@ -79,12 +78,6 @@ func (s *Service) FinalizedCheckpt() *ethpb.Checkpoint {
return &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
}

// If head state exists but there hasn't been a finalized check point,
// the check point's root should refer to genesis block root.
if bytes.Equal(s.finalizedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) {
return &ethpb.Checkpoint{Root: s.genesisRoot[:]}
}

return state.CopyCheckpoint(s.finalizedCheckpt)
}

Expand All @@ -94,12 +87,6 @@ func (s *Service) CurrentJustifiedCheckpt() *ethpb.Checkpoint {
return &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
}

// If head state exists but there hasn't been a justified check point,
// the check point root should refer to genesis block root.
if bytes.Equal(s.justifiedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) {
return &ethpb.Checkpoint{Root: s.genesisRoot[:]}
}

return state.CopyCheckpoint(s.justifiedCheckpt)
}

Expand All @@ -109,12 +96,6 @@ func (s *Service) PreviousJustifiedCheckpt() *ethpb.Checkpoint {
return &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
}

// If head state exists but there hasn't been a justified check point,
// the check point root should refer to genesis block root.
if bytes.Equal(s.prevJustifiedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) {
return &ethpb.Checkpoint{Root: s.genesisRoot[:]}
}

return state.CopyCheckpoint(s.prevJustifiedCheckpt)
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/service.go
Expand Up @@ -353,9 +353,9 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.B
return errors.Wrap(err, "could save genesis block root")
}

genesisCheckpoint := &ethpb.Checkpoint{Root: genesisBlkRoot[:]}
// Finalized checkpoint at genesis is a zero hash.
genesisCheckpoint := genesisState.FinalizedCheckpoint()

// Add the genesis block to the fork choice store.
s.justifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint)
s.prevJustifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint)
s.bestJustifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint)
Expand Down
42 changes: 33 additions & 9 deletions beacon-chain/rpc/beacon/blocks.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/pagination"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -246,24 +247,47 @@ func (bs *Server) chainHeadRetrieval(ctx context.Context) (*ethpb.ChainHead, err
return nil, status.Errorf(codes.Internal, "Could not get head block root: %v", err)
}

isGenesis := func(cp *ethpb.Checkpoint) bool {
return bytesutil.ToBytes32(cp.Root) == params.BeaconConfig().ZeroHash && cp.Epoch == 0
}
// Retrieve genesis block in the event we have genesis checkpoints.
genBlock, err := bs.BeaconDB.GenesisBlock(ctx)
if err != nil || genBlock == nil || genBlock.Block == nil {
return nil, status.Error(codes.Internal, "Could not get genesis block")
}

var b *ethpb.SignedBeaconBlock

finalizedCheckpoint := bs.FinalizationFetcher.FinalizedCheckpt()
b, err := bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(finalizedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get finalized block")
if isGenesis(finalizedCheckpoint) {
b = genBlock
} else {
b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(finalizedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get finalized block")
}
}
finalizedSlot := b.Block.Slot

justifiedCheckpoint := bs.FinalizationFetcher.CurrentJustifiedCheckpt()
b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(justifiedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get justified block")
if isGenesis(justifiedCheckpoint) {
b = genBlock
} else {
b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(justifiedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get justified block")
}
}
justifiedSlot := b.Block.Slot

prevJustifiedCheckpoint := bs.FinalizationFetcher.PreviousJustifiedCheckpt()
b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(prevJustifiedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get prev justified block")
if isGenesis(prevJustifiedCheckpoint) {
b = genBlock
} else {
b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(prevJustifiedCheckpoint.Root))
if err != nil || b == nil || b.Block == nil {
return nil, status.Error(codes.Internal, "Could not get prev justified block")
}
}
prevJustifiedSlot := b.Block.Slot

Expand Down
36 changes: 36 additions & 0 deletions beacon-chain/rpc/beacon/blocks_test.go
Expand Up @@ -374,6 +374,18 @@ func TestServer_GetChainHead_NoFinalizedBlock(t *testing.T) {
t.Fatal(err)
}

genBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}}
if err := db.SaveBlock(context.Background(), genBlock); err != nil {
t.Fatal(err)
}
gRoot, err := stateutil.BlockRoot(genBlock.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil {
t.Fatal(err)
}

bs := &Server{
BeaconDB: db,
HeadFetcher: &chainMock.ChainService{Block: &ethpb.SignedBeaconBlock{}, State: s},
Expand Down Expand Up @@ -405,6 +417,18 @@ func TestServer_GetChainHead_NoHeadBlock(t *testing.T) {
func TestServer_GetChainHead(t *testing.T) {
db := dbTest.SetupDB(t)

genBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}}
if err := db.SaveBlock(context.Background(), genBlock); err != nil {
t.Fatal(err)
}
gRoot, err := stateutil.BlockRoot(genBlock.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil {
t.Fatal(err)
}

finalizedBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: []byte{'A'}}}
if err := db.SaveBlock(context.Background(), finalizedBlock); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -522,6 +546,18 @@ func TestServer_StreamChainHead_ContextCanceled(t *testing.T) {
func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
db := dbTest.SetupDB(t)

genBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}}
if err := db.SaveBlock(context.Background(), genBlock); err != nil {
t.Fatal(err)
}
gRoot, err := stateutil.BlockRoot(genBlock.Block)
if err != nil {
t.Fatal(err)
}
if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil {
t.Fatal(err)
}

finalizedBlock := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: []byte{'A'}}}
if err := db.SaveBlock(context.Background(), finalizedBlock); err != nil {
t.Fatal(err)
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/sync/rpc_status.go
Expand Up @@ -250,6 +250,7 @@ func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status, str
}
genesis := r.chain.GenesisTime()
finalizedEpoch := r.chain.FinalizedCheckpt().Epoch
finalizedRoot := r.chain.FinalizedCheckpt().Root
maxEpoch := slotutil.EpochsSinceGenesis(genesis)
// It would take a minimum of 2 epochs to finalize a
// previous epoch
Expand All @@ -265,6 +266,12 @@ func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status, str
if finalizedEpoch < msg.FinalizedEpoch {
return nil
}
finalizedAtGenesis := (finalizedEpoch == msg.FinalizedEpoch) && finalizedEpoch == 0
rootIsEqual := bytes.Equal(finalizedRoot, msg.FinalizedRoot)
// If both peers are at genesis with the same root hash, then exit.
if finalizedAtGenesis && rootIsEqual {
return nil
}
if !r.db.IsFinalizedBlock(context.Background(), bytesutil.ToBytes32(msg.FinalizedRoot)) {
return errInvalidFinalizedRoot
}
Expand Down
65 changes: 63 additions & 2 deletions beacon-chain/sync/rpc_status_test.go
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil"
)

func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
Expand Down Expand Up @@ -99,7 +99,68 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
}
}

func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.Host.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}
root := [32]byte{}

r := &Service{p2p: p1,
chain: &mock.ChainService{
Fork: &pb.Fork{
PreviousVersion: params.BeaconConfig().GenesisForkVersion,
CurrentVersion: params.BeaconConfig().GenesisForkVersion,
},
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
Root: params.BeaconConfig().ZeroHash[:],
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}}
pcl := protocol.ID("/testing")

var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, r, stream)
out := &pb.Status{}
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
t.Fatal(err)
}
if !bytes.Equal(out.FinalizedRoot, root[:]) {
t.Errorf("Expected finalized root of %#x but got %#x", root, out.FinalizedRoot)
}
})

stream1, err := p1.Host.NewStream(context.Background(), p2.Host.ID(), pcl)
if err != nil {
t.Fatal(err)
}
digest, err := r.forkDigest()
if err != nil {
t.Fatal(err)
}

err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: digest[:], FinalizedRoot: params.BeaconConfig().ZeroHash[:]}, stream1)
if err != nil {
t.Errorf("Expected no error but got %v", err)
}

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}

if len(p1.Host.Network().Peers()) != 1 {
t.Error("handler disconnected with peer")
}
}

func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
Expand Down

0 comments on commit e715339

Please sign in to comment.