Skip to content

Commit

Permalink
satellite/metabase: adjust code for iteration
Browse files Browse the repository at this point in the history
Change-Id: Id3d4efe228a6f2d3642a639ef66a30e178ca001a
  • Loading branch information
egonelbre authored and Storj Robot committed Oct 25, 2023
1 parent 988ebba commit 97c98d7
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 89 deletions.
6 changes: 3 additions & 3 deletions satellite/metabase/bench_test.go
Expand Up @@ -217,7 +217,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
err := db.IterateObjectsAllVersionsWithStatus(ctx, metabase.IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: "bucket",
Status: metabase.CommittedUnversioned,
Pending: false,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
var entry metabase.ObjectEntry
for it.Next(ctx, &entry) {
Expand All @@ -241,7 +241,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
ProjectID: projectID,
BucketName: "bucket",
Prefix: metabase.ObjectKey(prefixes[i]),
Status: metabase.CommittedUnversioned,
Pending: false,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
var entry metabase.ObjectEntry
for it.Next(ctx, &entry) {
Expand Down Expand Up @@ -333,7 +333,7 @@ func (s *scenario) run(ctx *testcontext.Context, b *testing.B, db *metabase.DB)
Cursor: metabase.IterateCursor{
Key: object.ObjectKey,
},
Status: metabase.CommittedUnversioned,
Pending: false,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, func(ctx context.Context, it metabase.ObjectsIterator) error {
Expand Down
6 changes: 3 additions & 3 deletions satellite/metabase/get.go
Expand Up @@ -336,18 +336,18 @@ func (db *DB) BucketEmpty(ctx context.Context, opts BucketEmpty) (empty bool, er
func (db *DB) TestingAllCommittedObjects(ctx context.Context, projectID uuid.UUID, bucketName string) (objects []ObjectEntry, err error) {
defer mon.Task()(&ctx)(&err)

return db.testingAllObjectsByStatus(ctx, projectID, bucketName, CommittedUnversioned)
return db.testingAllObjectsByStatus(ctx, projectID, bucketName, false)
}

func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, status ObjectStatus) (objects []ObjectEntry, err error) {
func (db *DB) testingAllObjectsByStatus(ctx context.Context, projectID uuid.UUID, bucketName string, pending bool) (objects []ObjectEntry, err error) {
defer mon.Task()(&ctx)(&err)

err = db.IterateObjectsAllVersionsWithStatus(ctx,
IterateObjectsWithStatus{
ProjectID: projectID,
BucketName: bucketName,
Recursive: true,
Status: status,
Pending: pending,
IncludeCustomMetadata: true,
IncludeSystemMetadata: true,
}, func(ctx context.Context, it ObjectsIterator) error {
Expand Down
43 changes: 26 additions & 17 deletions satellite/metabase/iterator.go
Expand Up @@ -20,7 +20,7 @@ type objectsIterator struct {

projectID uuid.UUID
bucketName []byte
status ObjectStatus
pending bool
prefix ObjectKey
prefixLimit ObjectKey
batchSize int
Expand Down Expand Up @@ -54,7 +54,7 @@ func iterateAllVersionsWithStatus(ctx context.Context, db *DB, opts IterateObjec

projectID: opts.ProjectID,
bucketName: []byte(opts.BucketName),
status: opts.Status,
pending: opts.Pending,
prefix: opts.Prefix,
prefixLimit: prefixLimit(opts.Prefix),
batchSize: opts.BatchSize,
Expand Down Expand Up @@ -98,7 +98,7 @@ func iteratePendingObjectsByKey(ctx context.Context, db *DB, opts IteratePending
recursive: true,
includeCustomMetadata: true,
includeSystemMetadata: true,
status: Pending,
pending: true,

curIndex: 0,
cursor: iterateCursor{
Expand Down Expand Up @@ -162,7 +162,12 @@ func (it *objectsIterator) Next(ctx context.Context, item *ObjectEntry) bool {
*item = ObjectEntry{
IsPrefix: true,
ObjectKey: item.ObjectKey[:p+1],
Status: it.status,
}
// TODO(ver): should we return something else here?
if it.pending {
item.Status = Pending
} else {
item.Status = CommittedUnversioned
}
}

Expand Down Expand Up @@ -231,21 +236,25 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
cursorCompare = ">="
}

statusFilter := `AND status <> ` + statusPending
if it.pending {
statusFilter = `AND status = ` + statusPending
}

if it.prefixLimit == "" {
querySelectFields := querySelectorFields("object_key", it)
return it.db.db.QueryContext(ctx, `
SELECT
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
AND (project_id, bucket_name) < ($1, $7)
AND status = $3
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
AND (project_id, bucket_name) < ($1, $6)
`+statusFilter+`
AND (expires_at IS NULL OR expires_at > now())
ORDER BY (project_id, bucket_name, object_key, version) ASC
LIMIT $6
LIMIT $5
`, it.projectID, it.bucketName,
it.status,
[]byte(it.cursor.Key), int(it.cursor.Version),
it.batchSize,
nextBucket(it.bucketName),
Expand All @@ -257,20 +266,19 @@ func doNextQueryAllVersionsWithStatus(ctx context.Context, it *objectsIterator)
fromSubstring = len(it.prefix) + 1
}

querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $8)", it)
querySelectFields := querySelectorFields("SUBSTRING(object_key FROM $7)", it)
return it.db.db.QueryContext(ctx, `
SELECT
`+querySelectFields+`
FROM objects
WHERE
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $4, $5)
AND (project_id, bucket_name, object_key) < ($1, $2, $6)
AND status = $3
(project_id, bucket_name, object_key, version) `+cursorCompare+` ($1, $2, $3, $4)
AND (project_id, bucket_name, object_key) < ($1, $2, $5)
`+statusFilter+`
AND (expires_at IS NULL OR expires_at > now())
ORDER BY (project_id, bucket_name, object_key, version) ASC
LIMIT $7
LIMIT $6
`, it.projectID, it.bucketName,
it.status,
[]byte(it.cursor.Key), int(it.cursor.Version),
[]byte(it.prefixLimit),
it.batchSize,
Expand All @@ -282,6 +290,7 @@ func querySelectorFields(objectKeyColumn string, it *objectsIterator) string {
querySelectFields := objectKeyColumn + `
,stream_id
,version
,status
,encryption`

if it.includeSystemMetadata {
Expand Down Expand Up @@ -317,7 +326,7 @@ func doNextQueryPendingObjectsByKey(ctx context.Context, it *objectsIterator) (_

return it.db.db.QueryContext(ctx, `
SELECT
object_key, stream_id, version, encryption,
object_key, stream_id, version, status, encryption,
created_at, expires_at,
segment_count,
total_plain_size, total_encrypted_size, fixed_segment_size,
Expand All @@ -339,12 +348,12 @@ func doNextQueryPendingObjectsByKey(ctx context.Context, it *objectsIterator) (_
// scanItem scans doNextQuery results into ObjectEntry.
func (it *objectsIterator) scanItem(item *ObjectEntry) (err error) {
item.IsPrefix = false
item.Status = it.status

fields := []interface{}{
&item.ObjectKey,
&item.StreamID,
&item.Version,
&item.Status,
encryptionParameters{&item.Encryption},
}

Expand Down

0 comments on commit 97c98d7

Please sign in to comment.