From 7668357782b4f070030c6113733d7e77be1f4003 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Sun, 13 Aug 2023 10:59:33 +0000 Subject: [PATCH] sync, blocks: request blocks when tortoise crossed local threshold (#4826) closes: https://github.com/spacemeshos/go-spacemesh/issues/4824 - first improvement is to register peers that supported blocks, otherwise we will be guessing from whom to request the block - and when mesh is called by sync or block builder it may send request to fetch blocks from registered peers. previously it would do so only in sync code path, which is less robust and might be delayed by fork finder --- common/types/missing.go | 33 ------------- mesh/mesh.go | 15 +++++- node/node.go | 7 ++- proposals/handler.go | 3 ++ proposals/handler_test.go | 3 ++ syncer/blockssync/blocks.go | 58 ++++++++++++++++++++++ syncer/blockssync/blocks_test.go | 84 ++++++++++++++++++++++++++++++++ syncer/blockssync/mocks.go | 50 +++++++++++++++++++ syncer/metrics.go | 7 --- syncer/state_syncer.go | 24 +-------- syncer/state_syncer_test.go | 30 ------------ 11 files changed, 219 insertions(+), 95 deletions(-) delete mode 100644 common/types/missing.go create mode 100644 syncer/blockssync/blocks.go create mode 100644 syncer/blockssync/blocks_test.go create mode 100644 syncer/blockssync/mocks.go diff --git a/common/types/missing.go b/common/types/missing.go deleted file mode 100644 index 632fe5a249..0000000000 --- a/common/types/missing.go +++ /dev/null @@ -1,33 +0,0 @@ -package types - -import ( - "fmt" - - "github.com/spacemeshos/go-spacemesh/log" -) - -type ErrorMissing struct { - MissingData -} - -func (e *ErrorMissing) Error() string { - return e.MissingData.String() -} - -type MissingData struct { - Blocks []BlockID -} - -func (m *MissingData) String() string { - return fmt.Sprintf("missing: blocks %v", m.Blocks) -} - -func (m *MissingData) MarshalLogObject(encoder log.ObjectEncoder) error { - encoder.AddArray("blocks", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { - for _, block := range m.Blocks { - encoder.AppendString(block.String()) - } - return nil - })) - return nil -} diff --git a/mesh/mesh.go b/mesh/mesh.go index 28c3e00bb0..aef9971d4c 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -39,6 +39,8 @@ type Mesh struct { conState conservativeState trtl system.Tortoise + missingBlocks chan []types.BlockID + mu sync.Mutex // latestLayer is the latest layer this node had seen from blocks latestLayer atomic.Value @@ -64,6 +66,7 @@ func NewMesh(cdb *datastore.CachedDB, c layerClock, trtl system.Tortoise, exec * executor: exec, conState: state, nextProcessedLayers: make(map[types.LayerID]struct{}), + missingBlocks: make(chan []types.BlockID, 32), } msh.latestLayer.Store(types.LayerID(0)) msh.latestLayerInState.Store(types.LayerID(0)) @@ -131,6 +134,12 @@ func (msh *Mesh) LatestLayerInState() types.LayerID { return msh.latestLayerInState.Load().(types.LayerID) } +// MissingBlocks returns single consumer channel. +// Consumer by contract is responsible for downloading missing blocks. +func (msh *Mesh) MissingBlocks() <-chan []types.BlockID { + return msh.missingBlocks +} + // LatestLayer - returns the latest layer we saw from the network. func (msh *Mesh) LatestLayer() types.LayerID { return msh.latestLayer.Load().(types.LayerID) @@ -308,7 +317,11 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error { ) } if missing := missingBlocks(results); len(missing) > 0 { - return &types.ErrorMissing{MissingData: types.MissingData{Blocks: missing}} + select { + case <-ctx.Done(): + case msh.missingBlocks <- missing: + } + return fmt.Errorf("request missing blocks %v", missing) } if err := msh.ensureStateConsistent(ctx, results); err != nil { return err diff --git a/node/node.go b/node/node.go index c81135cb9e..36d4f17665 100644 --- a/node/node.go +++ b/node/node.go @@ -66,6 +66,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" "github.com/spacemeshos/go-spacemesh/timesync" timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config" @@ -703,12 +704,16 @@ func (app *App) initServices(ctx context.Context) error { blocks.WithCertifierLogger(app.addLogger(BlockCertLogger, lg)), ) + flog := app.addLogger(Fetcher, lg) fetcher := fetch.NewFetch(app.cachedDB, msh, beaconProtocol, app.host, fetch.WithContext(ctx), fetch.WithConfig(app.Config.FETCH), - fetch.WithLogger(app.addLogger(Fetcher, lg)), + fetch.WithLogger(flog), ) fetcherWrapped.Fetcher = fetcher + app.eg.Go(func() error { + return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher) + }) patrol := layerpatrol.New() syncerConf := syncer.Config{ diff --git a/proposals/handler.go b/proposals/handler.go index c3c3ecdf4f..6afa638d19 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -190,6 +190,9 @@ func collectHashes(a any) []types.Hash32 { if b.RefBallot != types.EmptyBallotID { hashes = append(hashes, b.RefBallot.AsHash32()) } + for _, header := range b.Votes.Support { + hashes = append(hashes, header.ID.AsHash32()) + } return hashes } log.Fatal("unexpected type") diff --git a/proposals/handler_test.go b/proposals/handler_test.go index 49f9299aed..2ecb97a815 100644 --- a/proposals/handler_test.go +++ b/proposals/handler_test.go @@ -1282,6 +1282,9 @@ func TestCollectHashes(t *testing.T) { b := p.Ballot expected := []types.Hash32{b.RefBallot.AsHash32()} expected = append(expected, b.Votes.Base.AsHash32()) + for _, header := range b.Votes.Support { + expected = append(expected, header.ID.AsHash32()) + } require.ElementsMatch(t, expected, collectHashes(b)) expected = append(expected, types.TransactionIDsToHashes(p.TxIDs)...) diff --git a/syncer/blockssync/blocks.go b/syncer/blockssync/blocks.go new file mode 100644 index 0000000000..dc78f05360 --- /dev/null +++ b/syncer/blockssync/blocks.go @@ -0,0 +1,58 @@ +package blockssync + +import ( + "context" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" +) + +//go:generate mockgen -package=blockssync -destination=./mocks.go -source=./blocks.go + +type blockFetcher interface { + GetBlocks(context.Context, []types.BlockID) error +} + +// Sync requests last specified blocks in background. +func Sync(ctx context.Context, logger *zap.Logger, requests <-chan []types.BlockID, fetcher blockFetcher) error { + var ( + eg errgroup.Group + lastch = make(chan map[types.BlockID]struct{}) + ) + eg.Go(func() error { + var ( + send chan map[types.BlockID]struct{} + last map[types.BlockID]struct{} + ) + for { + select { + case <-ctx.Done(): + close(lastch) + return ctx.Err() + case req := <-requests: + if last == nil { + last = map[types.BlockID]struct{}{} + send = lastch + } + for _, id := range req { + last[id] = struct{}{} + } + case send <- last: + last = nil + send = nil + } + } + }) + for batch := range lastch { + blocks := make([]types.BlockID, 0, len(batch)) + for id := range batch { + blocks = append(blocks, id) + } + if err := fetcher.GetBlocks(ctx, blocks); err != nil { + logger.Warn("failed to fetch blocks", zap.Error(err)) + } + } + return eg.Wait() +} diff --git a/syncer/blockssync/blocks_test.go b/syncer/blockssync/blocks_test.go new file mode 100644 index 0000000000..c3cf9993ef --- /dev/null +++ b/syncer/blockssync/blocks_test.go @@ -0,0 +1,84 @@ +package blockssync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" +) + +func TestCanBeAggregated(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + fetch = NewMockblockFetcher(ctrl) + req = make(chan []types.BlockID, 10) + out = make(chan []types.BlockID, 10) + ctx, cancel = context.WithCancel(context.Background()) + eg errgroup.Group + ) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + eg.Go(func() error { + return Sync(ctx, logtest.New(t).Zap(), req, fetch) + }) + fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error { + out <- blocks + return nil + }).AnyTimes() + first := []types.BlockID{{1}, {2}, {3}} + second := []types.BlockID{{2}, {3}, {4}} + req <- first + req <- second + rst1 := timedRead(t, out) + require.Subset(t, rst1, first) + if len(rst1) == len(first) { + require.Subset(t, timedRead(t, out), second) + } +} + +func TestErrorDoesntExit(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + fetch = NewMockblockFetcher(ctrl) + req = make(chan []types.BlockID, 10) + out = make(chan []types.BlockID, 10) + ctx, cancel = context.WithCancel(context.Background()) + eg errgroup.Group + ) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + eg.Go(func() error { + return Sync(ctx, logtest.New(t).Zap(), req, fetch) + }) + fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error { + out <- blocks + return errors.New("test") + }).AnyTimes() + first := []types.BlockID{{1}, {2}, {3}} + req <- first + require.Subset(t, timedRead(t, out), first) + req <- first + require.Subset(t, timedRead(t, out), first) +} + +func timedRead(tb testing.TB, blocks chan []types.BlockID) []types.BlockID { + delay := time.Second + select { + case v := <-blocks: + return v + case <-time.After(delay): + require.FailNow(tb, "timed out after", delay) + } + return nil +} diff --git a/syncer/blockssync/mocks.go b/syncer/blockssync/mocks.go new file mode 100644 index 0000000000..9f0187f59f --- /dev/null +++ b/syncer/blockssync/mocks.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./blocks.go + +// Package blockssync is a generated GoMock package. +package blockssync + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + types "github.com/spacemeshos/go-spacemesh/common/types" +) + +// MockblockFetcher is a mock of blockFetcher interface. +type MockblockFetcher struct { + ctrl *gomock.Controller + recorder *MockblockFetcherMockRecorder +} + +// MockblockFetcherMockRecorder is the mock recorder for MockblockFetcher. +type MockblockFetcherMockRecorder struct { + mock *MockblockFetcher +} + +// NewMockblockFetcher creates a new mock instance. +func NewMockblockFetcher(ctrl *gomock.Controller) *MockblockFetcher { + mock := &MockblockFetcher{ctrl: ctrl} + mock.recorder = &MockblockFetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockblockFetcher) EXPECT() *MockblockFetcherMockRecorder { + return m.recorder +} + +// GetBlocks mocks base method. +func (m *MockblockFetcher) GetBlocks(arg0 context.Context, arg1 []types.BlockID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlocks", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetBlocks indicates an expected call of GetBlocks. +func (mr *MockblockFetcherMockRecorder) GetBlocks(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlocks", reflect.TypeOf((*MockblockFetcher)(nil).GetBlocks), arg0, arg1) +} diff --git a/syncer/metrics.go b/syncer/metrics.go index aeec569486..3b5e9e5bff 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -71,13 +71,6 @@ var ( []string{}, ).WithLabelValues() - blockRequested = metrics.NewCounter( - "block_requested", - namespace, - "number of missing block requested", - []string{}, - ).WithLabelValues() - syncedLayer = metrics.NewGauge( "layer", namespace, diff --git a/syncer/state_syncer.go b/syncer/state_syncer.go index 91405eef77..1b5c5c409c 100644 --- a/syncer/state_syncer.go +++ b/syncer/state_syncer.go @@ -102,7 +102,7 @@ func (s *Syncer) processLayers(ctx context.Context) error { } // even if it fails to fetch opinions, we still go ahead to ProcessLayer so that the tortoise // has a chance to count ballots and form its own opinions - if err := s.processWithRetry(ctx, lid); err != nil { + if err := s.mesh.ProcessLayer(ctx, lid); err != nil { s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err)) } } @@ -116,28 +116,6 @@ func (s *Syncer) processLayers(ctx context.Context) error { return nil } -func (s *Syncer) processWithRetry(ctx context.Context, lid types.LayerID) error { - for { - origerr := s.mesh.ProcessLayer(ctx, lid) - if origerr == nil { - return nil - } - var missing *types.ErrorMissing - if !errors.As(origerr, &missing) { - return origerr - } - s.logger.With().Debug("requesting missing blocks", - log.Context(ctx), - log.Inline(missing), - ) - err := s.dataFetcher.GetBlocks(ctx, missing.Blocks) - if err != nil { - return fmt.Errorf("%w: %s", origerr, err) - } - blockRequested.Add(float64(len(missing.Blocks))) - } -} - func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) { cutoff := s.certCutoffLayer() if !lid.After(cutoff) { diff --git a/syncer/state_syncer_test.go b/syncer/state_syncer_test.go index dce288e01a..c65999266b 100644 --- a/syncer/state_syncer_test.go +++ b/syncer/state_syncer_test.go @@ -429,33 +429,3 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) { ts.mForkFinder.EXPECT().Purge(true) require.NoError(t, ts.syncer.processLayers(context.Background())) } - -func TestProcessLayers_SucceedOnRetry(t *testing.T) { - ts := newSyncerWithoutSyncTimer(t) - ts.syncer.setATXSynced() - current := types.GetEffectiveGenesis().Add(1) - ts.mTicker.advanceToLayer(current) - ts.syncer.setLastSyncedLayer(current) - - ts.mLyrPatrol.EXPECT().IsHareInCharge(gomock.Any()).Return(false).AnyTimes() - ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), gomock.Any()).AnyTimes() - ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), gomock.Any()).AnyTimes() - ts.mVm.EXPECT().Apply(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - ts.mVm.EXPECT().GetStateRoot().AnyTimes() - ts.mConState.EXPECT().UpdateCache(gomock.Any(), gomock.Any(), gomock.Any(), nil, nil).AnyTimes() - - missing := fixture.RLayers(fixture.RLayer(current, - fixture.RBlock(types.BlockID{1}, fixture.Hare()), - fixture.RBlock(types.BlockID{2}, fixture.Valid()), - )) - ts.mTortoise.EXPECT().Updates().Return(missing) - ts.mTortoise.EXPECT().Updates().Return(nil) - ts.mTortoise.EXPECT().Results(gomock.Any(), gomock.Any()).Return(missing, nil) - ts.mDataFetcher.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, got []types.BlockID) error { - missing[0].Blocks[0].Data = true - missing[0].Blocks[1].Data = true - return nil - }) - require.NoError(t, ts.syncer.processLayers(context.Background())) -}