Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Initial Sync Into its Own Package #404

Merged
merged 18 commits into from
Aug 22, 2018
Merged
1 change: 1 addition & 0 deletions beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//beacon-chain/rpc:go_default_library",
"//beacon-chain/simulator:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/initial-sync:go_default_library",
"//beacon-chain/utils:go_default_library",
"//shared:go_default_library",
"//shared/cmd:go_default_library",
Expand Down
25 changes: 25 additions & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/rpc"
"github.com/prysmaticlabs/prysm/beacon-chain/simulator"
rbcsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
initialsync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
Expand Down Expand Up @@ -76,6 +77,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}

if err := beacon.registerInitialSyncService(); err != nil {
return nil, err
}

if err := beacon.registerRPCService(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -204,6 +209,26 @@ func (b *BeaconNode) registerSyncService() error {
return b.services.RegisterService(syncService)
}

func (b *BeaconNode) registerInitialSyncService() error {
var p2pService *p2p.Server
if err := b.services.FetchService(&p2pService); err != nil {
return err
}

var chainService *blockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil {
return err
}

var syncService *rbcsync.Service
if err := b.services.FetchService(&syncService); err != nil {
return err
}

initialSyncService := initialsync.NewInitialSyncService(context.Background(), initialsync.DefaultConfig(), p2pService, chainService, syncService)
return b.services.RegisterService(initialSyncService)
}

func (b *BeaconNode) registerSimulatorService(ctx *cli.Context) error {
if !ctx.GlobalBool(utils.SimulatorFlag.Name) {
return nil
Expand Down
28 changes: 28 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/p2p:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/types:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/p2p:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
275 changes: 275 additions & 0 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
// Package initialsync is run by the beacon node when the local chain is
// behind the network's longest chain. Initial sync works as follows:
// The node requests for the slot number of the most recent finalized block.
// The node then builds from the most recent finalized block by requesting for subsequent
// blocks by slot number. Once the service detects that the local chain is caught up with
// the network, the service hands over control to the regular sync service.
// Note: The behavior of initialsync will likely change as the specification changes.
// The most significant and highly probable change will be determining where to sync from.
// The beacon chain may sync from a block in the pasts X months in order to combat long-range attacks
// (see here: https://github.com/ethereum/wiki/wiki/Proof-of-Stake-FAQs#what-is-weak-subjectivity)
package initialsync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add package level comment?


import (
"context"
"errors"
"fmt"
"time"

"github.com/prysmaticlabs/prysm/beacon-chain/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "initialsync")

// Config defines the configurable properties of InitialSync.
//
type Config struct {
SyncPollingInterval time.Duration
BlockBufferSize int
CrystallizedStateBufferSize int
}

// DefaultConfig provides the default configuration for a sync service.
// SyncPollingInterval determines how frequently the service checks that initial sync is complete.
// BlockBufferSize determines that buffer size of the `blockBuf` channel.
// CrystallizedStateBufferSize determines the buffer size of thhe `crystallizedStateBuf` channel.
func DefaultConfig() Config {
return Config{
SyncPollingInterval: 1 * time.Second,
BlockBufferSize: 100,
CrystallizedStateBufferSize: 100,
}
}

// ChainService is the interface for the blockchain package's ChainService struct.
type ChainService interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see this was what you meant by defining small interfaces within the packages themselves instead of relying on a single interfaces.go file when we only need a subset of methods. I like this pattern and we can split up the large interfaces we have in the types package into something like this.

HasStoredState() (bool, error)
SaveBlock(*types.Block) error
}

// SyncService is the interface for the Sync service.
// InitialSync calls `Start` when initial sync completes.
type SyncService interface {
Start()
}

// InitialSync defines the main class in this package.
// See the package comments for a general description of the service's functions.
type InitialSync struct {
ctx context.Context
cancel context.CancelFunc
p2p types.P2P
chainService ChainService
syncService SyncService
blockBuf chan p2p.Message
crystallizedStateBuf chan p2p.Message
currentSlotNumber uint64
syncPollingInterval time.Duration
initialCrystallizedStateHash [32]byte
}

// NewInitialSyncService constructs a new InitialSyncService.
// This method is normally called by the main node.
func NewInitialSyncService(ctx context.Context,
cfg Config,
beaconp2p types.P2P,
chainService ChainService,
syncService SyncService,
) *InitialSync {
ctx, cancel := context.WithCancel(ctx)

blockBuf := make(chan p2p.Message, cfg.BlockBufferSize)
crystallizedStateBuf := make(chan p2p.Message, cfg.CrystallizedStateBufferSize)

return &InitialSync{
ctx: ctx,
cancel: cancel,
p2p: beaconp2p,
chainService: chainService,
syncService: syncService,
blockBuf: blockBuf,
crystallizedStateBuf: crystallizedStateBuf,
syncPollingInterval: cfg.SyncPollingInterval,
}
}

// Start begins the goroutine.
func (s *InitialSync) Start() {
stored, err := s.chainService.HasStoredState()
if err != nil {
log.Errorf("error retrieving stored state: %v", err)
return
}

if stored {
// TODO: Bail out of the sync service if the chain is only partially synced.
log.Info("Chain state detected, exiting initial sync")
return
}

go func() {
ticker := time.NewTicker(s.syncPollingInterval)
s.run(ticker.C)
ticker.Stop()
}()
}

// Stop kills the initial sync goroutine.
func (s *InitialSync) Stop() error {
log.Info("Stopping service")
s.cancel()
return nil
}

// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
func (s *InitialSync) run(delaychan <-chan time.Time) {
blockSub := s.p2p.Subscribe(pb.BeaconBlockResponse{}, s.blockBuf)
crystallizedStateSub := s.p2p.Subscribe(pb.CrystallizedStateResponse{}, s.crystallizedStateBuf)
defer func() {
blockSub.Unsubscribe()
crystallizedStateSub.Unsubscribe()
close(s.blockBuf)
close(s.crystallizedStateBuf)
}()

highestObservedSlot := uint64(0)

for {
select {
case <-s.ctx.Done():
log.Debug("Exiting goroutine")
return
case <-delaychan:
if highestObservedSlot == s.currentSlotNumber {
log.Info("Exiting initial sync and starting normal sync")
// TODO: Resume sync after completion of initial sync.
// See comment in Sync service's Start function for explanation.
return
}
case msg := <-s.blockBuf:
data, ok := msg.Data.(*pb.BeaconBlockResponse)
// TODO: Handle this at p2p layer.
if !ok {
log.Error("Received malformed beacon block p2p message")
continue
}

if data.Block.GetSlotNumber() > highestObservedSlot {
highestObservedSlot = data.Block.GetSlotNumber()
}

if s.currentSlotNumber == 0 {
if s.initialCrystallizedStateHash != [32]byte{} {
continue
}
if err := s.setBlockForInitialSync(data); err != nil {
log.Errorf("Could not set block for initial sync: %v", err)
}
if err := s.requestCrystallizedStateFromPeer(data, msg.Peer); err != nil {
log.Errorf("Could not request crystallized state from peer: %v", err)
}

continue
}

if data.Block.GetSlotNumber() != (s.currentSlotNumber + 1) {
continue
}

if err := s.validateAndSaveNextBlock(data); err != nil {
log.Errorf("Unable to save block: %v", err)
}
s.requestNextBlock()
case msg := <-s.crystallizedStateBuf:
data, ok := msg.Data.(*pb.CrystallizedStateResponse)
// TODO: Handle this at p2p layer.
if !ok {
log.Error("Received malformed crystallized state p2p message")
continue
}

if s.initialCrystallizedStateHash == [32]byte{} {
continue
}

crystallizedState := types.NewCrystallizedState(data.CrystallizedState)
hash, err := crystallizedState.Hash()
if err != nil {
log.Errorf("Unable to hash crytsallized state: %v", err)
}

if hash != s.initialCrystallizedStateHash {
continue
}

s.currentSlotNumber = crystallizedState.LastFinalizedSlot()
s.requestNextBlock()
crystallizedStateSub.Unsubscribe()
}
}
}

// requestCrystallizedStateFromPeer sends a request to a peer for the corresponding crystallized state
// for a beacon block.
func (s *InitialSync) requestCrystallizedStateFromPeer(data *pb.BeaconBlockResponse, peer p2p.Peer) error {
block := types.NewBlock(data.Block)
h := block.CrystallizedStateHash()
log.Debugf("Successfully processed incoming block with crystallized state hash: %x", h)
s.p2p.Send(&pb.CrystallizedStateRequest{Hash: h[:]}, peer)
return nil
}

// setBlockForInitialSync sets the first received block as the base finalized
// block for initial sync.
func (s *InitialSync) setBlockForInitialSync(data *pb.BeaconBlockResponse) error {
block := types.NewBlock(data.Block)

h, err := block.Hash()
if err != nil {
return err
}
log.WithField("blockhash", fmt.Sprintf("%x", h)).Debug("Crystallized state hash exists locally")

if err := s.writeBlockToDB(block); err != nil {
return err
}

s.initialCrystallizedStateHash = block.CrystallizedStateHash()

log.Infof("Saved block with hash %x for initial sync", h)
return nil
}

// requestNextBlock broadcasts a request for a block with the next slotnumber.
func (s *InitialSync) requestNextBlock() {
s.p2p.Broadcast(&pb.BeaconBlockRequestBySlotNumber{SlotNumber: (s.currentSlotNumber + 1)})
}

// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (s *InitialSync) validateAndSaveNextBlock(data *pb.BeaconBlockResponse) error {
block := types.NewBlock(data.Block)

if s.currentSlotNumber == uint64(0) {
return errors.New("invalid slot number for syncing")
}

if (s.currentSlotNumber + 1) == block.SlotNumber() {

if err := s.writeBlockToDB(block); err != nil {
return err
}
s.currentSlotNumber = block.SlotNumber()
}
return nil
}

// writeBlockToDB saves the corresponding block to the local DB.
func (s *InitialSync) writeBlockToDB(block *types.Block) error {
return s.chainService.SaveBlock(block)
}
Loading