From ec5122999b77f7f3faac1da44070635bc2e1ae7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Garillot?= Date: Mon, 2 Aug 2021 17:38:32 -0400 Subject: [PATCH] Remove time-dependency (and flakiness) on the compactor test - introduces a most minima form of the Observable interface to modules/observable See: http://reactivex.io/documentation/observable.html - make the compactor test asynchronous --- ledger/complete/wal/compactor.go | 34 ++++++++++++++++++--- ledger/complete/wal/compactor_test.go | 44 ++++++++++++++++++++++----- 2 files changed, 65 insertions(+), 13 deletions(-) diff --git a/ledger/complete/wal/compactor.go b/ledger/complete/wal/compactor.go index ad380ffc47e..3f9365fc78f 100644 --- a/ledger/complete/wal/compactor.go +++ b/ledger/complete/wal/compactor.go @@ -7,6 +7,7 @@ import ( "time" "github.com/onflow/flow-go/module/lifecycle" + "github.com/onflow/flow-go/module/observable" ) type Compactor struct { @@ -14,6 +15,7 @@ type Compactor struct { stopc chan struct{} lm *lifecycle.LifecycleManager sync.Mutex + observers map[observable.Observer]struct{} interval time.Duration checkpointDistance uint checkpointsToKeep uint @@ -26,6 +28,7 @@ func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpoint return &Compactor{ checkpointer: checkpointer, stopc: make(chan struct{}), + observers: make(map[observable.Observer]struct{}), lm: lifecycle.NewLifecycleManager(), interval: interval, checkpointDistance: checkpointDistance, @@ -33,6 +36,15 @@ func NewCompactor(checkpointer *Checkpointer, interval time.Duration, checkpoint } } +func (c *Compactor) Subscribe(observer observable.Observer) { + var void struct{} + c.observers[observer] = void +} + +func (c *Compactor) Unsubscribe(observer observable.Observer) { + delete(c.observers, observer) +} + // Ready periodically fires Run function, every `interval` func (c *Compactor) Ready() <-chan struct{} { c.lm.OnStart(func() { @@ -43,6 +55,9 @@ func (c *Compactor) Ready() <-chan struct{} { func (c *Compactor) Done() <-chan struct{} { c.lm.OnStop(func() { + for observer := range c.observers { + observer.OnComplete() + } c.stopc <- struct{}{} }) return c.lm.Stopped() @@ -65,7 +80,7 @@ func (c *Compactor) Run() error { c.Lock() defer c.Unlock() - err := c.createCheckpoints() + newLatestCheckpoint, err := c.createCheckpoints() if err != nil { return fmt.Errorf("cannot create checkpoints: %w", err) } @@ -75,17 +90,25 @@ func (c *Compactor) Run() error { return fmt.Errorf("cannot cleanup checkpoints: %w", err) } + if newLatestCheckpoint > 0 { + for observer := range c.observers { + observer.OnNext(newLatestCheckpoint) + } + } + return nil } -func (c *Compactor) createCheckpoints() error { +func (c *Compactor) createCheckpoints() (int, error) { from, to, err := c.checkpointer.NotCheckpointedSegments() if err != nil { - return fmt.Errorf("cannot get latest checkpoint: %w", err) + return -1, fmt.Errorf("cannot get latest checkpoint: %w", err) } fmt.Printf("creating a checkpoint from segment %d to segment %d\n", from, to) + // we only return a positive value if the latest checkpoint index has changed + newLatestCheckpoint := -1 // more then one segment means we can checkpoint safely up to `to`-1 // presumably last segment is being written to if to-from > int(c.checkpointDistance) { @@ -96,10 +119,11 @@ func (c *Compactor) createCheckpoints() error { return c.checkpointer.CheckpointWriter(checkpointNumber) }) if err != nil { - return fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err) + return -1, fmt.Errorf("error creating checkpoint (%d): %w", checkpointNumber, err) } + newLatestCheckpoint = checkpointNumber } - return nil + return newLatestCheckpoint, nil } func (c *Compactor) cleanupCheckpoints() error { diff --git a/ledger/complete/wal/compactor_test.go b/ledger/complete/wal/compactor_test.go index 0bc76f35a5a..2f9ec750f70 100644 --- a/ledger/complete/wal/compactor_test.go +++ b/ledger/complete/wal/compactor_test.go @@ -21,6 +21,27 @@ import ( "github.com/onflow/flow-go/utils/unittest" ) +// Compactor observer that waits until it gets notified of a +// latest checkpoint larger than fromBound +type CompactorObserver struct { + fromBound int + done chan struct{} +} + +func (co *CompactorObserver) OnNext(val interface{}) { + res, ok := val.(int) + if ok { + new := res + if new >= co.fromBound { + co.done <- struct{}{} + } + } +} +func (co *CompactorObserver) OnError(err error) {} +func (co *CompactorObserver) OnComplete() { + close(co.done) +} + func Test_Compactor(t *testing.T) { numInsPerStep := 2 @@ -53,6 +74,8 @@ func Test_Compactor(t *testing.T) { require.NoError(t, err) compactor := NewCompactor(checkpointer, 100*time.Millisecond, checkpointDistance, 1) //keep only latest checkpoint + co := CompactorObserver{fromBound: 9, done: make(chan struct{})} + compactor.Subscribe(&co) // Run Compactor in background. <-compactor.Ready() @@ -86,13 +109,18 @@ func Test_Compactor(t *testing.T) { savedData[rootHash] = data } - assert.Eventually(t, func() bool { - from, to, err := checkpointer.NotCheckpointedSegments() - require.NoError(t, err) + // wait for the bound-checking observer to confirm checkpoints have been made + select { + case <-co.done: + // continue + case <-time.After(20 * time.Second): + assert.FailNow(t, "timed out") + } + + from, to, err := checkpointer.NotCheckpointedSegments() + require.NoError(t, err) - return from == 10 && to == 10 //make sure there is - // this is disk-based operation after all, so give it big timeout - }, 20*time.Second, 100*time.Millisecond) + assert.True(t, from == 10 && to == 10, "from: %v, to: %v", from, to) //make sure there is no leftover require.NoFileExists(t, path.Join(dir, "checkpoint.00000000")) require.NoFileExists(t, path.Join(dir, "checkpoint.00000001")) @@ -242,11 +270,11 @@ func Test_Compactor_checkpointInterval(t *testing.T) { require.FileExists(t, path.Join(dir, NumberToFilenamePart(i))) // run checkpoint creation after every file - err = compactor.createCheckpoints() + _, err = compactor.createCheckpoints() require.NoError(t, err) } - // assert precisely creation of checkpoint files + // assert creation of checkpoint files precisely require.NoFileExists(t, path.Join(dir, bootstrap.FilenameWALRootCheckpoint)) require.NoFileExists(t, path.Join(dir, "checkpoint.00000001")) require.NoFileExists(t, path.Join(dir, "checkpoint.00000002"))