-
Notifications
You must be signed in to change notification settings - Fork 975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Check Last Finalized Epoch #357
Changes from 43 commits
e448b3b
cc8d23a
9cb52a2
1fc7f15
6dcc47d
d809dc6
aebfbfa
630b2f3
843c476
7ad70ed
d6c15d4
f5ea3ff
704318e
8705194
2d4bdb1
6515947
7fa2847
bef7596
214e955
49836a6
6bc7952
b8f2823
cfddb90
1f0a05e
ce94ba3
52bcccd
71c06ba
be40cee
2fe3979
5648ca7
659f88f
d4cd0dd
b055e68
d8b8c60
5a64d17
7a9cd8d
6abf6d6
0e849d2
90c44ba
937bda6
acd4696
c1f2ea9
c304cb2
e3ebefd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package sync | |
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/prysmaticlabs/prysm/beacon-chain/types" | ||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" | ||
|
@@ -25,16 +26,21 @@ var log = logrus.WithField("prefix", "sync") | |
// * Drop peers that send invalid data | ||
// * Throttle incoming requests | ||
type Service struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
p2p types.P2P | ||
chainService types.ChainService | ||
announceBlockHashBuf chan p2p.Message | ||
blockBuf chan p2p.Message | ||
announceCrystallizedHashBuf chan p2p.Message | ||
crystallizedStateBuf chan p2p.Message | ||
announceActiveHashBuf chan p2p.Message | ||
activeStateBuf chan p2p.Message | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
p2p types.P2P | ||
chainService types.ChainService | ||
announceBlockHashBuf chan p2p.Message | ||
blockBuf chan p2p.Message | ||
announceCrystallizedHashBuf chan p2p.Message | ||
crystallizedStateBuf chan p2p.Message | ||
announceActiveHashBuf chan p2p.Message | ||
activeStateBuf chan p2p.Message | ||
syncMode Mode | ||
currentSlotNumber uint64 | ||
highestObservedSlot uint64 | ||
syncPollingInterval time.Duration | ||
initialCrystallizedStateHash [32]byte | ||
} | ||
|
||
// Config allows the channel's buffer sizes to be changed. | ||
|
@@ -45,42 +51,82 @@ type Config struct { | |
ActiveStateBufferSize int | ||
CrystallizedStateHashBufferSize int | ||
CrystallizedStateBufferSize int | ||
SyncMode Mode | ||
CurrentSlotNumber uint64 | ||
HighestObservedSlot uint64 | ||
SyncPollingInterval time.Duration | ||
} | ||
|
||
// Mode refers to the type for the sync mode of the client. | ||
type Mode int | ||
|
||
// This specifies the different sync modes. | ||
const ( | ||
SyncModeInitial Mode = 0 | ||
SyncModeDefault Mode = 1 | ||
) | ||
|
||
// DefaultConfig provides the default configuration for a sync service. | ||
func DefaultConfig() Config { | ||
|
||
return Config{ | ||
BlockHashBufferSize: 100, | ||
BlockBufferSize: 100, | ||
ActiveStateHashBufferSize: 100, | ||
ActiveStateBufferSize: 100, | ||
CrystallizedStateHashBufferSize: 100, | ||
CrystallizedStateBufferSize: 100, | ||
SyncMode: SyncModeDefault, | ||
CurrentSlotNumber: 0, | ||
HighestObservedSlot: 0, | ||
SyncPollingInterval: time.Second, | ||
} | ||
} | ||
|
||
// NewSyncService accepts a context and returns a new Service. | ||
func NewSyncService(ctx context.Context, cfg Config, beaconp2p types.P2P, cs types.ChainService) *Service { | ||
|
||
ctx, cancel := context.WithCancel(ctx) | ||
stored, err := cs.HasStoredState() | ||
|
||
if err != nil { | ||
log.Errorf("error retrieving stored state: %v", err) | ||
} | ||
|
||
if !stored { | ||
cfg.SyncMode = SyncModeInitial | ||
} | ||
|
||
return &Service{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
p2p: beaconp2p, | ||
chainService: cs, | ||
announceBlockHashBuf: make(chan p2p.Message, cfg.BlockHashBufferSize), | ||
blockBuf: make(chan p2p.Message, cfg.BlockBufferSize), | ||
announceCrystallizedHashBuf: make(chan p2p.Message, cfg.ActiveStateHashBufferSize), | ||
crystallizedStateBuf: make(chan p2p.Message, cfg.ActiveStateBufferSize), | ||
announceActiveHashBuf: make(chan p2p.Message, cfg.CrystallizedStateHashBufferSize), | ||
activeStateBuf: make(chan p2p.Message, cfg.CrystallizedStateBufferSize), | ||
ctx: ctx, | ||
cancel: cancel, | ||
p2p: beaconp2p, | ||
chainService: cs, | ||
announceBlockHashBuf: make(chan p2p.Message, cfg.BlockHashBufferSize), | ||
blockBuf: make(chan p2p.Message, cfg.BlockBufferSize), | ||
announceCrystallizedHashBuf: make(chan p2p.Message, cfg.ActiveStateHashBufferSize), | ||
crystallizedStateBuf: make(chan p2p.Message, cfg.ActiveStateBufferSize), | ||
announceActiveHashBuf: make(chan p2p.Message, cfg.CrystallizedStateHashBufferSize), | ||
activeStateBuf: make(chan p2p.Message, cfg.CrystallizedStateBufferSize), | ||
syncMode: cfg.SyncMode, | ||
currentSlotNumber: cfg.CurrentSlotNumber, | ||
highestObservedSlot: cfg.HighestObservedSlot, | ||
syncPollingInterval: cfg.SyncPollingInterval, | ||
initialCrystallizedStateHash: [32]byte{}, | ||
} | ||
} | ||
|
||
// Start begins the block processing goroutine. | ||
func (ss *Service) Start() { | ||
log.Info("Starting service") | ||
go ss.run(ss.ctx.Done()) | ||
switch ss.syncMode { | ||
case 0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps enum instead of int would be good for the type of sync |
||
log.Info("Starting initial sync") | ||
go ss.initialSync(time.NewTicker(ss.syncPollingInterval).C, ss.ctx.Done()) | ||
case 1: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, removed it |
||
go ss.run(ss.ctx.Done()) | ||
default: | ||
go ss.run(ss.ctx.Done()) | ||
|
||
} | ||
} | ||
|
||
// Stop kills the block processing goroutine, but does not wait until the goroutine exits. | ||
|
@@ -197,6 +243,156 @@ func (ss *Service) ReceiveActiveState(data *pb.ActiveState) error { | |
return nil | ||
} | ||
|
||
// RequestCrystallizedStateFromPeer sends a request to a peer for the corresponding crystallized state | ||
// for a beacon block. | ||
func (ss *Service) RequestCrystallizedStateFromPeer(data *pb.BeaconBlockResponse, peer p2p.Peer) error { | ||
block, err := types.NewBlock(data.Block) | ||
if err != nil { | ||
return fmt.Errorf("could not instantiate new block from proto: %v", err) | ||
} | ||
h := block.CrystallizedStateHash() | ||
log.Debugf("Successfully processed incoming block with crystallized state hash: %x", h) | ||
ss.p2p.Send(&pb.CrystallizedStateRequest{Hash: h[:]}, peer) | ||
return nil | ||
} | ||
|
||
// SetBlockForInitialSync sets the first received block as the base finalized | ||
// block for initial sync. | ||
func (ss *Service) SetBlockForInitialSync(data *pb.BeaconBlockResponse) error { | ||
|
||
block, err := types.NewBlock(data.Block) | ||
if err != nil { | ||
return fmt.Errorf("could not instantiate new block from proto: %v", err) | ||
} | ||
|
||
h, err := block.Hash() | ||
if err != nil { | ||
return err | ||
} | ||
log.WithField("Block received with hash", fmt.Sprintf("0x%x", h)).Debug("Crystallized state hash exists locally") | ||
|
||
if err := ss.writeBlockToDB(block); err != nil { | ||
return err | ||
} | ||
|
||
ss.initialCrystallizedStateHash = block.CrystallizedStateHash() | ||
|
||
log.Infof("Saved block with hash 0%x for initial sync", h) | ||
return nil | ||
} | ||
|
||
// requestNextBlock broadcasts a request for a block with the next slotnumber. | ||
func (ss *Service) requestNextBlock() { | ||
ss.p2p.Broadcast(&pb.BeaconBlockRequestBySlotNumber{SlotNumber: (ss.currentSlotNumber + 1)}) | ||
} | ||
|
||
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher | ||
// routine can be added to the chain. | ||
func (ss *Service) validateAndSaveNextBlock(data *pb.BeaconBlockResponse) error { | ||
block, err := types.NewBlock(data.Block) | ||
if err != nil { | ||
return fmt.Errorf("could not instantiate new block from proto: %v", err) | ||
} | ||
|
||
if ss.currentSlotNumber == uint64(0) { | ||
return fmt.Errorf("invalid slot number for syncing") | ||
} | ||
|
||
if (ss.currentSlotNumber + 1) == block.SlotNumber() { | ||
|
||
if err := ss.writeBlockToDB(block); err != nil { | ||
return err | ||
} | ||
ss.currentSlotNumber = block.SlotNumber() | ||
} | ||
return nil | ||
} | ||
|
||
// writeBlockToDB saves the corresponding block to the local DB. | ||
func (ss *Service) writeBlockToDB(block *types.Block) error { | ||
return ss.chainService.SaveBlock(block) | ||
} | ||
|
||
func (ss *Service) initialSync(delaychan <-chan time.Time, done <-chan struct{}) { | ||
blockSub := ss.p2p.Subscribe(pb.BeaconBlockResponse{}, ss.blockBuf) | ||
crystallizedStateSub := ss.p2p.Subscribe(pb.CrystallizedStateResponse{}, ss.crystallizedStateBuf) | ||
|
||
defer blockSub.Unsubscribe() | ||
defer crystallizedStateSub.Unsubscribe() | ||
for { | ||
select { | ||
case <-done: | ||
log.Infof("Exiting goroutine") | ||
return | ||
case <-delaychan: | ||
if ss.highestObservedSlot == ss.currentSlotNumber { | ||
log.Infof("Exiting initial sync and starting normal sync") | ||
go ss.run(ss.ctx.Done()) | ||
return | ||
} | ||
case msg := <-ss.blockBuf: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be in the for loop inside |
||
data, ok := msg.Data.(*pb.BeaconBlockResponse) | ||
// TODO: Handle this at p2p layer. | ||
if !ok { | ||
log.Errorf("Received malformed beacon block p2p message") | ||
continue | ||
} | ||
|
||
if data.Block.GetSlotNumber() > ss.highestObservedSlot { | ||
ss.highestObservedSlot = data.Block.GetSlotNumber() | ||
} | ||
|
||
if ss.currentSlotNumber == 0 { | ||
if ss.initialCrystallizedStateHash != [32]byte{} { | ||
continue | ||
} | ||
if err := ss.SetBlockForInitialSync(data); err != nil { | ||
log.Errorf("Could not set block for initial sync: %v", err) | ||
} | ||
if err := ss.RequestCrystallizedStateFromPeer(data, msg.Peer); err != nil { | ||
log.Errorf("Could not request crystallized state from peer: %v", err) | ||
} | ||
|
||
continue | ||
} | ||
|
||
if data.Block.GetSlotNumber() != (ss.currentSlotNumber + 1) { | ||
continue | ||
} | ||
|
||
if err := ss.validateAndSaveNextBlock(data); err != nil { | ||
log.Errorf("Unable to save block: %v", err) | ||
} | ||
ss.requestNextBlock() | ||
case msg := <-ss.crystallizedStateBuf: | ||
data, ok := msg.Data.(*pb.CrystallizedStateResponse) | ||
// TODO: Handle this at p2p layer. | ||
if !ok { | ||
log.Errorf("Received malformed crystallized state p2p message") | ||
continue | ||
} | ||
|
||
if ss.initialCrystallizedStateHash == [32]byte{} { | ||
continue | ||
} | ||
|
||
crystallizedState := types.NewCrystallizedState(data.CrystallizedState) | ||
hash, err := crystallizedState.Hash() | ||
if err != nil { | ||
log.Errorf("Unable to hash crytsallized state: %v", err) | ||
} | ||
|
||
if hash != ss.initialCrystallizedStateHash { | ||
continue | ||
} | ||
|
||
ss.currentSlotNumber = crystallizedState.LastFinalizedEpoch() | ||
ss.requestNextBlock() | ||
crystallizedStateSub.Unsubscribe() | ||
} | ||
} | ||
} | ||
|
||
func (ss *Service) run(done <-chan struct{}) { | ||
announceBlockHashSub := ss.p2p.Subscribe(pb.BeaconBlockHashAnnounce{}, ss.announceBlockHashBuf) | ||
blockSub := ss.p2p.Subscribe(pb.BeaconBlockResponse{}, ss.blockBuf) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So basically just checks if there is anything related to state that has been persisted before