diff --git a/common/types/missing.go b/common/types/missing.go deleted file mode 100644 index 632fe5a249..0000000000 --- a/common/types/missing.go +++ /dev/null @@ -1,33 +0,0 @@ -package types - -import ( - "fmt" - - "github.com/spacemeshos/go-spacemesh/log" -) - -type ErrorMissing struct { - MissingData -} - -func (e *ErrorMissing) Error() string { - return e.MissingData.String() -} - -type MissingData struct { - Blocks []BlockID -} - -func (m *MissingData) String() string { - return fmt.Sprintf("missing: blocks %v", m.Blocks) -} - -func (m *MissingData) MarshalLogObject(encoder log.ObjectEncoder) error { - encoder.AddArray("blocks", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { - for _, block := range m.Blocks { - encoder.AppendString(block.String()) - } - return nil - })) - return nil -} diff --git a/mesh/mesh.go b/mesh/mesh.go index 28c3e00bb0..aef9971d4c 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -39,6 +39,8 @@ type Mesh struct { conState conservativeState trtl system.Tortoise + missingBlocks chan []types.BlockID + mu sync.Mutex // latestLayer is the latest layer this node had seen from blocks latestLayer atomic.Value @@ -64,6 +66,7 @@ func NewMesh(cdb *datastore.CachedDB, c layerClock, trtl system.Tortoise, exec * executor: exec, conState: state, nextProcessedLayers: make(map[types.LayerID]struct{}), + missingBlocks: make(chan []types.BlockID, 32), } msh.latestLayer.Store(types.LayerID(0)) msh.latestLayerInState.Store(types.LayerID(0)) @@ -131,6 +134,12 @@ func (msh *Mesh) LatestLayerInState() types.LayerID { return msh.latestLayerInState.Load().(types.LayerID) } +// MissingBlocks returns single consumer channel. +// Consumer by contract is responsible for downloading missing blocks. +func (msh *Mesh) MissingBlocks() <-chan []types.BlockID { + return msh.missingBlocks +} + // LatestLayer - returns the latest layer we saw from the network. func (msh *Mesh) LatestLayer() types.LayerID { return msh.latestLayer.Load().(types.LayerID) @@ -308,7 +317,11 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error { ) } if missing := missingBlocks(results); len(missing) > 0 { - return &types.ErrorMissing{MissingData: types.MissingData{Blocks: missing}} + select { + case <-ctx.Done(): + case msh.missingBlocks <- missing: + } + return fmt.Errorf("request missing blocks %v", missing) } if err := msh.ensureStateConsistent(ctx, results); err != nil { return err diff --git a/node/node.go b/node/node.go index c81135cb9e..36d4f17665 100644 --- a/node/node.go +++ b/node/node.go @@ -66,6 +66,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql/layers" dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/blockssync" "github.com/spacemeshos/go-spacemesh/system" "github.com/spacemeshos/go-spacemesh/timesync" timeCfg "github.com/spacemeshos/go-spacemesh/timesync/config" @@ -703,12 +704,16 @@ func (app *App) initServices(ctx context.Context) error { blocks.WithCertifierLogger(app.addLogger(BlockCertLogger, lg)), ) + flog := app.addLogger(Fetcher, lg) fetcher := fetch.NewFetch(app.cachedDB, msh, beaconProtocol, app.host, fetch.WithContext(ctx), fetch.WithConfig(app.Config.FETCH), - fetch.WithLogger(app.addLogger(Fetcher, lg)), + fetch.WithLogger(flog), ) fetcherWrapped.Fetcher = fetcher + app.eg.Go(func() error { + return blockssync.Sync(ctx, flog.Zap(), msh.MissingBlocks(), fetcher) + }) patrol := layerpatrol.New() syncerConf := syncer.Config{ diff --git a/proposals/handler.go b/proposals/handler.go index c3c3ecdf4f..6afa638d19 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -190,6 +190,9 @@ func collectHashes(a any) []types.Hash32 { if b.RefBallot != types.EmptyBallotID { hashes = append(hashes, b.RefBallot.AsHash32()) } + for _, header := range b.Votes.Support { + hashes = append(hashes, header.ID.AsHash32()) + } return hashes } log.Fatal("unexpected type") diff --git a/proposals/handler_test.go b/proposals/handler_test.go index 49f9299aed..2ecb97a815 100644 --- a/proposals/handler_test.go +++ b/proposals/handler_test.go @@ -1282,6 +1282,9 @@ func TestCollectHashes(t *testing.T) { b := p.Ballot expected := []types.Hash32{b.RefBallot.AsHash32()} expected = append(expected, b.Votes.Base.AsHash32()) + for _, header := range b.Votes.Support { + expected = append(expected, header.ID.AsHash32()) + } require.ElementsMatch(t, expected, collectHashes(b)) expected = append(expected, types.TransactionIDsToHashes(p.TxIDs)...) diff --git a/syncer/blockssync/blocks.go b/syncer/blockssync/blocks.go new file mode 100644 index 0000000000..dc78f05360 --- /dev/null +++ b/syncer/blockssync/blocks.go @@ -0,0 +1,58 @@ +package blockssync + +import ( + "context" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" +) + +//go:generate mockgen -package=blockssync -destination=./mocks.go -source=./blocks.go + +type blockFetcher interface { + GetBlocks(context.Context, []types.BlockID) error +} + +// Sync requests last specified blocks in background. +func Sync(ctx context.Context, logger *zap.Logger, requests <-chan []types.BlockID, fetcher blockFetcher) error { + var ( + eg errgroup.Group + lastch = make(chan map[types.BlockID]struct{}) + ) + eg.Go(func() error { + var ( + send chan map[types.BlockID]struct{} + last map[types.BlockID]struct{} + ) + for { + select { + case <-ctx.Done(): + close(lastch) + return ctx.Err() + case req := <-requests: + if last == nil { + last = map[types.BlockID]struct{}{} + send = lastch + } + for _, id := range req { + last[id] = struct{}{} + } + case send <- last: + last = nil + send = nil + } + } + }) + for batch := range lastch { + blocks := make([]types.BlockID, 0, len(batch)) + for id := range batch { + blocks = append(blocks, id) + } + if err := fetcher.GetBlocks(ctx, blocks); err != nil { + logger.Warn("failed to fetch blocks", zap.Error(err)) + } + } + return eg.Wait() +} diff --git a/syncer/blockssync/blocks_test.go b/syncer/blockssync/blocks_test.go new file mode 100644 index 0000000000..c3cf9993ef --- /dev/null +++ b/syncer/blockssync/blocks_test.go @@ -0,0 +1,84 @@ +package blockssync + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/log/logtest" +) + +func TestCanBeAggregated(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + fetch = NewMockblockFetcher(ctrl) + req = make(chan []types.BlockID, 10) + out = make(chan []types.BlockID, 10) + ctx, cancel = context.WithCancel(context.Background()) + eg errgroup.Group + ) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + eg.Go(func() error { + return Sync(ctx, logtest.New(t).Zap(), req, fetch) + }) + fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error { + out <- blocks + return nil + }).AnyTimes() + first := []types.BlockID{{1}, {2}, {3}} + second := []types.BlockID{{2}, {3}, {4}} + req <- first + req <- second + rst1 := timedRead(t, out) + require.Subset(t, rst1, first) + if len(rst1) == len(first) { + require.Subset(t, timedRead(t, out), second) + } +} + +func TestErrorDoesntExit(t *testing.T) { + var ( + ctrl = gomock.NewController(t) + fetch = NewMockblockFetcher(ctrl) + req = make(chan []types.BlockID, 10) + out = make(chan []types.BlockID, 10) + ctx, cancel = context.WithCancel(context.Background()) + eg errgroup.Group + ) + t.Cleanup(func() { + cancel() + eg.Wait() + }) + eg.Go(func() error { + return Sync(ctx, logtest.New(t).Zap(), req, fetch) + }) + fetch.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, blocks []types.BlockID) error { + out <- blocks + return errors.New("test") + }).AnyTimes() + first := []types.BlockID{{1}, {2}, {3}} + req <- first + require.Subset(t, timedRead(t, out), first) + req <- first + require.Subset(t, timedRead(t, out), first) +} + +func timedRead(tb testing.TB, blocks chan []types.BlockID) []types.BlockID { + delay := time.Second + select { + case v := <-blocks: + return v + case <-time.After(delay): + require.FailNow(tb, "timed out after", delay) + } + return nil +} diff --git a/syncer/blockssync/mocks.go b/syncer/blockssync/mocks.go new file mode 100644 index 0000000000..9f0187f59f --- /dev/null +++ b/syncer/blockssync/mocks.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./blocks.go + +// Package blockssync is a generated GoMock package. +package blockssync + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + types "github.com/spacemeshos/go-spacemesh/common/types" +) + +// MockblockFetcher is a mock of blockFetcher interface. +type MockblockFetcher struct { + ctrl *gomock.Controller + recorder *MockblockFetcherMockRecorder +} + +// MockblockFetcherMockRecorder is the mock recorder for MockblockFetcher. +type MockblockFetcherMockRecorder struct { + mock *MockblockFetcher +} + +// NewMockblockFetcher creates a new mock instance. +func NewMockblockFetcher(ctrl *gomock.Controller) *MockblockFetcher { + mock := &MockblockFetcher{ctrl: ctrl} + mock.recorder = &MockblockFetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockblockFetcher) EXPECT() *MockblockFetcherMockRecorder { + return m.recorder +} + +// GetBlocks mocks base method. +func (m *MockblockFetcher) GetBlocks(arg0 context.Context, arg1 []types.BlockID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlocks", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetBlocks indicates an expected call of GetBlocks. +func (mr *MockblockFetcherMockRecorder) GetBlocks(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlocks", reflect.TypeOf((*MockblockFetcher)(nil).GetBlocks), arg0, arg1) +} diff --git a/syncer/metrics.go b/syncer/metrics.go index aeec569486..3b5e9e5bff 100644 --- a/syncer/metrics.go +++ b/syncer/metrics.go @@ -71,13 +71,6 @@ var ( []string{}, ).WithLabelValues() - blockRequested = metrics.NewCounter( - "block_requested", - namespace, - "number of missing block requested", - []string{}, - ).WithLabelValues() - syncedLayer = metrics.NewGauge( "layer", namespace, diff --git a/syncer/state_syncer.go b/syncer/state_syncer.go index 91405eef77..1b5c5c409c 100644 --- a/syncer/state_syncer.go +++ b/syncer/state_syncer.go @@ -102,7 +102,7 @@ func (s *Syncer) processLayers(ctx context.Context) error { } // even if it fails to fetch opinions, we still go ahead to ProcessLayer so that the tortoise // has a chance to count ballots and form its own opinions - if err := s.processWithRetry(ctx, lid); err != nil { + if err := s.mesh.ProcessLayer(ctx, lid); err != nil { s.logger.WithContext(ctx).With().Warning("mesh failed to process layer from sync", lid, log.Err(err)) } } @@ -116,28 +116,6 @@ func (s *Syncer) processLayers(ctx context.Context) error { return nil } -func (s *Syncer) processWithRetry(ctx context.Context, lid types.LayerID) error { - for { - origerr := s.mesh.ProcessLayer(ctx, lid) - if origerr == nil { - return nil - } - var missing *types.ErrorMissing - if !errors.As(origerr, &missing) { - return origerr - } - s.logger.With().Debug("requesting missing blocks", - log.Context(ctx), - log.Inline(missing), - ) - err := s.dataFetcher.GetBlocks(ctx, missing.Blocks) - if err != nil { - return fmt.Errorf("%w: %s", origerr, err) - } - blockRequested.Add(float64(len(missing.Blocks))) - } -} - func (s *Syncer) needCert(ctx context.Context, lid types.LayerID) (bool, error) { cutoff := s.certCutoffLayer() if !lid.After(cutoff) { diff --git a/syncer/state_syncer_test.go b/syncer/state_syncer_test.go index dce288e01a..c65999266b 100644 --- a/syncer/state_syncer_test.go +++ b/syncer/state_syncer_test.go @@ -429,33 +429,3 @@ func TestProcessLayers_NoHashResolutionForNewlySyncedNode(t *testing.T) { ts.mForkFinder.EXPECT().Purge(true) require.NoError(t, ts.syncer.processLayers(context.Background())) } - -func TestProcessLayers_SucceedOnRetry(t *testing.T) { - ts := newSyncerWithoutSyncTimer(t) - ts.syncer.setATXSynced() - current := types.GetEffectiveGenesis().Add(1) - ts.mTicker.advanceToLayer(current) - ts.syncer.setLastSyncedLayer(current) - - ts.mLyrPatrol.EXPECT().IsHareInCharge(gomock.Any()).Return(false).AnyTimes() - ts.mDataFetcher.EXPECT().PollLayerOpinions(gomock.Any(), gomock.Any()).AnyTimes() - ts.mTortoise.EXPECT().TallyVotes(gomock.Any(), gomock.Any()).AnyTimes() - ts.mVm.EXPECT().Apply(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - ts.mVm.EXPECT().GetStateRoot().AnyTimes() - ts.mConState.EXPECT().UpdateCache(gomock.Any(), gomock.Any(), gomock.Any(), nil, nil).AnyTimes() - - missing := fixture.RLayers(fixture.RLayer(current, - fixture.RBlock(types.BlockID{1}, fixture.Hare()), - fixture.RBlock(types.BlockID{2}, fixture.Valid()), - )) - ts.mTortoise.EXPECT().Updates().Return(missing) - ts.mTortoise.EXPECT().Updates().Return(nil) - ts.mTortoise.EXPECT().Results(gomock.Any(), gomock.Any()).Return(missing, nil) - ts.mDataFetcher.EXPECT().GetBlocks(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, got []types.BlockID) error { - missing[0].Blocks[0].Data = true - missing[0].Blocks[1].Data = true - return nil - }) - require.NoError(t, ts.syncer.processLayers(context.Background())) -}