From a24fb70246b52e59c31a655236a31f07569e0e04 Mon Sep 17 00:00:00 2001 From: Michal Niewrzal Date: Mon, 12 Feb 2024 13:17:57 +0100 Subject: [PATCH] satellite/metabase: replace ListObjects implementation ListObjects implementation turns out to be slow with cockroach so for now we decided to replace it with well known objects iterator and just filter latest objects. We are also changing ListObjects to not support listing pending objects and all versions. We keep old implementation as the plan is to improve it but we need time to do that. Change-Id: Icd6e629c9ab8519744843d91a985404d92dd1594 --- satellite/metabase/list_objects.go | 522 +++++++++++++----------- satellite/metabase/list_objects_test.go | 2 + satellite/metabase/list_pending_test.go | 8 + 3 files changed, 303 insertions(+), 229 deletions(-) diff --git a/satellite/metabase/list_objects.go b/satellite/metabase/list_objects.go index 81af3ca20b95..1067c20f8416 100644 --- a/satellite/metabase/list_objects.go +++ b/satellite/metabase/list_objects.go @@ -5,11 +5,10 @@ package metabase import ( "context" - "database/sql" - "errors" "strings" - "storj.io/common/tagsql" + "github.com/zeebo/errs" + "storj.io/common/uuid" ) @@ -43,11 +42,16 @@ func (opts *ListObjects) Verify() error { case opts.Limit < 0: return ErrInvalidRequest.New("Invalid limit: %d", opts.Limit) - case opts.Pending && !opts.AllVersions: - return ErrInvalidRequest.New("Not Implemented: Pending && !AllVersions") - case !opts.Pending && opts.AllVersions: - return ErrInvalidRequest.New("Not Implemented: !Pending && AllVersions") + case opts.Pending || opts.AllVersions: + return errs.New("not implemented") + + // TODO old code is disabled for now because of performance problems + // case opts.Pending && !opts.AllVersions: + // return ErrInvalidRequest.New("Not Implemented: Pending && !AllVersions") + // case !opts.Pending && opts.AllVersions: + // return ErrInvalidRequest.New("Not Implemented: !Pending && AllVersions") } + return nil } @@ -67,230 +71,290 @@ func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObj ListLimit.Ensure(&opts.Limit) - var entries []ObjectEntry - err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(), - opts.ProjectID, []byte(opts.BucketName), - opts.startKey(), opts.Cursor.Version, opts.stopKey(), - opts.Limit+1, len(opts.Prefix)+1), - )(func(rows tagsql.Rows) error { - entries, err = scanListObjectsResult(rows, opts) - return err - }) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return ListObjectsResult{}, nil - } - return ListObjectsResult{}, Error.New("unable to list objects: %w", err) - } - - if len(entries) > opts.Limit { - result.More = true - result.Objects = entries[:opts.Limit] - return result, nil - } - - result.Objects = entries - result.More = false - return result, nil -} - -func (opts *ListObjects) getSQLQuery() string { - var indexFields string - if opts.Recursive { - indexFields = `substring(object_key from $7) AS entry_key, version AS entry_version, FALSE AS is_prefix` - } else { - if opts.AllVersions { - indexFields = ` - DISTINCT ON (project_id, bucket_name, entry_key, entry_version) - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) - ELSE substring(object_key from $7) - END) - AS entry_key, - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN 0 - ELSE version - END) - AS entry_version, - position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` - } else { - indexFields = ` - DISTINCT ON (project_id, bucket_name, entry_key) - (CASE - WHEN position('/' IN substring(object_key from $7)) <> 0 - THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) - ELSE substring(object_key from $7) - END) - AS entry_key, - version AS entry_version, - position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` - } - } - - switch { - case opts.Pending && opts.AllVersions: - return `SELECT ` + indexFields + opts.selectedFields() + ` - FROM objects - WHERE - (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) - AND ` + opts.stopCondition() + ` - AND status = ` + statusPending + ` - AND (expires_at IS NULL OR expires_at > now()) - ORDER BY ` + opts.orderBy() + ` - LIMIT $6 - ` - - case !opts.Pending && !opts.AllVersions: - // query committed objects where the latest is not a delete marker - return `SELECT ` + indexFields + opts.selectedFields() + ` - FROM objects main - WHERE - (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) - AND ` + opts.stopCondition() + ` - AND status IN ` + statusesCommitted + ` - AND (expires_at IS NULL OR expires_at > now()) - AND version = ( - SELECT MAX(sub.version) - FROM objects sub - WHERE (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key) - AND status <> ` + statusPending + ` - AND (expires_at IS NULL OR expires_at > now()) - ) - ORDER BY ` + opts.orderBy() + ` - LIMIT $6 - ` - default: - panic("Not supported configuration, should not happen. Verify should check this.") - } -} - -func (opts *ListObjects) stopKey() []byte { - if opts.Prefix != "" { - return []byte(prefixLimit(opts.Prefix)) - } - return nextBucket([]byte(opts.BucketName)) -} - -func (opts *ListObjects) stopCondition() string { - if opts.Prefix != "" { - return "(project_id, bucket_name, object_key) < ($1, $2, $5)" - } - return "(project_id, bucket_name) < ($1, $5)" -} - -func (opts *ListObjects) orderBy() string { - if opts.Pending { - return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version ASC" - } else { - return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version DESC" - } -} - -func (opts ListObjects) selectedFields() (selectedFields string) { - selectedFields += ` - ,stream_id - ,status - ,encryption` - - if opts.IncludeSystemMetadata { - selectedFields += ` - ,created_at - ,expires_at - ,segment_count - ,total_plain_size - ,total_encrypted_size - ,fixed_segment_size` - } - - if opts.IncludeCustomMetadata { - selectedFields += ` - ,encrypted_metadata_nonce - ,encrypted_metadata - ,encrypted_metadata_encrypted_key` - } - return selectedFields -} - -// startKey determines what should be the starting key for the given options. -// in the recursive case, or if the cursor key is not in the specified prefix, -// we start at the greatest key between cursor and prefix. -// Otherwise (non-recursive), we start at the prefix after the one in the cursor. -func (opts *ListObjects) startKey() ObjectKey { - if opts.Prefix == "" && opts.Cursor.Key == "" { - return "" - } - if opts.Recursive || !strings.HasPrefix(string(opts.Cursor.Key), string(opts.Prefix)) { - if lessKey(opts.Cursor.Key, opts.Prefix) { - return opts.Prefix - } - return opts.Cursor.Key - } - - // in the recursive case - // prefix | cursor | startKey - // a/b/ | a/b/c/d/e | c/d/[0xff] (the first prefix/object key we return ) - key := opts.Cursor.Key - prefixSize := len(opts.Prefix) - subPrefix := key[prefixSize:] // c/d/e - - firstDelimiter := strings.Index(string(subPrefix), string(Delimiter)) - if firstDelimiter == -1 { - return key - } - newKey := []byte(key[:prefixSize+firstDelimiter+1]) // c/d/ - newKey = append(newKey, 0xff) - return ObjectKey(newKey) -} + // TODO old code is disabled for now because of performance problems + // https://github.com/storj/storj/issues/6734 + // var entries []ObjectEntry + // err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(), + // opts.ProjectID, []byte(opts.BucketName), + // opts.startKey(), opts.Cursor.Version, opts.stopKey(), + // opts.Limit+1, len(opts.Prefix)+1), + // )(func(rows tagsql.Rows) error { + // entries, err = scanListObjectsResult(rows, opts) + // return err + // }) + // if err != nil { + // if errors.Is(err, sql.ErrNoRows) { + // return ListObjectsResult{}, nil + // } + // return ListObjectsResult{}, Error.New("unable to list objects: %w", err) + // } + + // if len(entries) > opts.Limit { + // result.More = true + // result.Objects = entries[:opts.Limit] + // return result, nil + // } + + // result.Objects = entries + // result.More = false + // return result, nil + + err = db.IterateObjectsAllVersionsWithStatus(ctx, + IterateObjectsWithStatus{ + ProjectID: opts.ProjectID, + BucketName: opts.BucketName, + Prefix: opts.Prefix, + Cursor: IterateCursor{ + Key: opts.Cursor.Key, + Version: MaxVersion, + }, + Recursive: opts.Recursive, + // TODO we may need to increase batch size to optimize number + // of DB calls for objects with multiple versions + BatchSize: opts.Limit + 1, + Pending: false, + IncludeCustomMetadata: opts.IncludeCustomMetadata, + IncludeSystemMetadata: opts.IncludeSystemMetadata, + }, func(ctx context.Context, it ObjectsIterator) error { + var previousLatestSet bool + var entry, previousLatest ObjectEntry + prefix := opts.Prefix + if prefix != "" && !strings.HasSuffix(string(prefix), "/") { + prefix += "/" + } -func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []ObjectEntry, err error) { - - for rows.Next() { - var item ObjectEntry - - fields := []interface{}{ - &item.ObjectKey, - &item.Version, - &item.IsPrefix, - &item.StreamID, - &item.Status, - encryptionParameters{&item.Encryption}, - } - - if opts.IncludeSystemMetadata { - fields = append(fields, - &item.CreatedAt, - &item.ExpiresAt, - &item.SegmentCount, - &item.TotalPlainSize, - &item.TotalEncryptedSize, - &item.FixedSegmentSize, - ) - } - - if opts.IncludeCustomMetadata { - fields = append(fields, - &item.EncryptedMetadataNonce, - &item.EncryptedMetadata, - &item.EncryptedMetadataEncryptedKey, - ) - } - - if err := rows.Scan(fields...); err != nil { - return entries, err - } - - if item.IsPrefix { - item = ObjectEntry{ - IsPrefix: true, - ObjectKey: item.ObjectKey, - Status: Prefix, + for len(result.Objects) < opts.Limit && it.Next(ctx, &entry) { + objectKey := prefix + entry.ObjectKey + if opts.Cursor.Key == objectKey && opts.Cursor.Version >= entry.Version { + previousLatestSet = true + previousLatest = entry + continue + } + + if entry.Status.IsDeleteMarker() && (!previousLatestSet || prefix+previousLatest.ObjectKey != objectKey) { + previousLatestSet = true + previousLatest = entry + continue + } + + if !previousLatestSet || prefix+previousLatest.ObjectKey != objectKey { + previousLatestSet = true + previousLatest = entry + + result.Objects = append(result.Objects, entry) + } } - } - entries = append(entries, item) + result.More = it.Next(ctx, &entry) + return nil + }, + ) + if err != nil { + return ListObjectsResult{}, err } - - return entries, nil + return result, nil } + +// TODO old code is disabled for now because of performance problems +// https://github.com/storj/storj/issues/6734 + +// func (opts *ListObjects) getSQLQuery() string { +// var indexFields string +// if opts.Recursive { +// indexFields = `substring(object_key from $7) AS entry_key, version AS entry_version, FALSE AS is_prefix` +// } else { +// if opts.AllVersions { +// indexFields = ` +// DISTINCT ON (project_id, bucket_name, entry_key, entry_version) +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) +// ELSE substring(object_key from $7) +// END) +// AS entry_key, +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN 0 +// ELSE version +// END) +// AS entry_version, +// position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` +// } else { +// indexFields = ` +// DISTINCT ON (project_id, bucket_name, entry_key) +// (CASE +// WHEN position('/' IN substring(object_key from $7)) <> 0 +// THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1)) +// ELSE substring(object_key from $7) +// END) +// AS entry_key, +// version AS entry_version, +// position('/' IN substring(object_key from $7)) <> 0 AS is_prefix` +// } +// } + +// switch { +// case opts.Pending && opts.AllVersions: +// return `SELECT ` + indexFields + opts.selectedFields() + ` +// FROM objects +// WHERE +// (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) +// AND ` + opts.stopCondition() + ` +// AND status = ` + statusPending + ` +// AND (expires_at IS NULL OR expires_at > now()) +// ORDER BY ` + opts.orderBy() + ` +// LIMIT $6 +// ` + +// case !opts.Pending && !opts.AllVersions: +// // query committed objects where the latest is not a delete marker +// return `SELECT ` + indexFields + opts.selectedFields() + ` +// FROM objects main +// WHERE +// (project_id, bucket_name, object_key, version) > ($1, $2, $3, $4) +// AND ` + opts.stopCondition() + ` +// AND status IN ` + statusesCommitted + ` +// AND (expires_at IS NULL OR expires_at > now()) +// AND version = ( +// SELECT MAX(sub.version) +// FROM objects sub +// WHERE (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key) +// AND status <> ` + statusPending + ` +// AND (expires_at IS NULL OR expires_at > now()) +// ) +// ORDER BY ` + opts.orderBy() + ` +// LIMIT $6 +// ` +// default: +// panic("Not supported configuration, should not happen. Verify should check this.") +// } +// } + +// func (opts *ListObjects) stopKey() []byte { +// if opts.Prefix != "" { +// return []byte(prefixLimit(opts.Prefix)) +// } +// return nextBucket([]byte(opts.BucketName)) +// } + +// func (opts *ListObjects) stopCondition() string { +// if opts.Prefix != "" { +// return "(project_id, bucket_name, object_key) < ($1, $2, $5)" +// } +// return "(project_id, bucket_name) < ($1, $5)" +// } + +// func (opts *ListObjects) orderBy() string { +// if opts.Pending { +// return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version ASC" +// } else { +// return "project_id ASC, bucket_name ASC, entry_key ASC, entry_version DESC" +// } +// } + +// func (opts ListObjects) selectedFields() (selectedFields string) { +// selectedFields += ` +// ,stream_id +// ,status +// ,encryption` + +// if opts.IncludeSystemMetadata { +// selectedFields += ` +// ,created_at +// ,expires_at +// ,segment_count +// ,total_plain_size +// ,total_encrypted_size +// ,fixed_segment_size` +// } + +// if opts.IncludeCustomMetadata { +// selectedFields += ` +// ,encrypted_metadata_nonce +// ,encrypted_metadata +// ,encrypted_metadata_encrypted_key` +// } +// return selectedFields +// } + +// // startKey determines what should be the starting key for the given options. +// // in the recursive case, or if the cursor key is not in the specified prefix, +// // we start at the greatest key between cursor and prefix. +// // Otherwise (non-recursive), we start at the prefix after the one in the cursor. +// func (opts *ListObjects) startKey() ObjectKey { +// if opts.Prefix == "" && opts.Cursor.Key == "" { +// return "" +// } +// if opts.Recursive || !strings.HasPrefix(string(opts.Cursor.Key), string(opts.Prefix)) { +// if lessKey(opts.Cursor.Key, opts.Prefix) { +// return opts.Prefix +// } +// return opts.Cursor.Key +// } + +// // in the recursive case +// // prefix | cursor | startKey +// // a/b/ | a/b/c/d/e | c/d/[0xff] (the first prefix/object key we return ) +// key := opts.Cursor.Key +// prefixSize := len(opts.Prefix) +// subPrefix := key[prefixSize:] // c/d/e + +// firstDelimiter := strings.Index(string(subPrefix), string(Delimiter)) +// if firstDelimiter == -1 { +// return key +// } +// newKey := []byte(key[:prefixSize+firstDelimiter+1]) // c/d/ +// newKey = append(newKey, 0xff) +// return ObjectKey(newKey) +// } + +// func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []ObjectEntry, err error) { + +// for rows.Next() { +// var item ObjectEntry + +// fields := []interface{}{ +// &item.ObjectKey, +// &item.Version, +// &item.IsPrefix, +// &item.StreamID, +// &item.Status, +// encryptionParameters{&item.Encryption}, +// } + +// if opts.IncludeSystemMetadata { +// fields = append(fields, +// &item.CreatedAt, +// &item.ExpiresAt, +// &item.SegmentCount, +// &item.TotalPlainSize, +// &item.TotalEncryptedSize, +// &item.FixedSegmentSize, +// ) +// } + +// if opts.IncludeCustomMetadata { +// fields = append(fields, +// &item.EncryptedMetadataNonce, +// &item.EncryptedMetadata, +// &item.EncryptedMetadataEncryptedKey, +// ) +// } + +// if err := rows.Scan(fields...); err != nil { +// return entries, err +// } + +// if item.IsPrefix { +// item = ObjectEntry{ +// IsPrefix: true, +// ObjectKey: item.ObjectKey, +// Status: Prefix, +// } +// } + +// entries = append(entries, item) +// } + +// return entries, nil +// } diff --git a/satellite/metabase/list_objects_test.go b/satellite/metabase/list_objects_test.go index 5eeffa21244f..bd41937ac2e1 100644 --- a/satellite/metabase/list_objects_test.go +++ b/satellite/metabase/list_objects_test.go @@ -1035,6 +1035,8 @@ func TestListObjectsVersioned(t *testing.T) { }) t.Run("2 objects one with versions and one pending, list pending", func(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + defer metabasetest.DeleteAll{}.Check(ctx, t, db) a0 := metabasetest.RandObjectStream() diff --git a/satellite/metabase/list_pending_test.go b/satellite/metabase/list_pending_test.go index 63ece3f81294..dd3117ff4eaa 100644 --- a/satellite/metabase/list_pending_test.go +++ b/satellite/metabase/list_pending_test.go @@ -21,6 +21,8 @@ import ( ) func TestListPendingObjects(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { obj := metabasetest.RandObjectStream() @@ -566,6 +568,8 @@ func TestListPendingObjects(t *testing.T) { } func TestListPendingObjectsSkipCursor(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { projectID, bucketName := uuid.UUID{1}, "bucky" @@ -818,6 +822,8 @@ func TestListPendingObjectsSkipCursor(t *testing.T) { } func TestListPendingObjectsVersions(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { t.Run("2 objects, one with versions one without", func(t *testing.T) { @@ -1415,6 +1421,8 @@ func TestListPendingObjectsVersions(t *testing.T) { } func TestListPendingObjects_Limit(t *testing.T) { + t.Skip("see https://github.com/storj/storj/issues/6734") + metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) { projectID := testrand.UUID() bucketName := testrand.BucketName()