Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix activeset weight calc performance in the proposal handler #5923

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ through v1.5.x first ([#5907](https://github.com/spacemeshos/go-spacemesh/pull/5

* [#5911](https://github.com/spacemeshos/go-spacemesh/pull/5911) Avoid pulling poet proof multiple times in 1:N setups

* [#5923](https://github.com/spacemeshos/go-spacemesh/pull/5923) Fix high memory consumption and performance issues
in the proposal handler

## (v1.5.0)

### Upgrade information
Expand Down
162 changes: 110 additions & 52 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -48,16 +49,18 @@ type Handler struct {
logger log.Log
cfg Config

db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
clock layerClock
db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
weightCalcLock sync.Mutex
pendingWeightCalc map[types.Hash32][]chan uint64
clock layerClock

proposals proposalsConsumer
}
Expand Down Expand Up @@ -123,18 +126,19 @@ func NewHandler(
panic(err)
}
b := &Handler{
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
clock: clock,
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
pendingWeightCalc: make(map[types.Hash32][]chan uint64),
clock: clock,
}
for _, opt := range opts {
opt(b)
Expand Down Expand Up @@ -519,6 +523,87 @@ func (h *Handler) checkBallotSyntacticValidity(
return decoded, nil
}

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
h.weightCalcLock.Lock()
totalWeight, exists := h.activeSets.Get(id)
if exists {
h.weightCalcLock.Unlock()
return totalWeight, nil
}

var ch chan uint64
chs, exists := h.pendingWeightCalc[id]
if exists {
// The calculation is running or the activeset is being fetched,
// subscribe.
// Avoid any blocking on the channel by making it buffered, also so that
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
h.weightCalcLock.Unlock()

// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case totalWeight, ok := <-ch:
if !ok {
// Channel closed, fetch / calculation failed.
// The actual error will be logged by the initiator of the
// initial fetch / calculation, let's not make an
// impression it happened multiple times and use a simpler
// message
return totalWeight, errors.New("error getting activeset weight")
}
return totalWeight, nil
}
}

// mark calculation as running
h.pendingWeightCalc[id] = nil
h.weightCalcLock.Unlock()

success := false
defer func() {
h.weightCalcLock.Lock()
// this is guaranteed not to block b/c each channel is buffered
for _, ch := range h.pendingWeightCalc[id] {
if success {
ch <- totalWeight
poszu marked this conversation as resolved.
Show resolved Hide resolved
}
close(ch)
}
delete(h.pendingWeightCalc, id)
h.weightCalcLock.Unlock()
}()

if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set",
set.Set[i].ShortString(),
)
}
}
totalWeight = computed
h.activeSets.Add(id, totalWeight)
success = true // totalWeight will be sent to the subscribers

return totalWeight, nil
}
Comment on lines +526 to +605
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit late to the discussion but I feel like this could have been done simpler using the golang.org/x/sync/singleflight package:

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
	result, err, _ := h.activeSetGroup.Do(id.String(), func() (any, error) {
		totalWeight, exists := h.activeSets.Get(id)
		if exists {
			return totalWeight, nil
		}

		if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
			return uint64(0), err
		}
		set, err := activesets.Get(h.db, id)
		if err != nil {
			return uint64(0), err
		}
		if len(set.Set) == 0 {
			return uint64(0), fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)
		}

		computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
		for i := range used {
			if !used[i] {
				return uint64(0), fmt.Errorf("missing atx %s in active set", set.Set[i])
			}
		}
		h.activeSets.Add(id, computed)
		return computed, nil
	})
	if err != nil {
		h.activeSetGroup.Forget(id.String())
	}
	return result.(uint64), err
}

No mutex or channels needed 🙂


func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot) (uint64, error) {
//nolint:nestif
if b.RefBallot == types.EmptyBallotID {
Expand All @@ -534,36 +619,9 @@ func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot)
epoch-- // download activesets in the previous epoch too
}
if b.Layer.GetEpoch() >= epoch {
var exists bool
totalWeight, exists := h.activeSets.Get(b.EpochData.ActiveSetHash)
if !exists {
if err := h.fetcher.GetActiveSet(ctx, b.EpochData.ActiveSetHash); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, b.EpochData.ActiveSetHash)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf(
"%w: empty active set ballot %s",
pubsub.ErrValidationReject,
b.ID().String(),
)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set ballot %s",
set.Set[i].ShortString(),
b.ID().String(),
)
}
}
totalWeight = computed
h.activeSets.Add(b.EpochData.ActiveSetHash, totalWeight)
totalWeight, err := h.getActiveSetWeight(ctx, b.EpochData.ActiveSetHash)
if err != nil {
return 0, fmt.Errorf("ballot %s: %w", b.ID().String(), err)
}
return totalWeight, nil
}
Expand Down