From 23c29b4df5060da4fec28b0e85ba1f3a7679df9b Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Fri, 10 May 2024 14:01:54 +0400 Subject: [PATCH] Refactor activeset handling in the proposal code --- proposals/handler.go | 11 +- proposals/handler_test.go | 288 ++++++++++++++++++++------------------ 2 files changed, 155 insertions(+), 144 deletions(-) diff --git a/proposals/handler.go b/proposals/handler.go index ab50a1de43..9839ca8b1b 100644 --- a/proposals/handler.go +++ b/proposals/handler.go @@ -540,13 +540,8 @@ func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint // we don't have to wait on it in case the context is canceled ch = make(chan uint64, 1) h.pendingWeightCalc[id] = append(chs, ch) - } else { - // mark calculation as running - h.pendingWeightCalc[id] = nil - } - h.weightCalcLock.Unlock() + h.weightCalcLock.Unlock() - if exists { // need to wait for the calculation which is already running to finish select { case <-ctx.Done(): @@ -562,6 +557,10 @@ func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint } return totalWeight, nil } + } else { + // mark calculation as running + h.pendingWeightCalc[id] = nil + h.weightCalcLock.Unlock() } success := false diff --git a/proposals/handler_test.go b/proposals/handler_test.go index 57c0eb0f79..2fbff099f3 100644 --- a/proposals/handler_test.go +++ b/proposals/handler_test.go @@ -1407,74 +1407,46 @@ func gproposal(t *testing.T, signer *signing.EdSigner, atxid types.ATXID, return &p } -func TestHandleSyncedProposalActiveSet(t *testing.T) { - // TBD: test concurrent fetches - // TBD: test failures - // TBD: test cancellations - +type asTestHandler struct { + *testing.T + *testHandler + lid types.LayerID + set types.ATXIDList + p []*types.Proposal + pid p2p.Peer + startCh chan struct{} + contCh chan error +} + +func createASTestHandler(t *testing.T) *asTestHandler { signer, err := signing.NewEdSigner() require.NoError(t, err) - lid := types.LayerID(20) - sets := []types.ATXIDList{ - {{1}, {2}, {3}}, - {{1}, {2}, {4}, {5}}, - {{2}, {4}, {5}, {6}, {7}}, - {{2}, {4}, {5}, {6}, {7}, {8}, {9}}, - } - th := createTestHandler(t) - p := make([]*types.Proposal, 8) - for n := range p { - p[n] = gproposal(t, signer, types.ATXID{byte(n + 1)}, lid, &types.EpochData{ - ActiveSetHash: sets[n/2].Hash(), + th := &asTestHandler{ + T: t, + testHandler: createTestHandler(t), + lid: types.LayerID(20), + set: types.ATXIDList{{1}, {2}, {3}}, + pid: p2p.Peer("any"), + startCh: make(chan struct{}), + contCh: make(chan error), + } + th.p = []*types.Proposal{ + gproposal(t, signer, types.ATXID{1}, th.lid, &types.EpochData{ + ActiveSetHash: th.set.Hash(), Beacon: types.Beacon{1}, - }) - th.mconsumer.EXPECT().IsKnown(p[n].Layer, p[n].ID()).AnyTimes() + }), + gproposal(t, signer, types.ATXID{2}, th.lid, &types.EpochData{ + ActiveSetHash: th.set.Hash(), + Beacon: types.Beacon{1}, + }), } - pid := p2p.Peer("any") - th.mclock.EXPECT().CurrentLayer().Return(lid).AnyTimes() - th.mm.EXPECT().ProcessedLayer().Return(lid - 2).AnyTimes() + th.mclock.EXPECT().CurrentLayer().Return(th.lid).AnyTimes() + th.mm.EXPECT().ProcessedLayer().Return(th.lid - 2).AnyTimes() th.mclock.EXPECT().LayerToTime(gomock.Any()).AnyTimes() - th.mf.EXPECT().RegisterPeerHashes(pid, gomock.Any()).AnyTimes() - type asReq struct { - id types.Hash32 - err error - } - startChs := make(map[types.Hash32]chan struct{}) - asCh := make(chan asReq, 1) // buffered, no wait - for _, set := range sets { - th.mf.EXPECT().GetActiveSet(gomock.Any(), set.Hash()).DoAndReturn( - func(ctx context.Context, got types.Hash32) error { - startCh, found := startChs[got] - if found { - select { - case <-ctx.Done(): - return ctx.Err() - case startCh <- struct{}{}: - } - } - var req asReq - select { - case <-ctx.Done(): - return ctx.Err() - case req = <-asCh: - } - require.Equal(t, got, req.id) - if req.err != nil { - return req.err - } - require.NoError(t, activesets.Add(th.db, got, &types.EpochActiveSet{ - Epoch: lid.GetEpoch(), - Set: set, - })) - for _, id := range set { - th.atxsdata.AddAtx(lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}}) - } - return nil - }, - ).AnyTimes() - } + th.mf.EXPECT().RegisterPeerHashes(th.pid, gomock.Any()).AnyTimes() + th.mf.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).AnyTimes() th.mf.EXPECT().GetBallots(gomock.Any(), gomock.Any()).AnyTimes() th.mockSet.decodeAnyBallots() @@ -1482,111 +1454,151 @@ func TestHandleSyncedProposalActiveSet(t *testing.T) { th.mm.EXPECT().AddBallot(gomock.Any(), gomock.Any()).AnyTimes() th.mm.EXPECT().AddTXsFromProposal(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - t.Run("non-concurrent fetching of ActiveSets", func(t *testing.T) { - asCh <- asReq{id: sets[0].Hash()} - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0])) - err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0])) - require.NoError(t, err) + return th +} + +func (th *asTestHandler) expectIsKnown(n int) { + th.mconsumer.EXPECT().IsKnown(th.p[n].Layer, th.p[n].ID()) +} + +func (th *asTestHandler) expectProposal(n int) { + th.expectIsKnown(n) + th.mconsumer.EXPECT().OnProposal(gomock.Eq(th.p[n])) +} + +func (th *asTestHandler) blockOnGetActiveSet(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case th.startCh <- struct{}{}: + } - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1])) - err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1])) - require.NoError(t, err) + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-th.contCh: + return err + } +} + +func (th *asTestHandler) waitForFetchToStart() { + <-th.startCh +} + +func (th *asTestHandler) continueFetching(err error) { + th.contCh <- err +} + +func (th *asTestHandler) expectGetActiveSet(block bool) { + th.mf.EXPECT().GetActiveSet(gomock.Any(), th.set.Hash()).DoAndReturn( + func(ctx context.Context, got types.Hash32) error { + if block { + if err := th.blockOnGetActiveSet(ctx); err != nil { + return err + } + } + require.NoError(th, activesets.Add(th.db, got, &types.EpochActiveSet{ + Epoch: th.lid.GetEpoch(), + Set: th.set, + })) + for _, id := range th.set { + th.atxsdata.AddAtx(th.lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}}) + } + return nil + }, + ) +} + +func (th *asTestHandler) handleSyncedProposal(ctx context.Context, n int) error { + return th.HandleSyncedProposal( + ctx, th.p[n].ID().AsHash32(), th.pid, codec.MustEncode(th.p[n])) +} + +func (th *asTestHandler) waitForSubscription() { + require.Eventually(th, func() bool { + th.weightCalcLock.Lock() + defer th.weightCalcLock.Unlock() + return len(th.pendingWeightCalc[th.set.Hash()]) != 0 + }, 10*time.Second, 10*time.Millisecond) +} + +func TestHandleSyncedProposalActiveSet(t *testing.T) { + ctx := context.Background() + + t.Run("non-concurrent fetch", func(t *testing.T) { + th := createASTestHandler(t) + th.expectProposal(0) + th.expectGetActiveSet(false) + require.NoError(t, th.handleSyncedProposal(ctx, 0)) + + th.expectProposal(1) + // ActiveSet not fetched again here + require.NoError(t, th.handleSyncedProposal(ctx, 1)) }) - t.Run("concurrent fetching of ActiveSets", func(t *testing.T) { - startCh := make(chan struct{}) - startChs[sets[1].Hash()] = startCh + t.Run("concurrent fetch", func(t *testing.T) { + th := createASTestHandler(t) + th.expectProposal(0) + th.expectGetActiveSet(true) var eg errgroup.Group - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[2])) - eg.Go(func() error { - // blocks till we send smth on asCh - return th.HandleSyncedProposal(context.Background(), p[2].ID().AsHash32(), pid, codec.MustEncode(p[2])) - }) - <-startCh - // at this point, the fetcher for the activeset is started, but blocked - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[3])) - eg.Go(func() error { - // need to fetch the same ActiveSet - return th.HandleSyncedProposal(context.Background(), p[3].ID().AsHash32(), pid, codec.MustEncode(p[3])) - }) - - asCh <- asReq{id: sets[1].Hash()} // unblock fetching of the ActiveSet + eg.Go(func() error { return th.handleSyncedProposal(ctx, 0) }) + th.waitForFetchToStart() + th.expectProposal(1) + eg.Go(func() error { return th.handleSyncedProposal(ctx, 1) }) + th.waitForSubscription() + th.continueFetching(nil) require.NoError(t, eg.Wait()) }) - t.Run("ActiveSet fetch failure and refetch", func(t *testing.T) { - startCh := make(chan struct{}) - startChs[sets[2].Hash()] = startCh + t.Run("fetch failure and refetch", func(t *testing.T) { + th := createASTestHandler(t) + th.expectIsKnown(0) + th.expectGetActiveSet(true) var eg errgroup.Group eg.Go(func() error { - require.Error(t, th.HandleSyncedProposal(context.Background(), - p[4].ID().AsHash32(), pid, codec.MustEncode(p[4]))) + require.Error(t, th.handleSyncedProposal(ctx, 0)) return nil }) - <-startCh - // at this point, the fetcher for the activeset is started, but blocked + th.waitForFetchToStart() + th.expectIsKnown(1) eg.Go(func() error { - require.Error(t, th.HandleSyncedProposal(context.Background(), - p[5].ID().AsHash32(), pid, codec.MustEncode(p[5]))) + require.Error(t, th.handleSyncedProposal(ctx, 1)) return nil }) - - // wait till the 2nd request has subscribed - require.Eventually(t, func() bool { - th.weightCalcLock.Lock() - defer th.weightCalcLock.Unlock() - return len(th.pendingWeightCalc[sets[2].Hash()]) != 0 - }, 10*time.Second, 10*time.Millisecond) - asCh <- asReq{id: sets[2].Hash(), err: errors.New("foobar")} + th.waitForSubscription() + th.continueFetching(errors.New("fail")) require.NoError(t, eg.Wait()) - // refetch after failure - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[5])) - var eg1 errgroup.Group - eg1.Go(func() error { - return th.HandleSyncedProposal(context.Background(), - p[5].ID().AsHash32(), pid, codec.MustEncode(p[5])) - }) - <-startCh - asCh <- asReq{id: sets[2].Hash()} - require.NoError(t, eg1.Wait()) + // refetch + th.expectProposal(0) + th.expectGetActiveSet(false) + require.NoError(t, th.handleSyncedProposal(ctx, 0)) }) - t.Run("ActiveSet fetch cancel and refetch", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - startCh := make(chan struct{}) - startChs[sets[3].Hash()] = startCh + t.Run("cancel fetch and refetch", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + th := createASTestHandler(t) + th.expectIsKnown(0) + th.expectGetActiveSet(true) var eg errgroup.Group eg.Go(func() error { - require.Error(t, th.HandleSyncedProposal(ctx, p[6].ID().AsHash32(), pid, codec.MustEncode(p[6]))) + require.ErrorIs(t, th.handleSyncedProposal(ctx, 0), context.Canceled) return nil }) - <-startCh - // at this point, the fetcher for the activeset is started, but blocked + th.waitForFetchToStart() + th.expectIsKnown(1) eg.Go(func() error { - require.Error(t, th.HandleSyncedProposal(ctx, p[7].ID().AsHash32(), pid, codec.MustEncode(p[7]))) + require.ErrorIs(t, th.handleSyncedProposal(ctx, 1), context.Canceled) return nil }) - - // wait till the 2nd request has subscribed - require.Eventually(t, func() bool { - th.weightCalcLock.Lock() - defer th.weightCalcLock.Unlock() - return len(th.pendingWeightCalc[sets[3].Hash()]) != 0 - }, 10*time.Second, 10*time.Millisecond) + th.waitForSubscription() cancel() require.NoError(t, eg.Wait()) - // refetch after cancel - th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[7])) - var eg1 errgroup.Group - eg1.Go(func() error { - return th.HandleSyncedProposal(context.Background(), - p[7].ID().AsHash32(), pid, codec.MustEncode(p[7])) - }) - <-startCh - asCh <- asReq{id: sets[3].Hash()} - require.NoError(t, eg1.Wait()) + // refetch + th.expectProposal(0) + th.expectGetActiveSet(false) + require.NoError(t, th.handleSyncedProposal(ctx, 0)) }) }