Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sync Checker #13580

Merged
merged 9 commits into from Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 4 additions & 12 deletions beacon-chain/blockchain/chain_info.go
Expand Up @@ -557,17 +557,9 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
return s.cfg.ForkChoiceStore.Slot(root)
}

// inRegularSync applies the following heuristics to decide if the node is in
// regular sync mode vs init sync mode using only forkchoice.
// It checks that the highest received block is behind the current time by at least 2 epochs
// and that it was imported at least one epoch late if both of these
// tests pass then the node is in init sync. The caller of this function MUST
// have a lock on forkchoice
// inRegularSync queries the initial sync service to
// determine if the node is in regular sync or is still
// syncing to the head of the chain.
func (s *Service) inRegularSync() bool {
currentSlot := s.CurrentSlot()
fc := s.cfg.ForkChoiceStore
if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch {
return true
}
return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch
return s.cfg.SyncChecker.Synced()
}
23 changes: 0 additions & 23 deletions beacon-chain/blockchain/chain_info_test.go
Expand Up @@ -593,26 +593,3 @@ func TestService_IsFinalized(t *testing.T) {
require.Equal(t, true, c.IsFinalized(ctx, br))
require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'}))
}

func TestService_inRegularSync(t *testing.T) {
ctx := context.Background()
c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}}
ojc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
ofc := &ethpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}
st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())
c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot))
require.Equal(t, false, c.inRegularSync())

c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
require.Equal(t, true, c.inRegularSync())

c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot))))
c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix()))
require.Equal(t, true, c.inRegularSync())
}
7 changes: 7 additions & 0 deletions beacon-chain/blockchain/options.go
Expand Up @@ -198,3 +198,10 @@ func WithBlobStorage(b *filesystem.BlobStorage) Option {
return nil
}
}

func WithSyncChecker(checker Checker) Option {
return func(s *Service) error {
s.cfg.SyncChecker = checker
return nil
}
}
7 changes: 7 additions & 0 deletions beacon-chain/blockchain/service.go
Expand Up @@ -93,6 +93,13 @@ type config struct {
BlockFetcher execution.POWBlockFetcher
FinalizedStateAtStartUp state.BeaconState
ExecutionEngineCaller execution.EngineCaller
SyncChecker Checker
}

// Checker is an interface used to determine if a node is in initial sync
// or regular sync.
type Checker interface {
Synced() bool
}

var ErrMissingClockSetter = errors.New("blockchain Service initialized without a startup.ClockSetter")
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/setup_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"github.com/prysmaticlabs/prysm/v4/async/event"
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache/depositcache"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
Expand Down Expand Up @@ -118,6 +119,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq
WithDepositCache(dc),
WithTrackedValidatorsCache(cache.NewTrackedValidatorsCache()),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
WithSyncChecker(mock.MockChecker{}),
}
// append the variadic opts so they override the defaults by being processed afterwards
opts = append(defOpts, opts...)
Expand Down
8 changes: 8 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -180,6 +180,14 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
return mon.feed
}

// MockChecker is a mock sync checker.
type MockChecker struct{}

// Synced returns true.
func (_ MockChecker) Synced() bool {
return true
}

// ReceiveBlockInitialSync mocks ReceiveBlockInitialSync method in chain service.
func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error {
if s.State == nil {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/node/node.go
Expand Up @@ -120,6 +120,7 @@ type BeaconNode struct {
BlobStorage *filesystem.BlobStorage
blobRetentionEpochs primitives.Epoch
verifyInitWaiter *verification.InitializerWaiter
syncChecker *initialsync.SyncChecker
}

// New creates a new node instance, sets up configuration options, and registers
Expand Down Expand Up @@ -192,6 +193,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
}

beacon.initialSyncComplete = make(chan struct{})
beacon.syncChecker = &initialsync.SyncChecker{}
for _, opt := range opts {
if err := opt(beacon); err != nil {
return nil, err
Expand Down Expand Up @@ -674,6 +676,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st
blockchain.WithBlobStorage(b.BlobStorage),
blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache),
blockchain.WithPayloadIDCache(b.payloadIDCache),
blockchain.WithSyncChecker(b.syncChecker),
)

blockchainService, err := blockchain.NewService(b.ctx, opts...)
Expand Down Expand Up @@ -767,6 +770,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error {

opts := []initialsync.Option{
initialsync.WithVerifierWaiter(b.verifyInitWaiter),
initialsync.WithSyncChecker(b.syncChecker),
}
is := initialsync.NewService(b.ctx, &initialsync.Config{
DB: b.db,
Expand Down
25 changes: 25 additions & 0 deletions beacon-chain/node/node_test.go
Expand Up @@ -27,6 +27,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime"
"github.com/prysmaticlabs/prysm/v4/runtime/interop"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -91,6 +92,30 @@ func TestNodeStart_Ok(t *testing.T) {
require.LogsContain(t, hook, "Starting beacon node")
}

func TestNodeStart_SyncChecker(t *testing.T) {
hook := logTest.NewGlobal()
app := cli.App{}
tmp := fmt.Sprintf("%s/datadirtest2", t.TempDir())
set := flag.NewFlagSet("test", 0)
set.String("datadir", tmp, "node data directory")
set.String("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A", "fee recipient")
require.NoError(t, set.Set("suggested-fee-recipient", "0x6e35733c5af9B61374A128e6F85f553aF09ff89A"))

ctx, cancel := newCliContextWithCancel(&app, set)
node, err := New(ctx, cancel, WithBlockchainFlagOptions([]blockchain.Option{}),
WithBuilderFlagOptions([]builder.Option{}),
WithExecutionChainOptions([]execution.Option{}),
WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)))
require.NoError(t, err)
go func() {
node.Start()
}()
time.Sleep(3 * time.Second)
assert.NotNil(t, node.syncChecker.Svc)
node.Close()
require.LogsContain(t, hook, "Starting beacon node")
}

func TestNodeStart_Ok_registerDeterministicGenesisService(t *testing.T) {
numValidators := uint64(1)
hook := logTest.NewGlobal()
Expand Down
23 changes: 23 additions & 0 deletions beacon-chain/sync/initial-sync/service.go
Expand Up @@ -71,6 +71,29 @@ func WithVerifierWaiter(viw *verification.InitializerWaiter) Option {
}
}

// WithSyncChecker registers the initial sync service
// in the checker.
func WithSyncChecker(checker *SyncChecker) Option {
return func(service *Service) {
checker.Svc = service
}
}

// SyncChecker allows other services to check the current status of
// initial-sync and use that internally in their service.
type SyncChecker struct {
Svc *Service
}
Comment on lines +84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checker having the full service as a dependency doesn't look right to me... It seems like the initial sync service already implements the SyncChecker interface, you can use it as the blockchain's service dependency. One service depending on another is more natural.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason you cant do it is because initial sync has a dependency on the blockchain service. It is the reason the checker has been implemented


// Synced returns the status of the service.
func (s *SyncChecker) Synced() bool {
if s.Svc == nil {
log.Warn("Calling sync checker with a nil service initialized")
return false
}
return s.Svc.Synced()
}

// NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service {
Expand Down
1 change: 1 addition & 0 deletions testing/spectest/shared/common/forkchoice/service.go
Expand Up @@ -72,6 +72,7 @@ func startChainService(t testing.TB,
blockchain.WithPayloadIDCache(cache.NewPayloadIDCache()),
blockchain.WithClockSynchronizer(startup.NewClockSynchronizer()),
blockchain.WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)),
blockchain.WithSyncChecker(mock.MockChecker{}),
)
service, err := blockchain.NewService(context.Background(), opts...)
require.NoError(t, err)
Expand Down