-
Notifications
You must be signed in to change notification settings - Fork 975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract Initial Sync Into its Own Package #404
Merged
nisdas
merged 18 commits into
prysmaticlabs:master
from
rawfalafel:extract-initial-sync
Aug 22, 2018
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
c9a9fa0
Initial commit to extract initial sync code
d208396
Add stop/resume of sync/intialsync
dcad819
bazel run
1f7996c
gometalinter
24a5c88
add tests
af86ba7
cleanup
b6a2227
Merge remote-tracking branch 'origin/master' into extract-initial-sync
378cc44
Merge branch 'master' into extract-initial-sync
prestonvanloon 3a424ba
build fixes
a40a189
use infof rather than errorf if sync is not in correct state
765dd26
Add TODO comment detailing eventual completion of transition from ini…
5c18bcd
fix build
92a5305
pr comments
923bfee
merge origin/master
562eedf
fix tests
c009958
Merge remote-tracking branch 'origin/master' into extract-initial-sync
8c568db
bump
4c0fe06
Merge branch 'master' into extract-initial-sync
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||
|
||
go_library( | ||
name = "go_default_library", | ||
srcs = ["service.go"], | ||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync", | ||
visibility = ["//beacon-chain:__subpackages__"], | ||
deps = [ | ||
"//beacon-chain/types:go_default_library", | ||
"//proto/beacon/p2p/v1:go_default_library", | ||
"//shared/p2p:go_default_library", | ||
"@com_github_sirupsen_logrus//:go_default_library", | ||
], | ||
) | ||
|
||
go_test( | ||
name = "go_default_test", | ||
srcs = ["service_test.go"], | ||
embed = [":go_default_library"], | ||
deps = [ | ||
"//beacon-chain/types:go_default_library", | ||
"//proto/beacon/p2p/v1:go_default_library", | ||
"//shared/p2p:go_default_library", | ||
"//shared/testutil:go_default_library", | ||
"@com_github_ethereum_go_ethereum//event:go_default_library", | ||
"@com_github_sirupsen_logrus//hooks/test:go_default_library", | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,275 @@ | ||
// Package initialsync is run by the beacon node when the local chain is | ||
// behind the network's longest chain. Initial sync works as follows: | ||
// The node requests for the slot number of the most recent finalized block. | ||
// The node then builds from the most recent finalized block by requesting for subsequent | ||
// blocks by slot number. Once the service detects that the local chain is caught up with | ||
// the network, the service hands over control to the regular sync service. | ||
// Note: The behavior of initialsync will likely change as the specification changes. | ||
// The most significant and highly probable change will be determining where to sync from. | ||
// The beacon chain may sync from a block in the pasts X months in order to combat long-range attacks | ||
// (see here: https://github.com/ethereum/wiki/wiki/Proof-of-Stake-FAQs#what-is-weak-subjectivity) | ||
package initialsync | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/prysmaticlabs/prysm/beacon-chain/types" | ||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" | ||
"github.com/prysmaticlabs/prysm/shared/p2p" | ||
"github.com/sirupsen/logrus" | ||
) | ||
|
||
var log = logrus.WithField("prefix", "initialsync") | ||
|
||
// Config defines the configurable properties of InitialSync. | ||
// | ||
type Config struct { | ||
SyncPollingInterval time.Duration | ||
BlockBufferSize int | ||
CrystallizedStateBufferSize int | ||
} | ||
|
||
// DefaultConfig provides the default configuration for a sync service. | ||
// SyncPollingInterval determines how frequently the service checks that initial sync is complete. | ||
// BlockBufferSize determines that buffer size of the `blockBuf` channel. | ||
// CrystallizedStateBufferSize determines the buffer size of thhe `crystallizedStateBuf` channel. | ||
func DefaultConfig() Config { | ||
return Config{ | ||
SyncPollingInterval: 1 * time.Second, | ||
BlockBufferSize: 100, | ||
CrystallizedStateBufferSize: 100, | ||
} | ||
} | ||
|
||
// ChainService is the interface for the blockchain package's ChainService struct. | ||
type ChainService interface { | ||
HasStoredState() (bool, error) | ||
SaveBlock(*types.Block) error | ||
} | ||
|
||
// SyncService is the interface for the Sync service. | ||
// InitialSync calls `Start` when initial sync completes. | ||
type SyncService interface { | ||
Start() | ||
} | ||
|
||
// InitialSync defines the main class in this package. | ||
// See the package comments for a general description of the service's functions. | ||
type InitialSync struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
p2p types.P2P | ||
chainService ChainService | ||
syncService SyncService | ||
blockBuf chan p2p.Message | ||
crystallizedStateBuf chan p2p.Message | ||
currentSlotNumber uint64 | ||
syncPollingInterval time.Duration | ||
initialCrystallizedStateHash [32]byte | ||
} | ||
|
||
// NewInitialSyncService constructs a new InitialSyncService. | ||
// This method is normally called by the main node. | ||
func NewInitialSyncService(ctx context.Context, | ||
cfg Config, | ||
beaconp2p types.P2P, | ||
chainService ChainService, | ||
syncService SyncService, | ||
) *InitialSync { | ||
ctx, cancel := context.WithCancel(ctx) | ||
|
||
blockBuf := make(chan p2p.Message, cfg.BlockBufferSize) | ||
crystallizedStateBuf := make(chan p2p.Message, cfg.CrystallizedStateBufferSize) | ||
|
||
return &InitialSync{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
p2p: beaconp2p, | ||
chainService: chainService, | ||
syncService: syncService, | ||
blockBuf: blockBuf, | ||
crystallizedStateBuf: crystallizedStateBuf, | ||
syncPollingInterval: cfg.SyncPollingInterval, | ||
} | ||
} | ||
|
||
// Start begins the goroutine. | ||
func (s *InitialSync) Start() { | ||
stored, err := s.chainService.HasStoredState() | ||
if err != nil { | ||
log.Errorf("error retrieving stored state: %v", err) | ||
return | ||
} | ||
|
||
if stored { | ||
// TODO: Bail out of the sync service if the chain is only partially synced. | ||
log.Info("Chain state detected, exiting initial sync") | ||
return | ||
} | ||
|
||
go func() { | ||
ticker := time.NewTicker(s.syncPollingInterval) | ||
s.run(ticker.C) | ||
ticker.Stop() | ||
}() | ||
} | ||
|
||
// Stop kills the initial sync goroutine. | ||
func (s *InitialSync) Stop() error { | ||
log.Info("Stopping service") | ||
s.cancel() | ||
return nil | ||
} | ||
|
||
// run is the main goroutine for the initial sync service. | ||
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout. | ||
// It is assumed that the goroutine `run` is only called once per instance. | ||
func (s *InitialSync) run(delaychan <-chan time.Time) { | ||
blockSub := s.p2p.Subscribe(pb.BeaconBlockResponse{}, s.blockBuf) | ||
crystallizedStateSub := s.p2p.Subscribe(pb.CrystallizedStateResponse{}, s.crystallizedStateBuf) | ||
defer func() { | ||
blockSub.Unsubscribe() | ||
crystallizedStateSub.Unsubscribe() | ||
close(s.blockBuf) | ||
close(s.crystallizedStateBuf) | ||
}() | ||
|
||
highestObservedSlot := uint64(0) | ||
|
||
for { | ||
select { | ||
case <-s.ctx.Done(): | ||
log.Debug("Exiting goroutine") | ||
return | ||
case <-delaychan: | ||
if highestObservedSlot == s.currentSlotNumber { | ||
log.Info("Exiting initial sync and starting normal sync") | ||
// TODO: Resume sync after completion of initial sync. | ||
// See comment in Sync service's Start function for explanation. | ||
return | ||
} | ||
case msg := <-s.blockBuf: | ||
data, ok := msg.Data.(*pb.BeaconBlockResponse) | ||
// TODO: Handle this at p2p layer. | ||
if !ok { | ||
log.Error("Received malformed beacon block p2p message") | ||
continue | ||
} | ||
|
||
if data.Block.GetSlotNumber() > highestObservedSlot { | ||
highestObservedSlot = data.Block.GetSlotNumber() | ||
} | ||
|
||
if s.currentSlotNumber == 0 { | ||
if s.initialCrystallizedStateHash != [32]byte{} { | ||
continue | ||
} | ||
if err := s.setBlockForInitialSync(data); err != nil { | ||
log.Errorf("Could not set block for initial sync: %v", err) | ||
} | ||
if err := s.requestCrystallizedStateFromPeer(data, msg.Peer); err != nil { | ||
log.Errorf("Could not request crystallized state from peer: %v", err) | ||
} | ||
|
||
continue | ||
} | ||
|
||
if data.Block.GetSlotNumber() != (s.currentSlotNumber + 1) { | ||
continue | ||
} | ||
|
||
if err := s.validateAndSaveNextBlock(data); err != nil { | ||
log.Errorf("Unable to save block: %v", err) | ||
} | ||
s.requestNextBlock() | ||
case msg := <-s.crystallizedStateBuf: | ||
data, ok := msg.Data.(*pb.CrystallizedStateResponse) | ||
// TODO: Handle this at p2p layer. | ||
if !ok { | ||
log.Error("Received malformed crystallized state p2p message") | ||
continue | ||
} | ||
|
||
if s.initialCrystallizedStateHash == [32]byte{} { | ||
continue | ||
} | ||
|
||
crystallizedState := types.NewCrystallizedState(data.CrystallizedState) | ||
hash, err := crystallizedState.Hash() | ||
if err != nil { | ||
log.Errorf("Unable to hash crytsallized state: %v", err) | ||
} | ||
|
||
if hash != s.initialCrystallizedStateHash { | ||
continue | ||
} | ||
|
||
s.currentSlotNumber = crystallizedState.LastFinalizedSlot() | ||
s.requestNextBlock() | ||
crystallizedStateSub.Unsubscribe() | ||
} | ||
} | ||
} | ||
|
||
// requestCrystallizedStateFromPeer sends a request to a peer for the corresponding crystallized state | ||
// for a beacon block. | ||
func (s *InitialSync) requestCrystallizedStateFromPeer(data *pb.BeaconBlockResponse, peer p2p.Peer) error { | ||
block := types.NewBlock(data.Block) | ||
h := block.CrystallizedStateHash() | ||
log.Debugf("Successfully processed incoming block with crystallized state hash: %x", h) | ||
s.p2p.Send(&pb.CrystallizedStateRequest{Hash: h[:]}, peer) | ||
return nil | ||
} | ||
|
||
// setBlockForInitialSync sets the first received block as the base finalized | ||
// block for initial sync. | ||
func (s *InitialSync) setBlockForInitialSync(data *pb.BeaconBlockResponse) error { | ||
block := types.NewBlock(data.Block) | ||
|
||
h, err := block.Hash() | ||
if err != nil { | ||
return err | ||
} | ||
log.WithField("blockhash", fmt.Sprintf("%x", h)).Debug("Crystallized state hash exists locally") | ||
|
||
if err := s.writeBlockToDB(block); err != nil { | ||
return err | ||
} | ||
|
||
s.initialCrystallizedStateHash = block.CrystallizedStateHash() | ||
|
||
log.Infof("Saved block with hash %x for initial sync", h) | ||
return nil | ||
} | ||
|
||
// requestNextBlock broadcasts a request for a block with the next slotnumber. | ||
func (s *InitialSync) requestNextBlock() { | ||
s.p2p.Broadcast(&pb.BeaconBlockRequestBySlotNumber{SlotNumber: (s.currentSlotNumber + 1)}) | ||
} | ||
|
||
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher | ||
// routine can be added to the chain. | ||
func (s *InitialSync) validateAndSaveNextBlock(data *pb.BeaconBlockResponse) error { | ||
block := types.NewBlock(data.Block) | ||
|
||
if s.currentSlotNumber == uint64(0) { | ||
return errors.New("invalid slot number for syncing") | ||
} | ||
|
||
if (s.currentSlotNumber + 1) == block.SlotNumber() { | ||
|
||
if err := s.writeBlockToDB(block); err != nil { | ||
return err | ||
} | ||
s.currentSlotNumber = block.SlotNumber() | ||
} | ||
return nil | ||
} | ||
|
||
// writeBlockToDB saves the corresponding block to the local DB. | ||
func (s *InitialSync) writeBlockToDB(block *types.Block) error { | ||
return s.chainService.SaveBlock(block) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add package level comment?