Skip to content

Commit

Permalink
satellite/metabase: add exhaustive test for metabase listing
Browse files Browse the repository at this point in the history
As we modify the list objects implementation, it would be nice to be
certain that the behavior is correct and it respects all the corner
cases.

Also make TestingBatchInsertObjects more performant.

Change-Id: I05bbb41b3509bf632fb1fc834350d82962a32948
  • Loading branch information
egonelbre authored and Storj Robot committed Feb 28, 2024
1 parent de975c3 commit e923bde
Show file tree
Hide file tree
Showing 7 changed files with 505 additions and 36 deletions.
19 changes: 19 additions & 0 deletions satellite/metabase/common.go
Expand Up @@ -269,6 +269,7 @@ type ObjectStream struct {
}

// Less implements sorting on object streams.
// Where ProjectID asc, BucketName asc, ObjectKey asc, Version desc.
func (obj ObjectStream) Less(b ObjectStream) bool {
if obj.ProjectID != b.ProjectID {
return obj.ProjectID.Less(b.ProjectID)
Expand All @@ -285,6 +286,24 @@ func (obj ObjectStream) Less(b ObjectStream) bool {
return obj.StreamID.Less(b.StreamID)
}

// LessVersionAsc implements sorting on object streams.
// Where ProjectID asc, BucketName asc, ObjectKey asc, Version asc.
func (obj ObjectStream) LessVersionAsc(b ObjectStream) bool {
if obj.ProjectID != b.ProjectID {
return obj.ProjectID.Less(b.ProjectID)
}
if obj.BucketName != b.BucketName {
return obj.BucketName < b.BucketName
}
if obj.ObjectKey != b.ObjectKey {
return obj.ObjectKey < b.ObjectKey
}
if obj.Version != b.Version {
return obj.Version < b.Version
}
return obj.StreamID.Less(b.StreamID)
}

// Verify object stream fields.
func (obj *ObjectStream) Verify() error {
switch {
Expand Down
3 changes: 3 additions & 0 deletions satellite/metabase/db.go
Expand Up @@ -129,6 +129,9 @@ func (db *DB) DestroyTables(ctx context.Context) error {
DROP TABLE IF EXISTS objects;
DROP TABLE IF EXISTS segments;
DROP TABLE IF EXISTS node_aliases;
DROP TABLE IF EXISTS segment_copies;
DROP TABLE IF EXISTS pending_objects;
DROP TABLE IF EXISTS metabase_versions;
DROP SEQUENCE IF EXISTS node_alias_seq;
`)
db.aliasCache = NewNodeAliasCache(db)
Expand Down
13 changes: 13 additions & 0 deletions satellite/metabase/list.go
Expand Up @@ -54,6 +54,19 @@ func (entry ObjectEntry) Less(other ObjectEntry) bool {
})
}

// LessVersionAsc implements sorting on object entries.
func (entry ObjectEntry) LessVersionAsc(other ObjectEntry) bool {
return ObjectStream{
ObjectKey: entry.ObjectKey,
Version: entry.Version,
StreamID: entry.StreamID,
}.LessVersionAsc(ObjectStream{
ObjectKey: other.ObjectKey,
Version: other.Version,
StreamID: other.StreamID,
})
}

// ObjectsIterator iterates over a sequence of ObjectEntry items.
type ObjectsIterator interface {
Next(ctx context.Context, item *ObjectEntry) bool
Expand Down
2 changes: 1 addition & 1 deletion satellite/metabase/list_objects.go
Expand Up @@ -18,7 +18,7 @@ type ListObjectsCursor IterateCursor
// ListObjects contains arguments necessary for listing objects.
//
// For Pending = false, the versions are in descending order.
// For pending = true, the versions are in ascending order.
// For Pending = true, the versions are in ascending order.
type ListObjects struct {
ProjectID uuid.UUID
BucketName string
Expand Down
204 changes: 204 additions & 0 deletions satellite/metabase/list_objects_exhaustive_test.go
@@ -0,0 +1,204 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package metabase_test

import (
"flag"
"fmt"
"testing"

"github.com/stretchr/testify/require"

"storj.io/common/testcontext"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)

var listObjectsExhaustive = flag.Bool("exhaustive", false, "exhaustively test list objects implementation")

func TestListObjects_Exhaustive(t *testing.T) {
if !*listObjectsExhaustive {
t.Skip(`Use "go test -run TestListObjects_Exhaustive -exhaustive" to run this test.`)
}

entries := generateExhaustiveTestData()
raw := objectEntriesToRawObjects(entries)
naive := NewNaiveObjectsDB(entries)

metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
require.NoError(t, db.TestingBatchInsertObjects(ctx, raw))

check := func(opts metabase.ListObjects) {
expResult, expErr := naive.ListObjects(ctx, opts)
gotResult, gotErr := db.ListObjects(ctx, opts)

require.Equal(t, expErr, gotErr, fmt.Sprintf("%#v", opts))
require.Equal(t, expResult, gotResult, fmt.Sprintf("%#v", opts))
}

var opts metabase.ListObjects
opts.ProjectID = uuid.UUID{1}
opts.BucketName = "b"
for _, opts.Prefix = range []metabase.ObjectKey{"", "A/", "B/"} {
for _, opts.Pending = range []bool{false} {
for _, opts.AllVersions = range []bool{false} {
for _, opts.Recursive = range []bool{true, false} {
for _, opts.Limit = range []int{1, 3, 7} {
opts.Cursor.Key = ""
opts.Cursor.Version = 0
check(opts)

opts.Cursor.Version = metabase.MaxVersion
check(opts)

for i := range entries {
entry := &entries[i]
opts.Cursor.Key = entry.ObjectKey

opts.Cursor.Version = 0
check(opts)

opts.Cursor.Version = entry.Version
check(opts)

opts.Cursor.Version = metabase.MaxVersion
check(opts)
}
}
}
}
}
}
})
}

func objectEntriesToRawObjects(entries []metabase.ObjectEntry) (rs []metabase.RawObject) {
rs = make([]metabase.RawObject, len(entries))
for i := range rs {
entry := &entries[i]
rs[i] = metabase.RawObject{
ObjectStream: metabase.ObjectStream{
ProjectID: uuid.UUID{1},
BucketName: "b",
ObjectKey: entry.ObjectKey,
Version: entry.Version,
StreamID: entry.StreamID,
},
Status: entry.Status,
}
}
return rs
}

func generateExhaustiveTestData() []metabase.ObjectEntry {
cornerBytes := []byte{0, 1, 'A', 'B', 254, 255}
streamID := uuid.UUID{1}
entries := []metabase.ObjectEntry{}
for _, a := range cornerBytes {
entries = append(entries,
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a}),
Version: 2,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a}),
Version: 3,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, 0x00}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, 0xFF}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/'}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
)
if a == 'B' {
entries = append(entries,
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a}),
Version: 4,
StreamID: streamID,
Status: metabase.DeleteMarkerVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, 0x00}),
Version: 2,
StreamID: streamID,
Status: metabase.DeleteMarkerVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, 0xFF}),
Version: 2,
StreamID: streamID,
Status: metabase.DeleteMarkerVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/'}),
Version: 2,
StreamID: streamID,
Status: metabase.DeleteMarkerVersioned,
},
)
}

for _, b := range cornerBytes {
entries = append(entries,
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/', b}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/', b}),
Version: 2,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/', b}),
Version: 3,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/', b, 0x00}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
metabase.ObjectEntry{
ObjectKey: metabase.ObjectKey([]byte{a, '/', b, 0xFF}),
Version: 1,
StreamID: streamID,
Status: metabase.CommittedVersioned,
},
)
}
}

return entries
}

0 comments on commit e923bde

Please sign in to comment.