Skip to content

Commit

Permalink
fs: allow setting a write buffer for multithread
Browse files Browse the repository at this point in the history
when multi-thread downloading is enabled, rclone used
to send a write to disk after every read, resulting in a lot
of small writes to different locations of the file.

depending on the underlying filesystem or device, it can be more
efficient to send bigger writes.
  • Loading branch information
jorjao81 authored and ncw committed Jun 23, 2023
1 parent 5f938fb commit fcb912a
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 100 deletions.
19 changes: 19 additions & 0 deletions docs/content/docs.md
Expand Up @@ -1511,6 +1511,25 @@ if you are reading and writing to an OS X filing system this will be

This command line flag allows you to override that computed default.

### --multi-thread-write-buffer-size=SIZE ###

When downloading with multiple threads, rclone will buffer SIZE bytes in
memory before writing to disk for each thread.

This can improve performance if the underlying filesystem does not deal
well with a lot of small writes in different positions of the file, so
if you see downloads being limited by disk write speed, you might want
to experiment with different values. Specially for magnetic drives and
remote file systems a higher value can be useful.

Nevertheless, the default of `128k` should be fine for almost all use
cases, so before changing it ensure that network is not really your
bottleneck.

As a final hint, size is not the only factor: block size (or similar
concept) can have an impact. In one case, we observed that exact
multiples of 16k performed much better than other values.

### --multi-thread-cutoff=SIZE ###

When downloading files to the local backend above this size, rclone
Expand Down
192 changes: 97 additions & 95 deletions fs/config.go
Expand Up @@ -51,101 +51,102 @@ var (

// ConfigInfo is filesystem config options
type ConfigInfo struct {
LogLevel LogLevel
StatsLogLevel LogLevel
UseJSONLog bool
DryRun bool
Interactive bool
CheckSum bool
SizeOnly bool
IgnoreTimes bool
IgnoreExisting bool
IgnoreErrors bool
ModifyWindow time.Duration
Checkers int
Transfers int
ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout
ExpectContinueTimeout time.Duration
Dump DumpFlags
InsecureSkipVerify bool // Skip server certificate verification
DeleteMode DeleteMode
MaxDelete int64
MaxDeleteSize SizeSuffix
TrackRenames bool // Track file renames.
TrackRenamesStrategy string // Comma separated list of strategies used to track renames
LowLevelRetries int
UpdateOlder bool // Skip files that are newer on the destination
NoGzip bool // Disable compression
MaxDepth int
IgnoreSize bool
IgnoreChecksum bool
IgnoreCaseSync bool
NoTraverse bool
CheckFirst bool
NoCheckDest bool
NoUnicodeNormalization bool
NoUpdateModTime bool
DataRateUnit string
CompareDest []string
CopyDest []string
BackupDir string
Suffix string
SuffixKeepExtension bool
UseListR bool
BufferSize SizeSuffix
BwLimit BwTimetable
BwLimitFile BwTimetable
TPSLimit float64
TPSLimitBurst int
BindAddr net.IP
DisableFeatures []string
UserAgent string
Immutable bool
AutoConfirm bool
StreamingUploadCutoff SizeSuffix
StatsFileNameLength int
AskPassword bool
PasswordCommand SpaceSepList
UseServerModTime bool
MaxTransfer SizeSuffix
MaxDuration time.Duration
CutoffMode CutoffMode
MaxBacklog int
MaxStatsGroups int
StatsOneLine bool
StatsOneLineDate bool // If we want a date prefix at all
StatsOneLineDateFormat string // If we want to customize the prefix
ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred
Progress bool
ProgressTerminalTitle bool
Cookie bool
UseMmap bool
CaCert []string // Client Side CA
ClientCert string // Client Side Cert
ClientKey string // Client Side Key
MultiThreadCutoff SizeSuffix
MultiThreadStreams int
MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags)
OrderBy string // instructions on how to order the transfer
UploadHeaders []*HTTPOption
DownloadHeaders []*HTTPOption
Headers []*HTTPOption
MetadataSet Metadata // extra metadata to write when uploading
RefreshTimes bool
NoConsole bool
TrafficClass uint8
FsCacheExpireDuration time.Duration
FsCacheExpireInterval time.Duration
DisableHTTP2 bool
HumanReadable bool
KvLockTime time.Duration // maximum time to keep key-value database locked by process
DisableHTTPKeepAlives bool
Metadata bool
ServerSideAcrossConfigs bool
TerminalColorMode TerminalColorMode
DefaultTime Time // time that directories with no time should display
Inplace bool // Download directly to destination file instead of atomic download to temp/rename
LogLevel LogLevel
StatsLogLevel LogLevel
UseJSONLog bool
DryRun bool
Interactive bool
CheckSum bool
SizeOnly bool
IgnoreTimes bool
IgnoreExisting bool
IgnoreErrors bool
ModifyWindow time.Duration
Checkers int
Transfers int
ConnectTimeout time.Duration // Connect timeout
Timeout time.Duration // Data channel timeout
ExpectContinueTimeout time.Duration
Dump DumpFlags
InsecureSkipVerify bool // Skip server certificate verification
DeleteMode DeleteMode
MaxDelete int64
MaxDeleteSize SizeSuffix
TrackRenames bool // Track file renames.
TrackRenamesStrategy string // Comma separated list of strategies used to track renames
LowLevelRetries int
UpdateOlder bool // Skip files that are newer on the destination
NoGzip bool // Disable compression
MaxDepth int
IgnoreSize bool
IgnoreChecksum bool
IgnoreCaseSync bool
NoTraverse bool
CheckFirst bool
NoCheckDest bool
NoUnicodeNormalization bool
NoUpdateModTime bool
DataRateUnit string
CompareDest []string
CopyDest []string
BackupDir string
Suffix string
SuffixKeepExtension bool
UseListR bool
BufferSize SizeSuffix
MultiThreadWriteBufferSize SizeSuffix
BwLimit BwTimetable
BwLimitFile BwTimetable
TPSLimit float64
TPSLimitBurst int
BindAddr net.IP
DisableFeatures []string
UserAgent string
Immutable bool
AutoConfirm bool
StreamingUploadCutoff SizeSuffix
StatsFileNameLength int
AskPassword bool
PasswordCommand SpaceSepList
UseServerModTime bool
MaxTransfer SizeSuffix
MaxDuration time.Duration
CutoffMode CutoffMode
MaxBacklog int
MaxStatsGroups int
StatsOneLine bool
StatsOneLineDate bool // If we want a date prefix at all
StatsOneLineDateFormat string // If we want to customize the prefix
ErrorOnNoTransfer bool // Set appropriate exit code if no files transferred
Progress bool
ProgressTerminalTitle bool
Cookie bool
UseMmap bool
CaCert []string // Client Side CA
ClientCert string // Client Side Cert
ClientKey string // Client Side Key
MultiThreadCutoff SizeSuffix
MultiThreadStreams int
MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags)
OrderBy string // instructions on how to order the transfer
UploadHeaders []*HTTPOption
DownloadHeaders []*HTTPOption
Headers []*HTTPOption
MetadataSet Metadata // extra metadata to write when uploading
RefreshTimes bool
NoConsole bool
TrafficClass uint8
FsCacheExpireDuration time.Duration
FsCacheExpireInterval time.Duration
DisableHTTP2 bool
HumanReadable bool
KvLockTime time.Duration // maximum time to keep key-value database locked by process
DisableHTTPKeepAlives bool
Metadata bool
ServerSideAcrossConfigs bool
TerminalColorMode TerminalColorMode
DefaultTime Time // time that directories with no time should display
Inplace bool // Download directly to destination file instead of atomic download to temp/rename
}

// NewConfig creates a new config with everything set to the default
Expand All @@ -170,6 +171,7 @@ func NewConfig() *ConfigInfo {
c.MaxDepth = -1
c.DataRateUnit = "bytes"
c.BufferSize = SizeSuffix(16 << 20)
c.MultiThreadWriteBufferSize = SizeSuffix(128 * 1024)
c.UserAgent = "rclone/" + Version
c.StreamingUploadCutoff = SizeSuffix(100 * 1024)
c.MaxStatsGroups = 1000
Expand Down
1 change: 1 addition & 0 deletions fs/config/configflags/configflags.go
Expand Up @@ -126,6 +126,7 @@ func AddFlags(ci *fs.ConfigInfo, flagSet *pflag.FlagSet) {
flags.StringVarP(flagSet, &ci.ClientKey, "client-key", "", ci.ClientKey, "Client SSL private key (PEM) for mutual TLS auth")
flags.FVarP(flagSet, &ci.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size")
flags.IntVarP(flagSet, &ci.MultiThreadStreams, "multi-thread-streams", "", ci.MultiThreadStreams, "Max number of streams to use for multi-thread downloads")
flags.FVarP(flagSet, &ci.MultiThreadWriteBufferSize, "multi-thread-write-buffer-size", "", "In memory buffer size for writing when in multi-thread mode")
flags.BoolVarP(flagSet, &ci.UseJSONLog, "use-json-log", "", ci.UseJSONLog, "Use json log format")
flags.StringVarP(flagSet, &ci.OrderBy, "order-by", "", ci.OrderBy, "Instructions on how to order the transfers, e.g. 'size,descending'")
flags.StringArrayVarP(flagSet, &uploadHeaders, "header-upload", "", nil, "Set HTTP header for upload transactions")
Expand Down
48 changes: 43 additions & 5 deletions fs/operations/multithread.go
@@ -1,6 +1,7 @@
package operations

import (
"bufio"
"context"
"errors"
"fmt"
Expand All @@ -12,11 +13,32 @@ import (
)

const (
multithreadChunkSize = 64 << 10
multithreadChunkSizeMask = multithreadChunkSize - 1
multithreadBufferSize = 32 * 1024
multithreadChunkSize = 64 << 10
multithreadChunkSizeMask = multithreadChunkSize - 1
multithreadReadBufferSize = 32 * 1024
)

// An offsetWriter maps writes at offset base to offset base+off in the underlying writer.
//
// Modified from the go source code. Can be replaced with
// io.OffsetWriter when we no longer need to support go1.19
type offsetWriter struct {
w io.WriterAt
off int64 // the current offset
}

// newOffsetWriter returns an offsetWriter that writes to w
// starting at offset off.
func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter {
return &offsetWriter{w, off}
}

func (o *offsetWriter) Write(p []byte) (n int, err error) {
n, err = o.w.WriteAt(p, o.off)
o.off += int64(n)
return
}

// Return a boolean as to whether we should use multi thread copy for
// this transfer
func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
Expand Down Expand Up @@ -62,6 +84,7 @@ type multiThreadCopyState struct {

// Copy a single stream into place
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) {
ci := fs.GetConfig(ctx)
defer func() {
if err != nil {
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
Expand All @@ -84,8 +107,13 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
}
defer fs.CheckClose(rc, &err)

var writer io.Writer = newOffsetWriter(mc.wc, start)
if ci.MultiThreadWriteBufferSize > 0 {
writer = bufio.NewWriterSize(writer, int(ci.MultiThreadWriteBufferSize))
fs.Debugf(mc.src, "multi-thread copy: write buffer set to %v", ci.MultiThreadWriteBufferSize)
}
// Copy the data
buf := make([]byte, multithreadBufferSize)
buf := make([]byte, multithreadReadBufferSize)
offset := start
for {
// Check if context cancelled and exit if so
Expand All @@ -98,7 +126,7 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
if err != nil {
return fmt.Errorf("multipart copy: accounting failed: %w", err)
}
nw, ew := mc.wc.WriteAt(buf[0:nr], offset)
nw, ew := writer.Write(buf[0:nr])
if nw > 0 {
offset += int64(nw)
}
Expand All @@ -113,6 +141,16 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err
if er != io.EOF {
return fmt.Errorf("multipart copy: read failed: %w", er)
}

// if we were buffering, flush do disk
switch w := writer.(type) {
case *bufio.Writer:
er2 := w.Flush()
if er2 != nil {
return fmt.Errorf("multipart copy: flush failed: %w", er2)
}
}

break
}
}
Expand Down

0 comments on commit fcb912a

Please sign in to comment.