From eaa3edcfd864c74e0957d4b5f6b0ec3eedae9a90 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Fri, 26 Jan 2024 09:18:05 +0100 Subject: [PATCH] satellite/metainfo: don't use metabase.ListObjects for unversioned buckets This is workaround for metabase.ListObject performance issues. Earlier IterateObjectsAllVersionsWithStatus was changed to return results in descending order and regular listing was changed to use ListObjects method. With this change IterateObjectsAllVersionsWithStatus is reverted under IterateObjectsAllVersionsWithStatusAscending name and is used to do regular (only latest objects) listing for unversioned buckets and for pending objects listing. Versioned buckets will be still listed with metabase.ListObject. Change-Id: I49b57edc1f7e352c035f602ac71048e8461e0632 --- satellite/metabase/iterator.go | 92 +- satellite/metabase/iterator_test.go | 1911 ++++++++++++++++++++- satellite/metabase/list.go | 13 +- satellite/metabase/metabasetest/create.go | 13 +- satellite/metabase/metabasetest/test.go | 20 + satellite/metainfo/endpoint_object.go | 119 +- 6 files changed, 2119 insertions(+), 49 deletions(-) diff --git a/satellite/metabase/iterator.go b/satellite/metabase/iterator.go index 8c20cac6fe5f..a3c238ec7aac 100644 --- a/satellite/metabase/iterator.go +++ b/satellite/metabase/iterator.go @@ -46,7 +46,7 @@ type iterateCursor struct { Inclusive bool } -func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) { +func iterateAllVersionsWithStatusDescending(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) { defer mon.Task()(&ctx)(&err) it := &objectsIterator{ @@ -78,6 +78,38 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec return iterate(ctx, it, fn) } +func iterateAllVersionsWithStatusAscending(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) { + defer mon.Task()(&ctx)(&err) + + it := &objectsIterator{ + db: db, + + projectID: opts.ProjectID, + bucketName: []byte(opts.BucketName), + pending: opts.Pending, + prefix: opts.Prefix, + prefixLimit: prefixLimit(opts.Prefix), + batchSize: opts.BatchSize, + recursive: opts.Recursive, + includeCustomMetadata: opts.IncludeCustomMetadata, + includeSystemMetadata: opts.IncludeSystemMetadata, + + curIndex: 0, + cursor: firstIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix), + + doNextQuery: doNextQueryAllVersionsWithStatusAscending, + } + + // start from either the cursor or prefix, depending on which is larger + if lessKey(it.cursor.Key, opts.Prefix) { + it.cursor.Key = opts.Prefix + it.cursor.Version = -1 + it.cursor.Inclusive = true + } + + return iterate(ctx, it, fn) +} + func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePendingObjectsByKey, fn func(context.Context, ObjectsIterator) error) (err error) { defer mon.Task()(&ctx)(&err) @@ -287,6 +319,64 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator) ) } +func doNextQueryAllVersionsWithStatusAscending(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) { + defer mon.Task()(&ctx)(&err) + + cursorCompare := ">" + if it.cursor.Inclusive { + cursorCompare = ">=" + } + + statusFilter := `AND status <> ` + statusPending + if it.pending { + statusFilter = `AND status = ` + statusPending + } + + if it.prefixLimit == "" { + querySelectFields := querySelectorFields("object_key", it) + return it.db.db.QueryContext(ctx, ` + SELECT + `+querySelectFields+` + FROM objects + WHERE + (project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4) + AND (project_id, bucket_name) < ($1, $6) + `+statusFilter+` + AND (expires_at IS NULL OR expires_at > now()) + ORDER BY (project_id, bucket_name, object_key, version) ASC + LIMIT $5 + `, it.projectID, it.bucketName, + []byte(it.cursor.Key), int(it.cursor.Version), + it.batchSize, + nextBucket(it.bucketName), + ) + } + + fromSubstring := 1 + if it.prefix != "" { + fromSubstring = len(it.prefix) + 1 + } + + querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $7)", it) + return it.db.db.QueryContext(ctx, ` + SELECT + `+querySelectFields+` + FROM objects + WHERE + (project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4) + AND (project_id, bucket_name, object_key) < ($1, $2, $5) + `+statusFilter+` + AND (expires_at IS NULL OR expires_at > now()) + ORDER BY (project_id, bucket_name, object_key, version) ASC + LIMIT $6 + `, it.projectID, it.bucketName, + []byte(it.cursor.Key), int(it.cursor.Version), + []byte(it.prefixLimit), + it.batchSize, + fromSubstring, + ) +} + func querySelectorFields(objectKeyColumn string, it *objectsIterator) string { querySelectFields := objectKeyColumn + ` ,stream_id diff --git a/satellite/metabase/iterator_test.go b/satellite/metabase/iterator_test.go index 4e80f7825d42..12a570529f0d 100644 --- a/satellite/metabase/iterator_test.go +++ b/satellite/metabase/iterator_test.go @@ -1530,7 +1530,7 @@ func TestIterateObjectsWithStatus(t *testing.T) { "c//": {1000, 1001}, "c/1": {1000, 1001}, "g": {1000, 1001}, - }) + }, true) metabasetest.IterateObjectsWithStatus{ Opts: metabase.IterateObjectsWithStatus{ @@ -1691,7 +1691,7 @@ func TestIterateObjectsWithStatus(t *testing.T) { "c//": {1000, 1001}, "c/1": {1000, 1001}, "g": {1000, 1001}, - }) + }, true) metabasetest.IterateObjectsWithStatus{ Opts: metabase.IterateObjectsWithStatus{ @@ -1929,6 +1929,1913 @@ func TestIterateObjectsWithStatus(t *testing.T) { }) } +// TODO this test was copied (and renamed) from v1.95.1 (TestIterateObjectsWithStatus) +// Should be removed when metabase.ListingObjects performance issues will be fixed. +func TestIterateObjectsWithStatusAscending(t *testing.T) { + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { + t.Run("invalid arguments", func(t *testing.T) { + t.Run("ProjectID missing", func(t *testing.T) { + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{}, + BucketName: "sj://mybucket", + Recursive: true, + Pending: false, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "ProjectID missing", + }.Check(ctx, t, db) + }) + t.Run("BucketName missing", func(t *testing.T) { + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "", + Recursive: true, + Pending: false, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "BucketName missing", + }.Check(ctx, t, db) + }) + t.Run("Limit is negative", func(t *testing.T) { + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "mybucket", + BatchSize: -1, + Recursive: true, + Pending: false, + }, + ErrClass: &metabase.ErrInvalidRequest, + ErrText: "BatchSize is negative", + }.Check(ctx, t, db) + }) + }) + + t.Run("empty bucket", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + objects := createObjects(ctx, t, db, 2, uuid.UUID{1}, "mybucket") + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "myemptybucket", + BatchSize: 10, + Recursive: true, + Pending: false, + }, + Result: nil, + }.Check(ctx, t, db) + metabasetest.Verify{Objects: objects}.Check(ctx, t, db) + }) + + t.Run("based on status", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + now := time.Now() + + pending := metabasetest.RandObjectStream() + committed := metabasetest.RandObjectStream() + committed.ProjectID = pending.ProjectID + committed.BucketName = pending.BucketName + + projectID := pending.ProjectID + bucketName := pending.BucketName + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: pending, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + + encryptedMetadata := testrand.Bytes(1024) + encryptedMetadataNonce := testrand.Nonce() + encryptedMetadataKey := testrand.Bytes(265) + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: committed, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + metabasetest.CommitObject{ + Opts: metabase.CommitObject{ + ObjectStream: committed, + OverrideEncryptedMetadata: true, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{{ + ObjectKey: committed.ObjectKey, + Version: committed.Version, + StreamID: committed.StreamID, + CreatedAt: now, + Status: metabase.CommittedUnversioned, + Encryption: metabasetest.DefaultEncryption, + EncryptedMetadataNonce: encryptedMetadataNonce[:], + EncryptedMetadata: encryptedMetadata, + EncryptedMetadataEncryptedKey: encryptedMetadataKey, + }}, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: true, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{{ + ObjectKey: pending.ObjectKey, + Version: pending.Version, + StreamID: pending.StreamID, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + }}, + }.Check(ctx, t, db) + }) + + t.Run("less objects than limit", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + numberOfObjects := 3 + limit := 10 + expected := make([]metabase.ObjectEntry, numberOfObjects) + objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket") + for i, obj := range objects { + expected[i] = objectEntryFromRaw(obj) + } + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "mybucket", + Recursive: true, + BatchSize: limit, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: expected, + }.Check(ctx, t, db) + metabasetest.Verify{Objects: objects}.Check(ctx, t, db) + }) + + t.Run("more objects than limit", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + numberOfObjects := 10 + limit := 3 + expected := make([]metabase.ObjectEntry, numberOfObjects) + objects := createObjects(ctx, t, db, numberOfObjects, uuid.UUID{1}, "mybucket") + for i, obj := range objects { + expected[i] = objectEntryFromRaw(obj) + } + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "mybucket", + Recursive: true, + BatchSize: limit, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: expected, + }.Check(ctx, t, db) + metabasetest.Verify{Objects: objects}.Check(ctx, t, db) + }) + + t.Run("objects in one bucket in project with 2 buckets", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + numberOfObjectsPerBucket := 5 + + expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket) + + objectsBucketA := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-a") + objectsBucketB := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "bucket-b") + + for i, obj := range objectsBucketA { + expected[i] = objectEntryFromRaw(obj) + } + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "bucket-a", + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: append(objectsBucketA, objectsBucketB...), + }.Check(ctx, t, db) + }) + + t.Run("objects in one bucket with same bucketName in another project", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + numberOfObjectsPerBucket := 5 + + expected := make([]metabase.ObjectEntry, numberOfObjectsPerBucket) + + objectsProject1 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{1}, "mybucket") + objectsProject2 := createObjects(ctx, t, db, numberOfObjectsPerBucket, uuid.UUID{2}, "mybucket") + for i, obj := range objectsProject1 { + expected[i] = objectEntryFromRaw(obj) + } + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: uuid.UUID{1}, + BucketName: "mybucket", + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: expected, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: append(objectsProject1, objectsProject2...), + }.Check(ctx, t, db) + }) + + t.Run("recursive", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{ + "a", + "b/1", + "b/2", + "b/3", + "c", + "c/", + "c//", + "c/1", + "g", + }) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objects["a"], + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "a", Version: objects["a"].Version + 1}, + }, + Result: []metabase.ObjectEntry{ + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "b", Version: 0}, + }, + Result: []metabase.ObjectEntry{ + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + }, + Result: withoutPrefix("b/", + objects["b/1"], + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "a"}, + }, + Result: withoutPrefix("b/", + objects["b/1"], + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "b/2", Version: -3}, + }, + Result: withoutPrefix("b/", + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: nil, + }.Check(ctx, t, db) + }) + + t.Run("non-recursive", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{ + "a", + "b/1", + "b/2", + "b/3", + "c", + "c/", + "c//", + "c/1", + "g", + }) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objects["a"], + prefixEntry("b/"), + objects["c"], + prefixEntry("c/"), + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "a", Version: objects["a"].Version + 1}, + }, + Result: []metabase.ObjectEntry{ + prefixEntry("b/"), + objects["c"], + prefixEntry("c/"), + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "b", Version: 0}, + }, + Result: []metabase.ObjectEntry{ + prefixEntry("b/"), + objects["c"], + prefixEntry("c/"), + objects["g"], + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + }, + Result: withoutPrefix("b/", + objects["b/1"], + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "a"}, + }, + Result: withoutPrefix("b/", + objects["b/1"], + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "b/2", Version: -3}, + }, + Result: withoutPrefix("b/", + objects["b/2"], + objects["b/3"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: nil, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "c/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: withoutPrefix("c/", + objects["c/"], + prefixEntry("c//"), + objects["c/1"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "c//", + }, + Result: withoutPrefix("c//", + objects["c//"], + ), + }.Check(ctx, t, db) + }) + + t.Run("boundaries", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + queries := []metabase.ObjectKey{""} + for a := 0; a <= 0xFF; a++ { + if 3 < a && a < 252 { + continue + } + queries = append(queries, metabase.ObjectKey([]byte{byte(a)})) + for b := 0; b <= 0xFF; b++ { + if 4 < b && b < 251 { + continue + } + queries = append(queries, metabase.ObjectKey([]byte{byte(a), byte(b)})) + } + } + + createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries[1:]) + + var collector metabasetest.IterateCollector + for _, cursor := range queries { + for _, prefix := range queries { + collector = collector[:0] + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Cursor: metabase.IterateCursor{ + Key: cursor, + Version: -1, + }, + Prefix: prefix, + Pending: false, + IncludeCustomMetadata: true, + }, collector.Add) + require.NoError(t, err) + + collector = collector[:0] + err = db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Cursor: metabase.IterateCursor{ + Key: cursor, + Version: -1, + }, + Prefix: prefix, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + }, collector.Add) + require.NoError(t, err) + } + } + }) + + t.Run("verify-iterator-boundary", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + queries := []metabase.ObjectKey{"\x00\xFF"} + createObjectsWithKeys(ctx, t, db, projectID, bucketName, queries) + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Cursor: metabase.IterateCursor{ + Key: metabase.ObjectKey([]byte{}), + Version: -1, + }, + Prefix: metabase.ObjectKey([]byte{1}), + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, collector.Add) + require.NoError(t, err) + }) + + t.Run("verify-cursor-continuation", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{ + "1", + "a/a", + "a/0", + }) + + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Prefix: metabase.ObjectKey("a/"), + BatchSize: 1, + Pending: false, + IncludeCustomMetadata: true, + }, collector.Add) + require.NoError(t, err) + }) + + t.Run("include metadata", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj1 := metabasetest.RandObjectStream() + metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj1, + Encryption: metabasetest.DefaultEncryption, + OverrideEncryptedMetadata: true, + EncryptedMetadata: []byte{3}, + EncryptedMetadataEncryptedKey: []byte{4}, + EncryptedMetadataNonce: []byte{5}, + }, + }.Run(ctx, t, db, obj1, 4) + + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: obj1.ProjectID, + BucketName: obj1.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, collector.Add) + + require.NoError(t, err) + + for _, entry := range collector { + require.Equal(t, entry.EncryptedMetadata, []byte{3}) + require.Equal(t, entry.EncryptedMetadataEncryptedKey, []byte{4}) + require.Equal(t, entry.EncryptedMetadataNonce, []byte{5}) + } + }) + + t.Run("exclude custom metadata", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj1 := metabasetest.RandObjectStream() + metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj1, + Encryption: metabasetest.DefaultEncryption, + EncryptedMetadata: []byte{3}, + EncryptedMetadataEncryptedKey: []byte{4}, + EncryptedMetadataNonce: []byte{5}, + }, + }.Run(ctx, t, db, obj1, 4) + + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: obj1.ProjectID, + BucketName: obj1.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: false, + IncludeSystemMetadata: true, + }, collector.Add) + + require.NoError(t, err) + + for _, entry := range collector { + require.Nil(t, entry.EncryptedMetadataNonce) + require.Nil(t, entry.EncryptedMetadata) + require.Nil(t, entry.EncryptedMetadataEncryptedKey) + } + }) + + t.Run("exclude system metadata", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj1 := metabasetest.RandObjectStream() + metabasetest.CreateTestObject{ + CommitObject: &metabase.CommitObject{ + ObjectStream: obj1, + Encryption: metabasetest.DefaultEncryption, + OverrideEncryptedMetadata: true, + EncryptedMetadata: []byte{3}, + EncryptedMetadataEncryptedKey: []byte{4}, + EncryptedMetadataNonce: []byte{5}, + }, + }.Run(ctx, t, db, obj1, 4) + + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: obj1.ProjectID, + BucketName: obj1.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: false, + }, collector.Add) + + require.NoError(t, err) + + for _, entry := range collector { + // fields that should always be set + require.NotEmpty(t, entry.ObjectKey) + require.NotEmpty(t, entry.StreamID) + require.NotZero(t, entry.Version) + require.Equal(t, metabase.CommittedUnversioned, entry.Status) + require.False(t, entry.Encryption.IsZero()) + + require.True(t, entry.CreatedAt.IsZero()) + require.Nil(t, entry.ExpiresAt) + + require.Zero(t, entry.SegmentCount) + require.Zero(t, entry.TotalPlainSize) + require.Zero(t, entry.TotalEncryptedSize) + require.Zero(t, entry.FixedSegmentSize) + + require.NotNil(t, entry.EncryptedMetadataNonce) + require.NotNil(t, entry.EncryptedMetadata) + require.NotNil(t, entry.EncryptedMetadataEncryptedKey) + } + }) + + t.Run("verify-cursor-continuation", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{ + "1", + "a/a", + "a/0", + }) + var collector metabasetest.IterateCollector + err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Prefix: metabase.ObjectKey("a/"), + Pending: false, + BatchSize: 1, + }, collector.Add) + require.NoError(t, err) + require.Equal(t, 2, len(collector)) + }) + t.Run("skip-expired-objects", func(t *testing.T) { + now := time.Now() + type test struct { + notExpired []metabase.ObjectKey + expired []metabase.ObjectKey + } + testCases := []test{ + { + notExpired: []metabase.ObjectKey{"1"}, + expired: []metabase.ObjectKey{"2"}, + }, + { + notExpired: []metabase.ObjectKey{"2"}, + expired: []metabase.ObjectKey{"1"}, + }, + { + notExpired: []metabase.ObjectKey{"2"}, + expired: []metabase.ObjectKey{"1", "3"}, + }, + { + notExpired: []metabase.ObjectKey{"2", "4"}, + expired: []metabase.ObjectKey{"1", "3"}, + }, + { + expired: []metabase.ObjectKey{"1", "2", "3", "4"}, + }, + } + stream := metabase.ObjectStream{ + ProjectID: uuid.UUID{1}, + BucketName: "bucket", + Version: 1, + StreamID: testrand.UUID(), + } + for i, tc := range testCases { + tc := tc + t.Run(strconv.Itoa(i), func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + expectedResult := []metabase.ObjectEntry{} + if len(tc.notExpired) == 0 { + expectedResult = nil + } + for _, key := range tc.notExpired { + stream.ObjectKey = key + object := metabasetest.CreateObject(ctx, t, db, stream, 0) + expectedResult = append(expectedResult, objectEntryFromRaw(metabase.RawObject(object))) + } + for _, key := range tc.expired { + stream.ObjectKey = key + metabasetest.CreateExpiredObject(ctx, t, db, stream, 0, now.Add(-2*time.Hour)) + } + for _, batchSize := range []int{1, 2, 3} { + opts := metabase.IterateObjectsWithStatus{ + ProjectID: stream.ProjectID, + BucketName: stream.BucketName, + BatchSize: batchSize, + Pending: false, + IncludeSystemMetadata: true, + } + metabasetest.IterateObjectsWithStatusAscending{ + Opts: opts, + Result: expectedResult, + }.Check(ctx, t, db) + { + opts := opts + opts.Recursive = true + metabasetest.IterateObjectsWithStatusAscending{ + Opts: opts, + Result: expectedResult, + }.Check(ctx, t, db) + } + } + }) + } + }) + + t.Run("prefix longer than key", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + projectID, bucketName := uuid.UUID{1}, "bucky" + objects := createObjectsWithKeys(ctx, t, db, projectID, bucketName, []metabase.ObjectKey{ + "aaaa/a", + "aaaa/b", + "aaaa/c", + }) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: false, + Prefix: "aaaa/", + Pending: false, + BatchSize: 2, + IncludeSystemMetadata: true, + }, + Result: withoutPrefix("aaaa/", + objects["aaaa/a"], + objects["aaaa/b"], + objects["aaaa/c"], + ), + }.Check(ctx, t, db) + }) + + t.Run("version greater than one", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + projectID, bucketName := uuid.UUID{2}, "bucky" + + id1 := metabasetest.RandObjectStream() + id1.ProjectID = projectID + id1.BucketName = bucketName + id1.Version = metabase.Version(rand.Int31()) + + id2 := metabasetest.RandObjectStream() + id2.ProjectID = projectID + id2.BucketName = bucketName + id2.ObjectKey = id1.ObjectKey + "Z" // for deterministic ordering + id2.Version = 1 + + var objs []metabase.Object + for _, id := range []metabase.ObjectStream{id1, id2} { + obj, _ := metabasetest.CreateTestObject{ + BeginObjectExactVersion: &metabase.BeginObjectExactVersion{ + ObjectStream: id, + }, + CommitObject: &metabase.CommitObject{ + ObjectStream: id, + Encryption: metabasetest.DefaultEncryption, + }, + }.Run(ctx, t, db, id, 1) + objs = append(objs, obj) + } + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + BatchSize: 3, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objs[0])), + objectEntryFromRaw(metabase.RawObject(objs[1])), + }, + }.Check(ctx, t, db) + }) + + t.Run("2 objects, one with multiple versions and one without versioning", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + b1 := b0 + b1.Version = 500 + + objA0 := metabasetest.CreateObject(ctx, t, db, a0, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(objB1)), + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(objA0), + metabase.RawObject(objB0), + metabase.RawObject(objB1), + }, + }.Check(ctx, t, db) + }) + + t.Run("3 objects, one with versions one without and one pending", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + b0 := metabasetest.RandObjectStream() + c0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + c0.ProjectID = a0.ProjectID + c0.BucketName = a0.BucketName + c0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + b1 := b0 + b1.Version = 500 + + objA0 := metabasetest.CreateObject(ctx, t, db, a0, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001) + metabasetest.CreatePendingObject(ctx, t, db, c0, 0) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(objB1)), + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(objA0), + metabase.RawObject(objB0), + metabase.RawObject(objB1), + { + ObjectStream: metabase.ObjectStream{ + ProjectID: c0.ProjectID, + BucketName: c0.BucketName, + ObjectKey: c0.ObjectKey, + Version: 1000, + StreamID: c0.StreamID, + }, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("2 objects one with versions and one pending, list pending", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + a0.Version = 1000 + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + a1 := a0 + a1.Version = 1001 + + metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: metabase.ObjectStream{ + ProjectID: b0.ProjectID, + BucketName: b0.BucketName, + ObjectKey: b0.ObjectKey, + Version: b0.Version, + StreamID: b0.StreamID, + }, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0) + objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: true, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + { + ObjectKey: b0.ObjectKey, + Version: 1000, + StreamID: b0.StreamID, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + }, + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(objA0), + metabase.RawObject(objA1), + { + ObjectStream: metabase.ObjectStream{ + ProjectID: b0.ProjectID, + BucketName: b0.BucketName, + ObjectKey: b0.ObjectKey, + Version: 1000, + StreamID: b0.StreamID, + }, + CreatedAt: now, + Status: metabase.Pending, + + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + }, + }, + }.Check(ctx, t, db) + }) + + t.Run("2 objects, each with 2 versions", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + a0.Version = 1000 + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + a1 := a0 + a1.Version = 1001 + b1 := b0 + b1.Version = 500 + + objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0) + objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(objA1)), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(objB1)), + }}.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(objA0), + metabase.RawObject(objA1), + metabase.RawObject(objB0), + metabase.RawObject(objB1), + }, + }.Check(ctx, t, db) + }) + + t.Run("2 objects, each with two versions and one with delete_marker", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + a0.Version = 1000 + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + a1 := a0 + a1.Version = 1001 + b1 := b0 + b1.Version = 500 + + objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0) + objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + objB1 := metabasetest.CreateObjectVersionedOutOfOrder(ctx, t, db, b1, 0, 1001) + + deletionResult := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objA0.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objA0.ProjectID, + BucketName: objA0.BucketName, + ObjectKey: objA0.ObjectKey, + Version: 1002, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(objA1)), + objectEntryFromRaw(metabase.RawObject(deletionResult.Markers[0])), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(objB1)), + }}.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(deletionResult.Markers[0]), + metabase.RawObject(objA0), + metabase.RawObject(objA1), + metabase.RawObject(objB0), + metabase.RawObject(objB1), + }, + }.Check(ctx, t, db) + }) + + t.Run("2 objects, each with two versions and multiple delete_markers", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + a0.Version = 1000 + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + b0.ObjectKey, a0.ObjectKey = a0.ObjectKey, b0.ObjectKey + } + + objA0 := metabasetest.CreateObjectVersioned(ctx, t, db, a0, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + + deletionResultA0 := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objA0.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objA0.ProjectID, + BucketName: objA0.BucketName, + ObjectKey: objA0.ObjectKey, + Version: 1001, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + deletionResultB0 := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objB0.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objB0.ProjectID, + BucketName: objB0.BucketName, + ObjectKey: objB0.ObjectKey, + Version: 1001, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + a1 := a0 + a1.Version = 1002 + b1 := b0 + b1.Version = 1002 + + objA1 := metabasetest.CreateObjectVersioned(ctx, t, db, a1, 0) + objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0) + + deletionResultA1 := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objA1.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objA1.ProjectID, + BucketName: objA1.BucketName, + ObjectKey: objA1.ObjectKey, + Version: 1003, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + deletionResultB1 := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objB1.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objB1.ProjectID, + BucketName: objB1.BucketName, + ObjectKey: objB1.ObjectKey, + Version: 1003, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(deletionResultA0.Markers[0])), + objectEntryFromRaw(metabase.RawObject(objA1)), + objectEntryFromRaw(metabase.RawObject(deletionResultA1.Markers[0])), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(deletionResultB0.Markers[0])), + objectEntryFromRaw(metabase.RawObject(objB1)), + objectEntryFromRaw(metabase.RawObject(deletionResultB1.Markers[0])), + }}.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(deletionResultA1.Markers[0]), + metabase.RawObject(deletionResultB1.Markers[0]), + metabase.RawObject(objA1), + metabase.RawObject(objB1), + metabase.RawObject(deletionResultA0.Markers[0]), + metabase.RawObject(deletionResultB0.Markers[0]), + metabase.RawObject(objA0), + metabase.RawObject(objB0), + }, + }.Check(ctx, t, db) + }) + + t.Run("3 objects, 1 unversioned, 2 with multiple versions, 1 with and 1 without delete_marker", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + a0 := metabasetest.RandObjectStream() + b0 := metabasetest.RandObjectStream() + b0.ProjectID = a0.ProjectID + b0.BucketName = a0.BucketName + b0.Version = 1000 + c0 := metabasetest.RandObjectStream() + c0.ProjectID = a0.ProjectID + c0.BucketName = a0.BucketName + c0.Version = 1000 + + if a0.ObjectKey > b0.ObjectKey { + a0.ObjectKey, b0.ObjectKey = b0.ObjectKey, a0.ObjectKey + } + if a0.ObjectKey > c0.ObjectKey { + a0.ObjectKey, c0.ObjectKey = c0.ObjectKey, a0.ObjectKey + } + if b0.ObjectKey > c0.ObjectKey { + b0.ObjectKey, c0.ObjectKey = c0.ObjectKey, b0.ObjectKey + } + + objA0 := metabasetest.CreateObject(ctx, t, db, a0, 0) + objB0 := metabasetest.CreateObjectVersioned(ctx, t, db, b0, 0) + objC0 := metabasetest.CreateObjectVersioned(ctx, t, db, c0, 0) + + deletionResultC0 := metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: objC0.Location(), + Versioned: true, + }, + Result: metabase.DeleteObjectResult{ + Markers: []metabase.Object{ + { + ObjectStream: metabase.ObjectStream{ + ProjectID: objC0.ProjectID, + BucketName: objC0.BucketName, + ObjectKey: objC0.ObjectKey, + Version: 1001, + }, + Status: metabase.DeleteMarkerVersioned, + CreatedAt: time.Now(), + }, + }, + }, + }.Check(ctx, t, db) + + b1 := b0 + b1.Version = 1001 + c1 := c0 + c1.Version = 1002 + + objB1 := metabasetest.CreateObjectVersioned(ctx, t, db, b1, 0) + objC1 := metabasetest.CreateObjectVersioned(ctx, t, db, c1, 0) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: a0.ProjectID, + BucketName: a0.BucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: []metabase.ObjectEntry{ + objectEntryFromRaw(metabase.RawObject(objA0)), + objectEntryFromRaw(metabase.RawObject(objB0)), + objectEntryFromRaw(metabase.RawObject(objB1)), + objectEntryFromRaw(metabase.RawObject(objC0)), + objectEntryFromRaw(metabase.RawObject(deletionResultC0.Markers[0])), + objectEntryFromRaw(metabase.RawObject(objC1)), + }, + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(objC1), + metabase.RawObject(objB1), + metabase.RawObject(deletionResultC0.Markers[0]), + metabase.RawObject(objC0), + metabase.RawObject(objB0), + metabase.RawObject(objA0), + }, + }.Check(ctx, t, db) + }) + + t.Run("list recursive objects with versions", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + objects := metabasetest.CreateVersionedObjectsWithKeysAll(ctx, t, db, projectID, bucketName, map[metabase.ObjectKey][]metabase.Version{ + "a": {1000, 1001}, + "b/1": {1000, 1001}, + "b/2": {1000, 1001}, + "b/3": {1000, 1001}, + "c": {1000, 1001}, + "c/": {1000, 1001}, + "c//": {1000, 1001}, + "c/1": {1000, 1001}, + "g": {1000, 1001}, + }, false) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: concat( + objects["a"], + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "a", Version: last(objects["a"]).Version + 1}, + }, + Result: concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "b", Version: 0}, + }, + Result: concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + objects["c"], + objects["c/"], + objects["c//"], + objects["c/1"], + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + }, + Result: withoutPrefix("b/", + concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "a"}, + }, + Result: withoutPrefix("b/", + concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "b/2", Version: -3}, + }, + Result: withoutPrefix("b/", + concat( + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Recursive: true, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: nil, + }.Check(ctx, t, db) + }) + + t.Run("list non-recursive objects with versions", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + projectID, bucketName := uuid.UUID{1}, "bucky" + + objects := metabasetest.CreateVersionedObjectsWithKeysAll(ctx, t, db, projectID, bucketName, map[metabase.ObjectKey][]metabase.Version{ + "a": {1000, 1001}, + "b/1": {1000, 1001}, + "b/2": {1000, 1001}, + "b/3": {1000, 1001}, + "c": {1000, 1001}, + "c/": {1000, 1001}, + "c//": {1000, 1001}, + "c/1": {1000, 1001}, + "g": {1000, 1001}, + }, false) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + }, + Result: concat( + objects["a"], + []metabase.ObjectEntry{prefixEntry("b/")}, + objects["c"], + []metabase.ObjectEntry{prefixEntry("c/")}, + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "a", Version: last(objects["a"]).Version + 1}, + }, + Result: concat( + []metabase.ObjectEntry{prefixEntry("b/")}, + objects["c"], + []metabase.ObjectEntry{prefixEntry("c/")}, + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Cursor: metabase.IterateCursor{Key: "b", Version: 0}, + }, + Result: concat( + []metabase.ObjectEntry{prefixEntry("b/")}, + objects["c"], + []metabase.ObjectEntry{prefixEntry("c/")}, + objects["g"], + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + }, + Result: withoutPrefix("b/", + concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "a"}, + }, + Result: withoutPrefix("b/", + concat( + objects["b/1"], + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "b/2", Version: -3}, + }, + Result: withoutPrefix("b/", + concat( + objects["b/2"], + objects["b/3"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "b/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: nil, + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "c/", + Cursor: metabase.IterateCursor{Key: "c/"}, + }, + Result: withoutPrefix("c/", + concat( + objects["c/"], + []metabase.ObjectEntry{prefixEntry("c//")}, + objects["c/1"], + )..., + ), + }.Check(ctx, t, db) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: projectID, + BucketName: bucketName, + Pending: false, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + + Prefix: "c//", + }, + Result: withoutPrefix("c//", + objects["c//"]..., + ), + }.Check(ctx, t, db) + }) + + t.Run("batch iterate committed versioned, unversioned, and delete markers with pending object", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + now := time.Now() + zombieDeadline := now.Add(24 * time.Hour) + + var expected []metabase.ObjectEntry + var objLocation metabase.ObjectLocation + + // create 1 pending object first + pendingStream1 := metabasetest.RandObjectStream() + objLocation = pendingStream1.Location() + pendingStream1.Version = 100 + + metabasetest.CreatePendingObject(ctx, t, db, pendingStream1, 0) + + pendingObject1 := metabase.RawObject{ + ObjectStream: pendingStream1, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + expected = append(expected, objectEntryFromRaw(pendingObject1)) + + for i := 0; i < 10; i++ { + unversionedStream := metabasetest.RandObjectStream() + unversionedStream.ProjectID = objLocation.ProjectID + unversionedStream.BucketName = objLocation.BucketName + unversionedStream.ObjectKey = objLocation.ObjectKey + unversionedStream.Version = metabase.Version(200 + i) + if i == 0 { + metabasetest.CreateObject(ctx, t, db, unversionedStream, 0) + } else { + metabasetest.CreateObjectVersioned(ctx, t, db, unversionedStream, 0) + } + } + + // create a second pending object + pendingStream2 := metabasetest.RandObjectStream() + pendingStream2.ProjectID = objLocation.ProjectID + pendingStream2.BucketName = objLocation.BucketName + pendingStream2.ObjectKey = objLocation.ObjectKey + pendingStream2.Version = 300 + + metabasetest.CreatePendingObject(ctx, t, db, pendingStream2, 0) + + pendingObject2 := metabase.RawObject{ + ObjectStream: pendingStream2, + CreatedAt: now, + Status: metabase.Pending, + Encryption: metabasetest.DefaultEncryption, + ZombieDeletionDeadline: &zombieDeadline, + } + expected = append(expected, objectEntryFromRaw(pendingObject2)) + + metabasetest.IterateObjectsWithStatusAscending{ + Opts: metabase.IterateObjectsWithStatus{ + ProjectID: objLocation.ProjectID, + BucketName: objLocation.BucketName, + Pending: true, + IncludeCustomMetadata: true, + IncludeSystemMetadata: true, + BatchSize: 3, + Recursive: true, + }, + Result: expected, + }.Check(ctx, t, db) + }) + }) +} + func TestIterateObjectsSkipCursor(t *testing.T) { metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { projectID, bucketName := uuid.UUID{1}, "bucky" diff --git a/satellite/metabase/list.go b/satellite/metabase/list.go index 1bf9e6bf63b1..127be3c832b1 100644 --- a/satellite/metabase/list.go +++ b/satellite/metabase/list.go @@ -91,7 +91,18 @@ func (db *DB) IterateObjectsAllVersionsWithStatus(ctx context.Context, opts Iter if err = opts.Verify(); err != nil { return err } - return iterateAllVersionsWithStatus(ctx, db, opts, fn) + return iterateAllVersionsWithStatusDescending(ctx, db, opts, fn) +} + +// IterateObjectsAllVersionsWithStatusAscending iterates through all versions of all objects with specified status. Ordered from oldest to latest. +// TODO this method was copied (and renamed) from v1.95.1 as a workaround for issues with metabase.ListObject performance. It should be removed +// when problem with metabase.ListObject will be fixed. +func (db *DB) IterateObjectsAllVersionsWithStatusAscending(ctx context.Context, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) { + defer mon.Task()(&ctx)(&err) + if err = opts.Verify(); err != nil { + return err + } + return iterateAllVersionsWithStatusAscending(ctx, db, opts, fn) } // Verify verifies get object request fields. diff --git a/satellite/metabase/metabasetest/create.go b/satellite/metabase/metabasetest/create.go index d87c0fa20543..a38bbae8364f 100644 --- a/satellite/metabase/metabasetest/create.go +++ b/satellite/metabase/metabasetest/create.go @@ -231,7 +231,7 @@ func CreatePendingObjectsWithKeys(ctx *testcontext.Context, t *testing.T, db *me // CreateVersionedObjectsWithKeysAll creates multiple versioned objects with the specified keys and versions, // and returns a mapping of keys to a slice of all versions. -func CreateVersionedObjectsWithKeysAll(ctx *testcontext.Context, t *testing.T, db *metabase.DB, projectID uuid.UUID, bucketName string, keys map[metabase.ObjectKey][]metabase.Version) map[metabase.ObjectKey][]metabase.ObjectEntry { +func CreateVersionedObjectsWithKeysAll(ctx *testcontext.Context, t *testing.T, db *metabase.DB, projectID uuid.UUID, bucketName string, keys map[metabase.ObjectKey][]metabase.Version, sortDesc bool) map[metabase.ObjectKey][]metabase.ObjectEntry { objects := make(map[metabase.ObjectKey][]metabase.ObjectEntry, len(keys)) for key, versions := range keys { items := []metabase.ObjectEntry{} @@ -254,10 +254,13 @@ func CreateVersionedObjectsWithKeysAll(ctx *testcontext.Context, t *testing.T, d Encryption: DefaultEncryption, }) } - // sort by version descending - sort.Slice(items, func(i, k int) bool { - return items[i].Less(items[k]) - }) + + if sortDesc { + // sort by version descending + sort.Slice(items, func(i, k int) bool { + return items[i].Less(items[k]) + }) + } objects[key] = items } diff --git a/satellite/metabase/metabasetest/test.go b/satellite/metabase/metabasetest/test.go index 875475391c20..7fbf96062a88 100644 --- a/satellite/metabase/metabasetest/test.go +++ b/satellite/metabase/metabasetest/test.go @@ -592,6 +592,26 @@ func (step IterateObjectsWithStatus) Check(ctx *testcontext.Context, t testing.T require.Zero(t, diff) } +// IterateObjectsWithStatusAscending is for testing metabase.IterateObjectsWithStatusAscending. +type IterateObjectsWithStatusAscending struct { + Opts metabase.IterateObjectsWithStatus + + Result []metabase.ObjectEntry + ErrClass *errs.Class + ErrText string +} + +// Check runs the test. +func (step IterateObjectsWithStatusAscending) Check(ctx *testcontext.Context, t testing.TB, db *metabase.DB) { + var result IterateCollector + + err := db.IterateObjectsAllVersionsWithStatusAscending(ctx, step.Opts, result.Add) + checkError(t, err, step.ErrClass, step.ErrText) + + diff := cmp.Diff(step.Result, []metabase.ObjectEntry(result), DefaultTimeDiff()) + require.Zero(t, diff) +} + // IterateLoopObjects is for testing metabase.IterateLoopObjects. type IterateLoopObjects struct { Opts metabase.IterateLoopObjects diff --git a/satellite/metainfo/endpoint_object.go b/satellite/metainfo/endpoint_object.go index 8320d9d4f749..90fd7b5a5657 100644 --- a/satellite/metainfo/endpoint_object.go +++ b/satellite/metainfo/endpoint_object.go @@ -962,66 +962,105 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq // For pending objects, we always need to list the versions. if status == metabase.Pending { - result, err := endpoint.metabase.ListObjects(ctx, - metabase.ListObjects{ + // handles listing pending objects for all types of buckets + err = endpoint.metabase.IterateObjectsAllVersionsWithStatusAscending(ctx, + metabase.IterateObjectsWithStatus{ ProjectID: keyInfo.ProjectID, BucketName: string(req.Bucket), Prefix: prefix, - Cursor: metabase.ListObjectsCursor{ + Cursor: metabase.IterateCursor{ Key: cursorKey, Version: cursorVersion, }, - Pending: true, - AllVersions: true, - Recursive: req.Recursive, - Limit: limit, - + Recursive: req.Recursive, + BatchSize: limit + 1, + Pending: true, IncludeCustomMetadata: includeCustomMetadata, IncludeSystemMetadata: includeSystemMetadata, - }) + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } + + resp.More = it.Next(ctx, &entry) + return nil + }, + ) if err != nil { return nil, endpoint.convertMetabaseErr(err) } + } else if !req.IncludeAllVersions { + // handles regular listing for all type of buckets + if bucket.Versioning == buckets.Unversioned { + err = endpoint.metabase.IterateObjectsAllVersionsWithStatusAscending(ctx, + metabase.IterateObjectsWithStatus{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.IterateCursor{ + Key: cursorKey, + Version: cursorVersion, + }, + Recursive: req.Recursive, + BatchSize: limit + 1, + Pending: false, + IncludeCustomMetadata: includeCustomMetadata, + IncludeSystemMetadata: includeSystemMetadata, + }, func(ctx context.Context, it metabase.ObjectsIterator) error { + entry := metabase.ObjectEntry{} + for len(resp.Items) < limit && it.Next(ctx, &entry) { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) + if err != nil { + return err + } + resp.Items = append(resp.Items, item) + } - for _, entry := range result.Objects { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) + resp.More = it.Next(ctx, &entry) + return nil + }, + ) if err != nil { return nil, endpoint.convertMetabaseErr(err) } - resp.Items = append(resp.Items, item) - } - resp.More = result.More - } else if !req.IncludeAllVersions { - result, err := endpoint.metabase.ListObjects(ctx, - metabase.ListObjects{ - ProjectID: keyInfo.ProjectID, - BucketName: string(req.Bucket), - Prefix: prefix, - Cursor: metabase.ListObjectsCursor{ - Key: cursorKey, - Version: cursorVersion, - }, - Pending: false, - AllVersions: false, - Recursive: req.Recursive, - Limit: limit, - - IncludeCustomMetadata: includeCustomMetadata, - IncludeSystemMetadata: includeSystemMetadata, - }) - if err != nil { - return nil, endpoint.convertMetabaseErr(err) - } - - for _, entry := range result.Objects { - item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) + } else { + result, err := endpoint.metabase.ListObjects(ctx, + metabase.ListObjects{ + ProjectID: keyInfo.ProjectID, + BucketName: string(req.Bucket), + Prefix: prefix, + Cursor: metabase.ListObjectsCursor{ + Key: cursorKey, + Version: cursorVersion, + }, + Pending: false, + AllVersions: false, + Recursive: req.Recursive, + Limit: limit, + + IncludeCustomMetadata: includeCustomMetadata, + IncludeSystemMetadata: includeSystemMetadata, + }) if err != nil { return nil, endpoint.convertMetabaseErr(err) } - resp.Items = append(resp.Items, item) + + for _, entry := range result.Objects { + item, err := endpoint.objectEntryToProtoListItem(ctx, req.Bucket, entry, prefix, includeSystemMetadata, includeCustomMetadata, bucket.Placement, bucket.Versioning == buckets.VersioningEnabled) + if err != nil { + return nil, endpoint.convertMetabaseErr(err) + } + resp.Items = append(resp.Items, item) + } + resp.More = result.More } - resp.More = result.More } else { + // handles listing all versions err = endpoint.metabase.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{ ProjectID: keyInfo.ProjectID,