Skip to content

Commit

Permalink
Check Last Finalized Epoch (#357)
Browse files Browse the repository at this point in the history
* Adding proto topics

* Adding crystallized state request

* Reverting shanges to proto

* Adding db checks

* get crystallised state

* Finalize epoch for mapping

* more changes

* cleaning up main routine

* adding tests

* fix test

* Adding ability to save blocks

* Adding block fetcher

* Another test for setting finalized epoch

* adding final tests

* finished tests

* adding comments

* gazelle

* Making requested changes

* Fixing lint

* stop sync from exiting

* fixing lint

* lint

* Adding new request type to proto

* Making changes to block/state requests

* Change tests

* fixing error messages

* gazelle and lint

* adding back crystallised state

* fix tests

* Fixing merge conflicts

* Addressing review comments

* Changing back to one service

* removing  case
  • Loading branch information
Nishant Das committed Aug 7, 2018
1 parent 36965be commit 2cf7fa0
Show file tree
Hide file tree
Showing 11 changed files with 722 additions and 202 deletions.
16 changes: 16 additions & 0 deletions beacon-chain/blockchain/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,19 @@ func (b *BeaconChain) resetAttesterBitfield() {
newbitfields := make([]byte, b.CrystallizedState().ActiveValidatorsLength()/8)
b.state.ActiveState.SetAttesterBitfield(newbitfields)
}

func (b *BeaconChain) saveBlock(block *types.Block) error {
encodedState, err := block.Marshal()
if err != nil {
return err
}
hash, err := block.Hash()
if err != nil {
return err
}

return b.db.Put(hash[:], encodedState)
}

// Slashing Condtions
// TODO: Implement all the conditions and add in the methods once the spec is updated
25 changes: 25 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ func (c *ChainService) Stop() error {
return nil
}

// HasStoredState checks if there is any Crystallized/Active State or blocks(not implemented) are
// persisted to the db.
func (c *ChainService) HasStoredState() (bool, error) {

hasActive, err := c.beaconDB.DB().Has([]byte(activeStateLookupKey))
if err != nil {
return false, err
}
hasCrystallized, err := c.beaconDB.DB().Has([]byte(crystallizedStateLookupKey))
if err != nil {
return false, err
}
if !hasActive || !hasCrystallized {
return false, nil
}

return true, nil
}

// ProcessedBlockHashes by the chain service.
func (c *ChainService) ProcessedBlockHashes() [][32]byte {
return c.processedBlockHashes
Expand Down Expand Up @@ -100,6 +119,12 @@ func (c *ChainService) ProcessBlock(block *types.Block) error {
return nil
}

// SaveBlock is a mock which saves a block to the local db using the
// blockhash as the key.
func (c *ChainService) SaveBlock(block *types.Block) error {
return c.chain.saveBlock(block)
}

// ProcessCrystallizedState accepts a new crystallized state object for inclusion in the chain.
func (c *ChainService) ProcessCrystallizedState(state *types.CrystallizedState) error {
h, err := state.Hash()
Expand Down
240 changes: 217 additions & 23 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"fmt"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand All @@ -25,16 +26,21 @@ var log = logrus.WithField("prefix", "sync")
// * Drop peers that send invalid data
// * Throttle incoming requests
type Service struct {
ctx context.Context
cancel context.CancelFunc
p2p types.P2P
chainService types.ChainService
announceBlockHashBuf chan p2p.Message
blockBuf chan p2p.Message
announceCrystallizedHashBuf chan p2p.Message
crystallizedStateBuf chan p2p.Message
announceActiveHashBuf chan p2p.Message
activeStateBuf chan p2p.Message
ctx context.Context
cancel context.CancelFunc
p2p types.P2P
chainService types.ChainService
announceBlockHashBuf chan p2p.Message
blockBuf chan p2p.Message
announceCrystallizedHashBuf chan p2p.Message
crystallizedStateBuf chan p2p.Message
announceActiveHashBuf chan p2p.Message
activeStateBuf chan p2p.Message
syncMode Mode
currentSlotNumber uint64
highestObservedSlot uint64
syncPollingInterval time.Duration
initialCrystallizedStateHash [32]byte
}

// Config allows the channel's buffer sizes to be changed.
Expand All @@ -45,42 +51,80 @@ type Config struct {
ActiveStateBufferSize int
CrystallizedStateHashBufferSize int
CrystallizedStateBufferSize int
SyncMode Mode
CurrentSlotNumber uint64
HighestObservedSlot uint64
SyncPollingInterval time.Duration
}

// Mode refers to the type for the sync mode of the client.
type Mode int

// This specifies the different sync modes.
const (
SyncModeInitial Mode = 0
SyncModeDefault Mode = 1
)

// DefaultConfig provides the default configuration for a sync service.
func DefaultConfig() Config {

return Config{
BlockHashBufferSize: 100,
BlockBufferSize: 100,
ActiveStateHashBufferSize: 100,
ActiveStateBufferSize: 100,
CrystallizedStateHashBufferSize: 100,
CrystallizedStateBufferSize: 100,
SyncMode: SyncModeDefault,
CurrentSlotNumber: 0,
HighestObservedSlot: 0,
SyncPollingInterval: time.Second,
}
}

// NewSyncService accepts a context and returns a new Service.
func NewSyncService(ctx context.Context, cfg Config, beaconp2p types.P2P, cs types.ChainService) *Service {

ctx, cancel := context.WithCancel(ctx)
stored, err := cs.HasStoredState()

if err != nil {
log.Errorf("error retrieving stored state: %v", err)
}

if !stored {
cfg.SyncMode = SyncModeInitial
}

return &Service{
ctx: ctx,
cancel: cancel,
p2p: beaconp2p,
chainService: cs,
announceBlockHashBuf: make(chan p2p.Message, cfg.BlockHashBufferSize),
blockBuf: make(chan p2p.Message, cfg.BlockBufferSize),
announceCrystallizedHashBuf: make(chan p2p.Message, cfg.ActiveStateHashBufferSize),
crystallizedStateBuf: make(chan p2p.Message, cfg.ActiveStateBufferSize),
announceActiveHashBuf: make(chan p2p.Message, cfg.CrystallizedStateHashBufferSize),
activeStateBuf: make(chan p2p.Message, cfg.CrystallizedStateBufferSize),
ctx: ctx,
cancel: cancel,
p2p: beaconp2p,
chainService: cs,
announceBlockHashBuf: make(chan p2p.Message, cfg.BlockHashBufferSize),
blockBuf: make(chan p2p.Message, cfg.BlockBufferSize),
announceCrystallizedHashBuf: make(chan p2p.Message, cfg.ActiveStateHashBufferSize),
crystallizedStateBuf: make(chan p2p.Message, cfg.ActiveStateBufferSize),
announceActiveHashBuf: make(chan p2p.Message, cfg.CrystallizedStateHashBufferSize),
activeStateBuf: make(chan p2p.Message, cfg.CrystallizedStateBufferSize),
syncMode: cfg.SyncMode,
currentSlotNumber: cfg.CurrentSlotNumber,
highestObservedSlot: cfg.HighestObservedSlot,
syncPollingInterval: cfg.SyncPollingInterval,
initialCrystallizedStateHash: [32]byte{},
}
}

// Start begins the block processing goroutine.
func (ss *Service) Start() {
log.Info("Starting service")
go ss.run(ss.ctx.Done())
switch ss.syncMode {
case 0:
log.Info("Starting initial sync")
go ss.initialSync(time.NewTicker(ss.syncPollingInterval).C, ss.ctx.Done())
default:
go ss.run(ss.ctx.Done())

}
}

// Stop kills the block processing goroutine, but does not wait until the goroutine exits.
Expand Down Expand Up @@ -197,6 +241,156 @@ func (ss *Service) ReceiveActiveState(data *pb.ActiveState) error {
return nil
}

// RequestCrystallizedStateFromPeer sends a request to a peer for the corresponding crystallized state
// for a beacon block.
func (ss *Service) RequestCrystallizedStateFromPeer(data *pb.BeaconBlockResponse, peer p2p.Peer) error {
block, err := types.NewBlock(data.Block)
if err != nil {
return fmt.Errorf("could not instantiate new block from proto: %v", err)
}
h := block.CrystallizedStateHash()
log.Debugf("Successfully processed incoming block with crystallized state hash: %x", h)
ss.p2p.Send(&pb.CrystallizedStateRequest{Hash: h[:]}, peer)
return nil
}

// SetBlockForInitialSync sets the first received block as the base finalized
// block for initial sync.
func (ss *Service) SetBlockForInitialSync(data *pb.BeaconBlockResponse) error {

block, err := types.NewBlock(data.Block)
if err != nil {
return fmt.Errorf("could not instantiate new block from proto: %v", err)
}

h, err := block.Hash()
if err != nil {
return err
}
log.WithField("Block received with hash", fmt.Sprintf("0x%x", h)).Debug("Crystallized state hash exists locally")

if err := ss.writeBlockToDB(block); err != nil {
return err
}

ss.initialCrystallizedStateHash = block.CrystallizedStateHash()

log.Infof("Saved block with hash 0%x for initial sync", h)
return nil
}

// requestNextBlock broadcasts a request for a block with the next slotnumber.
func (ss *Service) requestNextBlock() {
ss.p2p.Broadcast(&pb.BeaconBlockRequestBySlotNumber{SlotNumber: (ss.currentSlotNumber + 1)})
}

// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (ss *Service) validateAndSaveNextBlock(data *pb.BeaconBlockResponse) error {
block, err := types.NewBlock(data.Block)
if err != nil {
return fmt.Errorf("could not instantiate new block from proto: %v", err)
}

if ss.currentSlotNumber == uint64(0) {
return fmt.Errorf("invalid slot number for syncing")
}

if (ss.currentSlotNumber + 1) == block.SlotNumber() {

if err := ss.writeBlockToDB(block); err != nil {
return err
}
ss.currentSlotNumber = block.SlotNumber()
}
return nil
}

// writeBlockToDB saves the corresponding block to the local DB.
func (ss *Service) writeBlockToDB(block *types.Block) error {
return ss.chainService.SaveBlock(block)
}

func (ss *Service) initialSync(delaychan <-chan time.Time, done <-chan struct{}) {
blockSub := ss.p2p.Subscribe(pb.BeaconBlockResponse{}, ss.blockBuf)
crystallizedStateSub := ss.p2p.Subscribe(pb.CrystallizedStateResponse{}, ss.crystallizedStateBuf)

defer blockSub.Unsubscribe()
defer crystallizedStateSub.Unsubscribe()
for {
select {
case <-done:
log.Infof("Exiting goroutine")
return
case <-delaychan:
if ss.highestObservedSlot == ss.currentSlotNumber {
log.Infof("Exiting initial sync and starting normal sync")
go ss.run(ss.ctx.Done())
return
}
case msg := <-ss.blockBuf:
data, ok := msg.Data.(*pb.BeaconBlockResponse)
// TODO: Handle this at p2p layer.
if !ok {
log.Errorf("Received malformed beacon block p2p message")
continue
}

if data.Block.GetSlotNumber() > ss.highestObservedSlot {
ss.highestObservedSlot = data.Block.GetSlotNumber()
}

if ss.currentSlotNumber == 0 {
if ss.initialCrystallizedStateHash != [32]byte{} {
continue
}
if err := ss.SetBlockForInitialSync(data); err != nil {
log.Errorf("Could not set block for initial sync: %v", err)
}
if err := ss.RequestCrystallizedStateFromPeer(data, msg.Peer); err != nil {
log.Errorf("Could not request crystallized state from peer: %v", err)
}

continue
}

if data.Block.GetSlotNumber() != (ss.currentSlotNumber + 1) {
continue
}

if err := ss.validateAndSaveNextBlock(data); err != nil {
log.Errorf("Unable to save block: %v", err)
}
ss.requestNextBlock()
case msg := <-ss.crystallizedStateBuf:
data, ok := msg.Data.(*pb.CrystallizedStateResponse)
// TODO: Handle this at p2p layer.
if !ok {
log.Errorf("Received malformed crystallized state p2p message")
continue
}

if ss.initialCrystallizedStateHash == [32]byte{} {
continue
}

crystallizedState := types.NewCrystallizedState(data.CrystallizedState)
hash, err := crystallizedState.Hash()
if err != nil {
log.Errorf("Unable to hash crytsallized state: %v", err)
}

if hash != ss.initialCrystallizedStateHash {
continue
}

ss.currentSlotNumber = crystallizedState.LastFinalizedEpoch()
ss.requestNextBlock()
crystallizedStateSub.Unsubscribe()
}
}
}

func (ss *Service) run(done <-chan struct{}) {
announceBlockHashSub := ss.p2p.Subscribe(pb.BeaconBlockHashAnnounce{}, ss.announceBlockHashBuf)
blockSub := ss.p2p.Subscribe(pb.BeaconBlockResponse{}, ss.blockBuf)
Expand Down
Loading

0 comments on commit 2cf7fa0

Please sign in to comment.