-
Notifications
You must be signed in to change notification settings - Fork 397
/
multipart.go
455 lines (371 loc) · 11.2 KB
/
multipart.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package miniogw
import (
"context"
"io"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/pkg/hash"
"storj.io/storj/lib/uplink"
)
func (layer *gatewayLayer) NewMultipartUpload(ctx context.Context, bucket, object string, metadata map[string]string) (uploadID string, err error) {
defer mon.Task()(&ctx)(&err)
// Check that the bucket exists
_, _, err = layer.gateway.project.GetBucketInfo(ctx, bucket)
if err != nil {
return "", convertError(err, bucket, "")
}
uploads := layer.gateway.multipart
upload, err := uploads.Create(bucket, object, metadata)
if err != nil {
return "", err
}
go func() {
contentType := metadata["content-type"]
delete(metadata, "content-type")
opts := uplink.UploadOptions{
ContentType: contentType,
Metadata: metadata,
}
objInfo, err := layer.putObject(ctx, bucket, object, upload.Stream, &opts)
uploads.RemoveByID(upload.ID)
if err != nil {
upload.fail(err)
} else {
upload.complete(objInfo)
}
}()
return upload.ID, nil
}
func (layer *gatewayLayer) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, data *hash.Reader) (info minio.PartInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := layer.gateway.multipart
upload, err := uploads.Get(bucket, object, uploadID)
if err != nil {
return minio.PartInfo{}, err
}
part, err := upload.Stream.AddPart(partID, data)
if err != nil {
return minio.PartInfo{}, err
}
err = <-part.Done
if err != nil {
return minio.PartInfo{}, err
}
partInfo := minio.PartInfo{
PartNumber: part.Number,
LastModified: time.Now(),
ETag: data.SHA256HexString(),
Size: atomic.LoadInt64(&part.Size),
}
upload.addCompletedPart(partInfo)
return partInfo, nil
}
func (layer *gatewayLayer) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string) (err error) {
defer mon.Task()(&ctx)(&err)
uploads := layer.gateway.multipart
upload, err := uploads.Remove(bucket, object, uploadID)
if err != nil {
return err
}
errAbort := Error.New("abort")
upload.Stream.Abort(errAbort)
r := <-upload.Done
if r.Error != errAbort {
return r.Error
}
return nil
}
func (layer *gatewayLayer) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, uploadedParts []minio.CompletePart) (objInfo minio.ObjectInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := layer.gateway.multipart
upload, err := uploads.Remove(bucket, object, uploadID)
if err != nil {
return minio.ObjectInfo{}, err
}
// notify stream that there aren't more parts coming
upload.Stream.Close()
// wait for completion
result := <-upload.Done
// return the final info
return result.Info, result.Error
}
func (layer *gatewayLayer) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int) (result minio.ListPartsInfo, err error) {
defer mon.Task()(&ctx)(&err)
uploads := layer.gateway.multipart
upload, err := uploads.Get(bucket, object, uploadID)
if err != nil {
return minio.ListPartsInfo{}, err
}
list := minio.ListPartsInfo{}
list.Bucket = bucket
list.Object = object
list.UploadID = uploadID
list.PartNumberMarker = partNumberMarker
list.MaxParts = maxParts
list.UserDefined = upload.Metadata
list.Parts = upload.getCompletedParts()
sort.Slice(list.Parts, func(i, k int) bool {
return list.Parts[i].PartNumber < list.Parts[k].PartNumber
})
var first int
for i, p := range list.Parts {
first = i
if partNumberMarker <= p.PartNumber {
break
}
}
list.Parts = list.Parts[first:]
if len(list.Parts) > maxParts {
list.NextPartNumberMarker = list.Parts[maxParts].PartNumber
list.Parts = list.Parts[:maxParts]
list.IsTruncated = true
}
return list, nil
}
// TODO: implement
// func (layer *gatewayLayer) ListMultipartUploads(ctx context.Context, bucket, prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int) (result minio.ListMultipartsInfo, err error) {
// func (layer *gatewayLayer) CopyObjectPart(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, partID int, startOffset int64, length int64, srcInfo minio.ObjectInfo) (info minio.PartInfo, err error) {
// MultipartUploads manages pending multipart uploads
type MultipartUploads struct {
mu sync.RWMutex
lastID int
pending map[string]*MultipartUpload
}
// NewMultipartUploads creates new MultipartUploads
func NewMultipartUploads() *MultipartUploads {
return &MultipartUploads{
pending: map[string]*MultipartUpload{},
}
}
// Create creates a new upload
func (uploads *MultipartUploads) Create(bucket, object string, metadata map[string]string) (*MultipartUpload, error) {
uploads.mu.Lock()
defer uploads.mu.Unlock()
for id, upload := range uploads.pending {
if upload.Bucket == bucket && upload.Object == object {
upload.Stream.Abort(Error.New("aborted by another upload to the same location"))
delete(uploads.pending, id)
}
}
uploads.lastID++
uploadID := "Upload" + strconv.Itoa(uploads.lastID)
upload := NewMultipartUpload(uploadID, bucket, object, metadata)
uploads.pending[uploadID] = upload
return upload, nil
}
// Get finds a pending upload
func (uploads *MultipartUploads) Get(bucket, object, uploadID string) (*MultipartUpload, error) {
uploads.mu.Lock()
defer uploads.mu.Unlock()
upload, ok := uploads.pending[uploadID]
if !ok {
return nil, Error.New("pending upload %q missing", uploadID)
}
if upload.Bucket != bucket || upload.Object != object {
return nil, Error.New("pending upload %q bucket/object name mismatch", uploadID)
}
return upload, nil
}
// Remove returns and removes a pending upload
func (uploads *MultipartUploads) Remove(bucket, object, uploadID string) (*MultipartUpload, error) {
uploads.mu.RLock()
defer uploads.mu.RUnlock()
upload, ok := uploads.pending[uploadID]
if !ok {
return nil, Error.New("pending upload %q missing", uploadID)
}
if upload.Bucket != bucket || upload.Object != object {
return nil, Error.New("pending upload %q bucket/object name mismatch", uploadID)
}
delete(uploads.pending, uploadID)
return upload, nil
}
// RemoveByID removes pending upload by id
func (uploads *MultipartUploads) RemoveByID(uploadID string) {
uploads.mu.RLock()
defer uploads.mu.RUnlock()
delete(uploads.pending, uploadID)
}
// MultipartUpload is partial info about a pending upload
type MultipartUpload struct {
ID string
Bucket string
Object string
Metadata map[string]string
Done chan (*MultipartUploadResult)
Stream *MultipartStream
mu sync.Mutex
completed []minio.PartInfo
}
// MultipartUploadResult contains either an Error or the uploaded ObjectInfo
type MultipartUploadResult struct {
Error error
Info minio.ObjectInfo
}
// NewMultipartUpload creates a new MultipartUpload
func NewMultipartUpload(uploadID string, bucket, object string, metadata map[string]string) *MultipartUpload {
upload := &MultipartUpload{
ID: uploadID,
Bucket: bucket,
Object: object,
Metadata: metadata,
Done: make(chan *MultipartUploadResult, 1),
Stream: NewMultipartStream(),
}
return upload
}
// addCompletedPart adds a completed part to the list
func (upload *MultipartUpload) addCompletedPart(part minio.PartInfo) {
upload.mu.Lock()
defer upload.mu.Unlock()
upload.completed = append(upload.completed, part)
}
func (upload *MultipartUpload) getCompletedParts() []minio.PartInfo {
upload.mu.Lock()
defer upload.mu.Unlock()
return append([]minio.PartInfo{}, upload.completed...)
}
// fail aborts the upload with an error
func (upload *MultipartUpload) fail(err error) {
upload.Done <- &MultipartUploadResult{Error: err}
close(upload.Done)
}
// complete completes the upload
func (upload *MultipartUpload) complete(info minio.ObjectInfo) {
upload.Done <- &MultipartUploadResult{Info: info}
close(upload.Done)
}
// MultipartStream serializes multiple readers into a single reader
type MultipartStream struct {
mu sync.Mutex
moreParts sync.Cond
err error
closed bool
finished bool
nextID int
nextNumber int
currentPart *StreamPart
parts []*StreamPart
}
// StreamPart is a reader waiting in MultipartStream
type StreamPart struct {
Number int
ID int
Size int64
Reader *hash.Reader
Done chan error
}
// NewMultipartStream creates a new MultipartStream
func NewMultipartStream() *MultipartStream {
stream := &MultipartStream{}
stream.moreParts.L = &stream.mu
stream.nextID = 1
return stream
}
// Abort aborts the stream reading
func (stream *MultipartStream) Abort(err error) {
stream.mu.Lock()
defer stream.mu.Unlock()
if stream.finished {
return
}
if stream.err == nil {
stream.err = err
}
stream.finished = true
stream.closed = true
for _, part := range stream.parts {
part.Done <- err
close(part.Done)
}
stream.parts = nil
stream.moreParts.Broadcast()
}
// Close closes the stream, but lets it complete
func (stream *MultipartStream) Close() {
stream.mu.Lock()
defer stream.mu.Unlock()
stream.closed = true
stream.moreParts.Broadcast()
}
// Read implements io.Reader interface, blocking when there's no part
func (stream *MultipartStream) Read(data []byte) (n int, err error) {
stream.mu.Lock()
for {
// has an error occurred?
if stream.err != nil {
stream.mu.Unlock()
return 0, Error.Wrap(err)
}
// still uploading the current part?
if stream.currentPart != nil {
break
}
// do we have the next part?
if len(stream.parts) > 0 && stream.nextID == stream.parts[0].ID {
stream.currentPart = stream.parts[0]
stream.parts = stream.parts[1:]
stream.nextID++
break
}
// we don't have the next part and are closed, hence we are complete
if stream.closed {
stream.finished = true
stream.mu.Unlock()
return 0, io.EOF
}
stream.moreParts.Wait()
}
stream.mu.Unlock()
// read as much as we can
n, err = stream.currentPart.Reader.Read(data)
atomic.AddInt64(&stream.currentPart.Size, int64(n))
if err == io.EOF {
// the part completed, hence advance to the next one
err = nil
close(stream.currentPart.Done)
stream.currentPart = nil
} else if err != nil {
// something bad happened, abort the whole thing
stream.Abort(err)
return n, Error.Wrap(err)
}
return n, err
}
// AddPart adds a new part to the stream to wait
func (stream *MultipartStream) AddPart(partID int, data *hash.Reader) (*StreamPart, error) {
stream.mu.Lock()
defer stream.mu.Unlock()
if partID < stream.nextID {
return nil, Error.New("part %d already uploaded, next part ID is %d", partID, stream.nextID)
}
for _, p := range stream.parts {
if p.ID == partID {
// Replace the reader of this part with the new one.
// This could happen if the read timeout for this part has expired
// and the client tries to upload the part again.
p.Reader = data
return p, nil
}
}
stream.nextNumber++
part := &StreamPart{
Number: stream.nextNumber - 1,
ID: partID,
Size: 0,
Reader: data,
Done: make(chan error, 1),
}
stream.parts = append(stream.parts, part)
sort.Slice(stream.parts, func(i, k int) bool {
return stream.parts[i].ID < stream.parts[k].ID
})
stream.moreParts.Broadcast()
return part, nil
}