-
Notifications
You must be signed in to change notification settings - Fork 387
/
list_segments.go
264 lines (231 loc) · 7.26 KB
/
list_segments.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"database/sql"
"errors"
"time"
"storj.io/common/uuid"
"storj.io/private/dbutil/pgutil"
"storj.io/private/tagsql"
)
// ListSegments contains arguments necessary for listing stream segments.
type ListSegments struct {
StreamID uuid.UUID
Cursor SegmentPosition
Limit int
}
// ListSegmentsResult result of listing segments.
type ListSegmentsResult struct {
Segments []Segment
More bool
}
// ListSegments lists specified stream segments.
func (db *DB) ListSegments(ctx context.Context, opts ListSegments) (result ListSegmentsResult, err error) {
defer mon.Task()(&ctx)(&err)
if opts.StreamID.IsZero() {
return ListSegmentsResult{}, ErrInvalidRequest.New("StreamID missing")
}
if opts.Limit < 0 {
return ListSegmentsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
}
ListLimit.Ensure(&opts.Limit)
err = withRows(db.db.QueryContext(ctx, `
SELECT
position,
created_at,
expires_at,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size, plain_offset, plain_size,
encrypted_etag,
redundancy,
inline_data, remote_alias_pieces
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY stream_id, position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1))(func(rows tagsql.Rows) error {
for rows.Next() {
var segment Segment
var aliasPieces AliasPieces
err = rows.Scan(
&segment.Position,
&segment.CreatedAt, &segment.ExpiresAt,
&segment.RootPieceID, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
&segment.EncryptedSize, &segment.PlainOffset, &segment.PlainSize,
&segment.EncryptedETag,
redundancyScheme{&segment.Redundancy},
&segment.InlineData, &aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}
segment.StreamID = opts.StreamID
result.Segments = append(result.Segments, segment)
}
return nil
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ListSegmentsResult{}, nil
}
return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
}
if db.config.ServerSideCopy {
copies := make([]Segment, 0, len(result.Segments))
copiesPositions := make([]int64, 0, len(result.Segments))
for _, segment := range result.Segments {
if segment.PiecesInAncestorSegment() {
copies = append(copies, segment)
copiesPositions = append(copiesPositions, int64(segment.Position.Encode()))
}
}
if len(copies) > 0 {
index := 0
err = withRows(db.db.QueryContext(ctx, `
SELECT
root_piece_id,
remote_alias_pieces
FROM segments
WHERE
stream_id = (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position IN (SELECT position FROM UNNEST($2::INT8[]) as position)
ORDER BY stream_id, position ASC
`, opts.StreamID, pgutil.Int8Array(copiesPositions)))(func(rows tagsql.Rows) error {
for rows.Next() {
var aliasPieces AliasPieces
err = rows.Scan(
&copies[index].RootPieceID,
&aliasPieces,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
copies[index].Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Error.New("failed to convert aliases to pieces: %w", err)
}
index++
}
return nil
})
if err != nil {
return ListSegmentsResult{}, Error.New("unable to fetch object segments: %w", err)
}
if index != len(copies) {
return ListSegmentsResult{}, Error.New("number of ancestor segments is different than copies: want %d got %d",
len(copies), index)
}
}
}
if len(result.Segments) > opts.Limit {
result.More = true
result.Segments = result.Segments[:len(result.Segments)-1]
}
return result, nil
}
// ListStreamPositions contains arguments necessary for listing stream segments.
type ListStreamPositions struct {
StreamID uuid.UUID
Cursor SegmentPosition
Limit int
Range *StreamRange
}
// StreamRange allows to limit stream positions based on the plain offsets.
type StreamRange struct {
PlainStart int64
PlainLimit int64 // limit is exclusive
}
// ListStreamPositionsResult result of listing segments.
type ListStreamPositionsResult struct {
Segments []SegmentPositionInfo
More bool
}
// SegmentPositionInfo contains information for segment position.
type SegmentPositionInfo struct {
Position SegmentPosition
// PlainSize is 0 for a migrated object.
PlainSize int32
// PlainOffset is 0 for a migrated object.
PlainOffset int64
CreatedAt *time.Time // TODO: make it non-nilable after we migrate all existing segments to have creation time
EncryptedETag []byte
EncryptedKeyNonce []byte
EncryptedKey []byte
}
// ListStreamPositions lists specified stream segment positions.
func (db *DB) ListStreamPositions(ctx context.Context, opts ListStreamPositions) (result ListStreamPositionsResult, err error) {
defer mon.Task()(&ctx)(&err)
if opts.StreamID.IsZero() {
return ListStreamPositionsResult{}, ErrInvalidRequest.New("StreamID missing")
}
if opts.Limit < 0 {
return ListStreamPositionsResult{}, ErrInvalidRequest.New("Invalid limit: %d", opts.Limit)
}
ListLimit.Ensure(&opts.Limit)
if opts.Range != nil {
if opts.Range.PlainStart > opts.Range.PlainLimit {
return ListStreamPositionsResult{}, ErrInvalidRequest.New("invalid range: %d:%d", opts.Range.PlainStart, opts.Range.PlainLimit)
}
}
var rows tagsql.Rows
var rowsErr error
if opts.Range == nil {
rows, rowsErr = db.db.QueryContext(ctx, `
SELECT
position, plain_size, plain_offset, created_at,
encrypted_etag, encrypted_key_nonce, encrypted_key
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2)
ORDER BY position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1)
} else {
rows, rowsErr = db.db.QueryContext(ctx, `
SELECT
position, plain_size, plain_offset, created_at,
encrypted_etag, encrypted_key_nonce, encrypted_key
FROM segments
WHERE
stream_id = $1 AND
($2 = 0::INT8 OR position > $2) AND
$4 < plain_offset + plain_size AND plain_offset < $5
ORDER BY position ASC
LIMIT $3
`, opts.StreamID, opts.Cursor, opts.Limit+1, opts.Range.PlainStart, opts.Range.PlainLimit)
}
err = withRows(rows, rowsErr)(func(rows tagsql.Rows) error {
for rows.Next() {
var segment SegmentPositionInfo
err = rows.Scan(
&segment.Position, &segment.PlainSize, &segment.PlainOffset, &segment.CreatedAt,
&segment.EncryptedETag, &segment.EncryptedKeyNonce, &segment.EncryptedKey,
)
if err != nil {
return Error.New("failed to scan segments: %w", err)
}
result.Segments = append(result.Segments, segment)
}
return nil
})
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ListStreamPositionsResult{}, nil
}
return ListStreamPositionsResult{}, Error.New("unable to fetch object segments: %w", err)
}
if len(result.Segments) > opts.Limit {
result.More = true
result.Segments = result.Segments[:len(result.Segments)-1]
}
return result, nil
}