Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix activeset weight calc performance in the proposal handler #5923

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ through v1.5.x first ([#5907](https://github.com/spacemeshos/go-spacemesh/pull/5

* [#5911](https://github.com/spacemeshos/go-spacemesh/pull/5911) Avoid pulling poet proof multiple times in 1:N setups

* [#5923](https://github.com/spacemeshos/go-spacemesh/pull/5923) Fix high memory consumption and performance issues
in the proposal handler

## (v1.5.0)

### Upgrade information
Expand Down
163 changes: 111 additions & 52 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"context"
"errors"
"fmt"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -48,16 +49,18 @@
logger log.Log
cfg Config

db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
clock layerClock
db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
weightCalcLock sync.Mutex
pendingWeightCalc map[types.Hash32][]chan uint64
clock layerClock

proposals proposalsConsumer
}
Expand Down Expand Up @@ -123,18 +126,19 @@
panic(err)
}
b := &Handler{
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
clock: clock,
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
pendingWeightCalc: make(map[types.Hash32][]chan uint64),
clock: clock,
}
for _, opt := range opts {
opt(b)
Expand Down Expand Up @@ -519,6 +523,88 @@
return decoded, nil
}

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
h.weightCalcLock.Lock()
totalWeight, exists := h.activeSets.Get(id)
if exists {
h.weightCalcLock.Unlock()
return totalWeight, nil
}

var ch chan uint64
chs, exists := h.pendingWeightCalc[id]
if exists {
// The calculation is running or the activeset is being fetched,
// subscribe.
// Avoid any blocking on the channel by making it buffered, also so that
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
}
h.weightCalcLock.Unlock()

if exists {
// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case totalWeight, ok := <-ch:
if !ok {
// Channel closed, fetch / calculation failed.
// The actual error will be logged by the initiator of the
// initial fetch / calculation, let's not make an
// impression it happened multiple times and use a simpler
// message
return totalWeight, errors.New("error getting activeset weight")
}
return totalWeight, nil

Check warning on line 563 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L563

Added line #L563 was not covered by tests
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
}

success := false
defer func() {
h.weightCalcLock.Lock()
// this is guaranteed not to block b/c each channel is buffered
for _, ch := range h.pendingWeightCalc[id] {
if success {
ch <- totalWeight

Check warning on line 573 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L573

Added line #L573 was not covered by tests
poszu marked this conversation as resolved.
Show resolved Hide resolved
}
close(ch)
}
delete(h.pendingWeightCalc, id)
h.weightCalcLock.Unlock()
}()

if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)

Check warning on line 589 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L589

Added line #L589 was not covered by tests
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set",
set.Set[i].ShortString(),
)

Check warning on line 598 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L595-L598

Added lines #L595 - L598 were not covered by tests
}
}
totalWeight = computed
h.activeSets.Add(id, totalWeight)
success = true // totalWeight will be sent to the subscribers

return totalWeight, nil
}

func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot) (uint64, error) {
//nolint:nestif
if b.RefBallot == types.EmptyBallotID {
Expand All @@ -534,36 +620,9 @@
epoch-- // download activesets in the previous epoch too
}
if b.Layer.GetEpoch() >= epoch {
var exists bool
totalWeight, exists := h.activeSets.Get(b.EpochData.ActiveSetHash)
if !exists {
if err := h.fetcher.GetActiveSet(ctx, b.EpochData.ActiveSetHash); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, b.EpochData.ActiveSetHash)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf(
"%w: empty active set ballot %s",
pubsub.ErrValidationReject,
b.ID().String(),
)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set ballot %s",
set.Set[i].ShortString(),
b.ID().String(),
)
}
}
totalWeight = computed
h.activeSets.Add(b.EpochData.ActiveSetHash, totalWeight)
totalWeight, err := h.getActiveSetWeight(ctx, b.EpochData.ActiveSetHash)
if err != nil {
return 0, fmt.Errorf("ballot %s: %w", b.ID().String(), err)
}
return totalWeight, nil
}
Expand Down
Loading
Loading