From e715339aab4ab5cb36ea26e2dd953a7c36e749a1 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 2 Jun 2020 04:26:42 +0800 Subject: [PATCH] Fix Status Messages During Genesis (#6046) * fix status messages * fix finalized checkpoint * remove code * Merge refs/heads/master into fixStatusMessages * Update beacon-chain/blockchain/service.go * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * fix * fix test * fix again * fix * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages * Merge refs/heads/master into fixStatusMessages --- beacon-chain/blockchain/chain_info.go | 19 -------- beacon-chain/blockchain/service.go | 4 +- beacon-chain/rpc/beacon/blocks.go | 42 +++++++++++++---- beacon-chain/rpc/beacon/blocks_test.go | 36 ++++++++++++++ beacon-chain/sync/rpc_status.go | 7 +++ beacon-chain/sync/rpc_status_test.go | 65 +++++++++++++++++++++++++- 6 files changed, 141 insertions(+), 32 deletions(-) diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index dd123d27487..2ef7f758252 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -1,7 +1,6 @@ package blockchain import ( - "bytes" "context" "time" @@ -79,12 +78,6 @@ func (s *Service) FinalizedCheckpt() *ethpb.Checkpoint { return ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} } - // If head state exists but there hasn't been a finalized check point, - // the check point's root should refer to genesis block root. - if bytes.Equal(s.finalizedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) { - return ðpb.Checkpoint{Root: s.genesisRoot[:]} - } - return state.CopyCheckpoint(s.finalizedCheckpt) } @@ -94,12 +87,6 @@ func (s *Service) CurrentJustifiedCheckpt() *ethpb.Checkpoint { return ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} } - // If head state exists but there hasn't been a justified check point, - // the check point root should refer to genesis block root. - if bytes.Equal(s.justifiedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) { - return ðpb.Checkpoint{Root: s.genesisRoot[:]} - } - return state.CopyCheckpoint(s.justifiedCheckpt) } @@ -109,12 +96,6 @@ func (s *Service) PreviousJustifiedCheckpt() *ethpb.Checkpoint { return ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} } - // If head state exists but there hasn't been a justified check point, - // the check point root should refer to genesis block root. - if bytes.Equal(s.prevJustifiedCheckpt.Root, params.BeaconConfig().ZeroHash[:]) { - return ðpb.Checkpoint{Root: s.genesisRoot[:]} - } - return state.CopyCheckpoint(s.prevJustifiedCheckpt) } diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 1e886c5d60d..5f1ec80189a 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -353,9 +353,9 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.B return errors.Wrap(err, "could save genesis block root") } - genesisCheckpoint := ðpb.Checkpoint{Root: genesisBlkRoot[:]} + // Finalized checkpoint at genesis is a zero hash. + genesisCheckpoint := genesisState.FinalizedCheckpoint() - // Add the genesis block to the fork choice store. s.justifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint) s.prevJustifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint) s.bestJustifiedCheckpt = stateTrie.CopyCheckpoint(genesisCheckpoint) diff --git a/beacon-chain/rpc/beacon/blocks.go b/beacon-chain/rpc/beacon/blocks.go index b4d0df19f72..033540cc62f 100644 --- a/beacon-chain/rpc/beacon/blocks.go +++ b/beacon-chain/rpc/beacon/blocks.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/pagination" + "github.com/prysmaticlabs/prysm/shared/params" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -246,24 +247,47 @@ func (bs *Server) chainHeadRetrieval(ctx context.Context) (*ethpb.ChainHead, err return nil, status.Errorf(codes.Internal, "Could not get head block root: %v", err) } + isGenesis := func(cp *ethpb.Checkpoint) bool { + return bytesutil.ToBytes32(cp.Root) == params.BeaconConfig().ZeroHash && cp.Epoch == 0 + } + // Retrieve genesis block in the event we have genesis checkpoints. + genBlock, err := bs.BeaconDB.GenesisBlock(ctx) + if err != nil || genBlock == nil || genBlock.Block == nil { + return nil, status.Error(codes.Internal, "Could not get genesis block") + } + + var b *ethpb.SignedBeaconBlock + finalizedCheckpoint := bs.FinalizationFetcher.FinalizedCheckpt() - b, err := bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(finalizedCheckpoint.Root)) - if err != nil || b == nil || b.Block == nil { - return nil, status.Error(codes.Internal, "Could not get finalized block") + if isGenesis(finalizedCheckpoint) { + b = genBlock + } else { + b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(finalizedCheckpoint.Root)) + if err != nil || b == nil || b.Block == nil { + return nil, status.Error(codes.Internal, "Could not get finalized block") + } } finalizedSlot := b.Block.Slot justifiedCheckpoint := bs.FinalizationFetcher.CurrentJustifiedCheckpt() - b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(justifiedCheckpoint.Root)) - if err != nil || b == nil || b.Block == nil { - return nil, status.Error(codes.Internal, "Could not get justified block") + if isGenesis(justifiedCheckpoint) { + b = genBlock + } else { + b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(justifiedCheckpoint.Root)) + if err != nil || b == nil || b.Block == nil { + return nil, status.Error(codes.Internal, "Could not get justified block") + } } justifiedSlot := b.Block.Slot prevJustifiedCheckpoint := bs.FinalizationFetcher.PreviousJustifiedCheckpt() - b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(prevJustifiedCheckpoint.Root)) - if err != nil || b == nil || b.Block == nil { - return nil, status.Error(codes.Internal, "Could not get prev justified block") + if isGenesis(prevJustifiedCheckpoint) { + b = genBlock + } else { + b, err = bs.BeaconDB.Block(ctx, bytesutil.ToBytes32(prevJustifiedCheckpoint.Root)) + if err != nil || b == nil || b.Block == nil { + return nil, status.Error(codes.Internal, "Could not get prev justified block") + } } prevJustifiedSlot := b.Block.Slot diff --git a/beacon-chain/rpc/beacon/blocks_test.go b/beacon-chain/rpc/beacon/blocks_test.go index 09955d4b398..6ce537e1488 100644 --- a/beacon-chain/rpc/beacon/blocks_test.go +++ b/beacon-chain/rpc/beacon/blocks_test.go @@ -374,6 +374,18 @@ func TestServer_GetChainHead_NoFinalizedBlock(t *testing.T) { t.Fatal(err) } + genBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}} + if err := db.SaveBlock(context.Background(), genBlock); err != nil { + t.Fatal(err) + } + gRoot, err := stateutil.BlockRoot(genBlock.Block) + if err != nil { + t.Fatal(err) + } + if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil { + t.Fatal(err) + } + bs := &Server{ BeaconDB: db, HeadFetcher: &chainMock.ChainService{Block: ðpb.SignedBeaconBlock{}, State: s}, @@ -405,6 +417,18 @@ func TestServer_GetChainHead_NoHeadBlock(t *testing.T) { func TestServer_GetChainHead(t *testing.T) { db := dbTest.SetupDB(t) + genBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}} + if err := db.SaveBlock(context.Background(), genBlock); err != nil { + t.Fatal(err) + } + gRoot, err := stateutil.BlockRoot(genBlock.Block) + if err != nil { + t.Fatal(err) + } + if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil { + t.Fatal(err) + } + finalizedBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1, ParentRoot: []byte{'A'}}} if err := db.SaveBlock(context.Background(), finalizedBlock); err != nil { t.Fatal(err) @@ -522,6 +546,18 @@ func TestServer_StreamChainHead_ContextCanceled(t *testing.T) { func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) { db := dbTest.SetupDB(t) + genBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 0, ParentRoot: []byte{'G'}}} + if err := db.SaveBlock(context.Background(), genBlock); err != nil { + t.Fatal(err) + } + gRoot, err := stateutil.BlockRoot(genBlock.Block) + if err != nil { + t.Fatal(err) + } + if err := db.SaveGenesisBlockRoot(context.Background(), gRoot); err != nil { + t.Fatal(err) + } + finalizedBlock := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1, ParentRoot: []byte{'A'}}} if err := db.SaveBlock(context.Background(), finalizedBlock); err != nil { t.Fatal(err) diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 2162d1b22df..b19eee894d3 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -250,6 +250,7 @@ func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status, str } genesis := r.chain.GenesisTime() finalizedEpoch := r.chain.FinalizedCheckpt().Epoch + finalizedRoot := r.chain.FinalizedCheckpt().Root maxEpoch := slotutil.EpochsSinceGenesis(genesis) // It would take a minimum of 2 epochs to finalize a // previous epoch @@ -265,6 +266,12 @@ func (r *Service) validateStatusMessage(ctx context.Context, msg *pb.Status, str if finalizedEpoch < msg.FinalizedEpoch { return nil } + finalizedAtGenesis := (finalizedEpoch == msg.FinalizedEpoch) && finalizedEpoch == 0 + rootIsEqual := bytes.Equal(finalizedRoot, msg.FinalizedRoot) + // If both peers are at genesis with the same root hash, then exit. + if finalizedAtGenesis && rootIsEqual { + return nil + } if !r.db.IsFinalizedBlock(context.Background(), bytesutil.ToBytes32(msg.FinalizedRoot)) { return errInvalidFinalizedRoot } diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 11bcc85d0e1..2f9a2d20f24 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -24,7 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/testutil" ) -func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { +func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2) @@ -99,7 +99,68 @@ func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { } } -func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) { +func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + if len(p1.Host.Network().Peers()) != 1 { + t.Error("Expected peers to be connected") + } + root := [32]byte{} + + r := &Service{p2p: p1, + chain: &mock.ChainService{ + Fork: &pb.Fork{ + PreviousVersion: params.BeaconConfig().GenesisForkVersion, + CurrentVersion: params.BeaconConfig().GenesisForkVersion, + }, + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 0, + Root: params.BeaconConfig().ZeroHash[:], + }, + Genesis: time.Now(), + ValidatorsRoot: [32]byte{'A'}, + }} + pcl := protocol.ID("/testing") + + var wg sync.WaitGroup + wg.Add(1) + p2.Host.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + expectSuccess(t, r, stream) + out := &pb.Status{} + if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + t.Fatal(err) + } + if !bytes.Equal(out.FinalizedRoot, root[:]) { + t.Errorf("Expected finalized root of %#x but got %#x", root, out.FinalizedRoot) + } + }) + + stream1, err := p1.Host.NewStream(context.Background(), p2.Host.ID(), pcl) + if err != nil { + t.Fatal(err) + } + digest, err := r.forkDigest() + if err != nil { + t.Fatal(err) + } + + err = r.statusRPCHandler(context.Background(), &pb.Status{ForkDigest: digest[:], FinalizedRoot: params.BeaconConfig().ZeroHash[:]}, stream1) + if err != nil { + t.Errorf("Expected no error but got %v", err) + } + + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + + if len(p1.Host.Network().Peers()) != 1 { + t.Error("handler disconnected with peer") + } +} + +func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) p1.Connect(p2)