Skip to content

Commit

Permalink
download checkpoint sync origin blobs in init-sync (#13665)
Browse files Browse the repository at this point in the history
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
  • Loading branch information
kasey and kasey committed Feb 26, 2024
1 parent d9d2ee7 commit 0132c1b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 4 deletions.
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Expand Up @@ -45,6 +45,7 @@ go_library(
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
Expand Down
111 changes: 107 additions & 4 deletions beacon-chain/sync/initial-sync/service.go
Expand Up @@ -5,22 +5,31 @@ package initialsync

import (
"context"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/async/abool"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -58,6 +67,7 @@ type Service struct {
clock *startup.Clock
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
ctxMap sync.ContextByteVersions
}

// Option is a functional option for the initial-sync Service.
Expand Down Expand Up @@ -124,6 +134,13 @@ func (s *Service) Start() {
}
s.clock = clock
log.Info("Received state initialized event")
ctxMap, err := sync.ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
WithError(err).Error("unable to initialize context version map using genesis validator")
return
}
s.ctxMap = ctxMap

v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
if err != nil {
Expand Down Expand Up @@ -162,7 +179,15 @@ func (s *Service) Start() {
s.markSynced()
return
}
s.waitForMinimumPeers()
peers, err := s.waitForMinimumPeers()
if err != nil {
log.WithError(err).Error("Error waiting for minimum number of peers")
return
}
if err := s.fetchOriginBlobs(peers); err != nil {
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
return
}
if err := s.roundRobinSync(gt); err != nil {
if errors.Is(s.ctx.Err(), context.Canceled) {
return
Expand Down Expand Up @@ -215,24 +240,30 @@ func (s *Service) Resync() error {
defer func() { s.synced.Set() }() // Reset it at the end of the method.
genesis := time.Unix(int64(headState.GenesisTime()), 0) // lint:ignore uintcast -- Genesis time will not exceed int64 in your lifetime.

s.waitForMinimumPeers()
_, err = s.waitForMinimumPeers()
if err != nil {
return err
}
if err = s.roundRobinSync(genesis); err != nil {
log = log.WithError(err)
}
log.WithField("slot", s.cfg.Chain.HeadSlot()).Info("Resync attempt complete")
return nil
}

func (s *Service) waitForMinimumPeers() {
func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if s.ctx.Err() != nil {
return nil, s.ctx.Err()
}
cp := s.cfg.Chain.FinalizedCheckpt()
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, cp.Epoch)
if len(peers) >= required {
break
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
Expand All @@ -247,3 +278,75 @@ func (s *Service) markSynced() {
s.synced.Set()
close(s.cfg.InitialSyncComplete)
}

func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
return nil
}
blk, err := s.cfg.DB.Block(s.ctx, r)
if err != nil {
log.WithField("root", r).Error("Block for checkpoint sync origin root not found in db")
return err
}
if blk.Version() < version.Deneb {
return nil
}
cmts, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
return err
}
if len(cmts) == 0 {
return nil
}
rob, err := blocks.NewROBlockWithRoot(blk, r)
if err != nil {
return err
}
onDisk, err := s.cfg.BlobStorage.Indices(r)
if err != nil {
return errors.Wrapf(err, "error checking existing blobs for checkpoint sync bloc root %#x", r)
}
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
for i := range cmts {
if onDisk[i] {
continue
}
req = append(req, &eth.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
}
if len(req) == 0 {
log.WithField("nBlobs", len(cmts)).WithField("root", fmt.Sprintf("%#x", r)).Debug("All checkpoint block blobs are present")
return nil
}
shufflePeers(pids)
for i := range pids {
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
if err != nil {
continue
}
if len(sidecars) != len(req) {
continue
}
bv := newBlobBatchVerifier(s.newBlobVerifier)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
return err
}
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
continue
}
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
return nil
}
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
}

func shufflePeers(pids []peer.ID) {
rg := rand.NewGenerator()
rg.Shuffle(len(pids), func(i, j int) {
pids[i], pids[j] = pids[j], pids[i]
})
}

0 comments on commit 0132c1b

Please sign in to comment.