diff --git a/satellite/metabase/list_objects.go b/satellite/metabase/list_objects.go index 81af3ca20b95..1067c20f8416 100644 --- a/satellite/metabase/list_objects.go +++ b/satellite/metabase/list_objects.go @@ -5,11 +5,10 @@ package metabase import ( "context" - "database/sql" - "errors" "strings" - "storj.io/common/tagsql" + "github.com/zeebo/errs" + "storj.io/common/uuid" ) @@ -43,11 +42,16 @@ func (opts *ListObjects) Verify() error { case opts.Limit < 0: return ErrInvalidRequest.New("Invalid limit: %d", opts.Limit) - case opts.Pending && !opts.AllVersions: - return ErrInvalidRequest.New("Not Implemented: Pending && !AllVersions") - case !opts.Pending && opts.AllVersions: - return ErrInvalidRequest.New("Not Implemented: !Pending && AllVersions") + case opts.Pending || opts.AllVersions: + return errs.New("not implemented") + + // TODO old code is disabled for now because of performance problems + // case opts.Pending && !opts.AllVersions: + // return ErrInvalidRequest.New("Not Implemented: Pending && !AllVersions") + // case !opts.Pending && opts.AllVersions: + // return ErrInvalidRequest.New("Not Implemented: !Pending && AllVersions") } + return nil } @@ -67,230 +71,290 @@ func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObj ListLimit.Ensure(&opts.Limit) - var entries []ObjectEntry - err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(), - opts.ProjectID, []byte(opts.BucketName), - opts.startKey(), opts.Cursor.Version, opts.stopKey(), - opts.Limit+1, len(opts.Prefix)+1), - )(func(rows tagsql.Rows) error { - entries, err = scanListObjectsResult(rows, opts) - return err - }) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return ListObjectsResult{}, nil - } - return ListObjectsResult{}, Error.New("unable to list objects: %w", err) - } - - if len(entries) > opts.Limit { - result.More = true - result.Objects = entries[:opts.Limit] - return result, nil - } - - result.Objects = entries - result.More = false - return result, nil -} - -func (opts *ListObjects) getSQLQuery() string { - var indexFields string - if opts.Recursive { - indexFields = `substring(object_key from $7) AS entry_key, version AS entry_version, FALSE AS is_prefix` - } else { - if opts.AllVersions { - indexFields = ` - DISTINCT ON (project_id, bucket_name, entry_key, entry_version) - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) - ELSE substring(object_key from $7) - END) - AS entry_key, - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN 0 - ELSE version - END) - AS entry_version, - position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` - } else { - indexFields = ` - DISTINCT ON (project_id, bucket_name, entry_key) - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) - ELSE substring(object_key from $7) - END) - AS entry_key, - version AS entry_version, - position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` - } - } - - switch { - case opts.Pending && opts.AllVersions: - return `SELECT ` + indexFields + opts.selectedFields() + ` - FROM objects - WHERE - (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) - AND ` + opts.stopCondition() + ` - AND status = ` + statusPending + ` - AND (expires_at IS NULL OR expires_at > now()) - ORDER BY ` + opts.orderBy() + ` - LIMIT $6 - ` - - case !opts.Pending && !opts.AllVersions: - // query committed objects where the latest is not a delete marker - return `SELECT ` + indexFields + opts.selectedFields() + ` - FROM objects main - WHERE - (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) - AND ` + opts.stopCondition() + ` - AND status IN ` + statusesCommitted + ` - AND (expires_at IS NULL OR expires_at > now()) - AND version = ( - SELECT MAX(sub.version) - FROM objects sub - WHERE (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key) - AND status <> ` + statusPending + ` - AND (expires_at IS NULL OR expires_at > now()) - ) - ORDER BY ` + opts.orderBy() + ` - LIMIT $6 - ` - default: - panic("Not supported configuration, should not happen. Verify should check this.") - } -} - -func (opts *ListObjects) stopKey() []byte { - if opts.Prefix != "" { - return []byte(prefixLimit(opts.Prefix)) - } - return nextBucket([]byte(opts.BucketName)) -} - -func (opts *ListObjects) stopCondition() string { - if opts.Prefix != "" { - return "(project_id, bucket_name, object_key) < ($1, $2, $5)" - } - return "(project_id, bucket_name) < ($1, $5)" -} - -func (opts *ListObjects) orderBy() string { - if opts.Pending { - return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version ASC" - } else { - return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version DESC" - } -} - -func (opts ListObjects) selectedFields() (selectedFields string) { - selectedFields += ` - ,stream_id - ,status - ,encryption` - - if opts.IncludeSystemMetadata { - selectedFields += ` - ,created_at - ,expires_at - ,segment_count - ,total_plain_size - ,total_encrypted_size - ,fixed_segment_size` - } - - if opts.IncludeCustomMetadata { - selectedFields += ` - ,encrypted_metadata_nonce - ,encrypted_metadata - ,encrypted_metadata_encrypted_key` - } - return selectedFields -} - -// startKey determines what should be the starting key for the given options. -// in the recursive case, or if the cursor key is not in the specified prefix, -// we start at the greatest key between cursor and prefix. -// Otherwise (non-recursive), we start at the prefix after the one in the cursor. -func (opts *ListObjects) startKey() ObjectKey { - if opts.Prefix == "" && opts.Cursor.Key == "" { - return "" - } - if opts.Recursive || !strings.HasPrefix(string(opts.Cursor.Key), string(opts.Prefix)) { - if lessKey(opts.Cursor.Key, opts.Prefix) { - return opts.Prefix - } - return opts.Cursor.Key - } - - // in the recursive case - // prefix | cursor | startKey - // a/b/ | a/b/c/d/e | c/d/[0xff] (the first prefix/object key we return ) - key := opts.Cursor.Key - prefixSize := len(opts.Prefix) - subPrefix := key[prefixSize:] // c/d/e - - firstDelimiter := strings.Index(string(subPrefix), string(Delimiter)) - if firstDelimiter == -1 { - return key - } - newKey := []byte(key[:prefixSize+firstDelimiter+1]) // c/d/ - newKey = append(newKey, 0xff) - return ObjectKey(newKey) -} + // TODO old code is disabled for now because of performance problems + // https://github.com/storj/storj/issues/6734 + // var entries []ObjectEntry + // err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(), + // opts.ProjectID, []byte(opts.BucketName), + // opts.startKey(), opts.Cursor.Version, opts.stopKey(), + // opts.Limit+1, len(opts.Prefix)+1), + // )(func(rows tagsql.Rows) error { + // entries, err = scanListObjectsResult(rows, opts) + // return err + // }) + // if err != nil { + // if errors.Is(err, sql.ErrNoRows) { + // return ListObjectsResult{}, nil + // } + // return ListObjectsResult{}, Error.New("unable to list objects: %w", err) + // } + + // if len(entries) > opts.Limit { + // result.More = true + // result.Objects = entries[:opts.Limit] + // return result, nil + // } + + // result.Objects = entries + // result.More = false + // return result, nil + + err = db.IterateObjectsAllVersionsWithStatus(ctx, + IterateObjectsWithStatus{ + ProjectID: opts.ProjectID, + BucketName: opts.BucketName, + Prefix: opts.Prefix, + Cursor: IterateCursor{ + Key: opts.Cursor.Key, + Version: MaxVersion, + }, + Recursive: opts.Recursive, + // TODO we may need to increase batch size to optimize number + // of DB calls for objects with multiple versions + BatchSize: opts.Limit + 1, + Pending: false, + IncludeCustomMetadata: opts.IncludeCustomMetadata, + IncludeSystemMetadata: opts.IncludeSystemMetadata, + }, func(ctx context.Context, it ObjectsIterator) error { + var previousLatestSet bool + var entry, previousLatest ObjectEntry + prefix := opts.Prefix + if prefix != "" && !strings.HasSuffix(string(prefix), "/") { + prefix += "/" + } -func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []ObjectEntry, err error) { - - for rows.Next() { - var item ObjectEntry - - fields := []interface{}{ - &item.ObjectKey, - &item.Version, - &item.IsPrefix, - &item.StreamID, - &item.Status, - encryptionParameters{&item.Encryption}, - } - - if opts.IncludeSystemMetadata { - fields = append(fields, - &item.CreatedAt, - &item.ExpiresAt, - &item.SegmentCount, - &item.TotalPlainSize, - &item.TotalEncryptedSize, - &item.FixedSegmentSize, - ) - } - - if opts.IncludeCustomMetadata { - fields = append(fields, - &item.EncryptedMetadataNonce, - &item.EncryptedMetadata, - &item.EncryptedMetadataEncryptedKey, - ) - } - - if err := rows.Scan(fields...); err != nil { - return entries, err - } - - if item.IsPrefix { - item = ObjectEntry{ - IsPrefix: true, - ObjectKey: item.ObjectKey, - Status: Prefix, + for len(result.Objects) < opts.Limit && it.Next(ctx, &entry) { + objectKey := prefix + entry.ObjectKey + if opts.Cursor.Key == objectKey && opts.Cursor.Version >= entry.Version { + previousLatestSet = true + previousLatest = entry + continue + } + + if entry.Status.IsDeleteMarker() && (!previousLatestSet || prefix+previousLatest.ObjectKey != objectKey) { + previousLatestSet = true + previousLatest = entry + continue + } + + if !previousLatestSet || prefix+previousLatest.ObjectKey != objectKey { + previousLatestSet = true + previousLatest = entry + + result.Objects = append(result.Objects, entry) + } } - } - entries = append(entries, item) + result.More = it.Next(ctx, &entry) + return nil + }, + ) + if err != nil { + return ListObjectsResult{}, err } - - return entries, nil + return result, nil } + +// TODO old code is disabled for now because of performance problems +// https://github.com/storj/storj/issues/6734 + +// func (opts *ListObjects) getSQLQuery() string { +// var indexFields string +// if opts.Recursive { +// indexFields = `substring(object_key from $7) AS entry_key, version AS entry_version, FALSE AS is_prefix` +// } else { +// if opts.AllVersions { +// indexFields = ` +// DISTINCT ON (project_id, bucket_name, entry_key, entry_version) +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) +// ELSE substring(object_key from $7) +// END) +// AS entry_key, +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN 0 +// ELSE version +// END) +// AS entry_version, +// position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` +// } else { +// indexFields = ` +// DISTINCT ON (project_id, bucket_name, entry_key) +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) +// ELSE substring(object_key from $7) +// END) +// AS entry_key, +// version AS entry_version, +// position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` +// } +// } + +// switch { +// case opts.Pending && opts.AllVersions: +// return `SELECT ` + indexFields + opts.selectedFields() + ` +// FROM objects +// WHERE +// (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) +// AND ` + opts.stopCondition() + ` +// AND status = ` + statusPending + ` +// AND (expires_at IS NULL OR expires_at > now()) +// ORDER BY ` + opts.orderBy() + ` +// LIMIT $6 +// ` + +// case !opts.Pending && !opts.AllVersions: +// // query committed objects where the latest is not a delete marker +// return `SELECT ` + indexFields + opts.selectedFields() + ` +// FROM objects main +// WHERE +// (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) +// AND ` + opts.stopCondition() + ` +// AND status IN ` + statusesCommitted + ` +// AND (expires_at IS NULL OR expires_at > now()) +// AND version = ( +// SELECT MAX(sub.version) +// FROM objects sub +// WHERE (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key) +// AND status <> ` + statusPending + ` +// AND (expires_at IS NULL OR expires_at > now()) +// ) +// ORDER BY ` + opts.orderBy() + ` +// LIMIT $6 +// ` +// default: +// panic("Not supported configuration, should not happen. Verify should check this.") +// } +// } + +// func (opts *ListObjects) stopKey() []byte { +// if opts.Prefix != "" { +// return []byte(prefixLimit(opts.Prefix)) +// } +// return nextBucket([]byte(opts.BucketName)) +// } + +// func (opts *ListObjects) stopCondition() string { +// if opts.Prefix != "" { +// return "(project_id, bucket_name, object_key) < ($1, $2, $5)" +// } +// return "(project_id, bucket_name) < ($1, $5)" +// } + +// func (opts *ListObjects) orderBy() string { +// if opts.Pending { +// return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version ASC" +// } else { +// return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version DESC" +// } +// } + +// func (opts ListObjects) selectedFields() (selectedFields string) { +// selectedFields += ` +// ,stream_id +// ,status +// ,encryption` + +// if opts.IncludeSystemMetadata { +// selectedFields += ` +// ,created_at +// ,expires_at +// ,segment_count +// ,total_plain_size +// ,total_encrypted_size +// ,fixed_segment_size` +// } + +// if opts.IncludeCustomMetadata { +// selectedFields += ` +// ,encrypted_metadata_nonce +// ,encrypted_metadata +// ,encrypted_metadata_encrypted_key` +// } +// return selectedFields +// } + +// // startKey determines what should be the starting key for the given options. +// // in the recursive case, or if the cursor key is not in the specified prefix, +// // we start at the greatest key between cursor and prefix. +// // Otherwise (non-recursive), we start at the prefix after the one in the cursor. +// func (opts *ListObjects) startKey() ObjectKey { +// if opts.Prefix == "" && opts.Cursor.Key == "" { +// return "" +// } +// if opts.Recursive || !strings.HasPrefix(string(opts.Cursor.Key), string(opts.Prefix)) { +// if lessKey(opts.Cursor.Key, opts.Prefix) { +// return opts.Prefix +// } +// return opts.Cursor.Key +// } + +// // in the recursive case +// // prefix | cursor | startKey +// // a/b/ | a/b/c/d/e | c/d/[0xff] (the first prefix/object key we return ) +// key := opts.Cursor.Key +// prefixSize := len(opts.Prefix) +// subPrefix := key[prefixSize:] // c/d/e + +// firstDelimiter := strings.Index(string(subPrefix), string(Delimiter)) +// if firstDelimiter == -1 { +// return key +// } +// newKey := []byte(key[:prefixSize+firstDelimiter+1]) // c/d/ +// newKey = append(newKey, 0xff) +// return ObjectKey(newKey) +// } + +// func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []ObjectEntry, err error) { + +// for rows.Next() { +// var item ObjectEntry + +// fields := []interface{}{ +// &item.ObjectKey, +// &item.Version, +// &item.IsPrefix, +// &item.StreamID, +// &item.Status, +// encryptionParameters{&item.Encryption}, +// } + +// if opts.IncludeSystemMetadata { +// fields = append(fields, +// &item.CreatedAt, +// &item.ExpiresAt, +// &item.SegmentCount, +// &item.TotalPlainSize, +// &item.TotalEncryptedSize, +// &item.FixedSegmentSize, +// ) +// } + +// if opts.IncludeCustomMetadata { +// fields = append(fields, +// &item.EncryptedMetadataNonce, +// &item.EncryptedMetadata, +// &item.EncryptedMetadataEncryptedKey, +// ) +// } + +// if err := rows.Scan(fields...); err != nil { +// return entries, err +// } + +// if item.IsPrefix { +// item = ObjectEntry{ +// IsPrefix: true, +// ObjectKey: item.ObjectKey, +// Status: Prefix, +// } +// } + +// entries = append(entries, item) +// } + +// return entries, nil +// } diff --git a/satellite/metabase/list_objects_test.go b/satellite/metabase/list_objects_test.go index 5eeffa21244f..bd41937ac2e1 100644 --- a/satellite/metabase/list_objects_test.go +++ b/satellite/metabase/list_objects_test.go @@ -1035,6 +1035,8 @@ func TestListObjectsVersioned(t *testing.T) { }) t.Run("2 objects one with versions and one pending, list pending", func(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + defer metabasetest.DeleteAll{}.Check(ctx, t, db) a0 := metabasetest.RandObjectStream() diff --git a/satellite/metabase/list_pending_test.go b/satellite/metabase/list_pending_test.go index 63ece3f81294..dd3117ff4eaa 100644 --- a/satellite/metabase/list_pending_test.go +++ b/satellite/metabase/list_pending_test.go @@ -21,6 +21,8 @@ import ( ) func TestListPendingObjects(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() @@ -566,6 +568,8 @@ func TestListPendingObjects(t *testing.T) { } func TestListPendingObjectsSkipCursor(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { projectID, bucketName := uuid.UUID{1}, "bucky" @@ -818,6 +822,8 @@ func TestListPendingObjectsSkipCursor(t *testing.T) { } func TestListPendingObjectsVersions(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { t.Run("2 objects, one with versions one without", func(t *testing.T) { @@ -1415,6 +1421,8 @@ func TestListPendingObjectsVersions(t *testing.T) { } func TestListPendingObjects_Limit(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { projectID := testrand.UUID() bucketName := testrand.BucketName()