Skip to content

Commit

Permalink
scrutinizer: add envelope height cache counter
Browse files Browse the repository at this point in the history
Performing db.Count(...) is very expensive since internally badgerhold
will iterate over all matching keys. To fix that two new caches are introduced:

- LRU cache of 1024 elements for storing processId=>height
- uint64 counter for storing the total height of envelopes

Both cache counters are incremented on each Commit().

Signed-off-by: p4u <pau@dabax.net>
  • Loading branch information
p4u committed Apr 19, 2021
1 parent 506efb5 commit 7737f02
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 11 deletions.
34 changes: 29 additions & 5 deletions vochain/scrutinizer/scrutinizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package scrutinizer

import (
"bytes"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru"
"github.com/timshannon/badgerhold/v3"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
Expand All @@ -21,6 +24,8 @@ const (
// MaxEnvelopeListSize is the maximum number of envelopes a process can store.
// 8.3M seems enough for now
MaxEnvelopeListSize = 32 << 18

countEnvelopeCacheSize = 1024
)

// EventListener is an interface used for executing custom functions during the
Expand Down Expand Up @@ -54,6 +59,10 @@ type Scrutinizer struct {
// eventListeners is the list of external callbacks that will be executed by the scrutinizer
eventListeners []EventListener
db *badgerhold.Store
// envelopeHeightCache and countTotalEnvelopes are in memory counters that helps reducing the
// access time when GenEnvelopeHeight() is called.
envelopeHeightCache *lru.Cache
countTotalEnvelopes *uint64

// addVoteLock is used to avoid Transaction Conflicts on the KV database.
// It is not critical and the code should be able to recover from a Conflict, but we
Expand Down Expand Up @@ -83,6 +92,16 @@ func NewScrutinizer(dbPath string, app *vochain.BaseApplication) (*Scrutinizer,
return nil, err
}
s.App.State.AddEventListener(s)
if s.envelopeHeightCache, err = lru.New(countEnvelopeCacheSize); err != nil {
return nil, err
}
c, err := s.db.Count(&VoteReference{}, &badgerhold.Query{})
if err != nil {
return nil, fmt.Errorf("could not count the total envelopes: %w", err)
}
log.Infof("indexer have %d envelopes stored", c)
s.countTotalEnvelopes = new(uint64)
*s.countTotalEnvelopes = uint64(c)
return s, nil
}

Expand Down Expand Up @@ -185,10 +204,7 @@ func (s *Scrutinizer) Commit(height uint32) error {
log.Errorf("commit: cannot create new empty process: %v", err)
continue
}
if live, err := s.isOpenProcess(p.ProcessID); err != nil {
log.Errorf("cannot check if process is live results: %v", err)
} else if live && !s.App.IsSynchronizing() {
// Only add live processes if the vochain is not synchronizing
if !s.App.IsSynchronizing() {
s.addProcessToLiveResults(p.ProcessID)
}
}
Expand Down Expand Up @@ -236,6 +252,8 @@ func (s *Scrutinizer) Commit(height uint32) error {
s.indexTxLock.Unlock()
log.Infof("indexed %d new envelopes, took %s",
len(s.voteIndexPool), time.Since(startTime))
// Add the envelopes to the countTotalEnvelopes var
atomic.AddUint64(s.countTotalEnvelopes, uint64(len(s.voteIndexPool)))
}
txn.Discard()

Expand All @@ -244,6 +262,12 @@ func (s *Scrutinizer) Commit(height uint32) error {
startTime = time.Now()

for pid, votes := range s.votePool {
// Update the cache of envelopes height by process ID (if exist)
if vcount, ok := s.envelopeHeightCache.Get(pid); ok {
c := vcount.(uint64) + uint64(len(votes))
s.envelopeHeightCache.Add(pid, c)
}
// Get the process information
proc, err := s.ProcessInfo([]byte(pid))
if err != nil {
log.Warnf("cannot get process %x", []byte(pid))
Expand All @@ -267,7 +291,7 @@ func (s *Scrutinizer) Commit(height uint32) error {
nvotes++
}
}
p := []byte(pid)
p := []byte(pid) // make a copy
go func() {
// The commit is run async because it might be blocking (due the Mutex locks)
// and since this is the more costly operation, avoids affecting the consensus time.
Expand Down
22 changes: 20 additions & 2 deletions vochain/scrutinizer/scrutinizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,31 @@ func TestCountVotes(t *testing.T) {
app.SetTestingMethods()
pid := util.RandomBytes(32)

err = app.State.AddProcess(&models.Process{
ProcessId: pid,
EnvelopeType: &models.EnvelopeType{EncryptedVotes: false},
Status: models.ProcessStatus_READY,
Mode: &models.ProcessMode{AutoStart: true},
BlockCount: 10,
VoteOptions: &models.ProcessVoteOptions{
MaxCount: 5,
MaxValue: 1,
MaxTotalCost: 3,
CostExponent: 1,
},
})
qt.Assert(t, err, qt.IsNil)
err = sc.newEmptyProcess(pid)
qt.Assert(t, err, qt.IsNil)

// Add 100 votes
vp, err := json.Marshal(types.VotePackage{
Nonce: fmt.Sprintf("%x", util.RandomHex(32)),
Votes: []int{1, 1, 1},
})
qt.Assert(t, err, qt.IsNil)
sc.Rollback()
sc.addProcessToLiveResults(pid)
for i := 0; i < 100; i++ {
v := &models.Vote{ProcessId: pid, VotePackage: vp, Nullifier: util.RandomBytes(32)}
// Add votes to votePool with i as txIndex
Expand All @@ -651,11 +669,11 @@ func TestCountVotes(t *testing.T) {
// Test envelope height for this PID
height, err := sc.GetEnvelopeHeight(pid)
qt.Assert(t, err, qt.IsNil)
qt.Assert(t, height, qt.CmpEquals(), 101)
qt.Assert(t, height, qt.CmpEquals(), uint64(101))
// Test global envelope height
height, err = sc.GetEnvelopeHeight([]byte{})
qt.Assert(t, err, qt.IsNil)
qt.Assert(t, height, qt.CmpEquals(), 101)
qt.Assert(t, height, qt.CmpEquals(), uint64(101))

ref, err := sc.GetEnvelopeReference(nullifier)
qt.Assert(t, err, qt.IsNil)
Expand Down
17 changes: 14 additions & 3 deletions vochain/scrutinizer/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,25 @@ func (s *Scrutinizer) GetEnvelopes(processId []byte) ([]*models.VoteEnvelope, []

// GetEnvelopeHeight returns the number of envelopes for a processId.
// If processId is empty, returns the total number of envelopes.
func (s *Scrutinizer) GetEnvelopeHeight(processId []byte) (int, error) {
func (s *Scrutinizer) GetEnvelopeHeight(processId []byte) (uint64, error) {
// TODO: Warning, int can overflow
if len(processId) > 0 {
return s.db.Count(&VoteReference{},
cc, ok := s.envelopeHeightCache.Get(string(processId))
if ok {
return cc.(uint64), nil
}
c, err := s.db.Count(&VoteReference{},
badgerhold.Where("ProcessID").Eq(processId).Index("ProcessID"))
if err != nil {
return 0, err
}
c64 := uint64(c)
s.envelopeHeightCache.Add(string(processId), c64)
return c64, nil
}

// If no processId is provided, count all envelopes
return s.db.Count(&VoteReference{}, &badgerhold.Query{})
return atomic.LoadUint64(s.countTotalEnvelopes), nil
}

// ComputeResult process a finished voting, compute the results and saves it in the Storage.
Expand Down
1 change: 0 additions & 1 deletion vochain/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ func (v *State) AddVote(vote *models.Vote) error {
if err != nil {
return fmt.Errorf("cannot marshal vote")
}
// TODO: newVoteBytes = hash(newVoteBytes)
v.Lock()
err = v.Store.Tree(VoteTree).Add(vid, ethereum.HashRaw(newVoteBytes))
v.Unlock()
Expand Down

0 comments on commit 7737f02

Please sign in to comment.