diff --git a/fetch/fetch.go b/fetch/fetch.go index 3ba27f41cf..3286dd605b 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -437,7 +437,6 @@ func (f *Fetch) send(requests []RequestMessage) { } peer2batches := f.organizeRequests(requests) - for peer, peerBatches := range peer2batches { for _, reqs := range peerBatches { batch := &batchInfo{ @@ -457,8 +456,22 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req peer2requests := make(map[p2p.Peer][]RequestMessage) peers := f.host.GetPeers() if len(peers) == 0 { - f.logger.Info("cannot send fetch: no peers found") - // in loop() we will try again after the batchTimeout + f.logger.Info("cannot send batch: no peers found") + f.mu.Lock() + defer f.mu.Unlock() + errNoPeer := errors.New("no peers") + for _, msg := range requests { + if req, ok := f.ongoing[msg.Hash]; ok { + req.promise.err = errNoPeer + close(req.promise.completed) + delete(f.ongoing, req.hash) + } else { + f.logger.With().Error("ongoing request missing", + log.Stringer("hash", msg.Hash), + log.String("hint", string(msg.Hint)), + ) + } + } return nil } diff --git a/mesh/mesh.go b/mesh/mesh.go index 63d72dd8cd..11c1ff0610 100644 --- a/mesh/mesh.go +++ b/mesh/mesh.go @@ -294,7 +294,8 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error { msh.pendingUpdates.min = types.MinLayer(msh.pendingUpdates.min, results[0].Layer) msh.pendingUpdates.max = types.MaxLayer(msh.pendingUpdates.max, results[len(results)-1].Layer) } - if next := msh.LatestLayerInState() + 1; next < msh.pendingUpdates.min { + next := msh.LatestLayerInState() + 1 + if next < msh.pendingUpdates.min { msh.pendingUpdates.min = next pending = true } @@ -318,34 +319,55 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error { })), ) } - if missing := missingBlocks(results); len(missing) > 0 { + applicable, missing := filterMissing(results, next) + if len(missing) > 0 { select { case <-ctx.Done(): case msh.missingBlocks <- missing: } - return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing) + if len(applicable) == 0 { + return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing) + } } - if err := msh.ensureStateConsistent(ctx, results); err != nil { + + if err := msh.ensureStateConsistent(ctx, applicable); err != nil { return err } - if err := msh.applyResults(ctx, results); err != nil { + if err := msh.applyResults(ctx, applicable); err != nil { return err } - msh.pendingUpdates.min = 0 - msh.pendingUpdates.max = 0 + if len(missing) > 0 { + msh.pendingUpdates.min = msh.LatestLayerInState() + } else { + msh.pendingUpdates.min = 0 + msh.pendingUpdates.max = 0 + } return nil } -func missingBlocks(results []result.Layer) []types.BlockID { - var response []types.BlockID - for _, layer := range results { +func filterMissing(results []result.Layer, next types.LayerID) ([]result.Layer, []types.BlockID) { + var ( + missing []types.BlockID + index = -1 + ) + for i, layer := range results { for _, block := range layer.Blocks { if (block.Valid || block.Hare || block.Local) && !block.Data { - response = append(response, block.Header.ID) + missing = append(missing, block.Header.ID) + if index == -1 { + index = i + } } } } - return response + if index >= 0 { + firstMissing := results[index].Layer + if firstMissing <= next { + return nil, missing + } + return results[:index], missing + } + return results, nil } func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error { @@ -409,6 +431,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error msh.logger.With().Debug("state persisted", log.Context(ctx), + log.Stringer("layer", layer.Layer), log.Stringer("applied", target), ) if layer.Layer > msh.LatestLayerInState() { diff --git a/mesh/mesh_test.go b/mesh/mesh_test.go index 574dd23b5c..3e24387854 100644 --- a/mesh/mesh_test.go +++ b/mesh/mesh_test.go @@ -375,8 +375,8 @@ func TestProcessLayer(t *testing.T) { // outputs err string - executed []types.BlockID - applied []types.BlockID + executed map[types.LayerID]types.BlockID + applied map[types.LayerID]types.BlockID validity map[types.BlockID]bool } type testCase struct { @@ -395,8 +395,8 @@ func TestProcessLayer(t *testing.T) { rblock(idg("1"), fixture.Good()), rblock(idg("2"), fixture.Data(), fixture.Invalid())), ), - executed: []types.BlockID{idg("1")}, - applied: []types.BlockID{idg("1")}, + executed: map[types.LayerID]types.BlockID{start: idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, validity: map[types.BlockID]bool{ idg("1"): true, idg("2"): false, @@ -404,6 +404,29 @@ func TestProcessLayer(t *testing.T) { }, }, }, + { + "missing but can make progress", + []call{ + { + updates: rlayers( + rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())), + rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), + rlayer(start+2, rblock(idg("3"), fixture.Valid())), + rlayer(start+3, rblock(idg("4"), fixture.Valid(), fixture.Data())), + ), + executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + }, + { + results: rlayers( + rlayer(start+2, rblock(idg("3"), fixture.Valid(), fixture.Data())), + rlayer(start+3, rblock(idg("4"), fixture.Valid(), fixture.Data())), + ), + executed: map[types.LayerID]types.BlockID{start + 2: idg("3"), start + 3: idg("4")}, + applied: map[types.LayerID]types.BlockID{start + 2: idg("3"), start + 3: idg("4")}, + }, + }, + }, { "missing valid", []call{ @@ -417,8 +440,8 @@ func TestProcessLayer(t *testing.T) { results: rlayers( rlayer(start, rblock(idg("1"), fixture.Data(), fixture.Valid())), ), - executed: []types.BlockID{idg("1")}, - applied: []types.BlockID{idg("1")}, + executed: map[types.LayerID]types.BlockID{start: idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, validity: map[types.BlockID]bool{ idg("1"): true, }, @@ -438,8 +461,8 @@ func TestProcessLayer(t *testing.T) { results: rlayers( rlayer(start, rblock(idg("1"), fixture.Invalid())), ), - executed: []types.BlockID{{}}, - applied: []types.BlockID{{}}, + executed: map[types.LayerID]types.BlockID{start: {}}, + applied: map[types.LayerID]types.BlockID{start: {}}, }, }, }, @@ -450,15 +473,15 @@ func TestProcessLayer(t *testing.T) { updates: rlayers( rlayer(start), ), - executed: []types.BlockID{{}}, - applied: []types.BlockID{{0}}, + executed: map[types.LayerID]types.BlockID{start: {}}, + applied: map[types.LayerID]types.BlockID{start: {0}}, }, { updates: []result.Layer{ rlayer(start, rblock(idg("2"), fixture.Valid(), fixture.Data())), }, - executed: []types.BlockID{idg("2")}, - applied: []types.BlockID{idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("2")}, }, }, }, @@ -469,14 +492,14 @@ func TestProcessLayer(t *testing.T) { updates: rlayers( rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data())), ), - executed: []types.BlockID{idg("1")}, - applied: []types.BlockID{idg("1")}, + executed: map[types.LayerID]types.BlockID{start: idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, }, { updates: []result.Layer{ rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data(), fixture.Valid())), }, - applied: []types.BlockID{idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, }, }, }, @@ -487,15 +510,15 @@ func TestProcessLayer(t *testing.T) { updates: rlayers( rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data())), ), - executed: []types.BlockID{idg("1")}, - applied: []types.BlockID{idg("1")}, + executed: map[types.LayerID]types.BlockID{start: idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, }, { updates: rlayers( rlayer(start.Add(1), rblock(idg("2"), fixture.Hare(), fixture.Data())), ), - executed: []types.BlockID{idg("2")}, - applied: []types.BlockID{idg("1"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, }, }, }, @@ -506,15 +529,15 @@ func TestProcessLayer(t *testing.T) { updates: rlayers( rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{idg("1")}, - applied: []types.BlockID{idg("1")}, + executed: map[types.LayerID]types.BlockID{start: idg("1")}, + applied: map[types.LayerID]types.BlockID{start: idg("1")}, }, { updates: rlayers( rlayer(start, rblock(idg("1"), fixture.Invalid(), fixture.Hare(), fixture.Data())), ), - executed: []types.BlockID{{0}}, - applied: []types.BlockID{{0}}, + executed: map[types.LayerID]types.BlockID{start: {0}}, + applied: map[types.LayerID]types.BlockID{start: {0}}, }, }, }, @@ -526,16 +549,16 @@ func TestProcessLayer(t *testing.T) { rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())), rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{idg("1"), idg("2")}, - applied: []types.BlockID{idg("1"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, }, { updates: rlayers( rlayer(start, rblock(idg("1"), fixture.Invalid(), fixture.Data())), rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{types.EmptyBlockID, idg("2")}, - applied: []types.BlockID{types.EmptyBlockID, idg("2")}, + executed: map[types.LayerID]types.BlockID{start: types.EmptyBlockID, start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: types.EmptyBlockID, start + 1: idg("2")}, }, }, }, @@ -547,8 +570,8 @@ func TestProcessLayer(t *testing.T) { rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())), rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{idg("1"), idg("2")}, - applied: []types.BlockID{idg("1"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, }, { updates: rlayers( @@ -570,8 +593,8 @@ func TestProcessLayer(t *testing.T) { rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{idg("3"), idg("2")}, - applied: []types.BlockID{idg("3"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("3"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("3"), start + 1: idg("2")}, }, }, }, @@ -592,8 +615,8 @@ func TestProcessLayer(t *testing.T) { rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())), rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())), ), - executed: []types.BlockID{idg("1"), idg("2")}, - applied: []types.BlockID{idg("1"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, }, }, }, @@ -609,8 +632,8 @@ func TestProcessLayer(t *testing.T) { updates: rlayers( fixture.RLayer(start), ), - executed: []types.BlockID{{}}, - applied: []types.BlockID{{}}, + executed: map[types.LayerID]types.BlockID{start: {}}, + applied: map[types.LayerID]types.BlockID{start: {}}, }, }, }, @@ -631,8 +654,8 @@ func TestProcessLayer(t *testing.T) { fixture.RBlock(fixture.IDGen("2"), fixture.Valid(), fixture.Data()), ), ), - executed: []types.BlockID{idg("1"), idg("2")}, - applied: []types.BlockID{idg("1"), idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")}, }, }, }, @@ -648,15 +671,15 @@ func TestProcessLayer(t *testing.T) { rblock(fixture.IDGen("2"), fixture.Invalid(), fixture.Data()), ), ), - executed: []types.BlockID{{}, {}}, - applied: []types.BlockID{{}, {}}, + executed: map[types.LayerID]types.BlockID{start: {}, start + 1: {}}, + applied: map[types.LayerID]types.BlockID{start: {}, start + 1: {}}, }, { updates: rlayers( rlayer(start, rblock(fixture.IDGen("1"), fixture.Invalid(), fixture.Data()), )), - applied: []types.BlockID{{}, {}}, + applied: map[types.LayerID]types.BlockID{start: {}, start + 1: {}}, }, { updates: rlayers( @@ -664,8 +687,8 @@ func TestProcessLayer(t *testing.T) { rblock(fixture.IDGen("2"), fixture.Valid(), fixture.Data()), ), ), - executed: []types.BlockID{idg("2")}, - applied: []types.BlockID{{}, idg("2")}, + executed: map[types.LayerID]types.BlockID{start: idg("2")}, + applied: map[types.LayerID]types.BlockID{start: {}, start + 1: idg("2")}, }, }, }, @@ -698,10 +721,10 @@ func TestProcessLayer(t *testing.T) { } else { require.NoError(t, err) } - for i := range c.applied { - applied, err := layers.GetApplied(tm.cdb, start.Add(uint32(i))) + for lid, bid := range c.applied { + applied, err := layers.GetApplied(tm.cdb, lid) require.NoError(t, err) - require.Equal(t, c.applied[i], applied) + require.Equal(t, bid, applied) } for bid, valid := range c.validity { stored, err := blocks.IsValid(tm.cdb, bid) diff --git a/syncer/syncer.go b/syncer/syncer.go index c9317498c0..b51c2f7e53 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -166,7 +166,7 @@ func NewSyncer( } s.syncTimer = time.NewTicker(s.cfg.Interval) - s.validateTimer = time.NewTicker(s.cfg.Interval * 2) + s.validateTimer = time.NewTicker(s.cfg.Interval) if s.dataFetcher == nil { s.dataFetcher = NewDataFetch(mesh, fetcher, cdb, cache, s.logger) }