From 0c85be58feefe133a27b33ae920c7ffd9ad2f713 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 14 Nov 2023 08:41:20 +0100 Subject: [PATCH] use randomized peers from the set of 20 peers with good latency --- fetch/fetch.go | 12 +++++++--- syncer/data_fetch.go | 6 ++--- syncer/data_fetch_test.go | 10 ++++---- syncer/find_fork.go | 2 +- syncer/find_fork_test.go | 4 ++-- syncer/interface.go | 2 +- syncer/mocks/mocks.go | 48 ++++++++++++++++++------------------- syncer/state_syncer.go | 2 +- syncer/state_syncer_test.go | 14 +++++------ syncer/syncer.go | 3 ++- syncer/syncer_test.go | 4 ++-- 11 files changed, 57 insertions(+), 50 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index a880e695ed..206dfc6436 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -31,7 +31,7 @@ const ( cacheSize = 1000 - RedundantPeers = 10 + RedundantPeers = 20 ) var ( @@ -742,6 +742,12 @@ func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) { f.hashToPeers.RegisterPeerHashes(peer, hashes) } -func (f *Fetch) SelectBest(n int) []p2p.Peer { - return f.peers.SelectBest(n) +func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer { + // shuffle to split the load between peers with good latency. + // and it avoids sticky behavior, when temporarily faulty peer had good latency in the past. + peers := f.peers.SelectBest(n) + rand.Shuffle(len(peers), func(i, j int) { + peers[i], peers[j] = peers[j], peers[i] + }) + return peers } diff --git a/syncer/data_fetch.go b/syncer/data_fetch.go index 1fb034d05e..41d07722f7 100644 --- a/syncer/data_fetch.go +++ b/syncer/data_fetch.go @@ -83,7 +83,7 @@ func NewDataFetch( // PollMaliciousProofs polls all peers for malicious NodeIDs. func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error { - peers := d.fetcher.SelectBest(fetch.RedundantPeers) + peers := d.fetcher.SelectBestShuffled(fetch.RedundantPeers) logger := d.logger.WithContext(ctx) req := &maliciousIDRequest{ peers: peers, @@ -137,7 +137,7 @@ func (d *DataFetch) PollMaliciousProofs(ctx context.Context) error { // PollLayerData polls all peers for data in the specified layer. func (d *DataFetch) PollLayerData(ctx context.Context, lid types.LayerID, peers ...p2p.Peer) error { if len(peers) == 0 { - peers = d.fetcher.SelectBest(fetch.RedundantPeers) + peers = d.fetcher.SelectBestShuffled(fetch.RedundantPeers) } if len(peers) == 0 { return errNoPeers @@ -471,7 +471,7 @@ func (d *DataFetch) updateAtxPeer(epoch types.EpochID, peer p2p.Peer) { // GetEpochATXs fetches all ATXs published in the specified epoch from a peer. func (d *DataFetch) GetEpochATXs(ctx context.Context, epoch types.EpochID) error { - peers := d.fetcher.SelectBest(fetch.RedundantPeers) + peers := d.fetcher.SelectBestShuffled(fetch.RedundantPeers) if len(peers) == 0 { return errNoPeers } diff --git a/syncer/data_fetch_test.go b/syncer/data_fetch_test.go index b414c4ec63..67c1f5d421 100644 --- a/syncer/data_fetch_test.go +++ b/syncer/data_fetch_test.go @@ -103,7 +103,7 @@ func TestDataFetch_PollMaliciousIDs(t *testing.T) { errUnknown := errors.New("unknown") newTestDataFetchWithMocks := func(_ *testing.T, exits bool) *testDataFetch { td := newTestDataFetch(t) - td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetMaliciousIDs(gomock.Any(), peers, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ []p2p.Peer, okCB func([]byte, p2p.Peer), errCB func(error, p2p.Peer)) error { for _, peer := range peers { @@ -144,7 +144,7 @@ func TestDataFetch_PollLayerData(t *testing.T) { errUnknown := errors.New("unknown") newTestDataFetchWithMocks := func(*testing.T) *testDataFetch { td := newTestDataFetch(t) - td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, @@ -189,7 +189,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) { t.Run("only one peer has data", func(t *testing.T) { t.Parallel() td := newTestDataFetch(t) - td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, @@ -212,7 +212,7 @@ func TestDataFetch_PollLayerData_PeerErrors(t *testing.T) { t.Run("only one peer has empty layer", func(t *testing.T) { t.Parallel() td := newTestDataFetch(t) - td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) td.mFetcher.EXPECT().GetLayerData(gomock.Any(), peers, layerID, gomock.Any(), gomock.Any()). DoAndReturn(func( _ context.Context, @@ -377,7 +377,7 @@ func TestDataFetch_GetEpochATXs(t *testing.T) { ed := &fetch.EpochData{ AtxIDs: types.RandomActiveSet(11), } - td.mFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + td.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) if tc.getErr == nil { td.mAtxCache.EXPECT().GetMissingActiveSet(epoch+1, ed.AtxIDs).Return(ed.AtxIDs[1:]) } diff --git a/syncer/find_fork.go b/syncer/find_fork.go index 48c15f1fba..a61c8676cf 100644 --- a/syncer/find_fork.go +++ b/syncer/find_fork.go @@ -82,7 +82,7 @@ func (ff *ForkFinder) Purge(all bool, toPurge ...p2p.Peer) { return } - peers := ff.fetcher.SelectBest(fetch.RedundantPeers) + peers := ff.fetcher.SelectBestShuffled(fetch.RedundantPeers) uniquePeers := make(map[p2p.Peer]struct{}) for _, p := range peers { uniquePeers[p] = struct{}{} diff --git a/syncer/find_fork_test.go b/syncer/find_fork_test.go index d5b110c9c6..88990d74a9 100644 --- a/syncer/find_fork_test.go +++ b/syncer/find_fork_test.go @@ -48,7 +48,7 @@ func TestResynced(t *testing.T) { tf.AddResynced(lid, hash) require.False(t, tf.NeedResync(lid, hash)) - tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{}) + tf.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{}) tf.Purge(false) require.True(t, tf.NeedResync(lid, hash)) } @@ -65,7 +65,7 @@ func TestForkFinder_Purge(t *testing.T) { for i := 1; i < numCached; i++ { tf.UpdateAgreement(p2p.Peer(strconv.Itoa(i)), types.LayerID(uint32(i+1)), types.RandomHash(), time.Now()) } - tf.mFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{}) + tf.mFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{}) require.Equal(t, numCached, tf.NumPeersCached()) tf.Purge(false) require.Equal(t, 9, tf.NumPeersCached()) diff --git a/syncer/interface.go b/syncer/interface.go index 5f34ad8cb6..72cb05abd0 100644 --- a/syncer/interface.go +++ b/syncer/interface.go @@ -69,7 +69,7 @@ type fetcher interface { GetBlocks(context.Context, []types.BlockID) error RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32) - SelectBest(int) []p2p.Peer + SelectBestShuffled(int) []p2p.Peer PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error) PeerMeshHashes(context.Context, p2p.Peer, *fetch.MeshHashRequest) (*fetch.MeshHashes, error) } diff --git a/syncer/mocks/mocks.go b/syncer/mocks/mocks.go index 38b28faa62..92d79a4a19 100644 --- a/syncer/mocks/mocks.go +++ b/syncer/mocks/mocks.go @@ -801,40 +801,40 @@ func (c *fetchLogicRegisterPeerHashesCall) DoAndReturn(f func(p2p.Peer, []types. return c } -// SelectBest mocks base method. -func (m *MockfetchLogic) SelectBest(arg0 int) []p2p.Peer { +// SelectBestShuffled mocks base method. +func (m *MockfetchLogic) SelectBestShuffled(arg0 int) []p2p.Peer { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SelectBest", arg0) + ret := m.ctrl.Call(m, "SelectBestShuffled", arg0) ret0, _ := ret[0].([]p2p.Peer) return ret0 } -// SelectBest indicates an expected call of SelectBest. -func (mr *MockfetchLogicMockRecorder) SelectBest(arg0 any) *fetchLogicSelectBestCall { +// SelectBestShuffled indicates an expected call of SelectBestShuffled. +func (mr *MockfetchLogicMockRecorder) SelectBestShuffled(arg0 any) *fetchLogicSelectBestShuffledCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBest", reflect.TypeOf((*MockfetchLogic)(nil).SelectBest), arg0) - return &fetchLogicSelectBestCall{Call: call} + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBestShuffled", reflect.TypeOf((*MockfetchLogic)(nil).SelectBestShuffled), arg0) + return &fetchLogicSelectBestShuffledCall{Call: call} } -// fetchLogicSelectBestCall wrap *gomock.Call -type fetchLogicSelectBestCall struct { +// fetchLogicSelectBestShuffledCall wrap *gomock.Call +type fetchLogicSelectBestShuffledCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *fetchLogicSelectBestCall) Return(arg0 []p2p.Peer) *fetchLogicSelectBestCall { +func (c *fetchLogicSelectBestShuffledCall) Return(arg0 []p2p.Peer) *fetchLogicSelectBestShuffledCall { c.Call = c.Call.Return(arg0) return c } // Do rewrite *gomock.Call.Do -func (c *fetchLogicSelectBestCall) Do(f func(int) []p2p.Peer) *fetchLogicSelectBestCall { +func (c *fetchLogicSelectBestShuffledCall) Do(f func(int) []p2p.Peer) *fetchLogicSelectBestShuffledCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *fetchLogicSelectBestCall) DoAndReturn(f func(int) []p2p.Peer) *fetchLogicSelectBestCall { +func (c *fetchLogicSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Peer) *fetchLogicSelectBestShuffledCall { c.Call = c.Call.DoAndReturn(f) return c } @@ -1281,40 +1281,40 @@ func (c *fetcherRegisterPeerHashesCall) DoAndReturn(f func(p2p.Peer, []types.Has return c } -// SelectBest mocks base method. -func (m *Mockfetcher) SelectBest(arg0 int) []p2p.Peer { +// SelectBestShuffled mocks base method. +func (m *Mockfetcher) SelectBestShuffled(arg0 int) []p2p.Peer { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SelectBest", arg0) + ret := m.ctrl.Call(m, "SelectBestShuffled", arg0) ret0, _ := ret[0].([]p2p.Peer) return ret0 } -// SelectBest indicates an expected call of SelectBest. -func (mr *MockfetcherMockRecorder) SelectBest(arg0 any) *fetcherSelectBestCall { +// SelectBestShuffled indicates an expected call of SelectBestShuffled. +func (mr *MockfetcherMockRecorder) SelectBestShuffled(arg0 any) *fetcherSelectBestShuffledCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBest", reflect.TypeOf((*Mockfetcher)(nil).SelectBest), arg0) - return &fetcherSelectBestCall{Call: call} + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBestShuffled", reflect.TypeOf((*Mockfetcher)(nil).SelectBestShuffled), arg0) + return &fetcherSelectBestShuffledCall{Call: call} } -// fetcherSelectBestCall wrap *gomock.Call -type fetcherSelectBestCall struct { +// fetcherSelectBestShuffledCall wrap *gomock.Call +type fetcherSelectBestShuffledCall struct { *gomock.Call } // Return rewrite *gomock.Call.Return -func (c *fetcherSelectBestCall) Return(arg0 []p2p.Peer) *fetcherSelectBestCall { +func (c *fetcherSelectBestShuffledCall) Return(arg0 []p2p.Peer) *fetcherSelectBestShuffledCall { c.Call = c.Call.Return(arg0) return c } // Do rewrite *gomock.Call.Do -func (c *fetcherSelectBestCall) Do(f func(int) []p2p.Peer) *fetcherSelectBestCall { +func (c *fetcherSelectBestShuffledCall) Do(f func(int) []p2p.Peer) *fetcherSelectBestShuffledCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *fetcherSelectBestCall) DoAndReturn(f func(int) []p2p.Peer) *fetcherSelectBestCall { +func (c *fetcherSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Peer) *fetcherSelectBestShuffledCall { c.Call = c.Call.DoAndReturn(f) return c } diff --git a/syncer/state_syncer.go b/syncer/state_syncer.go index 7a8029d779..09c1bdd926 100644 --- a/syncer/state_syncer.go +++ b/syncer/state_syncer.go @@ -148,7 +148,7 @@ func (s *Syncer) layerOpinions( ctx context.Context, lid types.LayerID, ) ([]*peerOpinion, []*types.Certificate, error) { - peers := s.dataFetcher.SelectBest(fetch.RedundantPeers) + peers := s.dataFetcher.SelectBestShuffled(fetch.RedundantPeers) if len(peers) == 0 { return nil, nil, errNoPeers } diff --git a/syncer/state_syncer_test.go b/syncer/state_syncer_test.go index eea4abb49d..d06906d2fa 100644 --- a/syncer/state_syncer_test.go +++ b/syncer/state_syncer_test.go @@ -44,7 +44,7 @@ func TestProcessLayers_MultiLayers(t *testing.T) { ts.mTicker.advanceToLayer(current) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers).AnyTimes() ts.mForkFinder.EXPECT(). UpdateAgreement(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes() @@ -161,7 +161,7 @@ func TestProcessLayers_OpinionsNotAdopted(t *testing.T) { ts.syncer.setLastSyncedLayer(current.Sub(1)) ts.mTicker.advanceToLayer(current) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers).AnyTimes() hasCert := false for _, opn := range tc.opns { @@ -263,7 +263,7 @@ func TestProcessLayers_HareIsStillWorking(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT(). PollLayerOpinions(gomock.Any(), lastSynced, true, peers). Return(nil, nil, nil) @@ -292,7 +292,7 @@ func TestProcessLayers_HareTakesTooLong(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(lid).Return(false) } peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT(). PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers). Return(nil, nil, nil) @@ -315,7 +315,7 @@ func TestProcessLayers_OpinionsOptional(t *testing.T) { ts.mTicker.advanceToLayer(lastSynced.Add(1)) ts.mLyrPatrol.EXPECT().IsHareInCharge(lastSynced).Return(false) peers := test.GeneratePeerIDs(5) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT(). PollLayerOpinions(gomock.Any(), lastSynced, true, peers). Return(nil, nil, errors.New("meh")) @@ -381,7 +381,7 @@ func TestProcessLayers_MeshHashDiverged(t *testing.T) { ts.mLyrPatrol.EXPECT().IsHareInCharge(instate).Return(false) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT(). PollLayerOpinions(gomock.Any(), instate, false, peers). Return(opns, nil, nil) @@ -508,7 +508,7 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) { for lid := instate; lid <= current; lid++ { ts.mLyrPatrol.EXPECT().IsHareInCharge(lid) peers := test.GeneratePeerIDs(3) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(peers) + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(peers) ts.mDataFetcher.EXPECT(). PollLayerOpinions(gomock.Any(), lid, gomock.Any(), peers). Return(opns, nil, nil) diff --git a/syncer/syncer.go b/syncer/syncer.go index 79f70ff046..629665838a 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -370,11 +370,12 @@ func (s *Syncer) synchronize(ctx context.Context) bool { return true } // check that we have any peers - if len(s.dataFetcher.SelectBest(1)) == 0 { + if len(s.dataFetcher.SelectBestShuffled(1)) == 0 { return false } if err := s.syncAtx(ctx); err != nil { + s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err)) return false } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index a881339b30..5aa14883d0 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -124,7 +124,7 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { func newSyncerWithoutPeriodicRuns(t *testing.T) *testSyncer { ts := newTestSyncer(t, never) - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return([]p2p.Peer{"non-empty"}).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return([]p2p.Peer{"non-empty"}).AnyTimes() return ts } @@ -146,7 +146,7 @@ func TestStartAndShutdown(t *testing.T) { ts.syncer.Start() ts.mForkFinder.EXPECT().Purge(false).AnyTimes() - ts.mDataFetcher.EXPECT().SelectBest(gomock.Any()).Return(nil).AnyTimes() + ts.mDataFetcher.EXPECT().SelectBestShuffled(gomock.Any()).Return(nil).AnyTimes() require.Eventually(t, func() bool { return ts.syncer.ListenToATXGossip() && ts.syncer.ListenToGossip() && ts.syncer.IsSynced(ctx)