Skip to content

Commit

Permalink
use randomized peers from the set of 20 peers with good latency
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak committed Nov 14, 2023
1 parent 18fec8f commit 0c85be5
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 50 deletions.
12 changes: 9 additions & 3 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (

cacheSize = 1000

RedundantPeers = 10
RedundantPeers = 20
)

var (
Expand Down Expand Up @@ -742,6 +742,12 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) {
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

func (f *Fetch) SelectBest(n int) []p2p.Peer {
return f.peers.SelectBest(n)
func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer {
// shuffle to split the load between peers with good latency.
// and it avoids sticky behavior, when temporarily faulty peer had good latency in the past.
peers := f.peers.SelectBest(n)
rand.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})

Check warning on line 751 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L750-L751

Added lines #L750 - L751 were not covered by tests
return peers
}
6 changes: 3 additions & 3 deletions syncer/data_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewDataFetch(

// PollMaliciousProofs polls all peers for malicious NodeIDs.
func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error {
peers := d.fetcher.SelectBest(fetch.RedundantPeers)
peers := d.fetcher.SelectBestShuffled(fetch.RedundantPeers)
logger := d.logger.WithContext(ctx)
req := &maliciousIDRequest{
peers: peers,
Expand Down Expand Up @@ -137,7 +137,7 @@ func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error {
// PollLayerData polls all peers for data in the specified layer.
func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers ...p2p.Peer) error {
if len(peers) == 0 {
peers = d.fetcher.SelectBest(fetch.RedundantPeers)
peers = d.fetcher.SelectBestShuffled(fetch.RedundantPeers)
}
if len(peers) == 0 {
return errNoPeers
Expand Down Expand Up @@ -471,7 +471,7 @@ func (d *DataFetch) updateAtxPeer(epoch types.EpochID, peer p2p.Peer) {

// GetEpochATXs fetches all ATXs published in the specified epoch from a peer.
func (d *DataFetch) GetEpochATXs(ctx context.Context, epoch types.EpochID) error {
peers := d.fetcher.SelectBest(fetch.RedundantPeers)
peers := d.fetcher.SelectBestShuffled(fetch.RedundantPeers)
if len(peers) == 0 {
return errNoPeers
}
Expand Down
10 changes: 5 additions & 5 deletions syncer/data_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestDataFetch_PollMaliciousIDs(t *testing.T) {
errUnknown := errors.New("unknown")
newTestDataFetchWithMocks := func(_ *testing.T, exits bool) *testDataFetch {
td := newTestDataFetch(t)
td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().GetMaliciousIDs(gomock.Any(), peers, gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ []p2p.Peer, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error {
for _, peer := range peers {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestDataFetch_PollLayerData(t *testing.T) {
errUnknown := errors.New("unknown")
newTestDataFetchWithMocks := func(*testing.T) *testDataFetch {
td := newTestDataFetch(t)
td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).
DoAndReturn(func(
_ context.Context,
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) {
t.Run("only one peer has data", func(t *testing.T) {
t.Parallel()
td := newTestDataFetch(t)
td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).
DoAndReturn(func(
_ context.Context,
Expand All @@ -212,7 +212,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) {
t.Run("only one peer has empty layer", func(t *testing.T) {
t.Parallel()
td := newTestDataFetch(t)
td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()).
DoAndReturn(func(
_ context.Context,
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDataFetch_GetEpochATXs(t *testing.T) {
ed := &fetch.EpochData{
AtxIDs: types.RandomActiveSet(11),
}
td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
if tc.getErr == nil {
td.mAtxCache.EXPECT().GetMissingActiveSet(epoch+1, ed.AtxIDs).Return(ed.AtxIDs[1:])
}
Expand Down
2 changes: 1 addition & 1 deletion syncer/find_fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (ff *ForkFinder) Purge(all bool, toPurge ...p2p.Peer) {
return
}

peers := ff.fetcher.SelectBest(fetch.RedundantPeers)
peers := ff.fetcher.SelectBestShuffled(fetch.RedundantPeers)
uniquePeers := make(map[p2p.Peer]struct{})
for _, p := range peers {
uniquePeers[p] = struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions syncer/find_fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestResynced(t *testing.T) {
tf.AddResynced(lid, hash)
require.False(t, tf.NeedResync(lid, hash))

tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{})
tf.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{})
tf.Purge(false)
require.True(t, tf.NeedResync(lid, hash))
}
Expand All @@ -65,7 +65,7 @@ func TestForkFinder_Purge(t *testing.T) {
for i := 1; i < numCached; i++ {
tf.UpdateAgreement(p2p.Peer(strconv.Itoa(i)), types.LayerID(uint32(i+1)), types.RandomHash(), time.Now())
}
tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{})
tf.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{})
require.Equal(t, numCached, tf.NumPeersCached())
tf.Purge(false)
require.Equal(t, 9, tf.NumPeersCached())
Expand Down
2 changes: 1 addition & 1 deletion syncer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type fetcher interface {
GetBlocks(context.Context, []types.BlockID) error
RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)

SelectBest(int) []p2p.Peer
SelectBestShuffled(int) []p2p.Peer
PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)
PeerMeshHashes(context.Context, p2p.Peer, *fetch.MeshHashRequest) (*fetch.MeshHashes, error)
}
Expand Down
48 changes: 24 additions & 24 deletions syncer/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion syncer/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (s *Syncer) layerOpinions(
ctx context.Context,
lid types.LayerID,
) ([]*peerOpinion, []*types.Certificate, error) {
peers := s.dataFetcher.SelectBest(fetch.RedundantPeers)
peers := s.dataFetcher.SelectBestShuffled(fetch.RedundantPeers)
if len(peers) == 0 {
return nil, nil, errNoPeers
}
Expand Down
14 changes: 7 additions & 7 deletions syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestProcessLayers_MultiLayers(t *testing.T) {
ts.mTicker.advanceToLayer(current)

peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes()
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers).AnyTimes()
ts.mForkFinder.EXPECT().
UpdateAgreement(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes()
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestProcessLayers_OpinionsNotAdopted(t *testing.T) {
ts.syncer.setLastSyncedLayer(current.Sub(1))
ts.mTicker.advanceToLayer(current)
peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes()
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers).AnyTimes()

hasCert := false
for _, opn := range tc.opns {
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestProcessLayers_HareIsStillWorking(t *testing.T) {

ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false)
peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().
PollLayerOpinions(gomock.Any(), lastSynced, true, peers).
Return(nil, nil, nil)
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestProcessLayers_HareTakesTooLong(t *testing.T) {
ts.mLyrPatrol.EXPECT().IsHareInCharge(lid).Return(false)
}
peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().
PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers).
Return(nil, nil, nil)
Expand All @@ -315,7 +315,7 @@ func TestProcessLayers_OpinionsOptional(t *testing.T) {
ts.mTicker.advanceToLayer(lastSynced.Add(1))
ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false)
peers := test.GeneratePeerIDs(5)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().
PollLayerOpinions(gomock.Any(), lastSynced, true, peers).
Return(nil, nil, errors.New("meh"))
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestProcessLayers_MeshHashDiverged(t *testing.T) {

ts.mLyrPatrol.EXPECT().IsHareInCharge(instate).Return(false)
peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().
PollLayerOpinions(gomock.Any(), instate, false, peers).
Return(opns, nil, nil)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) {
for lid := instate; lid <= current; lid++ {
ts.mLyrPatrol.EXPECT().IsHareInCharge(lid)
peers := test.GeneratePeerIDs(3)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers)
ts.mDataFetcher.EXPECT().
PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers).
Return(opns, nil, nil)
Expand Down
3 changes: 2 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,12 @@ func (s *Syncer) synchronize(ctx context.Context) bool {
return true
}
// check that we have any peers
if len(s.dataFetcher.SelectBest(1)) == 0 {
if len(s.dataFetcher.SelectBestShuffled(1)) == 0 {
return false
}

if err := s.syncAtx(ctx); err != nil {
s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err))
return false
}

Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer {

func newSyncerWithoutPeriodicRuns(t *testing.T) *testSyncer {
ts := newTestSyncer(t, never)
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{"non-empty"}).AnyTimes()
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{"non-empty"}).AnyTimes()
return ts
}

Expand All @@ -146,7 +146,7 @@ func TestStartAndShutdown(t *testing.T) {
ts.syncer.Start()

ts.mForkFinder.EXPECT().Purge(false).AnyTimes()
ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(nil).AnyTimes()
ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(nil).AnyTimes()
require.Eventually(t, func() bool {
return ts.syncer.ListenToATXGossip() && ts.syncer.ListenToGossip() &&
ts.syncer.IsSynced(ctx)
Expand Down

0 comments on commit 0c85be5

Please sign in to comment.