Skip to content

Commit

Permalink
Refactor activeset handling in the proposal code
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed May 10, 2024
1 parent 8f45fad commit 23c29b4
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 144 deletions.
11 changes: 5 additions & 6 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,8 @@ func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
}
h.weightCalcLock.Unlock()
h.weightCalcLock.Unlock()

if exists {
// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
Expand All @@ -562,6 +557,10 @@ func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint
}
return totalWeight, nil
}
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()
}

success := false
Expand Down
288 changes: 150 additions & 138 deletions proposals/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,186 +1407,198 @@ func gproposal(t *testing.T, signer *signing.EdSigner, atxid types.ATXID,
return &p
}

func TestHandleSyncedProposalActiveSet(t *testing.T) {
// TBD: test concurrent fetches
// TBD: test failures
// TBD: test cancellations

type asTestHandler struct {
*testing.T
*testHandler
lid types.LayerID
set types.ATXIDList
p []*types.Proposal
pid p2p.Peer
startCh chan struct{}
contCh chan error
}

func createASTestHandler(t *testing.T) *asTestHandler {
signer, err := signing.NewEdSigner()
require.NoError(t, err)

lid := types.LayerID(20)
sets := []types.ATXIDList{
{{1}, {2}, {3}},
{{1}, {2}, {4}, {5}},
{{2}, {4}, {5}, {6}, {7}},
{{2}, {4}, {5}, {6}, {7}, {8}, {9}},
}
th := createTestHandler(t)
p := make([]*types.Proposal, 8)
for n := range p {
p[n] = gproposal(t, signer, types.ATXID{byte(n + 1)}, lid, &types.EpochData{
ActiveSetHash: sets[n/2].Hash(),
th := &asTestHandler{
T: t,
testHandler: createTestHandler(t),
lid: types.LayerID(20),
set: types.ATXIDList{{1}, {2}, {3}},
pid: p2p.Peer("any"),
startCh: make(chan struct{}),
contCh: make(chan error),
}
th.p = []*types.Proposal{
gproposal(t, signer, types.ATXID{1}, th.lid, &types.EpochData{
ActiveSetHash: th.set.Hash(),
Beacon: types.Beacon{1},
})
th.mconsumer.EXPECT().IsKnown(p[n].Layer, p[n].ID()).AnyTimes()
}),
gproposal(t, signer, types.ATXID{2}, th.lid, &types.EpochData{
ActiveSetHash: th.set.Hash(),
Beacon: types.Beacon{1},
}),
}
pid := p2p.Peer("any")

th.mclock.EXPECT().CurrentLayer().Return(lid).AnyTimes()
th.mm.EXPECT().ProcessedLayer().Return(lid - 2).AnyTimes()
th.mclock.EXPECT().CurrentLayer().Return(th.lid).AnyTimes()
th.mm.EXPECT().ProcessedLayer().Return(th.lid - 2).AnyTimes()
th.mclock.EXPECT().LayerToTime(gomock.Any()).AnyTimes()
th.mf.EXPECT().RegisterPeerHashes(pid, gomock.Any()).AnyTimes()
type asReq struct {
id types.Hash32
err error
}
startChs := make(map[types.Hash32]chan struct{})
asCh := make(chan asReq, 1) // buffered, no wait
for _, set := range sets {
th.mf.EXPECT().GetActiveSet(gomock.Any(), set.Hash()).DoAndReturn(
func(ctx context.Context, got types.Hash32) error {
startCh, found := startChs[got]
if found {
select {
case <-ctx.Done():
return ctx.Err()
case startCh <- struct{}{}:
}
}
var req asReq
select {
case <-ctx.Done():
return ctx.Err()
case req = <-asCh:
}
require.Equal(t, got, req.id)
if req.err != nil {
return req.err
}
require.NoError(t, activesets.Add(th.db, got, &types.EpochActiveSet{
Epoch: lid.GetEpoch(),
Set: set,
}))
for _, id := range set {
th.atxsdata.AddAtx(lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}})
}
return nil
},
).AnyTimes()
}
th.mf.EXPECT().RegisterPeerHashes(th.pid, gomock.Any()).AnyTimes()

th.mf.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).AnyTimes()
th.mf.EXPECT().GetBallots(gomock.Any(), gomock.Any()).AnyTimes()
th.mockSet.decodeAnyBallots()
th.mv.EXPECT().CheckEligibility(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
th.mm.EXPECT().AddBallot(gomock.Any(), gomock.Any()).AnyTimes()
th.mm.EXPECT().AddTXsFromProposal(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

t.Run("non-concurrent fetching of ActiveSets", func(t *testing.T) {
asCh <- asReq{id: sets[0].Hash()}
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[0]))
err = th.HandleSyncedProposal(context.Background(), p[0].ID().AsHash32(), pid, codec.MustEncode(p[0]))
require.NoError(t, err)
return th
}

func (th *asTestHandler) expectIsKnown(n int) {
th.mconsumer.EXPECT().IsKnown(th.p[n].Layer, th.p[n].ID())
}

func (th *asTestHandler) expectProposal(n int) {
th.expectIsKnown(n)
th.mconsumer.EXPECT().OnProposal(gomock.Eq(th.p[n]))
}

func (th *asTestHandler) blockOnGetActiveSet(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case th.startCh <- struct{}{}:
}

th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[1]))
err = th.HandleSyncedProposal(context.Background(), p[1].ID().AsHash32(), pid, codec.MustEncode(p[1]))
require.NoError(t, err)
select {
case <-ctx.Done():
return ctx.Err()
case err := <-th.contCh:
return err
}
}

func (th *asTestHandler) waitForFetchToStart() {
<-th.startCh
}

func (th *asTestHandler) continueFetching(err error) {
th.contCh <- err
}

func (th *asTestHandler) expectGetActiveSet(block bool) {
th.mf.EXPECT().GetActiveSet(gomock.Any(), th.set.Hash()).DoAndReturn(
func(ctx context.Context, got types.Hash32) error {
if block {
if err := th.blockOnGetActiveSet(ctx); err != nil {
return err
}
}
require.NoError(th, activesets.Add(th.db, got, &types.EpochActiveSet{
Epoch: th.lid.GetEpoch(),
Set: th.set,
}))
for _, id := range th.set {
th.atxsdata.AddAtx(th.lid.GetEpoch(), id, &atxsdata.ATX{Node: types.NodeID{1}})
}
return nil
},
)
}

func (th *asTestHandler) handleSyncedProposal(ctx context.Context, n int) error {
return th.HandleSyncedProposal(
ctx, th.p[n].ID().AsHash32(), th.pid, codec.MustEncode(th.p[n]))
}

func (th *asTestHandler) waitForSubscription() {
require.Eventually(th, func() bool {
th.weightCalcLock.Lock()
defer th.weightCalcLock.Unlock()
return len(th.pendingWeightCalc[th.set.Hash()]) != 0
}, 10*time.Second, 10*time.Millisecond)
}

func TestHandleSyncedProposalActiveSet(t *testing.T) {
ctx := context.Background()

t.Run("non-concurrent fetch", func(t *testing.T) {
th := createASTestHandler(t)
th.expectProposal(0)
th.expectGetActiveSet(false)
require.NoError(t, th.handleSyncedProposal(ctx, 0))

th.expectProposal(1)
// ActiveSet not fetched again here
require.NoError(t, th.handleSyncedProposal(ctx, 1))
})

t.Run("concurrent fetching of ActiveSets", func(t *testing.T) {
startCh := make(chan struct{})
startChs[sets[1].Hash()] = startCh
t.Run("concurrent fetch", func(t *testing.T) {
th := createASTestHandler(t)
th.expectProposal(0)
th.expectGetActiveSet(true)
var eg errgroup.Group
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[2]))
eg.Go(func() error {
// blocks till we send smth on asCh
return th.HandleSyncedProposal(context.Background(), p[2].ID().AsHash32(), pid, codec.MustEncode(p[2]))
})
<-startCh
// at this point, the fetcher for the activeset is started, but blocked
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[3]))
eg.Go(func() error {
// need to fetch the same ActiveSet
return th.HandleSyncedProposal(context.Background(), p[3].ID().AsHash32(), pid, codec.MustEncode(p[3]))
})

asCh <- asReq{id: sets[1].Hash()} // unblock fetching of the ActiveSet
eg.Go(func() error { return th.handleSyncedProposal(ctx, 0) })
th.waitForFetchToStart()
th.expectProposal(1)
eg.Go(func() error { return th.handleSyncedProposal(ctx, 1) })
th.waitForSubscription()
th.continueFetching(nil)
require.NoError(t, eg.Wait())
})

t.Run("ActiveSet fetch failure and refetch", func(t *testing.T) {
startCh := make(chan struct{})
startChs[sets[2].Hash()] = startCh
t.Run("fetch failure and refetch", func(t *testing.T) {
th := createASTestHandler(t)
th.expectIsKnown(0)
th.expectGetActiveSet(true)
var eg errgroup.Group
eg.Go(func() error {
require.Error(t, th.HandleSyncedProposal(context.Background(),
p[4].ID().AsHash32(), pid, codec.MustEncode(p[4])))
require.Error(t, th.handleSyncedProposal(ctx, 0))
return nil
})
<-startCh
// at this point, the fetcher for the activeset is started, but blocked
th.waitForFetchToStart()
th.expectIsKnown(1)
eg.Go(func() error {
require.Error(t, th.HandleSyncedProposal(context.Background(),
p[5].ID().AsHash32(), pid, codec.MustEncode(p[5])))
require.Error(t, th.handleSyncedProposal(ctx, 1))
return nil
})

// wait till the 2nd request has subscribed
require.Eventually(t, func() bool {
th.weightCalcLock.Lock()
defer th.weightCalcLock.Unlock()
return len(th.pendingWeightCalc[sets[2].Hash()]) != 0
}, 10*time.Second, 10*time.Millisecond)
asCh <- asReq{id: sets[2].Hash(), err: errors.New("foobar")}
th.waitForSubscription()
th.continueFetching(errors.New("fail"))
require.NoError(t, eg.Wait())

// refetch after failure
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[5]))
var eg1 errgroup.Group
eg1.Go(func() error {
return th.HandleSyncedProposal(context.Background(),
p[5].ID().AsHash32(), pid, codec.MustEncode(p[5]))
})
<-startCh
asCh <- asReq{id: sets[2].Hash()}
require.NoError(t, eg1.Wait())
// refetch
th.expectProposal(0)
th.expectGetActiveSet(false)
require.NoError(t, th.handleSyncedProposal(ctx, 0))
})

t.Run("ActiveSet fetch cancel and refetch", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
startCh := make(chan struct{})
startChs[sets[3].Hash()] = startCh
t.Run("cancel fetch and refetch", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
th := createASTestHandler(t)
th.expectIsKnown(0)
th.expectGetActiveSet(true)
var eg errgroup.Group
eg.Go(func() error {
require.Error(t, th.HandleSyncedProposal(ctx, p[6].ID().AsHash32(), pid, codec.MustEncode(p[6])))
require.ErrorIs(t, th.handleSyncedProposal(ctx, 0), context.Canceled)
return nil
})
<-startCh
// at this point, the fetcher for the activeset is started, but blocked
th.waitForFetchToStart()
th.expectIsKnown(1)
eg.Go(func() error {
require.Error(t, th.HandleSyncedProposal(ctx, p[7].ID().AsHash32(), pid, codec.MustEncode(p[7])))
require.ErrorIs(t, th.handleSyncedProposal(ctx, 1), context.Canceled)
return nil
})

// wait till the 2nd request has subscribed
require.Eventually(t, func() bool {
th.weightCalcLock.Lock()
defer th.weightCalcLock.Unlock()
return len(th.pendingWeightCalc[sets[3].Hash()]) != 0
}, 10*time.Second, 10*time.Millisecond)
th.waitForSubscription()
cancel()
require.NoError(t, eg.Wait())

// refetch after cancel
th.mconsumer.EXPECT().OnProposal(gomock.Eq(p[7]))
var eg1 errgroup.Group
eg1.Go(func() error {
return th.HandleSyncedProposal(context.Background(),
p[7].ID().AsHash32(), pid, codec.MustEncode(p[7]))
})
<-startCh
asCh <- asReq{id: sets[3].Hash()}
require.NoError(t, eg1.Wait())
// refetch
th.expectProposal(0)
th.expectGetActiveSet(false)
require.NoError(t, th.handleSyncedProposal(ctx, 0))
})
}

Expand Down

0 comments on commit 23c29b4

Please sign in to comment.