Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement 'atomic downloads' with copy-to-temp + rename #6913

Merged
merged 5 commits into from May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/ftp/ftp.go
Expand Up @@ -580,6 +580,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (ff fs.Fs
}
f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
PartialUploads: true,
}).Fill(ctx, f)
// set the pool drainer timer going
if f.opt.IdleTimeout > 0 {
Expand Down
1 change: 1 addition & 0 deletions backend/local/local.go
Expand Up @@ -303,6 +303,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
WriteMetadata: true,
UserMetadata: xattrSupported, // can only R/W general purpose metadata if xattrs are supported
FilterAware: true,
PartialUploads: true,
}).Fill(ctx, f)
if opt.FollowSymlinks {
f.lstat = os.Stat
Expand Down
16 changes: 12 additions & 4 deletions backend/sftp/sftp.go
Expand Up @@ -994,6 +994,7 @@ func NewFsWithConnection(ctx context.Context, f *Fs, name string, root string, m
f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
SlowHash: true,
PartialUploads: true,
}).Fill(ctx, f)
// Make a connection and pool it to return errors early
c, err := f.getSftpConnection(ctx)
Expand Down Expand Up @@ -1329,10 +1330,17 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
if err != nil {
return nil, fmt.Errorf("Move: %w", err)
}
err = c.sftpClient.Rename(
srcObj.path(),
path.Join(f.absRoot, remote),
)
srcPath, dstPath := srcObj.path(), path.Join(f.absRoot, remote)
if _, ok := c.sftpClient.HasExtension("posix-rename@openssh.com"); ok {
err = c.sftpClient.PosixRename(srcPath, dstPath)
} else {
// If haven't got PosixRename then remove source first before renaming
err = c.sftpClient.Remove(dstPath)
if err != nil && !errors.Is(err, iofs.ErrNotExist) {
fs.Errorf(f, "Move: Failed to remove existing file %q: %v", dstPath, err)
}
err = c.sftpClient.Rename(srcPath, dstPath)
}
f.putSftpConnection(&c, err)
if err != nil {
return nil, fmt.Errorf("Move Rename failed: %w", err)
Expand Down
43 changes: 43 additions & 0 deletions docs/content/docs.md
Expand Up @@ -1257,6 +1257,49 @@ This can be useful as an additional layer of protection for immutable
or append-only data sets (notably backup archives), where modification
implies corruption and should not be propagated.

### --inplace {#inplace}

The `--inplace` flag changes the behaviour of rclone when uploading
files to some backends (backends with the `PartialUploads` feature
flag set) such as:

- local
- ftp
- sftp

Without `--inplace` (the default) rclone will first upload to a
temporary file with an extension like this where `XXXXXX` represents a
random string.

original-file-name.XXXXXX.partial

(rclone will make sure the final name is no longer than 100 characters
by truncating the `original-file-name` part if necessary).

When the upload is complete, rclone will rename the `.partial` file to
the correct name, overwriting any existing file at that point. If the
upload fails then the `.partial` file will be deleted.

This prevents other users of the backend from seeing partially
uploaded files in their new names and prevents overwriting the old
file until the new one is completely uploaded.

If the `--inplace` flag is supplied, rclone will upload directly to
the final name without creating a `.partial` file.

This means that an incomplete file will be visible in the directory
listings while the upload is in progress and any existing files will
be overwritten as soon as the upload starts. If the transfer fails
then the file will be deleted. This can cause data loss of the
existing file if the transfer fails.

Note that on the local file system if you don't use `--inplace` hard
links (Unix only) will be broken. And if you do use `--inplace` you
won't be able to update in use executables.

Note also that versions of rclone prior to v1.63.0 behave as if the
`--inplace` flag is always supplied.

### -i, --interactive {#interactive}

This flag can be used to tell rclone that you wish a manual
Expand Down
1 change: 1 addition & 0 deletions fs/config.go
Expand Up @@ -145,6 +145,7 @@ type ConfigInfo struct {
ServerSideAcrossConfigs bool
TerminalColorMode TerminalColorMode
DefaultTime Time // time that directories with no time should display
Inplace bool // Download directly to destination file instead of atomic download to temp/rename
}

// NewConfig creates a new config with everything set to the default
Expand Down
1 change: 1 addition & 0 deletions fs/config/configflags/configflags.go
Expand Up @@ -145,6 +145,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.BoolVarP(flagSet, &ci.ServerSideAcrossConfigs, "server-side-across-configs", "", ci.ServerSideAcrossConfigs, "Allow server-side operations (e.g. copy) to work across different configs")
flags.FVarP(flagSet, &ci.TerminalColorMode, "color", "", "When to show colors (and other ANSI codes) AUTO|NEVER|ALWAYS")
flags.FVarP(flagSet, &ci.DefaultTime, "default-time", "", "Time to show if modtime is unknown for files and directories")
flags.BoolVarP(flagSet, &ci.Inplace, "inplace", "", ci.Inplace, "Download directly to destination file instead of atomic download to temp/rename")
}

// ParseHeaders converts the strings passed in via the header flags into HTTPOptions
Expand Down
1 change: 1 addition & 0 deletions fs/features.go
Expand Up @@ -30,6 +30,7 @@ type Features struct {
WriteMetadata bool // can write metadata to objects
UserMetadata bool // can read/write general purpose metadata
FilterAware bool // can make use of filters if provided for listing
PartialUploads bool // uploaded file can appear incomplete on the fs while it's being uploaded

// Purge all files in the directory specified
//
Expand Down
55 changes: 49 additions & 6 deletions fs/operations/operations.go
Expand Up @@ -336,6 +336,27 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
doUpdate := dst != nil
hashType, hashOption := CommonHash(ctx, f, src.Fs())

if dst != nil {
remote = dst.Remote()
}

var (
inplace = true
remotePartial = remote
)
if !ci.Inplace && f.Features().Move != nil && f.Features().PartialUploads {
// Avoid making the leaf name longer if it's already lengthy to avoid
// trouble with file name length limits.
suffix := "." + random.String(8) + ".partial"
base := path.Base(remotePartial)
if len(base) > 100 {
remotePartial = remotePartial[:len(remotePartial)-len(suffix)] + suffix
} else {
remotePartial += suffix
}
inplace = false
}

var actionTaken string
for {
// Try server-side copy first - if has optional interface and
Expand Down Expand Up @@ -363,6 +384,7 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
dst = newDst
in.ServerSideCopyEnd(dst.Size()) // account the bytes for the server-side transfer
_ = in.Close()
inplace = true
} else {
_ = in.Close()
}
Expand All @@ -384,7 +406,10 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if streams < 2 {
streams = 2
}
dst, err = multiThreadCopy(ctx, f, remote, src, int(streams), tr)
dst, err = multiThreadCopy(ctx, f, remotePartial, src, int(streams), tr)
if err == nil {
newDst = dst
}
if doUpdate {
actionTaken = "Multi-thread Copied (replaced existing)"
} else {
Expand Down Expand Up @@ -416,14 +441,14 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
}
}
// NB Rcat closes in0
dst, err = Rcat(ctx, f, remote, in0, src.ModTime(ctx), meta)
dst, err = Rcat(ctx, f, remotePartial, in0, src.ModTime(ctx), meta)
newDst = dst
} else {
in := tr.Account(ctx, in0).WithBuffer() // account and buffer the transfer
var wrappedSrc fs.ObjectInfo = src
// We try to pass the original object if possible
if src.Remote() != remote {
wrappedSrc = fs.NewOverrideRemote(src, remote)
if src.Remote() != remotePartial {
wrappedSrc = fs.NewOverrideRemote(src, remotePartial)
}
options := []fs.OpenOption{hashOption}
for _, option := range ci.UploadHeaders {
Expand All @@ -432,12 +457,15 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
if ci.MetadataSet != nil {
options = append(options, fs.MetadataOption(ci.MetadataSet))
}
if doUpdate && inplace {
err = dst.Update(ctx, in, wrappedSrc, options...)
} else {
dst, err = f.Put(ctx, in, wrappedSrc, options...)
}
if doUpdate {
actionTaken = "Copied (replaced existing)"
err = dst.Update(ctx, in, wrappedSrc, options...)
} else {
actionTaken = "Copied (new)"
dst, err = f.Put(ctx, in, wrappedSrc, options...)
}
closeErr := in.Close()
if err == nil {
Expand Down Expand Up @@ -499,6 +527,21 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj
return newDst, err
}
}

// Move the copied file to its real destination.
if err == nil && !inplace && remotePartial != remote {
dst, err = f.Features().Move(ctx, newDst, remote)
if err == nil {
fs.Debugf(newDst, "renamed to: %s", remote)
newDst = dst
} else {
fs.Errorf(newDst, "partial file rename failed: %v", err)
err = fs.CountError(err)
removeFailedCopy(ctx, newDst)
return newDst, err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused why there's dst and newDst in this function. It might be because there are a lot of assignments to dst and newDst is updated to "carry the newly created file handle forward" when there were no errors.

I think the error handling here is correct and the debug prints print sensible output too. I'm just not 100% sure what should be returned from this path and how the newDst is used by the caller.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks correct.

The return from this function should be the final object in its final resting place. We try not to update newDst unless the function returned err = nil which is why I think there is dst and newDst.

I think the whole dst and newDst needs a careful review, but let's not do it in this PR! I'll do it after we merge it.

}
}

if newDst != nil && src.String() != newDst.String() {
actionTaken = fmt.Sprintf("%s to: %s", actionTaken, newDst.String())
}
Expand Down
58 changes: 58 additions & 0 deletions fs/operations/operations_test.go
Expand Up @@ -1228,6 +1228,64 @@ func TestCopyFileCopyDest(t *testing.T) {
r.CheckRemoteItems(t, file2, file2dst, file3, file4, file4dst, file6, file7dst)
}

func TestCopyInplace(t *testing.T) {
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
r := fstest.NewRun(t)

ci.Inplace = true

file1 := r.WriteFile("file1", "file1 contents", t1)
r.CheckLocalItems(t, file1)

file2 := file1
file2.Path = "sub/file2"

err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)

err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)

err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)
}

func TestCopyLongFileName(t *testing.T) {
ctx := context.Background()
ctx, ci := fs.AddConfig(ctx)
r := fstest.NewRun(t)

ci.Inplace = false // the default

file1 := r.WriteFile("file1", "file1 contents", t1)
r.CheckLocalItems(t, file1)

file2 := file1
file2.Path = "sub/" + strings.Repeat("file2", 30)

err := operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)

err = operations.CopyFile(ctx, r.Fremote, r.Flocal, file2.Path, file1.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)

err = operations.CopyFile(ctx, r.Fremote, r.Fremote, file2.Path, file2.Path)
require.NoError(t, err)
r.CheckLocalItems(t, file1)
r.CheckRemoteItems(t, file2)
}

// testFsInfo is for unit testing fs.Info
type testFsInfo struct {
name string
Expand Down
12 changes: 12 additions & 0 deletions fs/override.go
Expand Up @@ -12,6 +12,13 @@ type OverrideRemote struct {
// NewOverrideRemote returns an OverrideRemoteObject which will
// return the remote specified
func NewOverrideRemote(oi ObjectInfo, remote string) *OverrideRemote {
// re-wrap an OverrideRemote
if or, ok := oi.(*OverrideRemote); ok {
return &OverrideRemote{
ObjectInfo: or.ObjectInfo,
remote: remote,
}
}
return &OverrideRemote{
ObjectInfo: oi,
remote: remote,
Expand All @@ -23,6 +30,11 @@ func (o *OverrideRemote) Remote() string {
return o.remote
}

// String returns the overridden remote name
func (o *OverrideRemote) String() string {
return o.remote
}

// MimeType returns the mime type of the underlying object or "" if it
// can't be worked out
func (o *OverrideRemote) MimeType(ctx context.Context) string {
Expand Down