Skip to content

Commit

Permalink
addresses code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
aliszka committed May 18, 2023
1 parent 674aaa8 commit 54ccdce
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 197 deletions.
272 changes: 172 additions & 100 deletions adapters/repos/db/lsmkv/store_backup_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/errorcompounder"
"github.com/weaviate/weaviate/entities/storagestate"
)

Expand All @@ -29,50 +30,70 @@ func TestStoreBackup_PauseCompaction(t *testing.T) {
ctx := context.Background()

t.Run("assert that context timeout works for long compactions", func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)
}

canceledCtx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()
expiredCtx, cancel := context.WithDeadline(ctx, time.Now())
defer cancel()

err = store.PauseCompaction(canceledCtx)
require.NotNil(t, err)
assert.Equal(t, "long-running compaction in progress: context deadline exceeded", err.Error())
err = store.PauseCompaction(expiredCtx)
require.NotNil(t, err)
assert.Equal(t, "long-running compaction in progress: context deadline exceeded", err.Error())

err = store.Shutdown(ctx)
require.Nil(t, err)
err = store.Shutdown(ctx)
require.Nil(t, err)
})
}
})

t.Run("assert compaction is successfully paused", func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)

cancelableCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket(bucket)
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
}

t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket("test_bucket")
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
expirableCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

err = store.PauseCompaction(cancelableCtx)
assert.Nil(t, err)
err = store.PauseCompaction(expirableCtx)
assert.Nil(t, err)

err = store.Shutdown(context.Background())
require.Nil(t, err)
err = store.Shutdown(context.Background())
require.Nil(t, err)
})
}
})
}

Expand All @@ -81,35 +102,45 @@ func TestStoreBackup_ResumeCompaction(t *testing.T) {
ctx := context.Background()

t.Run("assert compaction restarts after pausing", func(t *testing.T) {
dirName := t.TempDir()
for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket(bucket)
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
}

t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket("test_bucket")
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
expirableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

cancelableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = store.PauseCompaction(cancelableCtx)
require.Nil(t, err)
err = store.PauseCompaction(expirableCtx)
require.Nil(t, err)

err = store.ResumeCompaction(ctx)
require.Nil(t, err)
err = store.ResumeCompaction(expirableCtx)
require.Nil(t, err)

assert.True(t, store.compactionCycle.Running())
assert.True(t, store.compactionCycle.Running())

err = store.Shutdown(ctx)
require.Nil(t, err)
err = store.Shutdown(ctx)
require.Nil(t, err)
})
}
})
}

Expand All @@ -118,72 +149,113 @@ func TestStoreBackup_FlushMemtable(t *testing.T) {
ctx := context.Background()

t.Run("assert that context timeout works for long flushes", func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)
}

canceledCtx, cancel := context.WithTimeout(ctx, time.Nanosecond)
defer cancel()
expiredCtx, cancel := context.WithDeadline(ctx, time.Now())
defer cancel()

err = store.FlushMemtables(canceledCtx)
require.NotNil(t, err)
assert.Equal(t, "long-running memtable flush in progress: context deadline exceeded", err.Error())
err = store.FlushMemtables(expiredCtx)
require.NotNil(t, err)
assert.Equal(t, "long-running memtable flush in progress: context deadline exceeded", err.Error())

err = store.Shutdown(ctx)
require.Nil(t, err)
err = store.Shutdown(ctx)
require.Nil(t, err)
})
}
})

t.Run("assert that flushes run successfully", func(t *testing.T) {
dirName := t.TempDir()
for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)

t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket("test_bucket")
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
t.Run("insert contents into bucket", func(t *testing.T) {
bucket := store.Bucket(bucket)
for i := 0; i < 10; i++ {
err := bucket.Put([]byte(fmt.Sprint(i)), []byte(fmt.Sprint(i)))
require.Nil(t, err)
}
})
}

cancelableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
expirableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = store.FlushMemtables(cancelableCtx)
assert.Nil(t, err)
err = store.FlushMemtables(expirableCtx)
assert.Nil(t, err)

err = store.Shutdown(ctx)
require.Nil(t, err)
err = store.Shutdown(ctx)
require.Nil(t, err)
})
}
})

t.Run("assert that readonly bucket fails to flush", func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)
singleErr := errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
expectedErr := func(bucketsCount int) error {
ec := &errorcompounder.ErrorCompounder{}
for i := 0; i < bucketsCount; i++ {
ec.Add(singleErr)
}
return ec.ToError()
}

for _, buckets := range [][]string{
{"test_bucket"},
{"test_bucket1", "test_bucket2"},
{"test_bucket1", "test_bucket2", "test_bucket3", "test_bucket4", "test_bucket5"},
} {
t.Run(fmt.Sprintf("with %d buckets", len(buckets)), func(t *testing.T) {
dirName := t.TempDir()

store, err := New(dirName, dirName, logger, nil)
require.Nil(t, err)

err = store.CreateOrLoadBucket(ctx, "test_bucket")
require.Nil(t, err)
for _, bucket := range buckets {
err = store.CreateOrLoadBucket(ctx, bucket)
require.Nil(t, err)
}

store.UpdateBucketsStatus(storagestate.StatusReadOnly)
store.UpdateBucketsStatus(storagestate.StatusReadOnly)

cancelableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
expirableCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = store.FlushMemtables(cancelableCtx)
require.NotNil(t, err)
expectedErr := errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
assert.EqualError(t, expectedErr, err.Error())
err = store.FlushMemtables(expirableCtx)
require.NotNil(t, err)
assert.EqualError(t, expectedErr(len(buckets)), err.Error())

err = store.Shutdown(ctx)
require.Nil(t, err)
err = store.Shutdown(ctx)
require.Nil(t, err)
})
}
})
}

0 comments on commit 54ccdce

Please sign in to comment.