-
Notifications
You must be signed in to change notification settings - Fork 387
/
list_objects.go
277 lines (242 loc) · 7 KB
/
list_objects.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"database/sql"
"errors"
"strings"
"storj.io/common/uuid"
"storj.io/private/tagsql"
)
// ListObjectsCursor is a cursor used during iteration through objects.
type ListObjectsCursor IterateCursor
// ListObjects contains arguments necessary for listing objects.
type ListObjects struct {
ProjectID uuid.UUID
BucketName string
Recursive bool
Limit int
Prefix ObjectKey
Cursor ListObjectsCursor
Pending bool
IncludeCustomMetadata bool
IncludeSystemMetadata bool
}
// Verify verifies get object request fields.
func (opts *ListObjects) Verify() error {
switch {
case opts.ProjectID.IsZero():
return ErrInvalidRequest.New("ProjectID missing")
case opts.BucketName == "":
return ErrInvalidRequest.New("BucketName missing")
case opts.Limit < 0:
return ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
}
return nil
}
// ListObjectsResult result of listing objects.
type ListObjectsResult struct {
Objects []ObjectEntry
More bool
}
// ListObjects lists objects.
func (db *DB) ListObjects(ctx context.Context, opts ListObjects) (result ListObjectsResult, err error) {
defer mon.Task()(&ctx)(&err)
if err := opts.Verify(); err != nil {
return ListObjectsResult{}, err
}
ListLimit.Ensure(&opts.Limit)
var entries []ObjectEntry
err = withRows(db.db.QueryContext(ctx, opts.getSQLQuery(),
opts.ProjectID, []byte(opts.BucketName),
opts.startKey(), opts.Cursor.Version, opts.stopKey(),
opts.Limit+1, len(opts.Prefix)+1),
)(func(rows tagsql.Rows) error {
entries, err = scanListObjectsResult(rows, opts)
return err
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ListObjectsResult{}, nil
}
return ListObjectsResult{}, Error.New("unable to list objects: %w", err)
}
if len(entries) > opts.Limit {
result.More = true
result.Objects = entries[:opts.Limit]
return result, nil
}
result.Objects = entries
result.More = false
return result, nil
}
func (opts *ListObjects) getSQLQuery() string {
var indexFields string
if opts.Recursive {
indexFields = `
substring(object_key from $7), FALSE as is_prefix`
} else {
indexFields = `
DISTINCT ON (entry_key)
CASE
WHEN position('/' IN substring(object_key from $7)) <> 0
THEN substring(substring(object_key from $7) from 0 for (position('/' IN substring(object_key from $7)) +1))
ELSE substring(object_key from $7)
END
AS entry_key,
position('/' IN substring(object_key from $7)) <> 0 AS is_prefix`
}
if opts.Pending {
return `SELECT ` + indexFields + opts.selectedFields() + `
FROM objects
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND ` + opts.stopCondition() + `
AND status = ` + statusPending + `
AND (expires_at IS NULL OR expires_at > now())
ORDER BY ` + opts.orderBy() + `
LIMIT $6
`
}
// TODO(ver): using subquery the following subquery looks nicer, however CRDB has a bug related to it.
//
// SELECT MAX(sub.version)
// FROM objects sub
// WHERE
// (sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
// AND status <> ` + statusPending + `
// AND (expires_at IS NULL OR expires_at > now())
// query committed objects where the latest is not a delete marker
return `SELECT ` + indexFields + opts.selectedFields() + `
FROM objects main
WHERE
(project_id, bucket_name, object_key, version) > ($1, $2, $3, $4)
AND ` + opts.stopCondition() + `
AND status IN ` + statusesCommitted + `
AND (expires_at IS NULL OR expires_at > now())
AND version = (
SELECT sub.version
FROM objects sub
WHERE
(sub.project_id, sub.bucket_name, sub.object_key) = (main.project_id, main.bucket_name, main.object_key)
AND status <> ` + statusPending + `
AND (expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1
)
ORDER BY ` + opts.orderBy() + `
LIMIT $6
`
}
func (opts *ListObjects) stopKey() []byte {
if opts.Prefix != "" {
return []byte(prefixLimit(opts.Prefix))
}
return nextBucket([]byte(opts.BucketName))
}
func (opts *ListObjects) stopCondition() string {
if opts.Prefix != "" {
return "(project_id, bucket_name, object_key) < ($1, $2, $5)"
}
return "(project_id, bucket_name) < ($1, $5)"
}
func (opts *ListObjects) orderBy() string {
if !opts.Recursive {
return "entry_key ASC, version DESC"
}
return "object_key ASC, version DESC"
}
func (opts ListObjects) selectedFields() (selectedFields string) {
selectedFields += `
,stream_id
,version
,status
,encryption`
if opts.IncludeSystemMetadata {
selectedFields += `
,created_at
,expires_at
,segment_count
,total_plain_size
,total_encrypted_size
,fixed_segment_size`
}
if opts.IncludeCustomMetadata {
selectedFields += `
,encrypted_metadata_nonce
,encrypted_metadata
,encrypted_metadata_encrypted_key`
}
return selectedFields
}
// startKey determines what should be the starting key for the given options.
// in the recursive case, or if the cursor key is not in the specified prefix,
// we start at the greatest key between cursor and prefix.
// Otherwise (non-recursive), we start at the prefix after the one in the cursor.
func (opts *ListObjects) startKey() ObjectKey {
if opts.Prefix == "" && opts.Cursor.Key == "" {
return ""
}
if opts.Recursive || !strings.HasPrefix(string(opts.Cursor.Key), string(opts.Prefix)) {
if lessKey(opts.Cursor.Key, opts.Prefix) {
return opts.Prefix
}
return opts.Cursor.Key
}
// in the recursive case
// prefix | cursor | startKey
// a/b/ | a/b/c/d/e | c/d/[0xff] (the first prefix/object key we return )
key := opts.Cursor.Key
prefixSize := len(opts.Prefix)
subPrefix := key[prefixSize:] // c/d/e
firstDelimiter := strings.Index(string(subPrefix), string(Delimiter))
if firstDelimiter == -1 {
return key
}
newKey := []byte(key[:prefixSize+firstDelimiter+1]) // c/d/
newKey = append(newKey, 0xff)
return ObjectKey(newKey)
}
func scanListObjectsResult(rows tagsql.Rows, opts ListObjects) (entries []ObjectEntry, err error) {
for rows.Next() {
var item ObjectEntry
fields := []interface{}{
&item.ObjectKey,
&item.IsPrefix,
&item.StreamID,
&item.Version,
&item.Status,
encryptionParameters{&item.Encryption},
}
if opts.IncludeSystemMetadata {
fields = append(fields,
&item.CreatedAt,
&item.ExpiresAt,
&item.SegmentCount,
&item.TotalPlainSize,
&item.TotalEncryptedSize,
&item.FixedSegmentSize,
)
}
if opts.IncludeCustomMetadata {
fields = append(fields,
&item.EncryptedMetadataNonce,
&item.EncryptedMetadata,
&item.EncryptedMetadataEncryptedKey,
)
}
if err := rows.Scan(fields...); err != nil {
return entries, err
}
if item.IsPrefix {
item = ObjectEntry{
IsPrefix: true,
ObjectKey: item.ObjectKey,
Status: Prefix,
}
}
entries = append(entries, item)
}
return entries, nil
}