-
Notifications
You must be signed in to change notification settings - Fork 1
/
s3_copier.go
510 lines (434 loc) · 15.4 KB
/
s3_copier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
package s3server
// Copyright (C) 2022 by RStudio, PBC
import (
"context"
"fmt"
"net/url"
"strings"
"sync"
"sync/atomic"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/awsutil"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/rstudio/platform-lib/v2/pkg/rsstorage/internal"
)
// From https://github.com/aws/aws-sdk-go/pull/2653
//
// This implements multi-part copying to support copying files larger than
// 5GB on S3. This may eventually be included in the aws SDK; when that
// happens, we should remove `s3_copier.go`, `s3_copier_internal_test.go`,
// and `s3_copier_test.go`.
// TODO Watch https://github.com/aws/aws-sdk-go/pull/2653 and remove the
// TODO files listed above when this code is incorporated in the AWS SDK.
// So that the Error interface type can be included as an anonymous field
// in the multiUploadError struct and not conflict with the error.Error() method.
type awsError awserr.Error
// A multiUploadError wraps the upload ID of a failed s3 multipart upload.
// Composed of BaseError for code, message, and original error
//
// Should be used for an error that occurred failing a S3 multipart upload,
// and a upload ID is available. If an uploadID is not available a more relevant
type multiUploadError struct {
awsError
// ID for multipart upload which failed.
uploadID string
}
// MaxUploadPartSize is the maximum allowed part size when uploading a part to
// Amazon S3.
const MaxUploadPartSize int64 = 1024 * 1024 * 1024 * 5
// DefaultMultipartCopyThreshold is the default object size threshold (in bytes)
// for using multipart instead of simple copy. The default is 10 MB.
const DefaultMultipartCopyThreshold = 10 * 1024 * 1024
// DefaultCopyConcurrency is the default number of goroutines to spin up when
// using Copy().
const DefaultCopyConcurrency = 10
// Copier is a structure for calling Copy(). It is safe to call Copy() on this
// structure for multiple objects and across concurrent goroutines. Mutating
// the Copier's properties is not safe to be done concurrently.
type Copier struct {
// MaxPartSize is the maximum multipart chunk size to use (in bytes). It
// must be at least 5 MB. The actual size of the chunks will vary, but
// remain capped at MaxPartSize. If set to 0, the value of MaxUploadPartSize
// is used.
MaxPartSize int64
// MultipartCopyThreshold is the minimum size (in bytes) an object should
// have before multipart copy is used instead of simple copy. The minimum
// is 5 MB, and the maximum is 5 GB. If set to 0, the value of
// DefaultMultipartCopyThreshold will be used.
MultipartCopyThreshold int64
// Concurrency is the number of goroutines to use when executing a multipart
// copy. If this value is set to 0, the value of DefaultCopyConcurrency
// will be used.
//
// The concurrency pool is not shared between calls to Copy.
Concurrency int
// Setting this value to true will cause the SDK to avoid calling
// AbortMultipartUpload on a failure, leaving all successfully copied
// parts on S3 for manual recovery.
//
// Note that storing parts of an incomplete multipart upload counts towards
// space usage on S3 and will add additional costs if not cleaned up.
LeavePartsOnError bool
// S3 is the client used for executing the multipart copy.
S3 s3iface.S3API
// RequestOptions as a list of request options that will be passed down to
// individual API operation requests made by the uploader.
RequestOptions []request.Option
}
// WithCopierRequestOptions appends to the Copier's API request options.
func WithCopierRequestOptions(opts ...request.Option) func(*Copier) {
return func(u *Copier) {
u.RequestOptions = append(u.RequestOptions, opts...)
}
}
// NewCopier creates a new Copier instance for copying objects between
// buckets and/or keys in S3. Customisations can be passed in options, or
// otherwise as part of the Copier's Copy method. The ConfigProvider (e.g.
// a *session.Session) will be used to instantiate a S3 service client.
//
// See NewCopierWithClient for examples.
func NewCopier(c client.ConfigProvider, options ...func(*Copier)) *Copier {
return NewCopierWithClient(s3.New(c), options...)
}
// NewCopierWithClient creates a new Copier instance for copying objects
// between buckets and/or keys in S3. Customisations can be passed in options,
// or otherwise as part of the Copier's Copy method.
func NewCopierWithClient(svc s3iface.S3API, options ...func(*Copier)) *Copier {
c := &Copier{
S3: svc,
MaxPartSize: MaxUploadPartSize,
MultipartCopyThreshold: DefaultMultipartCopyThreshold,
Concurrency: DefaultCopyConcurrency,
LeavePartsOnError: false,
}
for _, option := range options {
option(c)
}
return c
}
// Copy copies an object between buckets and/or keys in S3. See CopyWithContext
// for more information.
func (c Copier) Copy(input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error) {
return c.CopyWithContext(aws.BackgroundContext(), input, options...)
}
// CopyWithContext copies an object between buckets and/or keys in S3. When the
// size of a source object is large enough, it will use concurrent multipart
// uploads to execute on the copy of an arbitrarily-sized object.
//
// The Copier will use either the CopyObject or UploadPartCopy APIs, and no
// actual object data will pass through it. If you wish to transform the data,
// use a combination of s3manager.Downloader and s3manager.Uploader.
//
// The Copier's AWS credentials must have s3:GetObject permission on the source
// object, and s3:PutObject permission on the destination. It is advisable that
// it also have s3:AbortMultipartUpload on the destination, as otherwise a failed
// copy would leave parts abandoned. To disable aborting in case of a failed
// copy, set LeavePartsOnError to true.
//
// Additional functional options can be provided to configure the individual
// copies. These options are copies of the Copier instance Upload is called from.
// Modifying the options will not impact the original Copier instance.
//
// Use the WithCopierRequestOptions helper function to pass in request
// options that will be applied to all API operations made with this uploader.
//
// It is safe to call this method concurrently across goroutines.
func (c Copier) CopyWithContext(ctx aws.Context, input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error) {
it := copier{in: input, cfg: c, ctx: ctx}
for _, opt := range options {
opt(&it.cfg)
}
it.cfg.RequestOptions = append(it.cfg.RequestOptions, request.WithAppendUserAgent("S3Manager"))
return it.copy()
}
// optimalPartSize returns the optimal multipart copy part size. It minimizes the
// number of round trips to UploadPartCopy by maximizing the number of bytes
// copied per goroutine-part.
func optimalPartSize(sourceSize int64, concurrency int) int64 {
return internal.MaxInt64(
s3manager.MinUploadPartSize, internal.MinInt64(
sourceSize/int64(concurrency), MaxUploadPartSize))
}
// copySourceRange produces a value appropriate for the CopySourceRange header
// of a UploadPartCopy call. The result is "bytes=start-end", where start is
// the first, and end is the last byte to be copied. The start and end are
// inclusive byte indices into the source object.
func copySourceRange(sourceSize, partSize, partNum int64) string {
rangeStart := (partNum - 1) * partSize
remainingBytes := sourceSize - rangeStart
rangeEnd := rangeStart + internal.MinInt64(remainingBytes, partSize) - 1
return fmt.Sprintf("bytes=%d-%d", rangeStart, rangeEnd)
}
type copier struct {
ctx aws.Context // context from the call to CopyWithContext()
cfg Copier // copy of the Copier that created this copier
in *s3.CopyObjectInput
src struct {
bucket string
key string
version *string // nil if source object is not versioned
size int64 // size (in bytes) of the source object
}
partSize int64 // derived from source object size and concurrency
partCount int64 // derived from source object size and partSize
}
// copy executes a multipart copy based on the copier's input field. If the
// source object is small, a simple S3 copy is executed instead.
func (c *copier) copy() (*s3.CopyObjectOutput, error) {
if err := c.init(); err != nil {
return nil, err
}
c.partSize = internal.MinInt64(optimalPartSize(c.src.size, c.cfg.Concurrency), c.cfg.MaxPartSize)
c.partCount = (c.src.size + c.partSize - 1) / c.partSize
if c.src.size <= c.cfg.MultipartCopyThreshold || c.partCount < 2 {
return c.simpleCopy()
}
return c.multipartCopy()
}
// init sets up the copier.
func (c *copier) init() error {
if c.cfg.MaxPartSize == 0 {
c.cfg.MaxPartSize = MaxUploadPartSize
}
if c.cfg.MaxPartSize < s3manager.MinUploadPartSize || c.cfg.MaxPartSize > MaxUploadPartSize {
return awserr.New(
"InvalidRequest",
fmt.Sprintf(
"expected %d <= MaxPartSize <= %d",
s3manager.MinUploadPartSize, MaxUploadPartSize),
nil)
}
if c.cfg.MultipartCopyThreshold == 0 {
c.cfg.MultipartCopyThreshold = DefaultMultipartCopyThreshold
}
if c.cfg.MultipartCopyThreshold < s3manager.MinUploadPartSize || c.cfg.MultipartCopyThreshold > MaxUploadPartSize {
return awserr.New(
"InvalidRequest",
fmt.Sprintf(
"expected %d <= MultipartCopyThreshold <= %d",
s3manager.MinUploadPartSize, MaxUploadPartSize),
nil)
}
if c.cfg.Concurrency == 0 {
c.cfg.Concurrency = DefaultCopyConcurrency
}
if c.cfg.Concurrency <= 0 {
return awserr.New(
"InvalidRequest",
"expected 0 < Concurrency",
nil)
}
err := c.initSource()
if err != nil {
return err
}
c.src.size, err = c.getSourceSize()
if err != nil {
return err
}
return nil
}
// initSource derives the bucket, key, and optional versionId from the
// CopySource input field.
func (c *copier) initSource() error {
if c.in.CopySource == nil {
return awserr.New("InvalidRequest", "expected non-nil copy source", nil)
}
src, err := url.QueryUnescape(*c.in.CopySource)
if err != nil {
return awserr.New("InvalidRequest", "invalid copy source", err)
}
a := strings.SplitN(src, "/", 2)
if len(a) != 2 {
return awserr.New(
"InvalidRequest",
"invalid copy source: expected bucket and key",
nil)
}
b := strings.SplitN(a[1], "?versionId=", 2)
if len(a) > 2 {
return awserr.New(
"InvalidRequest",
"invalid copy source: expected at most one versionId",
nil)
}
if b[0] == "" {
return awserr.New(
"InvalidRequest",
"invalid copy source: expected key",
nil)
}
c.src.bucket = a[0]
c.src.key = b[0]
if len(b) == 2 {
c.src.version = aws.String(b[1])
}
return nil
}
// getSourceSize returns the size (in bytes) of the source object.
func (c *copier) getSourceSize() (int64, error) {
out, err := c.cfg.S3.HeadObjectWithContext(
c.ctx, &s3.HeadObjectInput{
Bucket: &c.src.bucket,
Key: &c.src.key,
VersionId: c.src.version,
}, c.cfg.RequestOptions...)
if err != nil {
return 0, err
}
return *out.ContentLength, nil
}
func (c *copier) simpleCopy() (*s3.CopyObjectOutput, error) {
return c.cfg.S3.CopyObjectWithContext(c.ctx, c.in, c.cfg.RequestOptions...)
}
func (c *copier) multipartCopy() (*s3.CopyObjectOutput, error) {
upload, err := c.createUpload()
if err != nil {
return nil, err
}
// set to false once the upload has been completed
shouldAbortUpload := true
defer func() {
if shouldAbortUpload && !c.cfg.LeavePartsOnError {
_, _ = c.abortUpload(upload)
}
}()
completedParts := make([]*s3.CompletedPart, c.partCount)
nextPartNum := int64(0)
wg := sync.WaitGroup{}
cctx, cancel := context.WithCancel(c.ctx)
defer cancel()
firstErr := make(chan error, 1)
var firstPart *s3.UploadPartCopyOutput
for i := int64(0); i < internal.MinInt64(c.partCount, int64(c.cfg.Concurrency)); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-cctx.Done():
return
default:
partNum := atomic.AddInt64(&nextPartNum, 1)
if partNum > c.partCount {
return
}
in, out, err := c.uploadPartCopy(cctx, upload, partNum)
if err != nil {
select {
case firstErr <- err:
// we don't have absolute guarantees about this being
// the first error, but it's close enough.
default:
}
cancel()
return
}
completedParts[partNum-1] = &s3.CompletedPart{
PartNumber: in.PartNumber,
ETag: out.CopyPartResult.ETag,
}
if partNum == 1 {
firstPart = out
}
}
}
}()
}
wg.Wait()
select {
case err := <-firstErr:
return nil, &multiUploadError{
awsError: awserr.New(
"MultipartUpload",
"failed to copy one or more parts",
err),
uploadID: *upload.UploadId,
}
default:
select {
case <-cctx.Done():
// the cancellation must have been external to multipartCopy(),
// and was timed such that none of the uploadPartCopy() calls
// returned a cancellation error. this means we're responsible
// for producing it (as well as aborting what's remained of
// the multipart copy).
return nil, &multiUploadError{
awsError: awserr.New(
"MultipartUpload",
"multipart copy cancelled",
cctx.Err()),
uploadID: *upload.UploadId,
}
default:
}
}
completed, err := c.completeUpload(upload, completedParts)
if err != nil {
return nil, &multiUploadError{
awsError: awserr.New(
"MultipartUpload",
"failed to complete multipart upload",
err),
uploadID: *upload.UploadId,
}
}
shouldAbortUpload = false
out := s3.CopyObjectOutput{}
awsutil.Copy(&out, firstPart)
awsutil.Copy(&out, completed)
out.CopyObjectResult = &s3.CopyObjectResult{
ETag: completed.ETag,
LastModified: nil, // could use a part, but it wouldn't be exact
}
return &out, nil
}
func (c *copier) createUpload() (*s3.CreateMultipartUploadOutput, error) {
in := s3.CreateMultipartUploadInput{}
awsutil.Copy(&in, c.in)
return c.cfg.S3.CreateMultipartUploadWithContext(c.ctx, &in, c.cfg.RequestOptions...)
}
func (c *copier) abortUpload(upload *s3.CreateMultipartUploadOutput) (*s3.AbortMultipartUploadOutput, error) {
in := s3.AbortMultipartUploadInput{}
awsutil.Copy(&in, c.in)
in.UploadId = upload.UploadId
return c.cfg.S3.AbortMultipartUploadWithContext(c.ctx, &in, c.cfg.RequestOptions...)
}
func (c *copier) uploadPartCopy(
ctx aws.Context,
upload *s3.CreateMultipartUploadOutput,
partNum int64,
) (*s3.UploadPartCopyInput, *s3.UploadPartCopyOutput, error) {
in := s3.UploadPartCopyInput{}
awsutil.Copy(&in, c.in)
// disable these options, as the CopySourceIn* headers from c.in would
// pertain to the full object and not the individual parts.
in.CopySourceIfNoneMatch = nil
in.CopySourceIfMatch = nil
in.CopySourceIfModifiedSince = nil
in.CopySourceIfUnmodifiedSince = nil
in.UploadId = upload.UploadId
in.PartNumber = &partNum
in.CopySourceRange = aws.String(copySourceRange(c.src.size, c.partSize, partNum))
out, err := c.cfg.S3.UploadPartCopyWithContext(ctx, &in, c.cfg.RequestOptions...)
if err != nil {
return nil, nil, err
}
return &in, out, nil
}
func (c *copier) completeUpload(
upload *s3.CreateMultipartUploadOutput,
parts []*s3.CompletedPart,
) (*s3.CompleteMultipartUploadOutput, error) {
in := s3.CompleteMultipartUploadInput{}
awsutil.Copy(&in, c.in)
in.UploadId = upload.UploadId
completed := s3.CompletedMultipartUpload{Parts: parts}
in.MultipartUpload = &completed
return c.cfg.S3.CompleteMultipartUploadWithContext(c.ctx, &in, c.cfg.RequestOptions...)
}