-
Notifications
You must be signed in to change notification settings - Fork 397
/
raw.go
215 lines (176 loc) · 5.17 KB
/
raw.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"time"
"github.com/zeebo/errs"
"storj.io/common/storj"
"storj.io/common/uuid"
)
// RawObject defines the full object that is stored in the database. It should be rarely used directly.
type RawObject struct {
ObjectStream
CreatedAt time.Time
ExpiresAt *time.Time
Status ObjectStatus
SegmentCount int32
EncryptedMetadataNonce []byte
EncryptedMetadata []byte
EncryptedMetadataEncryptedKey []byte
TotalPlainSize int64
TotalEncryptedSize int64
FixedSegmentSize int32
Encryption storj.EncryptionParameters
// ZombieDeletionDeadline defines when the pending raw object should be deleted from the database.
// This is as a safeguard against objects that failed to upload and the client has not indicated
// whether they want to continue uploading or delete the already uploaded data.
ZombieDeletionDeadline *time.Time
}
// RawSegment defines the full segment that is stored in the database. It should be rarely used directly.
type RawSegment struct {
StreamID uuid.UUID
Position SegmentPosition
RootPieceID storj.PieceID
EncryptedKeyNonce []byte
EncryptedKey []byte
EncryptedSize int32 // size of the whole segment (not a piece)
PlainSize int32
PlainOffset int64
Redundancy storj.RedundancyScheme
InlineData []byte
Pieces Pieces
}
// RawState contains full state of a table.
type RawState struct {
Objects []RawObject
Segments []RawSegment
}
// TestingGetState returns the state of the database.
func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) {
state := &RawState{}
state.Objects, err = db.testingGetAllObjects(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
state.Segments, err = db.testingGetAllSegments(ctx)
if err != nil {
return nil, Error.New("GetState: %w", err)
}
return state, nil
}
// TestingDeleteAll deletes all objects and segments from the database.
func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
_, err = db.db.ExecContext(ctx, `
DELETE FROM objects;
DELETE FROM segments;
DELETE FROM node_aliases;
SELECT setval('node_alias_seq', 1, false);
`)
db.aliasCache = NewNodeAliasCache(db)
return Error.Wrap(err)
}
// testingGetAllObjects returns the state of the database.
func (db *DB) testingGetAllObjects(ctx context.Context) (_ []RawObject, err error) {
objs := []RawObject{}
rows, err := db.db.Query(ctx, `
SELECT
project_id, bucket_name, object_key, version, stream_id,
created_at, expires_at,
status, segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption,
zombie_deletion_deadline
FROM objects
ORDER BY project_id ASC, bucket_name ASC, object_key ASC, version ASC
`)
if err != nil {
return nil, Error.New("testingGetAllObjects query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var obj RawObject
err := rows.Scan(
&obj.ProjectID,
&obj.BucketName,
&obj.ObjectKey,
&obj.Version,
&obj.StreamID,
&obj.CreatedAt,
&obj.ExpiresAt,
&obj.Status, // TODO: fix encoding
&obj.SegmentCount,
&obj.EncryptedMetadataNonce,
&obj.EncryptedMetadata,
&obj.EncryptedMetadataEncryptedKey,
&obj.TotalPlainSize,
&obj.TotalEncryptedSize,
&obj.FixedSegmentSize,
encryptionParameters{&obj.Encryption},
&obj.ZombieDeletionDeadline,
)
if err != nil {
return nil, Error.New("testingGetAllObjects scan failed: %w", err)
}
objs = append(objs, obj)
}
if err := rows.Err(); err != nil {
return nil, Error.New("testingGetAllObjects scan failed: %w", err)
}
if len(objs) == 0 {
return nil, nil
}
return objs, nil
}
// testingGetAllSegments returns the state of the database.
func (db *DB) testingGetAllSegments(ctx context.Context) (_ []RawSegment, err error) {
segs := []RawSegment{}
rows, err := db.db.Query(ctx, `
SELECT
stream_id, position,
root_piece_id, encrypted_key_nonce, encrypted_key,
encrypted_size,
plain_offset, plain_size,
redundancy,
inline_data, remote_alias_pieces
FROM segments
ORDER BY stream_id ASC, position ASC
`)
if err != nil {
return nil, Error.New("testingGetAllSegments query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var seg RawSegment
var aliasPieces AliasPieces
err := rows.Scan(
&seg.StreamID,
&seg.Position,
&seg.RootPieceID,
&seg.EncryptedKeyNonce,
&seg.EncryptedKey,
&seg.EncryptedSize,
&seg.PlainOffset,
&seg.PlainSize,
redundancyScheme{&seg.Redundancy},
&seg.InlineData,
&aliasPieces,
)
if err != nil {
return nil, Error.New("testingGetAllSegments scan failed: %w", err)
}
seg.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return nil, Error.New("testingGetAllSegments convert aliases to pieces failed: %w", err)
}
segs = append(segs, seg)
}
if err := rows.Err(); err != nil {
return nil, Error.New("testingGetAllSegments scan failed: %w", err)
}
if len(segs) == 0 {
return nil, nil
}
return segs, nil
}