Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not panic if initial sync fails #4477

Merged
merged 6 commits into from Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 6 additions & 7 deletions beacon-chain/p2p/peers/status.go
Expand Up @@ -317,17 +317,16 @@ func (p *Status) Decay() {
}
}

// BestFinalized returns the highest finalized epoch that is agreed upon by the majority of
// peers. This method may not return the absolute highest finalized, but the finalized epoch in
// which most peers can serve blocks. Ideally, all peers would be reporting the same finalized
// epoch.
// Returns the best finalized root, epoch number, and peers that agree.
func (p *Status) BestFinalized(maxPeers int) ([]byte, uint64, []peer.ID) {
// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed upon by the majority of peers.
// This method may not return the absolute highest finalized, but the finalized epoch in which most peers can serve blocks.
// Ideally, all peers would be reporting the same finalized epoch.
// Returns the best finalized root, epoch number, and list of peers that agree.
func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch uint64) ([]byte, uint64, []peer.ID) {
finalized := make(map[[32]byte]uint64)
rootToEpoch := make(map[[32]byte]uint64)
for _, pid := range p.Connected() {
peerChainState, err := p.ChainState(pid)
if err == nil && peerChainState != nil {
if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= ourFinalizedEpoch {
r := bytesutil.ToBytes32(peerChainState.FinalizedRoot)
finalized[r]++
rootToEpoch[r] = peerChainState.FinalizedEpoch
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/peers/status_test.go
Expand Up @@ -378,7 +378,7 @@ func TestBestPeer(t *testing.T) {
FinalizedEpoch: 3,
FinalizedRoot: junkRoot[:],
})
retRoot, retEpoch, _ := p.BestFinalized(15)
retRoot, retEpoch, _ := p.BestFinalized(15, 0)
if !bytes.Equal(retRoot, expectedRoot[:]) {
t.Errorf("Incorrect Finalized Root retrieved; wanted %v but got %v", expectedRoot, retRoot)
}
Expand All @@ -400,7 +400,7 @@ func TestBestFinalized_returnsMaxValue(t *testing.T) {
})
}

_, _, pids := p.BestFinalized(maxPeers)
_, _, pids := p.BestFinalized(maxPeers, 0)
if len(pids) != maxPeers {
t.Fatalf("returned wrong number of peers, wanted %d, got %d", maxPeers, len(pids))
}
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync/round_robin.go
Expand Up @@ -46,7 +46,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
var lastEmptyRequests int
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(s.highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if len(peers) == 0 {
log.Warn("No peers; waiting for reconnect")
time.Sleep(refreshTime)
Expand Down Expand Up @@ -120,7 +120,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
}()

resp, err := s.requestBlocks(ctx, req, pid)
log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp))
if err != nil {
// fail over to other peers by splitting this requests evenly across them.
ps := append(peers[:i], peers[i+1:]...)
Expand All @@ -141,6 +140,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
return
}
}
log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks")
blocksChan <- resp
}(i, pid)
}
Expand Down Expand Up @@ -225,13 +225,13 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
// we receive there after must build on the finalized chain or be considered invalid during
// fork choice resolution / block processing.
best := s.bestPeer()
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)

// if no best peer exists, retry until a new best peer is found.
for len(best) == 0 {
time.Sleep(refreshTime)
best = s.bestPeer()
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
}
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
Expand Down Expand Up @@ -297,7 +297,7 @@ func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRa
// highestFinalizedEpoch as reported by peers. This is the absolute highest finalized epoch as
// reported by peers.
func (s *Service) highestFinalizedEpoch() uint64 {
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
return epoch
}

Expand Down
32 changes: 18 additions & 14 deletions beacon-chain/sync/initial-sync/service.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -114,12 +115,10 @@ func (s *Service) Start() {
return
}
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
panic(err)
if err := s.roundRobinSync(genesis); err == nil {
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
s.synced = true
}

log.Infof("Synced up to slot %d", s.chain.HeadSlot())
s.synced = true
}

// Stop initial sync.
Expand Down Expand Up @@ -152,25 +151,30 @@ func (s *Service) Resync() error {
genesis := time.Unix(int64(headState.GenesisTime), 0)

s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
return errors.Wrap(err, "could not retrieve head state")
err = s.roundRobinSync(genesis)
if err == nil {
s.synced = true
} else {
log = log.WithError(err)
}
log.Infof("Synced up to slot %d", s.chain.HeadSlot())
log.WithField("synced", s.synced).WithField("slot", s.chain.HeadSlot()).Info("Resync attempt complete")

s.synced = true
return nil
}

func (s *Service) waitForMinimumPeers() {
// Every 5 sec, report handshake count.
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
count := len(s.p2p.Peers().Connected())
if count >= flags.Get().MinimumSyncPeers {
_, _, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if len(peers) >= required {
break
}
log.WithFields(logrus.Fields{
"valid handshakes": count,
"required handshakes": flags.Get().MinimumSyncPeers}).Info("Waiting for enough peer handshakes before syncing")
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
7 changes: 3 additions & 4 deletions beacon-chain/sync/rpc_status.go
Expand Up @@ -36,14 +36,13 @@ func (r *Service) maintainPeerStatuses() {
}
}
}
if !r.initialSync.Syncing() {
_, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync)
for !r.initialSync.Syncing() {
_, highestEpoch, _ := r.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, r.chain.HeadSlot()/params.BeaconConfig().SlotsPerEpoch)
if helpers.StartSlot(highestEpoch) > r.chain.HeadSlot() {
numberOfTimesResyncedCounter.Inc()
r.clearPendingSlots()
// block until we can resync the node
if err := r.initialSync.Resync(); err != nil {
log.Errorf("Could not Resync Chain: %v", err)
log.Errorf("Could not resync chain: %v", err)
}
}
}
Expand Down