Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common cycle manager #3025

Merged
merged 8 commits into from May 22, 2023
6 changes: 3 additions & 3 deletions adapters/repos/db/backup_integration_test.go
Expand Up @@ -199,10 +199,10 @@ func TestBackup_BucketLevel(t *testing.T) {
objBucket := shard.store.Bucket("objects")
require.NotNil(t, objBucket)

err := objBucket.PauseCompaction(ctx)
err := shard.store.PauseCompaction(ctx)
require.Nil(t, err)

err = objBucket.FlushMemtable(ctx)
err = objBucket.FlushMemtable()
require.Nil(t, err)

files, err := objBucket.ListFiles(ctx)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestBackup_BucketLevel(t *testing.T) {
assert.Contains(t, exts, ".bloom") // matches both bloom filters (primary+secondary)
})

err = objBucket.ResumeCompaction(ctx)
err = shard.store.ResumeCompaction(ctx)
require.Nil(t, err)
})

Expand Down
7 changes: 1 addition & 6 deletions adapters/repos/db/inverted_reindexer.go
Expand Up @@ -144,7 +144,7 @@ func (r *ShardInvertedReindexer) doTask(ctx context.Context, task ShardInvertedR
}
tempBucketName := helpers.TempBucketFromBucketName(bucketsToReindex[i])
tempBucket := r.shard.store.Bucket(tempBucketName)
tempBucket.FlushMemtable(ctx)
tempBucket.FlushMemtable()
tempBucket.UpdateStatus(storagestate.StatusReadOnly)

if reindexProperties[i].NewIndex {
Expand Down Expand Up @@ -229,11 +229,6 @@ func (r *ShardInvertedReindexer) createTempBucket(ctx context.Context, name stri
if err := r.shard.store.CreateBucket(ctx, tempName, bucketOptions...); err != nil {
return errors.Wrapf(err, "failed creating temp bucket '%s'", tempName)
}

// no point starting compaction until bucket successfully populated and plugged in
if err := r.shard.store.Bucket(tempName).PauseCompaction(ctx); err != nil {
return errors.Wrapf(err, "failed pausing compaction for temp bucket '%s'", tempName)
}
return nil
}

Expand Down
14 changes: 6 additions & 8 deletions adapters/repos/db/lsmkv/bucket.go
Expand Up @@ -60,7 +60,7 @@ type Bucket struct {
// for backward compatibility
legacyMapSortingBeforeCompaction bool

flushCycle *cyclemanager.CycleManager
unregisterFlush cyclemanager.UnregisterFunc

status storagestate.Status
statusLock sync.RWMutex
Expand All @@ -82,7 +82,8 @@ type Bucket struct {
// [Store]. In this case the [Store] can manage buckets for you, using methods
// such as CreateOrLoadBucket().
func NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger,
metrics *Metrics, opts ...BucketOption,
metrics *Metrics, compactionCycle cyclemanager.CycleManager,
flushCycle cyclemanager.CycleManager, opts ...BucketOption,
) (*Bucket, error) {
beforeAll := time.Now()
defaultMemTableThreshold := uint64(10 * 1024 * 1024)
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogg
}

sg, err := newSegmentGroup(dir, logger, b.legacyMapSortingBeforeCompaction,
metrics, b.strategy, b.monitorCount)
metrics, b.strategy, b.monitorCount, compactionCycle)
if err != nil {
return nil, errors.Wrap(err, "init disk segments")
}
Expand Down Expand Up @@ -156,10 +157,7 @@ func NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogg
return nil, err
}

b.flushCycle = cyclemanager.New(
cyclemanager.MemtableFlushCycleTicker(),
b.flushAndSwitchIfThresholdsMet)
b.flushCycle.Start()
b.unregisterFlush = flushCycle.Register(b.flushAndSwitchIfThresholdsMet)

b.metrics.TrackStartupBucket(beforeAll)

Expand Down Expand Up @@ -706,7 +704,7 @@ func (b *Bucket) Shutdown(ctx context.Context) error {
return err
}

if err := b.flushCycle.StopAndWait(ctx); err != nil {
if err := b.unregisterFlush(ctx); err != nil {
return errors.Wrap(ctx.Err(), "long-running flush in progress")
}

Expand Down
46 changes: 3 additions & 43 deletions adapters/repos/db/lsmkv/bucket_backup.go
Expand Up @@ -17,51 +17,21 @@ import (
"path/filepath"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaviate/weaviate/entities/storagestate"
"github.com/weaviate/weaviate/usecases/monitoring"
)

// PauseCompaction waits for all ongoing compactions to finish,
// then makes sure that no new compaction can be started.
//
// This is a preparatory stage for creating backups.
//
// A timeout should be specified for the input context as some
// compactions are long-running, in which case it may be better
// to fail the backup attempt and retry later, than to block
// indefinitely.
func (b *Bucket) PauseCompaction(ctx context.Context) error {
metric, err := monitoring.GetMetrics().BucketPauseDurations.GetMetricWithLabelValues(b.dir)
if err == nil {
b.pauseTimer = prometheus.NewTimer(metric)
}

if err := b.disk.compactionCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "long-running compaction in progress")
}
return nil
}

// FlushMemtable flushes any active memtable and returns only once the memtable
// has been fully flushed and a stable state on disk has been reached.
//
// This is a preparatory stage for creating backups.
//
// A timeout should be specified for the input context as some
// flushes are long-running, in which case it may be better
// to fail the backup attempt and retry later, than to block
// indefinitely.
func (b *Bucket) FlushMemtable(ctx context.Context) error {
// Method should be run only if flushCycle is not running
// (was not started, is stopped, or noop impl is provided)
func (b *Bucket) FlushMemtable() error {
if b.isReadOnly() {
return errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
}

if err := b.flushCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(ctx.Err(), "long-running memtable flush in progress")
}

defer b.flushCycle.Start()
// this lock does not currently _need_ to be
// obtained, as the only other place that
// grabs this lock is the flush cycle, which
Expand Down Expand Up @@ -122,13 +92,3 @@ func (b *Bucket) ListFiles(ctx context.Context) ([]string, error) {

return files, nil
}

// ResumeCompaction starts the compaction cycle again.
// It errors if compactions were not paused
func (b *Bucket) ResumeCompaction(ctx context.Context) error {
b.disk.compactionCycle.Start()
if b.pauseTimer != nil {
b.pauseTimer.ObserveDuration()
}
return nil
}
175 changes: 13 additions & 162 deletions adapters/repos/db/lsmkv/bucket_backup_test.go
Expand Up @@ -14,131 +14,31 @@ package lsmkv
import (
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/cyclemanager"
"github.com/weaviate/weaviate/entities/storagestate"
)

func TestBackup_PauseCompaction(t *testing.T) {
t.Run("assert that context timeout works for long compactions", func(t *testing.T) {
ctx := context.Background()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
require.Nil(t, err)

ctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()

err = b.PauseCompaction(ctx)
require.NotNil(t, err)
assert.Equal(t, "long-running compaction in progress: context deadline exceeded", err.Error())

err = b.Shutdown(context.Background())
require.Nil(t, err)
})

t.Run("assert compaction is successfully paused", func(t *testing.T) {
ctx := context.Background()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
require.Nil(t, err)

ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

t.Run("insert contents into bucket", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := b.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})

err = b.PauseCompaction(ctx)
assert.Nil(t, err)

err = b.Shutdown(context.Background())
require.Nil(t, err)
})
}

func TestBackup_FlushMemtable(t *testing.T) {
t.Run("assert that context timeout works for long flushes", func(t *testing.T) {
ctx := context.Background()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
require.Nil(t, err)

ctx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()

err = b.FlushMemtable(ctx)
require.NotNil(t, err)
assert.Equal(t, "long-running memtable flush in progress: context deadline exceeded", err.Error())

err = b.Shutdown(context.Background())
require.Nil(t, err)
})

t.Run("assert that flushes run successfully", func(t *testing.T) {
ctx := context.Background()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
require.Nil(t, err)

t.Run("insert contents into bucket", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := b.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})

ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

err = b.FlushMemtable(ctx)
assert.Nil(t, err)

err = b.Shutdown(context.Background())
require.Nil(t, err)
})

func TestBucketBackup_FlushMemtable(t *testing.T) {
t.Run("assert that readonly bucket fails to flush", func(t *testing.T) {
ctx := context.Background()
dirName := t.TempDir()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
b, err := NewBucket(ctx, dirName, dirName, logrus.New(), nil,
cyclemanager.NewNoop(), cyclemanager.NewNoop(),
WithStrategy(StrategyReplace))
require.Nil(t, err)

b.UpdateStatus(storagestate.StatusReadOnly)

ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

err = b.FlushMemtable(ctx)
err = b.FlushMemtable()
require.NotNil(t, err)

expectedErr := errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
assert.EqualError(t, expectedErr, err.Error())

Expand All @@ -147,21 +47,21 @@ func TestBackup_FlushMemtable(t *testing.T) {
})
}

func TestBackup_ListFiles(t *testing.T) {
func TestBucketBackup_ListFiles(t *testing.T) {
ctx := context.Background()
dirName := t.TempDir()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
b, err := NewBucket(ctx, dirName, dirName, logrus.New(), nil,
cyclemanager.NewNoop(), cyclemanager.NewNoop(),
WithStrategy(StrategyReplace))
require.Nil(t, err)

t.Run("insert contents into bucket", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := b.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
b.FlushMemtable(ctx) // flush memtable to generate .db files
b.FlushMemtable() // flush memtable to generate .db files
})

t.Run("assert expected bucket contents", func(t *testing.T) {
Expand All @@ -181,52 +81,3 @@ func TestBackup_ListFiles(t *testing.T) {
err = b.Shutdown(context.Background())
require.Nil(t, err)
}

func TestBackup_ResumeCompaction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

dirName := makeTestDir(t)
defer removeTestDir(t, dirName)

b, err := NewBucket(ctx, dirName, "", logrus.New(), nil, WithStrategy(StrategyReplace))
require.Nil(t, err)

t.Run("insert contents into bucket", func(t *testing.T) {
for i := 0; i < 10; i++ {
err := b.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})

t.Run("assert compaction restarts after pausing", func(t *testing.T) {
err = b.PauseCompaction(ctx)
require.Nil(t, err)

err = b.ResumeCompaction(ctx)
assert.Nil(t, err)

t.Run("assert cycle restarts", func(t *testing.T) {
assert.True(t, b.flushCycle.Running())
assert.True(t, b.disk.compactionCycle.Running())
})
})

err = b.Shutdown(context.Background())
require.Nil(t, err)
}

func makeTestDir(t *testing.T) string {
rand.Seed(time.Now().UnixNano())
dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
if err := os.MkdirAll(dirName, 0o777); err != nil {
t.Fatalf("failed to make test dir '%s': %s", dirName, err)
}
return dirName
}

func removeTestDir(t *testing.T, dirName string) {
if err := os.RemoveAll(dirName); err != nil {
t.Errorf("failed to remove test dir '%s': %s", dirName, err)
}
}
4 changes: 3 additions & 1 deletion adapters/repos/db/lsmkv/bucket_test.go
Expand Up @@ -18,12 +18,14 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/cyclemanager"
)

func TestBucket_WasDeleted(t *testing.T) {
tmpDir := t.TempDir()
logger, _ := test.NewNullLogger()
b, err := NewBucket(context.Background(), tmpDir, "", logger, nil)
b, err := NewBucket(context.Background(), tmpDir, "", logger, nil,
cyclemanager.NewNoop(), cyclemanager.NewNoop())
require.Nil(t, err)

var (
Expand Down