Skip to content

Commit

Permalink
cmd/uplink: adjust multipart part size based on file size
Browse files Browse the repository at this point in the history
This change allows the uplink to bump the part size based on the
content length that is being copied. This ensures we are staying
below the 10k part limit currently enforced on the satellites.

If the user specifies the flag, it will error out if the value
chosen by the user is too low. Otherwise it will use it.

Change-Id: I00d30f603d941c2f7703ba19d5923e668629a7b9
  • Loading branch information
stefanbenten committed May 13, 2022
1 parent 0633aca commit 5e4ec0b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 8 deletions.
33 changes: 25 additions & 8 deletions cmd/uplink/cmd_cp.go
Expand Up @@ -44,6 +44,8 @@ type cmdCp struct {
dest ulloc.Location
}

const maxPartCount int64 = 10000

func newCmdCp(ex ulext.External) *cmdCp {
return &cmdCp{ex: ex}
}
Expand Down Expand Up @@ -82,14 +84,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
return n, nil
}),
).(int)
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Controls the size of the chunks for parallelism", 64*memory.MiB,
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.B*0,
clingy.Transform(memory.ParseString),
clingy.Transform(func(n int64) (memory.Size, error) {
if memory.Size(n) < 1*memory.MB {
return 0, errs.New("file chunk size must be at least 1 MB")
}
return memory.Size(n), nil
}),
).(memory.Size)

c.expires = params.Flag("expires",
Expand Down Expand Up @@ -244,15 +240,36 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
defer bar.Finish()
}

partSize, err := c.calculatePartSize(length, c.parallelismChunkSize.Int64())
if err != nil {
return err
}

return errs.Wrap(parallelCopy(
ctx,
mwh, mrh,
c.parallelism, c.parallelismChunkSize.Int64(),
c.parallelism, partSize,
offset, length,
bar,
))
}

// calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed.
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) {
segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1
requiredSize = segC * (memory.MiB * 64).Int64()
switch {
case preferredSize == 0:
return requiredSize, nil
case requiredSize <= preferredSize:
return preferredSize, nil
default:
return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize)))
}
}

func copyVerb(source, dest ulloc.Location) string {
switch {
case dest.Remote():
Expand Down
48 changes: 48 additions & 0 deletions cmd/uplink/cmd_cp_test.go
Expand Up @@ -6,6 +6,9 @@ package main
import (
"testing"

"github.com/stretchr/testify/require"

"storj.io/common/memory"
"storj.io/storj/cmd/uplink/ultest"
)

Expand Down Expand Up @@ -93,6 +96,51 @@ func TestCpDownload(t *testing.T) {
})
}

func TestCpPartSize(t *testing.T) {
c := newCmdCp(nil)

// 1GiB file, should return 64MiB
partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)

// 640 GB file, should return 64MiB.
partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)

// 640GiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)

// 1TiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)

// 1.3TiB file, should return 192MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*192, partSize)

// should return 1GiB as requested.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.GiB, partSize)

// should return 192 MiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64())
require.Error(t, err)
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error())
require.Zero(t, partSize)

// negative length should return 64MiB part size
partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
}

func TestCpUpload(t *testing.T) {
state := ultest.Setup(commands,
ultest.WithFile("/home/user/file1.txt", "local"),
Expand Down

1 comment on commit 5e4ec0b

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/release-preparation-v1-56/18583/1

Please sign in to comment.