forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
remote_backing.go
303 lines (274 loc) · 9.35 KB
/
remote_backing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package objstorageprovider
import (
"bytes"
"encoding/binary"
"io"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/remoteobjcat"
"github.com/cockroachdb/pebble/objstorage/remote"
)
const (
tagCreatorID = 1
tagCreatorFileNum = 2
tagCleanupMethod = 3
// tagRefCheckID encodes the information for a ref marker that needs to be
// checked when attaching this object to another provider. This is set to the
// creator ID and FileNum for the provider that encodes the backing, and
// allows the "target" provider to check that the "source" provider kept its
// reference on the object alive.
tagRefCheckID = 4
// tagLocator encodes the remote.Locator; if absent the locator is "". It is
// followed by the locator string length and the locator string.
tagLocator = 5
// tagLocator encodes a custom object name (if present). It is followed by the
// custom name string length and the string.
tagCustomObjectName = 6
// Any new tags that don't have the tagNotSafeToIgnoreMask bit set must be
// followed by the length of the data (so they can be skipped).
// Any new tags that have the tagNotSafeToIgnoreMask bit set cause errors if
// they are encountered by earlier code that doesn't know the tag.
tagNotSafeToIgnoreMask = 64
)
func (p *provider) encodeRemoteObjectBacking(
meta *objstorage.ObjectMetadata,
) (objstorage.RemoteObjectBacking, error) {
if !meta.IsRemote() {
return nil, errors.AssertionFailedf("object %s not on remote storage", meta.DiskFileNum)
}
buf := make([]byte, 0, binary.MaxVarintLen64*4)
buf = binary.AppendUvarint(buf, tagCreatorID)
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorID))
// TODO(radu): encode file type as well?
buf = binary.AppendUvarint(buf, tagCreatorFileNum)
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CreatorFileNum.FileNum()))
buf = binary.AppendUvarint(buf, tagCleanupMethod)
buf = binary.AppendUvarint(buf, uint64(meta.Remote.CleanupMethod))
if meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
buf = binary.AppendUvarint(buf, tagRefCheckID)
buf = binary.AppendUvarint(buf, uint64(p.remote.shared.creatorID))
buf = binary.AppendUvarint(buf, uint64(meta.DiskFileNum.FileNum()))
}
if meta.Remote.Locator != "" {
buf = binary.AppendUvarint(buf, tagLocator)
buf = encodeString(buf, string(meta.Remote.Locator))
}
if meta.Remote.CustomObjectName != "" {
buf = binary.AppendUvarint(buf, tagCustomObjectName)
buf = encodeString(buf, meta.Remote.CustomObjectName)
}
return buf, nil
}
type remoteObjectBackingHandle struct {
backing objstorage.RemoteObjectBacking
fileNum base.DiskFileNum
p *provider
}
func (s *remoteObjectBackingHandle) Get() (objstorage.RemoteObjectBacking, error) {
if s.backing == nil {
return nil, errors.Errorf("RemoteObjectBackingHandle.Get() called after Close()")
}
return s.backing, nil
}
func (s *remoteObjectBackingHandle) Close() {
if s.backing != nil {
s.backing = nil
s.p.unprotectObject(s.fileNum)
}
}
var _ objstorage.RemoteObjectBackingHandle = (*remoteObjectBackingHandle)(nil)
// RemoteObjectBacking is part of the objstorage.Provider interface.
func (p *provider) RemoteObjectBacking(
meta *objstorage.ObjectMetadata,
) (objstorage.RemoteObjectBackingHandle, error) {
backing, err := p.encodeRemoteObjectBacking(meta)
if err != nil {
return nil, err
}
p.protectObject(meta.DiskFileNum)
return &remoteObjectBackingHandle{
backing: backing,
fileNum: meta.DiskFileNum,
p: p,
}, nil
}
// CreateExternalObjectBacking is part of the objstorage.Provider interface.
func (p *provider) CreateExternalObjectBacking(
locator remote.Locator, objName string,
) (objstorage.RemoteObjectBacking, error) {
var meta objstorage.ObjectMetadata
meta.Remote.Locator = locator
meta.Remote.CustomObjectName = objName
meta.Remote.CleanupMethod = objstorage.SharedNoCleanup
return p.encodeRemoteObjectBacking(&meta)
}
type decodedBacking struct {
meta objstorage.ObjectMetadata
// refToCheck is set only when meta.Remote.CleanupMethod is RefTracking
refToCheck struct {
creatorID objstorage.CreatorID
fileNum base.DiskFileNum
}
}
// decodeRemoteObjectBacking decodes the remote object metadata.
//
// Note that the meta.Remote.Storage field is not set.
func decodeRemoteObjectBacking(
fileType base.FileType, fileNum base.DiskFileNum, buf objstorage.RemoteObjectBacking,
) (decodedBacking, error) {
var creatorID, creatorFileNum, cleanupMethod, refCheckCreatorID, refCheckFileNum uint64
var locator, customObjName string
br := bytes.NewReader(buf)
for {
tag, err := binary.ReadUvarint(br)
if err == io.EOF {
break
}
if err != nil {
return decodedBacking{}, err
}
switch tag {
case tagCreatorID:
creatorID, err = binary.ReadUvarint(br)
case tagCreatorFileNum:
creatorFileNum, err = binary.ReadUvarint(br)
case tagCleanupMethod:
cleanupMethod, err = binary.ReadUvarint(br)
case tagRefCheckID:
refCheckCreatorID, err = binary.ReadUvarint(br)
if err == nil {
refCheckFileNum, err = binary.ReadUvarint(br)
}
case tagLocator:
locator, err = decodeString(br)
case tagCustomObjectName:
customObjName, err = decodeString(br)
default:
// Ignore unknown tags, unless they're not safe to ignore.
if tag&tagNotSafeToIgnoreMask != 0 {
return decodedBacking{}, errors.Newf("unknown tag %d", tag)
}
var dataLen uint64
dataLen, err = binary.ReadUvarint(br)
if err == nil {
_, err = br.Seek(int64(dataLen), io.SeekCurrent)
}
}
if err != nil {
return decodedBacking{}, err
}
}
if customObjName == "" {
if creatorID == 0 {
return decodedBacking{}, errors.Newf("remote object backing missing creator ID")
}
if creatorFileNum == 0 {
return decodedBacking{}, errors.Newf("remote object backing missing creator file num")
}
}
var res decodedBacking
res.meta.DiskFileNum = fileNum
res.meta.FileType = fileType
res.meta.Remote.CreatorID = objstorage.CreatorID(creatorID)
res.meta.Remote.CreatorFileNum = base.FileNum(creatorFileNum).DiskFileNum()
res.meta.Remote.CleanupMethod = objstorage.SharedCleanupMethod(cleanupMethod)
if res.meta.Remote.CleanupMethod == objstorage.SharedRefTracking {
if refCheckCreatorID == 0 || refCheckFileNum == 0 {
return decodedBacking{}, errors.Newf("remote object backing missing ref to check")
}
res.refToCheck.creatorID = objstorage.CreatorID(refCheckCreatorID)
res.refToCheck.fileNum = base.FileNum(refCheckFileNum).DiskFileNum()
}
res.meta.Remote.Locator = remote.Locator(locator)
res.meta.Remote.CustomObjectName = customObjName
return res, nil
}
func encodeString(buf []byte, s string) []byte {
buf = binary.AppendUvarint(buf, uint64(len(s)))
buf = append(buf, []byte(s)...)
return buf
}
func decodeString(br io.ByteReader) (string, error) {
length, err := binary.ReadUvarint(br)
if err != nil || length == 0 {
return "", err
}
buf := make([]byte, length)
for i := range buf {
buf[i], err = br.ReadByte()
if err != nil {
return "", err
}
}
return string(buf), nil
}
// AttachRemoteObjects is part of the objstorage.Provider interface.
func (p *provider) AttachRemoteObjects(
objs []objstorage.RemoteObjectToAttach,
) ([]objstorage.ObjectMetadata, error) {
decoded := make([]decodedBacking, len(objs))
for i, o := range objs {
var err error
decoded[i], err = decodeRemoteObjectBacking(o.FileType, o.FileNum, o.Backing)
if err != nil {
return nil, err
}
decoded[i].meta.Remote.Storage, err = p.ensureStorage(decoded[i].meta.Remote.Locator)
if err != nil {
return nil, err
}
}
// Create the reference marker objects.
// TODO(radu): parallelize this.
for _, d := range decoded {
if d.meta.Remote.CleanupMethod != objstorage.SharedRefTracking {
continue
}
if err := p.sharedCreateRef(d.meta); err != nil {
// TODO(radu): clean up references previously created in this loop.
return nil, err
}
// Check the "origin"'s reference.
refName := sharedObjectRefName(d.meta, d.refToCheck.creatorID, d.refToCheck.fileNum)
if _, err := d.meta.Remote.Storage.Size(refName); err != nil {
_ = p.sharedUnref(d.meta)
// TODO(radu): clean up references previously created in this loop.
if d.meta.Remote.Storage.IsNotExistError(err) {
return nil, errors.Errorf("origin marker object %q does not exist;"+
" object probably removed from the provider which created the backing", refName)
}
return nil, errors.Wrapf(err, "checking origin's marker object %s", refName)
}
}
func() {
p.mu.Lock()
defer p.mu.Unlock()
for _, d := range decoded {
p.mu.remote.catalogBatch.AddObject(remoteobjcat.RemoteObjectMetadata{
FileNum: d.meta.DiskFileNum,
FileType: d.meta.FileType,
CreatorID: d.meta.Remote.CreatorID,
CreatorFileNum: d.meta.Remote.CreatorFileNum,
CleanupMethod: d.meta.Remote.CleanupMethod,
Locator: d.meta.Remote.Locator,
})
}
}()
if err := p.sharedSync(); err != nil {
return nil, err
}
metas := make([]objstorage.ObjectMetadata, len(decoded))
for i, d := range decoded {
metas[i] = d.meta
}
p.mu.Lock()
defer p.mu.Unlock()
for _, meta := range metas {
p.mu.knownObjects[meta.DiskFileNum] = meta
}
return metas, nil
}