Skip to content

Commit

Permalink
Remove time-dependency (and flakiness) on the compactor test
Browse files Browse the repository at this point in the history
- introduces a most minima form of the Observable interface to modules/observable
  See: http://reactivex.io/documentation/observable.html
- make the compactor test asynchronous
  • Loading branch information
huitseeker committed Aug 4, 2021
1 parent 7f9593a commit ec51229
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 13 deletions.
34 changes: 29 additions & 5 deletions ledger/complete/wal/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"time"

"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/module/observable"
)

type Compactor struct {
checkpointer *Checkpointer
stopc chan struct{}
lm *lifecycle.LifecycleManager
sync.Mutex
observers map[observable.Observer]struct{}
interval time.Duration
checkpointDistance uint
checkpointsToKeep uint
Expand All @@ -26,13 +28,23 @@ func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpoint
return &Compactor{
checkpointer: checkpointer,
stopc: make(chan struct{}),
observers: make(map[observable.Observer]struct{}),
lm: lifecycle.NewLifecycleManager(),
interval: interval,
checkpointDistance: checkpointDistance,
checkpointsToKeep: checkpointsToKeep,
}
}

func (c *Compactor) Subscribe(observer observable.Observer) {
var void struct{}
c.observers[observer] = void
}

func (c *Compactor) Unsubscribe(observer observable.Observer) {
delete(c.observers, observer)
}

// Ready periodically fires Run function, every `interval`
func (c *Compactor) Ready() <-chan struct{} {
c.lm.OnStart(func() {
Expand All @@ -43,6 +55,9 @@ func (c *Compactor) Ready() <-chan struct{} {

func (c *Compactor) Done() <-chan struct{} {
c.lm.OnStop(func() {
for observer := range c.observers {
observer.OnComplete()
}
c.stopc <- struct{}{}
})
return c.lm.Stopped()
Expand All @@ -65,7 +80,7 @@ func (c *Compactor) Run() error {
c.Lock()
defer c.Unlock()

err := c.createCheckpoints()
newLatestCheckpoint, err := c.createCheckpoints()
if err != nil {
return fmt.Errorf("cannot create checkpoints: %w", err)
}
Expand All @@ -75,17 +90,25 @@ func (c *Compactor) Run() error {
return fmt.Errorf("cannot cleanup checkpoints: %w", err)
}

if newLatestCheckpoint > 0 {
for observer := range c.observers {
observer.OnNext(newLatestCheckpoint)
}
}

return nil
}

func (c *Compactor) createCheckpoints() error {
func (c *Compactor) createCheckpoints() (int, error) {
from, to, err := c.checkpointer.NotCheckpointedSegments()
if err != nil {
return fmt.Errorf("cannot get latest checkpoint: %w", err)
return -1, fmt.Errorf("cannot get latest checkpoint: %w", err)
}

fmt.Printf("creating a checkpoint from segment %d to segment %d\n", from, to)

// we only return a positive value if the latest checkpoint index has changed
newLatestCheckpoint := -1
// more then one segment means we can checkpoint safely up to `to`-1
// presumably last segment is being written to
if to-from > int(c.checkpointDistance) {
Expand All @@ -96,10 +119,11 @@ func (c *Compactor) createCheckpoints() error {
return c.checkpointer.CheckpointWriter(checkpointNumber)
})
if err != nil {
return fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err)
return -1, fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err)
}
newLatestCheckpoint = checkpointNumber
}
return nil
return newLatestCheckpoint, nil
}

func (c *Compactor) cleanupCheckpoints() error {
Expand Down
44 changes: 36 additions & 8 deletions ledger/complete/wal/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,27 @@ import (
"github.com/onflow/flow-go/utils/unittest"
)

// Compactor observer that waits until it gets notified of a
// latest checkpoint larger than fromBound
type CompactorObserver struct {
fromBound int
done chan struct{}
}

func (co *CompactorObserver) OnNext(val interface{}) {
res, ok := val.(int)
if ok {
new := res
if new >= co.fromBound {
co.done <- struct{}{}
}
}
}
func (co *CompactorObserver) OnError(err error) {}
func (co *CompactorObserver) OnComplete() {
close(co.done)
}

func Test_Compactor(t *testing.T) {

numInsPerStep := 2
Expand Down Expand Up @@ -53,6 +74,8 @@ func Test_Compactor(t *testing.T) {
require.NoError(t, err)

compactor := NewCompactor(checkpointer, 100*time.Millisecond, checkpointDistance, 1) //keep only latest checkpoint
co := CompactorObserver{fromBound: 9, done: make(chan struct{})}
compactor.Subscribe(&co)

// Run Compactor in background.
<-compactor.Ready()
Expand Down Expand Up @@ -86,13 +109,18 @@ func Test_Compactor(t *testing.T) {
savedData[rootHash] = data
}

assert.Eventually(t, func() bool {
from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)
// wait for the bound-checking observer to confirm checkpoints have been made
select {
case <-co.done:
// continue
case <-time.After(20 * time.Second):
assert.FailNow(t, "timed out")
}

from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)

return from == 10 && to == 10 //make sure there is
// this is disk-based operation after all, so give it big timeout
}, 20*time.Second, 100*time.Millisecond)
assert.True(t, from == 10 && to == 10, "from: %v, to: %v", from, to) //make sure there is no leftover

require.NoFileExists(t, path.Join(dir, "checkpoint.00000000"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
Expand Down Expand Up @@ -242,11 +270,11 @@ func Test_Compactor_checkpointInterval(t *testing.T) {
require.FileExists(t, path.Join(dir, NumberToFilenamePart(i)))

// run checkpoint creation after every file
err = compactor.createCheckpoints()
_, err = compactor.createCheckpoints()
require.NoError(t, err)
}

// assert precisely creation of checkpoint files
// assert creation of checkpoint files precisely
require.NoFileExists(t, path.Join(dir, bootstrap.FilenameWALRootCheckpoint))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000002"))
Expand Down

0 comments on commit ec51229

Please sign in to comment.