Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ledger] Ledger cleanups + make a test async #1073

Merged
merged 7 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions ledger/common/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,7 @@ func DecodeValue(encodedValue []byte) (ledger.Value, error) {
return nil, err
}

return decodeValue(rest)
}

func decodeValue(inp []byte) (ledger.Value, error) {
return ledger.Value(inp), nil
return rest, nil
}

// EncodePath encodes a path into a byte slice
Expand Down Expand Up @@ -406,13 +402,7 @@ func decodePayload(inp []byte) (*ledger.Payload, error) {
return nil, fmt.Errorf("error decoding payload: %w", err)
}

// decode value
value, err := decodeValue(encValue)
if err != nil {
return nil, fmt.Errorf("error decoding payload: %w", err)
}

return &ledger.Payload{Key: *key, Value: value}, nil
return &ledger.Payload{Key: *key, Value: encValue}, nil
}

// EncodeTrieUpdate encodes a trie update struct
Expand Down
6 changes: 3 additions & 3 deletions ledger/common/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Test_KeyPartEncodingDecoding(t *testing.T) {
require.Error(t, err)

// test wrong version decoding
encoded[0] = byte(uint8(1))
encoded[0] = uint8(1)
_, err = encoding.DecodeKeyPart(encoded)
require.Error(t, err)
}
Expand Down Expand Up @@ -96,12 +96,12 @@ func Test_TrieUpdateEncodingDecoding(t *testing.T) {
kp1 := ledger.NewKeyPart(uint16(1), []byte("key 1 part 1"))
kp2 := ledger.NewKeyPart(uint16(22), []byte("key 1 part 2"))
k1 := ledger.NewKey([]ledger.KeyPart{kp1, kp2})
pl1 := ledger.NewPayload(k1, ledger.Value([]byte{'A'}))
pl1 := ledger.NewPayload(k1, []byte{'A'})

p2 := utils.PathByUint16(2)
kp3 := ledger.NewKeyPart(uint16(1), []byte("key2 part 1"))
k2 := ledger.NewKey([]ledger.KeyPart{kp3})
pl2 := ledger.NewPayload(k2, ledger.Value([]byte{'B'}))
pl2 := ledger.NewPayload(k2, []byte{'B'})

tu := &ledger.TrieUpdate{
RootHash: utils.RootHashFixture(),
Expand Down
6 changes: 3 additions & 3 deletions ledger/common/hash/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestHash(t *testing.T) {
_, _ = hasher.Write(path[:])
_, _ = hasher.Write(value)
expected := hasher.Sum(nil)
assert.Equal(t, []byte(expected), []byte(h[:]))
assert.Equal(t, expected, h[:])
}
})

Expand All @@ -52,7 +52,7 @@ func TestHash(t *testing.T) {
_, _ = hasher.Write(h1[:])
_, _ = hasher.Write(h2[:])
expected := hasher.Sum(nil)
assert.Equal(t, []byte(expected), []byte(h[:]))
assert.Equal(t, expected, h[:])
}
})
}
Expand All @@ -62,7 +62,7 @@ func Test_GetDefaultHashForHeight(t *testing.T) {
hasher := cryhash.NewSHA3_256()
defaultLeafHash := hasher.ComputeHash([]byte("default:"))
expected := ledger.GetDefaultHashForHeight(0)
assert.Equal(t, []byte(expected[:]), []byte(defaultLeafHash))
assert.Equal(t, expected[:], []byte(defaultLeafHash))

l1 := hash.HashInterNode(ledger.GetDefaultHashForHeight(0), ledger.GetDefaultHashForHeight(0))
assert.Equal(t, l1, ledger.GetDefaultHashForHeight(1))
Expand Down
29 changes: 4 additions & 25 deletions ledger/common/utils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Uint64ToBinary(integer uint64) []byte {

// AppendUint8 appends the value byte to the input slice
func AppendUint8(input []byte, value uint8) []byte {
return append(input, byte(value))
return append(input, value)
}

// AppendUint16 appends the value bytes to the input slice (big endian)
Expand Down Expand Up @@ -93,7 +93,7 @@ func ReadUint8(input []byte) (value uint8, rest []byte, err error) {
if len(input) < 1 {
return 0, input, fmt.Errorf("input size (%d) is too small to read a uint8", len(input))
}
return uint8(input[0]), input[1:], nil
return input[0], input[1:], nil
}

// ReadUint16 reads a uint16 from the input and returns the rest
Expand Down Expand Up @@ -131,17 +131,6 @@ func ReadShortData(input []byte) (data []byte, rest []byte, err error) {
return
}

// ReadLongData read data shorter than 32MB and return the rest of bytes
func ReadLongData(input []byte) (data []byte, rest []byte, err error) {
var size uint32
size, rest, err = ReadUint32(input)
if err != nil {
return nil, rest, err
}
data = rest[:size]
return
}

// ReadShortDataFromReader reads data shorter than 16kB from reader
func ReadShortDataFromReader(reader io.Reader) ([]byte, error) {
buf, err := ReadFromBuffer(reader, 2)
Expand Down Expand Up @@ -247,7 +236,7 @@ func PathByUint16LeftPadded(inp uint16) l.Path {

// KeyPartFixture returns a key part fixture
func KeyPartFixture(typ uint16, val string) l.KeyPart {
kp1t := uint16(typ)
kp1t := typ
kp1v := []byte(val)
return l.NewKeyPart(kp1t, kp1v)
}
Expand Down Expand Up @@ -307,7 +296,7 @@ func TrieBatchProofFixture() (*l.TrieBatchProof, l.State) {
bp := l.NewTrieBatchProof()
bp.Proofs = append(bp.Proofs, p)
bp.Proofs = append(bp.Proofs, p)
return bp, l.State(s)
return bp, s
}

// RandomPathsRandLen generate m random paths.
Expand Down Expand Up @@ -407,13 +396,3 @@ func RandomUniqueKeys(n, m, minByteSize, maxByteSize int) []l.Key {
}
return keys
}

// RandomUniqueKeysRandomN generate n (0<n<maxN) random keys (each m random key part),
func RandomUniqueKeysRandomN(maxN, m, minByteSize, maxByteSize int) []l.Key {
numberOfKeys := rand.Intn(maxN) + 1
// at least return 1 keys
if numberOfKeys == 0 {
numberOfKeys = 1
}
return RandomUniqueKeys(numberOfKeys, m, minByteSize, maxByteSize)
}
2 changes: 1 addition & 1 deletion ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (l *Ledger) Prove(query *ledger.Query) (proof ledger.Proof, err error) {
l.metrics.ProofSize(uint32(len(proofToGo) / len(paths)))
}

return ledger.Proof(proofToGo), err
return proofToGo, err
}

// MemSize return the amount of memory used by ledger
Expand Down
4 changes: 2 additions & 2 deletions ledger/complete/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,9 @@ func TestLedgerFunctionality(t *testing.T) {
for s := range histStorage {
value := histStorage[s]
var state ledger.State
copy(state[:], []byte(s[:stateComSize]))
copy(state[:], s[:stateComSize])
enk := []byte(s[stateComSize:])
key, err := encoding.DecodeKey([]byte(enk))
key, err := encoding.DecodeKey(enk)
assert.NoError(t, err)
query, err := ledger.NewQuery(state, []ledger.Key{*key})
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion ledger/complete/mtrie/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (f *Forest) Update(u *ledger.TrieUpdate) (ledger.RootHash, error) {
return emptyHash, fmt.Errorf("adding updated trie to forest failed: %w", err)
}

return ledger.RootHash(newTrie.RootHash()), nil
return newTrie.RootHash(), nil
}

// Proofs returns a batch proof for the given paths.
Expand Down
69 changes: 41 additions & 28 deletions ledger/complete/wal/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"io"
"sync"
"time"

"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/module/observable"
)

type Compactor struct {
checkpointer *Checkpointer
done chan struct{}
stopc chan struct{}
wg sync.WaitGroup
lm *lifecycle.LifecycleManager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

sync.Mutex
observers map[observable.Observer]struct{}
interval time.Duration
checkpointDistance uint
checkpointsToKeep uint
Expand All @@ -24,48 +27,49 @@ func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpoint
}
return &Compactor{
checkpointer: checkpointer,
done: make(chan struct{}),
stopc: make(chan struct{}),
observers: make(map[observable.Observer]struct{}),
lm: lifecycle.NewLifecycleManager(),
interval: interval,
checkpointDistance: checkpointDistance,
checkpointsToKeep: checkpointsToKeep,
}
}

// Ready periodically fires Run function, every `interval`
// If called more then once, behaviour is undefined.
func (c *Compactor) Ready() <-chan struct{} {
ch := make(chan struct{})
func (c *Compactor) Subscribe(observer observable.Observer) {
var void struct{}
c.observers[observer] = void
}

c.wg.Add(1)
go c.start()
func (c *Compactor) Unsubscribe(observer observable.Observer) {
delete(c.observers, observer)
}

defer close(ch)
return ch
// Ready periodically fires Run function, every `interval`
func (c *Compactor) Ready() <-chan struct{} {
c.lm.OnStart(func() {
go c.start()
})
return c.lm.Started()
}

func (c *Compactor) Done() <-chan struct{} {
c.stopc <- struct{}{}

ch := make(chan struct{})

go func() {
c.wg.Wait()
close(ch)
}()

return ch
c.lm.OnStop(func() {
for observer := range c.observers {
observer.OnComplete()
}
c.stopc <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be interested in #1077, which will allow you to replace c.stopc as well :)

})
return c.lm.Stopped()
}

func (c *Compactor) start() {

for {
//TODO Log error
_ = c.Run()

select {
case <-c.stopc:
c.wg.Done()
return
case <-time.After(c.interval):
}
Expand All @@ -76,7 +80,7 @@ func (c *Compactor) Run() error {
c.Lock()
defer c.Unlock()

err := c.createCheckpoints()
newLatestCheckpoint, err := c.createCheckpoints()
if err != nil {
return fmt.Errorf("cannot create checkpoints: %w", err)
}
Expand All @@ -86,17 +90,25 @@ func (c *Compactor) Run() error {
return fmt.Errorf("cannot cleanup checkpoints: %w", err)
}

if newLatestCheckpoint > 0 {
for observer := range c.observers {
observer.OnNext(newLatestCheckpoint)
}
}

return nil
}

func (c *Compactor) createCheckpoints() error {
func (c *Compactor) createCheckpoints() (int, error) {
from, to, err := c.checkpointer.NotCheckpointedSegments()
if err != nil {
return fmt.Errorf("cannot get latest checkpoint: %w", err)
return -1, fmt.Errorf("cannot get latest checkpoint: %w", err)
}

fmt.Printf("creating a checkpoint from segment %d to segment %d\n", from, to)

// we only return a positive value if the latest checkpoint index has changed
newLatestCheckpoint := -1
// more then one segment means we can checkpoint safely up to `to`-1
// presumably last segment is being written to
if to-from > int(c.checkpointDistance) {
Expand All @@ -107,10 +119,11 @@ func (c *Compactor) createCheckpoints() error {
return c.checkpointer.CheckpointWriter(checkpointNumber)
})
if err != nil {
return fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err)
return -1, fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err)
}
newLatestCheckpoint = checkpointNumber
}
return nil
return newLatestCheckpoint, nil
}

func (c *Compactor) cleanupCheckpoints() error {
Expand Down
44 changes: 36 additions & 8 deletions ledger/complete/wal/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ import (
"github.com/onflow/flow-go/utils/unittest"
)

// Compactor observer that waits until it gets notified of a
// latest checkpoint larger than fromBound
type CompactorObserver struct {
fromBound int
done chan struct{}
}

func (co *CompactorObserver) OnNext(val interface{}) {
res, ok := val.(int)
if ok {
new := res
if new >= co.fromBound {
co.done <- struct{}{}
}
}
}
func (co *CompactorObserver) OnError(err error) {}
func (co *CompactorObserver) OnComplete() {
close(co.done)
}

func Test_Compactor(t *testing.T) {

numInsPerStep := 2
Expand Down Expand Up @@ -53,6 +74,8 @@ func Test_Compactor(t *testing.T) {
require.NoError(t, err)

compactor := NewCompactor(checkpointer, 100*time.Millisecond, checkpointDistance, 1) //keep only latest checkpoint
co := CompactorObserver{fromBound: 9, done: make(chan struct{})}
compactor.Subscribe(&co)

// Run Compactor in background.
<-compactor.Ready()
Expand Down Expand Up @@ -86,13 +109,18 @@ func Test_Compactor(t *testing.T) {
savedData[rootHash] = data
}

assert.Eventually(t, func() bool {
from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)
// wait for the bound-checking observer to confirm checkpoints have been made
select {
case <-co.done:
// continue
case <-time.After(20 * time.Second):
assert.FailNow(t, "timed out")
}

from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)

return from == 10 && to == 10 //make sure there is
// this is disk-based operation after all, so give it big timeout
}, 20*time.Second, 100*time.Millisecond)
assert.True(t, from == 10 && to == 10, "from: %v, to: %v", from, to) //make sure there is no leftover

require.NoFileExists(t, path.Join(dir, "checkpoint.00000000"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
Expand Down Expand Up @@ -242,11 +270,11 @@ func Test_Compactor_checkpointInterval(t *testing.T) {
require.FileExists(t, path.Join(dir, NumberToFilenamePart(i)))

// run checkpoint creation after every file
err = compactor.createCheckpoints()
_, err = compactor.createCheckpoints()
require.NoError(t, err)
}

// assert precisely creation of checkpoint files
// assert creation of checkpoint files precisely
require.NoFileExists(t, path.Join(dir, bootstrap.FilenameWALRootCheckpoint))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000002"))
Expand Down
Loading