Skip to content

Commit

Permalink
satellite/metainfo: don't use metabase.ListObjects for unversioned bu…
Browse files Browse the repository at this point in the history
…ckets

This is workaround for metabase.ListObject performance issues.
Earlier IterateObjectsAllVersionsWithStatus was changed to return
results in descending order and regular listing was changed to use
ListObjects method. With this change IterateObjectsAllVersionsWithStatus is reverted under IterateObjectsAllVersionsWithStatusAscending name and
is used to do regular (only latest objects) listing for unversioned
buckets and for pending objects listing. Versioned buckets will be still
listed with metabase.ListObject.

Change-Id: I49b57edc1f7e352c035f602ac71048e8461e0632
  • Loading branch information
mniewrzal authored and Storj Robot committed Jan 29, 2024
1 parent e4d6ce8 commit eaa3edc
Show file tree
Hide file tree
Showing 6 changed files with 2,119 additions and 49 deletions.
92 changes: 91 additions & 1 deletion satellite/metabase/iterator.go
Expand Up @@ -46,7 +46,7 @@ type iterateCursor struct {
Inclusive bool
}

func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) {
func iterateAllVersionsWithStatusDescending(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)

it := &objectsIterator{
Expand Down Expand Up @@ -78,6 +78,38 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec
return iterate(ctx, it, fn)
}

func iterateAllVersionsWithStatusAscending(ctx context.Context, db *DB, opts IterateObjectsWithStatus, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)

it := &objectsIterator{
db: db,

projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
pending: opts.Pending,
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
recursive: opts.Recursive,
includeCustomMetadata: opts.IncludeCustomMetadata,
includeSystemMetadata: opts.IncludeSystemMetadata,

curIndex: 0,
cursor: firstIterateCursor(opts.Recursive, opts.Cursor, opts.Prefix),

doNextQuery: doNextQueryAllVersionsWithStatusAscending,
}

// start from either the cursor or prefix, depending on which is larger
if lessKey(it.cursor.Key, opts.Prefix) {
it.cursor.Key = opts.Prefix
it.cursor.Version = -1
it.cursor.Inclusive = true
}

return iterate(ctx, it, fn)
}

func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePendingObjectsByKey, fn func(context.Context, ObjectsIterator) error) (err error) {
defer mon.Task()(&ctx)(&err)

Expand Down Expand Up @@ -287,6 +319,64 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
)
}

func doNextQueryAllVersionsWithStatusAscending(ctx context.Context, it *objectsIterator) (_ tagsql.Rows, err error) {
defer mon.Task()(&ctx)(&err)

cursorCompare := ">"
if it.cursor.Inclusive {
cursorCompare = ">="
}

statusFilter := `AND status <> ` + statusPending
if it.pending {
statusFilter = `AND status = ` + statusPending
}

if it.prefixLimit == "" {
querySelectFields := querySelectorFields("object_key", it)
return it.db.db.QueryContext(ctx, `
SELECT
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
AND (project_id, bucket_name) < ($1, $6)
`+statusFilter+`
AND (expires_at IS NULL OR expires_at > now())
ORDER BY (project_id, bucket_name, object_key, version) ASC
LIMIT $5
`, it.projectID, it.bucketName,
[]byte(it.cursor.Key), int(it.cursor.Version),
it.batchSize,
nextBucket(it.bucketName),
)
}

fromSubstring := 1
if it.prefix != "" {
fromSubstring = len(it.prefix) + 1
}

querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $7)", it)
return it.db.db.QueryContext(ctx, `
SELECT
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
AND (project_id, bucket_name, object_key) < ($1, $2, $5)
`+statusFilter+`
AND (expires_at IS NULL OR expires_at > now())
ORDER BY (project_id, bucket_name, object_key, version) ASC
LIMIT $6
`, it.projectID, it.bucketName,
[]byte(it.cursor.Key), int(it.cursor.Version),
[]byte(it.prefixLimit),
it.batchSize,
fromSubstring,
)
}

func querySelectorFields(objectKeyColumn string, it *objectsIterator) string {
querySelectFields := objectKeyColumn + `
,stream_id
Expand Down

0 comments on commit eaa3edc

Please sign in to comment.