Skip to content

Commit

Permalink
Remove time-dependency (and flakiness) on the compactor test
Browse files Browse the repository at this point in the history
- introduces a most minima form of the Observable interface to modules/observable
  See: http://reactivex.io/documentation/observable.html
- make the compactor test asynchronous
  • Loading branch information
huitseeker committed Aug 2, 2021
1 parent 7f9593a commit 738f918
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 7 deletions.
22 changes: 22 additions & 0 deletions ledger/complete/wal/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"time"

"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/module/observable"
)

type Compactor struct {
checkpointer *Checkpointer
stopc chan struct{}
lm *lifecycle.LifecycleManager
sync.Mutex
observers map[observable.Observer]struct{}
interval time.Duration
checkpointDistance uint
checkpointsToKeep uint
Expand All @@ -26,13 +28,25 @@ func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpoint
return &Compactor{
checkpointer: checkpointer,
stopc: make(chan struct{}),
observers: make(map[observable.Observer]struct{}),
lm: lifecycle.NewLifecycleManager(),
interval: interval,
checkpointDistance: checkpointDistance,
checkpointsToKeep: checkpointsToKeep,
}
}

func (c *Compactor) Subscribe(observer observable.Observer) {
if _, exists := c.observers[observer]; !exists {
var void struct{}
c.observers[observer] = void
}
}

func (c *Compactor) Unsubscribe(observer observable.Observer) {
delete(c.observers, observer)
}

// Ready periodically fires Run function, every `interval`
func (c *Compactor) Ready() <-chan struct{} {
c.lm.OnStart(func() {
Expand All @@ -43,6 +57,9 @@ func (c *Compactor) Ready() <-chan struct{} {

func (c *Compactor) Done() <-chan struct{} {
c.lm.OnStop(func() {
for observer := range c.observers {
observer.OnComplete()
}
c.stopc <- struct{}{}
})
return c.lm.Stopped()
Expand Down Expand Up @@ -75,6 +92,11 @@ func (c *Compactor) Run() error {
return fmt.Errorf("cannot cleanup checkpoints: %w", err)
}

latest, _ := c.checkpointer.LatestCheckpoint()
for observer := range c.observers {
observer.OnNext(latest)
}

return nil
}

Expand Down
40 changes: 33 additions & 7 deletions ledger/complete/wal/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,28 @@ import (
"github.com/onflow/flow-go/utils/unittest"
)

// Compactor observer that waits until it gets notified of a
// latest checkpoint larger than fromBound
type CompactorObserver struct {
fromBound int
done chan struct{}
}

func (co *CompactorObserver) OnNext(val interface{}) {
res, ok := val.(int)
if ok {
new := res
fmt.Printf("checkpointed until %v\n", new)
if new >= co.fromBound {
co.done <- struct{}{}
}
}
}
func (co *CompactorObserver) OnError(err error) {}
func (co *CompactorObserver) OnComplete() {
close(co.done)
}

func Test_Compactor(t *testing.T) {

numInsPerStep := 2
Expand Down Expand Up @@ -53,6 +75,8 @@ func Test_Compactor(t *testing.T) {
require.NoError(t, err)

compactor := NewCompactor(checkpointer, 100*time.Millisecond, checkpointDistance, 1) //keep only latest checkpoint
co := CompactorObserver{fromBound: 9, done: make(chan struct{})}
compactor.Subscribe(&co)

// Run Compactor in background.
<-compactor.Ready()
Expand Down Expand Up @@ -86,13 +110,15 @@ func Test_Compactor(t *testing.T) {
savedData[rootHash] = data
}

assert.Eventually(t, func() bool {
from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)
// wait for the bound-checking observer to confirm checkpoints have been made
select {
case <-co.done:
}

from, to, err := checkpointer.NotCheckpointedSegments()
require.NoError(t, err)

return from == 10 && to == 10 //make sure there is
// this is disk-based operation after all, so give it big timeout
}, 20*time.Second, 100*time.Millisecond)
assert.True(t, from == 10 && to == 10, "from: %v, to: %v", from, to) //make sure there is no leftover

require.NoFileExists(t, path.Join(dir, "checkpoint.00000000"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
Expand Down Expand Up @@ -246,7 +272,7 @@ func Test_Compactor_checkpointInterval(t *testing.T) {
require.NoError(t, err)
}

// assert precisely creation of checkpoint files
// assert creation of checkpoint files precisely
require.NoFileExists(t, path.Join(dir, bootstrap.FilenameWALRootCheckpoint))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000001"))
require.NoFileExists(t, path.Join(dir, "checkpoint.00000002"))
Expand Down

0 comments on commit 738f918

Please sign in to comment.