Skip to content

Commit

Permalink
satellite/metabase: fix and test ListObjects.Pending
Browse files Browse the repository at this point in the history
Make ListObjects.Pending behavior clearer and add tests.

Updates #6679

Change-Id: I5ff1d65c070386f7b1fcb5df16617c8a8f3098bb
  • Loading branch information
egonelbre committed Jan 12, 2024
1 parent 52bfc87 commit 80cfa9a
Show file tree
Hide file tree
Showing 5 changed files with 1,759 additions and 91 deletions.
127 changes: 79 additions & 48 deletions satellite/metabase/list_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
type ListObjectsCursor IterateCursor

// ListObjects contains arguments necessary for listing objects.
//
// For Pending = false, the versions are in descending order.
// For pending = true, the versions are in ascending order.
type ListObjects struct {
ProjectID uuid.UUID
BucketName string
Expand All @@ -25,6 +28,7 @@ type ListObjects struct {
Prefix ObjectKey
Cursor ListObjectsCursor
Pending bool
AllVersions bool
IncludeCustomMetadata bool
IncludeSystemMetadata bool
}
Expand All @@ -38,6 +42,11 @@ func (opts *ListObjects) Verify() error {
return ErrInvalidRequest.New("BucketName missing")
case opts.Limit < 0:
return ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)

case opts.Pending && !opts.AllVersions:
return ErrInvalidRequest.New("Not Implemented: Pending && !AllVersions")
case !opts.Pending && opts.AllVersions:
return ErrInvalidRequest.New("Not Implemented: !Pending && AllVersions")
}
return nil
}
Expand Down Expand Up @@ -88,21 +97,40 @@ func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObj
func (opts *ListObjects) getSQLQuery() string {
var indexFields string
if opts.Recursive {
indexFields = `
substring(object_key from $7), FALSE as is_prefix`
indexFields = `substring(object_key from $7) AS entry_key, version AS entry_version, FALSE AS is_prefix`
} else {
indexFields = `
DISTINCT ON (entry_key)
CASE
WHEN position('/' IN substring(object_key from $7)) <> 0
THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1))
ELSE substring(object_key from $7)
END
AS entry_key,
position('/' IN substring(object_key from $7)) <> 0 AS is_prefix`
if opts.AllVersions {
indexFields = `
DISTINCT ON (entry_key, entry_version)
(CASE
WHEN position('/' IN substring(object_key from $7)) <> 0
THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1))
ELSE substring(object_key from $7)
END)
AS entry_key,
(CASE
WHEN position('/' IN substring(object_key from $7)) <> 0
THEN 0
ELSE version
END)
AS entry_version,
position('/' IN substring(object_key from $7)) <> 0 AS is_prefix`
} else {
indexFields = `
DISTINCT ON (entry_key)
(CASE
WHEN position('/' IN substring(object_key from $7)) <> 0
THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1))
ELSE substring(object_key from $7)
END)
AS entry_key,
version AS entry_version,
position('/' IN substring(object_key from $7)) <> 0 AS is_prefix`
}
}

if opts.Pending {
switch {
case opts.Pending && opts.AllVersions:
return `SELECT ` + indexFields + opts.selectedFields() + `
FROM objects
WHERE
Expand All @@ -113,38 +141,41 @@ func (opts *ListObjects) getSQLQuery() string {
ORDER BY ` + opts.orderBy() + `
LIMIT $6
`
}

// TODO(ver): using subquery the following subquery looks nicer, however CRDB has a bug related to it.
//
// SELECT MAX(sub.version)
// FROM objects sub
// WHERE
// (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
// AND status <> ` + statusPending + `
// AND (expires_at IS NULL OR expires_at > now())

// query committed objects where the latest is not a delete marker
return `SELECT ` + indexFields + opts.selectedFields() + `
FROM objects main
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND ` + opts.stopCondition() + `
AND status IN ` + statusesCommitted + `
AND (expires_at IS NULL OR expires_at > now())
AND version = (
SELECT sub.version
FROM objects sub
WHERE
(sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
AND status <> ` + statusPending + `
AND (expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1
)
ORDER BY ` + opts.orderBy() + `
LIMIT $6
`
case !opts.Pending && !opts.AllVersions:
// TODO(ver): using subquery the following subquery looks nicer, however CRDB has a bug related to it.
//
// SELECT MAX(sub.version)
// FROM objects sub
// WHERE
// (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
// AND status <> ` + statusPending + `
// AND (expires_at IS NULL OR expires_at > now())

// query committed objects where the latest is not a delete marker
return `SELECT ` + indexFields + opts.selectedFields() + `
FROM objects main
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND ` + opts.stopCondition() + `
AND status IN ` + statusesCommitted + `
AND (expires_at IS NULL OR expires_at > now())
AND version = (
SELECT sub.version
FROM objects sub
WHERE
(sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
AND status <> ` + statusPending + `
AND (expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1
)
ORDER BY ` + opts.orderBy() + `
LIMIT $6
`
default:
panic("Not supported configuration, should not happen. Verify should check this.")
}
}

func (opts *ListObjects) stopKey() []byte {
Expand All @@ -162,16 +193,16 @@ func (opts *ListObjects) stopCondition() string {
}

func (opts *ListObjects) orderBy() string {
if !opts.Recursive {
return "entry_key ASC, version DESC"
if opts.Pending {
return "entry_key ASC, entry_version ASC"
} else {
return "entry_key ASC, entry_version DESC"
}
return "object_key ASC, version DESC"
}

func (opts ListObjects) selectedFields() (selectedFields string) {
selectedFields += `
,stream_id
,version
,status
,encryption`

Expand Down Expand Up @@ -232,9 +263,9 @@ func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []Object

fields := []interface{}{
&item.ObjectKey,
&item.Version,
&item.IsPrefix,
&item.StreamID,
&item.Version,
&item.Status,
encryptionParameters{&item.Encryption},
}
Expand Down

0 comments on commit 80cfa9a

Please sign in to comment.