Skip to content

Commit

Permalink
Fix activeset weight calc performance in the proposal handler
Browse files Browse the repository at this point in the history
This fixes multiple concurrent retrievals of the same activeset from
the database that were causing high CPU and memory usage.

Fixes #5765
  • Loading branch information
ivan4th committed May 9, 2024
1 parent 06ec6d0 commit ec063da
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 79 deletions.
163 changes: 111 additions & 52 deletions proposals/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand Down Expand Up @@ -48,16 +49,18 @@ type Handler struct {
logger log.Log
cfg Config

db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
clock layerClock
db *sql.Database
atxsdata *atxsdata.Data
activeSets *lru.Cache[types.Hash32, uint64]
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
fetcher system.Fetcher
mesh meshProvider
validator eligibilityValidator
tortoise tortoiseProvider
weightCalcLock sync.Mutex
pendingWeightCalc map[types.Hash32][]chan uint64
clock layerClock

proposals proposalsConsumer
}
Expand Down Expand Up @@ -123,18 +126,19 @@ func NewHandler(
panic(err)
}
b := &Handler{
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
clock: clock,
logger: log.NewNop(),
cfg: defaultConfig(),
db: db,
atxsdata: atxsdata,
proposals: proposals,
activeSets: activeSets,
edVerifier: edVerifier,
publisher: p,
fetcher: f,
mesh: m,
tortoise: tortoise,
pendingWeightCalc: make(map[types.Hash32][]chan uint64),
clock: clock,
}
for _, opt := range opts {
opt(b)
Expand Down Expand Up @@ -519,6 +523,88 @@ func (h *Handler) checkBallotSyntacticValidity(
return decoded, nil
}

func (h *Handler) getActiveSetWeight(ctx context.Context, id types.Hash32) (uint64, error) {
h.weightCalcLock.Lock()
totalWeight, exists := h.activeSets.Get(id)
if exists {
h.weightCalcLock.Unlock()
return totalWeight, nil
}

var ch chan uint64
chs, exists := h.pendingWeightCalc[id]
if exists {
// The calculation is running or the activeset is being fetched,
// subscribe.
// Avoid any blocking on the channel by making it buffered, also so that
// we don't have to wait on it in case the context is canceled
ch = make(chan uint64, 1)
h.pendingWeightCalc[id] = append(chs, ch)
} else {
// mark calculation as running
h.pendingWeightCalc[id] = nil
}
h.weightCalcLock.Unlock()

if exists {
// need to wait for the calculation which is already running to finish
select {
case <-ctx.Done():
return 0, ctx.Err()
case totalWeight, ok := <-ch:
if !ok {
// Channel closed, fetch / calculation failed.
// The actual error will be logged by the initiator of the
// initial fetch / calculation, let's not make an
// impression it happened multiple times and use a simpler
// message
return totalWeight, errors.New("error getting activeset weight")
}
return totalWeight, nil

Check warning on line 563 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L563

Added line #L563 was not covered by tests
}
}

success := false
defer func() {
h.weightCalcLock.Lock()
// this is guaranteed not to block b/c each channel is buffered
for _, ch := range h.pendingWeightCalc[id] {
if success {
ch <- totalWeight

Check warning on line 573 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L573

Added line #L573 was not covered by tests
}
close(ch)
}
delete(h.pendingWeightCalc, id)
h.weightCalcLock.Unlock()
}()

if err := h.fetcher.GetActiveSet(ctx, id); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, id)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf("%w: empty active set", pubsub.ErrValidationReject)

Check warning on line 589 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L589

Added line #L589 was not covered by tests
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set",
set.Set[i].ShortString(),
)

Check warning on line 598 in proposals/handler.go

View check run for this annotation

Codecov / codecov/patch

proposals/handler.go#L595-L598

Added lines #L595 - L598 were not covered by tests
}
}
totalWeight = computed
h.activeSets.Add(id, totalWeight)
success = true // totalWeight will be sent to the subscribers

return totalWeight, nil
}

func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot) (uint64, error) {
//nolint:nestif
if b.RefBallot == types.EmptyBallotID {
Expand All @@ -534,36 +620,9 @@ func (h *Handler) checkBallotDataIntegrity(ctx context.Context, b *types.Ballot)
epoch-- // download activesets in the previous epoch too
}
if b.Layer.GetEpoch() >= epoch {
var exists bool
totalWeight, exists := h.activeSets.Get(b.EpochData.ActiveSetHash)
if !exists {
if err := h.fetcher.GetActiveSet(ctx, b.EpochData.ActiveSetHash); err != nil {
return 0, err
}
set, err := activesets.Get(h.db, b.EpochData.ActiveSetHash)
if err != nil {
return 0, err
}
if len(set.Set) == 0 {
return 0, fmt.Errorf(
"%w: empty active set ballot %s",
pubsub.ErrValidationReject,
b.ID().String(),
)
}

computed, used := h.atxsdata.WeightForSet(set.Epoch, set.Set)
for i := range used {
if !used[i] {
return 0, fmt.Errorf(
"missing atx %s in active set ballot %s",
set.Set[i].ShortString(),
b.ID().String(),
)
}
}
totalWeight = computed
h.activeSets.Add(b.EpochData.ActiveSetHash, totalWeight)
totalWeight, err := h.getActiveSetWeight(ctx, b.EpochData.ActiveSetHash)
if err != nil {
return 0, fmt.Errorf("ballot %s: %w", b.ID().String(), err)
}
return totalWeight, nil
}
Expand Down

0 comments on commit ec063da

Please sign in to comment.