Skip to content

Commit

Permalink
operations: fix missing metadata for multipart transfers to local disk
Browse files Browse the repository at this point in the history
Before this change multipart downloads to the local disk with
--metadata failed to have their metadata set properly.

This was because the OpenWriterAt interface doesn't receive metadata
when creating the object.

This patch fixes the problem by using the recently introduced
Object.SetMetadata method to set the metadata on the object after the
download has completed (when using --metadata). If the backend we are
copying to is using OpenWriterAt but the Object doesn't support
SetMetadata then it will write an ERROR level log but complete
successfully. This should not happen at the moment as only the local
backend supports metadata and OpenWriterAt but it may in the future.

It also adds a test to check metadata is preserved when doing
multipart transfers.

Fixes #7424
  • Loading branch information
ncw committed May 14, 2024
1 parent 629e895 commit 6a0a54a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 21 deletions.
34 changes: 28 additions & 6 deletions fs/operations/multithread.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
openChunkWriter := f.Features().OpenChunkWriter
ci := fs.GetConfig(ctx)
noBuffering := false
usingOpenWriterAt := false
if openChunkWriter == nil {
openWriterAt := f.Features().OpenWriterAt
if openWriterAt == nil {
Expand All @@ -140,6 +141,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
noBuffering = true
usingOpenWriterAt = true
} else if src.Fs().Features().IsLocal {
// If the source fs is local we don't need to buffer
fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk")
Expand Down Expand Up @@ -241,12 +243,32 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
}

if f.Features().PartialUploads {
err = obj.SetModTime(ctx, src.ModTime(ctx))
switch err {
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
default:
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
// OpenWriterAt doesn't set metadata so we need to set it on completion
if usingOpenWriterAt {
setModTime := true
if ci.Metadata {
do, ok := obj.(fs.SetMetadataer)
if ok {
meta, err := fs.GetMetadataOptions(ctx, f, src, options)
if err != nil {
return nil, fmt.Errorf("multi-thread copy: failed to read metadata from source object: %w", err)
}
err = do.SetMetadata(ctx, meta)
if err != nil {
return nil, fmt.Errorf("multi-thread copy: failed to set metadata: %w", err)
}
setModTime = false
} else {
fs.Errorf(obj, "multi-thread copy: can't set metadata as SetMetadata isn't implemented in: %v", f)
}
}
if setModTime {
err = obj.SetModTime(ctx, src.ModTime(ctx))
switch err {
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
default:
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
}
}
}

Expand Down
57 changes: 42 additions & 15 deletions fs/operations/multithread_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func TestMultithreadCopy(t *testing.T) {
r := fstest.NewRun(t)
ctx := context.Background()
chunkSize := skipIfNotMultithread(ctx, t, r)
// Check every other transfer for metadata
checkMetadata := false
ctx, ci := fs.AddConfig(ctx)

for _, upload := range []bool{false, true} {
for _, test := range []struct {
Expand All @@ -156,52 +159,76 @@ func TestMultithreadCopy(t *testing.T) {
{size: chunkSize * 2, streams: 2},
{size: chunkSize*2 + 1, streams: 2},
} {
checkMetadata = !checkMetadata
ci.Metadata = checkMetadata
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
}
var (
contents = random.String(test.size)
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 fstest.Item
src, dst fs.Object
err error
contents = random.String(test.size)
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 fstest.Item
src, dst fs.Object
err error
testMetadata = fs.Metadata{
// System metadata supported by all backends
"mtime": t1.Format(time.RFC3339Nano),
// User metadata
"potato": "jersey",
}
)

var fSrc, fDst fs.Fs
if upload {
file1 = r.WriteFile(fileName, contents, t1)
r.CheckRemoteItems(t)
r.CheckLocalItems(t, file1)
src, err = r.Flocal.NewObject(ctx, fileName)
fDst, fSrc = r.Fremote, r.Flocal
} else {
file1 = r.WriteObject(ctx, fileName, contents, t1)
r.CheckRemoteItems(t, file1)
r.CheckLocalItems(t)
src, err = r.Fremote.NewObject(ctx, fileName)
fDst, fSrc = r.Flocal, r.Fremote
}
src, err = fSrc.NewObject(ctx, fileName)
require.NoError(t, err)

do, canSetMetadata := src.(fs.SetMetadataer)
if checkMetadata && canSetMetadata {
// Set metadata on the source if required
err := do.SetMetadata(ctx, testMetadata)
if err == fs.ErrorNotImplemented {
canSetMetadata = false
} else {
require.NoError(t, err)
fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata)
}
}

accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src, nil)

defer func() {
tr.Done(ctx, err)
}()

if upload {
dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr)
} else {
dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr)
}

dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
require.NoError(t, err)

assert.Equal(t, src.Size(), dst.Size())
assert.Equal(t, fileName, dst.Remote())
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))

if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata {
fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata)
}

require.NoError(t, dst.Remove(ctx))
require.NoError(t, src.Remove(ctx))

})
}
}
Expand Down

0 comments on commit 6a0a54a

Please sign in to comment.