Skip to content

Commit

Permalink
Add Sync Checker (#13580)
Browse files Browse the repository at this point in the history
* fix it

* add it in

* typo

* fix tests

* fix tests

* export and add test

* preston's review
  • Loading branch information
nisdas authored and rkapka committed Feb 6, 2024
1 parent 6be9470 commit bc0ebb6
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 35 deletions.
16 changes: 4 additions & 12 deletions beacon-chain/blockchain/chain_info.go
Expand Up @@ -557,17 +557,9 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
return s.cfg.ForkChoiceStore.Slot(root)
}

// inRegularSync applies the following heuristics to decide if the node is in
// regular sync mode vs init sync mode using only forkchoice.
// It checks that the highest received block is behind the current time by at least 2 epochs
// and that it was imported at least one epoch late if both of these
// tests pass then the node is in init sync. The caller of this function MUST
// have a lock on forkchoice
// inRegularSync queries the initial sync service to
// determine if the node is in regular sync or is still
// syncing to the head of the chain.
func (s *Service) inRegularSync() bool {
currentSlot := s.CurrentSlot()
fc := s.cfg.ForkChoiceStore
if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch {
return true
}
return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch
return s.cfg.SyncChecker.Synced()
}
23 changes: 0 additions & 23 deletions beacon-chain/blockchain/chain_info_test.go
Expand Up @@ -593,26 +593,3 @@ func TestService_IsFinalized(t *testing.T) {
require.Equal(t, true, c.IsFinalized(ctx, br))
require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'}))
}

func TestService_inRegularSync(t *testing.T) {
ctx := context.Background()
c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}}
ojc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
ofc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())

c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
require.Equal(t, true, c.inRegularSync())

c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix()))
require.Equal(t, true, c.inRegularSync())
}
7 changes: 7 additions & 0 deletions beacon-chain/blockchain/options.go
Expand Up @@ -198,3 +198,10 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option {
return nil
}
}

func WithSyncChecker(checker Checker) Option {
return func(s *Service) error {
s.cfg.SyncChecker = checker
return nil
}
}
7 changes: 7 additions & 0 deletions beacon-chain/blockchain/service.go
Expand Up @@ -93,6 +93,13 @@ type config struct {
BlockFetcher execution.POWBlockFetcher
FinalizedStateAtStartUp state.BeaconState
ExecutionEngineCaller execution.EngineCaller
SyncChecker Checker
}

// Checker is an interface used to determine if a node is in initial sync
// or regular sync.
type Checker interface {
Synced() bool
}

var ErrMissingClockSetter = errors.New("blockchain Service initialized without a startup.ClockSetter")
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/prysmaticlabs/prysm/v4/async/event"
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
Expand Down Expand Up @@ -118,6 +119,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithDepositCache(dc),
WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
}
// append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...)
Expand Down
8 changes: 8 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -180,6 +180,14 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
return mon.feed
}

// MockChecker is a mock sync checker.
type MockChecker struct{}

// Synced returns true.
func (_ MockChecker) Synced() bool {
return true
}

// ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service.
func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error {
if s.State == nil {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/node/node.go
Expand Up @@ -120,6 +120,7 @@ type BeaconNode struct {
BlobStorage *filesystem.BlobStorage
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
}

// New creates a new node instance, sets up configuration options, and registers
Expand Down Expand Up @@ -192,6 +193,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
}

beacon.initialSyncComplete = make(chan struct{})
beacon.syncChecker = &initialsync.SyncChecker{}
for _, opt := range opts {
if err := opt(beacon); err != nil {
return nil, err
Expand Down Expand Up @@ -674,6 +676,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
)

blockchainService, err := blockchain.NewService(b.ctx, opts...)
Expand Down Expand Up @@ -767,6 +770,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {

opts := []initialsync.Option{
initialsync.WithVerifierWaiter(b.verifyInitWaiter),
initialsync.WithSyncChecker(b.syncChecker),
}
is := initialsync.NewService(b.ctx, &initialsync.Config{
DB: b.db,
Expand Down
25 changes: 25 additions & 0 deletions beacon-chain/node/node_test.go
Expand Up @@ -27,6 +27,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime"
"github.com/prysmaticlabs/prysm/v4/runtime/interop"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -91,6 +92,30 @@ func TestNodeStart_Ok(t *testing.T) {
require.LogsContain(t, hook, "Starting beacon node")
}

func TestNodeStart_SyncChecker(t *testing.T) {
hook := logTest.NewGlobal()
app := cli.App{}
tmp := fmt.Sprintf("%s/datadirtest2", t.TempDir())
set := flag.NewFlagSet("test", 0)
set.String("datadir", tmp, "node data directory")
set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient")
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))

ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
require.NoError(t, err)
go func() {
node.Start()
}()
time.Sleep(3 * time.Second)
assert.NotNil(t, node.syncChecker.Svc)
node.Close()
require.LogsContain(t, hook, "Starting beacon node")
}

func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) {
numValidators := uint64(1)
hook := logTest.NewGlobal()
Expand Down
23 changes: 23 additions & 0 deletions beacon-chain/sync/initial-sync/service.go
Expand Up @@ -71,6 +71,29 @@ func WithVerifierWaiter(viw *verification.InitializerWaiter) Option {
}
}

// WithSyncChecker registers the initial sync service
// in the checker.
func WithSyncChecker(checker *SyncChecker) Option {
return func(service *Service) {
checker.Svc = service
}
}

// SyncChecker allows other services to check the current status of
// initial-sync and use that internally in their service.
type SyncChecker struct {
Svc *Service
}

// Synced returns the status of the service.
func (s *SyncChecker) Synced() bool {
if s.Svc == nil {
log.Warn("Calling sync checker with a nil service initialized")
return false
}
return s.Svc.Synced()
}

// NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service {
Expand Down
1 change: 1 addition & 0 deletions testing/spectest/shared/common/forkchoice/service.go
Expand Up @@ -72,6 +72,7 @@ func startChainService(t testing.TB,
blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()),
blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()),
blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
blockchain.WithSyncChecker(mock.MockChecker{}),
)
service, err := blockchain.NewService(context.Background(), opts...)
require.NoError(t, err)
Expand Down

0 comments on commit bc0ebb6

Please sign in to comment.