Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance: cache the ID for Header entities #1279

Merged
merged 22 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func Test_AsyncUploader(t *testing.T) {
require.Equal(t, 3, callCount)
})

time.Sleep(1 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this added by accident?


jwinkler2083233 marked this conversation as resolved.
Show resolved Hide resolved
t.Run("stopping component stops retrying", func(t *testing.T) {

callCount := 0
Expand Down
6 changes: 5 additions & 1 deletion ledger/complete/wal/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func Test_Compactor(t *testing.T) {
select {
case <-co.done:
// continue
case <-time.After(20 * time.Second):
case <-time.After(60 * time.Second):
jwinkler2083233 marked this conversation as resolved.
Show resolved Hide resolved
assert.FailNow(t, "timed out")
}

Expand All @@ -138,6 +138,8 @@ func Test_Compactor(t *testing.T) {
require.NoError(t, err)
})

time.Sleep(2 * time.Second)

t.Run("remove unnecessary files", func(t *testing.T) {
// Remove all files apart from target checkpoint and WAL segments ahead of it
// We know their names, so just hardcode them
Expand All @@ -159,6 +161,8 @@ func Test_Compactor(t *testing.T) {
f2, err := mtrie.NewForest(size*10, metricsCollector, func(tree *trie.MTrie) error { return nil })
require.NoError(t, err)

time.Sleep(2 * time.Second)

t.Run("load data from checkpoint and WAL", func(t *testing.T) {
wal2, err := NewDiskWAL(zerolog.Nop(), nil, metrics.NewNoopCollector(), dir, size*10, pathByteSize, 32*1024)
require.NoError(t, err)
Expand Down
51 changes: 50 additions & 1 deletion model/flow/header.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package flow

import (
"bytes"
"encoding/json"
"sync"
"time"

"github.com/fxamacker/cbor/v2"
Expand Down Expand Up @@ -72,6 +74,10 @@ func (h Header) Fingerprint() []byte {
return fingerprint.Fingerprint(h.Body())
}

var mutexHeader sync.Mutex
var previdHeader Identifier
var prevHeader Header

// ID returns a unique ID to singularly identify the header and its block
// within the flow system.
func (h Header) ID() Identifier {
Expand All @@ -80,7 +86,50 @@ func (h Header) ID() Identifier {
if h.Timestamp.Location() != time.UTC {
h.Timestamp = h.Timestamp.UTC()
}
return MakeID(h)

mutexHeader.Lock()

// unlock at the return
defer mutexHeader.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this change. Wouldn't this lock block two different threads computing IDs of two headers?

If engine A is calling header1.ID(), and engine B is calling header2.ID(), before this change, they can run concurrently, but with this change, one engine has to be blocked and wait until the other finishes its call. no?


// compare these elements individually
if prevHeader.ParentVoterIDs != nil &&
prevHeader.ParentVoterSigData != nil &&
prevHeader.ProposerSigData != nil &&
len(h.ParentVoterIDs) == len(prevHeader.ParentVoterIDs) &&
len(h.ParentVoterSigData) == len(prevHeader.ParentVoterSigData) &&
len(h.ProposerSigData) == len(prevHeader.ProposerSigData) {
bNotEqual := false

for i, v := range h.ParentVoterIDs {
if v == prevHeader.ParentVoterIDs[i] {
continue
}
bNotEqual = true
break
}
if !bNotEqual &&
h.ChainID == prevHeader.ChainID &&
h.Timestamp == prevHeader.Timestamp &&
h.Height == prevHeader.Height &&
h.ParentID == prevHeader.ParentID &&
h.View == prevHeader.View &&
h.PayloadHash == prevHeader.PayloadHash &&
bytes.Equal(h.ProposerSigData, prevHeader.ProposerSigData) &&
bytes.Equal(h.ParentVoterSigData, prevHeader.ParentVoterSigData) &&
h.ProposerID == prevHeader.ProposerID {

// cache hit, return the previous identifier
return previdHeader
}
}
jwinkler2083233 marked this conversation as resolved.
Show resolved Hide resolved

previdHeader = MakeID(h)

// store a reference to the Header entity data
prevHeader = h

return previdHeader
}

// Checksum returns the checksum of the header.
Expand Down