Skip to content

Commit

Permalink
cmd/uplink: add back parallelism
Browse files Browse the repository at this point in the history
for very large machines (>10Gbit) it is still useful
to have parallelism for uploads because we're actually
bound by getting new pieces from the satellite, so doing
that in parallel provides a big win.

this change adds back that flag to exist for uploads, and
removes the backwards compatibility code for the flag with
the maximum-concurrent-pieces as they are now independent.

the upload code parallelism story is now this:

    - each object is a transfer
    - each transfer happens in N parts (size dynamically
      chosen to avoid having >10000 parts)
    - each part can happen in parallel up to the limit
      specified
    - each parallel part can have up to the limit of
      max concurrent pieces and segments

this change also changes some defaults to be better.

    - the connection pool capacity now takes into acount
      transfers, parallelism and max concurrent pieces
    - the default smallest part size is 1GiB to allow the
      new upload code path to upload multiple segments

Change-Id: Iff6709ae73425fbc2858ed360faa2d3ece297c2d
  • Loading branch information
zeebo committed Aug 15, 2023
1 parent 03690da commit 1cbad0f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 124 deletions.
121 changes: 18 additions & 103 deletions cmd/uplink/cmd_cp.go
Expand Up @@ -85,8 +85,7 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(bool)
c.byteRange = params.Flag("range", "Downloads the specified range bytes of an object. For more information about the HTTP Range header, see https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35", "").(string)

parallelism := params.Flag("parallelism", "Controls how many parallel chunks to upload/download from a file", nil,
clingy.Optional,
c.parallelism = params.Flag("parallelism", "Controls how many parallel parts to upload/download from a file", 1,
clingy.Short('p'),
clingy.Transform(strconv.Atoi),
clingy.Transform(func(n int) (int, error) {
Expand All @@ -95,8 +94,8 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
}
return n, nil
}),
).(*int)
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the chunks for parallelism, 0 means automatic adjustment", memory.Size(0),
).(int)
c.parallelismChunkSize = params.Flag("parallelism-chunk-size", "Set the size of the parts for parallelism, 0 means automatic adjustment", memory.Size(0),
clingy.Transform(memory.ParseString),
clingy.Transform(func(n int64) (memory.Size, error) {
if n < 0 {
Expand All @@ -107,17 +106,16 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
).(memory.Size)

c.uploadConfig = testuplink.DefaultConcurrentSegmentUploadsConfig()
maxConcurrent := params.Flag(
c.uploadConfig.SchedulerOptions.MaximumConcurrent = params.Flag(
"maximum-concurrent-pieces",
"Maximum concurrent pieces to upload at once per transfer",
nil,
clingy.Optional,
"Maximum concurrent pieces to upload at once per part",
c.uploadConfig.SchedulerOptions.MaximumConcurrent,
clingy.Transform(strconv.Atoi),
clingy.Advanced,
).(*int)
).(int)
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles = params.Flag(
"maximum-concurrent-segments",
"Maximum concurrent segments to upload at once per transfer",
"Maximum concurrent segments to upload at once per part",
c.uploadConfig.SchedulerOptions.MaximumConcurrentHandles,
clingy.Transform(strconv.Atoi),
clingy.Advanced,
Expand All @@ -133,28 +131,6 @@ func (c *cmdCp) Setup(params clingy.Parameters) {
clingy.Advanced,
).(string)

{ // handle backwards compatibility around parallelism and maximum concurrent pieces
addr := func(x int) *int { return &x }

switch {
// if neither are actively set, use defaults
case parallelism == nil && maxConcurrent == nil:
parallelism = addr(1)
maxConcurrent = addr(c.uploadConfig.SchedulerOptions.MaximumConcurrent)

// if parallelism is not set, use a value based on maxConcurrent
case parallelism == nil:
parallelism = addr((*maxConcurrent + 99) / 100)

// if maxConcurrent is not set, use a value based on parallelism
case maxConcurrent == nil:
maxConcurrent = addr(100 * *parallelism)
}

c.uploadConfig.SchedulerOptions.MaximumConcurrent = *maxConcurrent
c.parallelism = *parallelism
}

c.inmemoryEC = params.Flag("inmemory-erasure-coding", "Keep erasure-coded pieces in-memory instead of writing them on the disk during upload", false,
clingy.Transform(strconv.ParseBool),
clingy.Boolean,
Expand Down Expand Up @@ -194,9 +170,10 @@ func (c *cmdCp) Execute(ctx context.Context) error {
fs, err := c.ex.OpenFilesystem(ctx, c.access,
ulext.ConcurrentSegmentUploadsConfig(c.uploadConfig),
ulext.ConnectionPoolOptions(rpcpool.Options{
// Add a bit more capacity for connections to the satellite
Capacity: c.uploadConfig.SchedulerOptions.MaximumConcurrent + 5,
KeyCapacity: 5,
// Allow at least as many connections as the maximum concurrent pieces per
// parallel part per transfer, plus a few extra for the satellite.
Capacity: c.transfers*c.parallelism*c.uploadConfig.SchedulerOptions.MaximumConcurrent + 5,
KeyCapacity: 2,
IdleExpiration: 2 * time.Minute,
}))
if err != nil {
Expand Down Expand Up @@ -419,17 +396,6 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
}
defer func() { _ = mwh.Abort(ctx) }()

// if we're uploading, do a single part of maximum size
if dest.Remote() {
return errs.Wrap(c.singleCopy(
ctx,
source, dest,
mrh, mwh,
offset, length,
bar,
))
}

partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64())
if err != nil {
return err
Expand All @@ -448,13 +414,15 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
// calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed.
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) {
segC := (length / maxPartCount / (memory.MiB * 64).Int64()) + 1
requiredSize = segC * (memory.MiB * 64).Int64()
segC := (length / maxPartCount / memory.GiB.Int64()) + 1
requiredSize = segC * memory.GiB.Int64()
switch {
case preferredSize == 0:
return requiredSize, nil
case requiredSize <= preferredSize:
return preferredSize, nil
case length < 0: // let the user pick their size if we don't have a length to know better
return preferredSize, nil
default:
return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize)))
Expand Down Expand Up @@ -535,8 +503,8 @@ func (c *cmdCp) parallelCopy(
}

var readBufs *ulfs.BytesPool
if p > 1 && chunkSize > 0 && (source.Std() || dest.Std()) {
// Create the read buffer pool only for uploads from stdin and downloads to stdout with parallelism > 1.
if p > 1 && chunkSize > 0 && source.Std() {
// Create the read buffer pool only for uploads from stdin with parallelism > 1.
readBufs = ulfs.NewBytesPool(int(chunkSize))
}

Expand Down Expand Up @@ -619,59 +587,6 @@ func (c *cmdCp) parallelCopy(
return errs.Wrap(combineErrs(es))
}

func (c *cmdCp) singleCopy(
ctx context.Context,
source, dest ulloc.Location,
src ulfs.MultiReadHandle,
dst ulfs.MultiWriteHandle,
offset, length int64,
bar *mpb.Bar) error {

if offset != 0 {
if err := src.SetOffset(offset); err != nil {
return err
}
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

rh, err := src.NextPart(ctx, length)
if err != nil {
return errs.Wrap(err)
}
defer func() { _ = rh.Close() }()

wh, err := dst.NextPart(ctx, length)
if err != nil {
return errs.Wrap(err)
}
defer func() { _ = wh.Abort() }()

var w io.Writer = wh
if bar != nil {
bar.SetTotal(rh.Info().ContentLength, false)
bar.EnableTriggerComplete()
pw := bar.ProxyWriter(w)
defer func() { _ = pw.Close() }()
w = pw
}

if _, err := sync2.Copy(ctx, w, rh); err != nil {
return errs.Wrap(err)
}

if err := wh.Commit(); err != nil {
return errs.Wrap(err)
}

if err := dst.Commit(ctx); err != nil {
return errs.Wrap(err)
}

return nil
}

func newProgressBar(progress *mpb.Progress, name string, which, total int) *mpb.Bar {
const counterFmt = " % .2f / % .2f"
const percentageFmt = "%.2f "
Expand Down
47 changes: 26 additions & 21 deletions cmd/uplink/cmd_cp_test.go
Expand Up @@ -99,46 +99,51 @@ func TestCpDownload(t *testing.T) {
func TestCpPartSize(t *testing.T) {
c := newCmdCp(nil)

// 1GiB file, should return 64MiB
partSize, err := c.calculatePartSize(memory.GiB.Int64(), c.parallelismChunkSize.Int64())
// 10 GiB file, should return 1 GiB
partSize, err := c.calculatePartSize(10*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
require.EqualValues(t, 1*memory.GiB, partSize)

// 640 GB file, should return 64MiB.
partSize, err = c.calculatePartSize(memory.GB.Int64()*640, c.parallelismChunkSize.Int64())
// 10000 GB file, should return 1 GiB.
partSize, err = c.calculatePartSize(10000*memory.GB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
require.EqualValues(t, 1*memory.GiB, partSize)

// 640GiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*640, c.parallelismChunkSize.Int64())
// 10000 GiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(10000*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)
require.EqualValues(t, 2*memory.GiB, partSize)

// 1TiB file, should return 128MiB.
partSize, err = c.calculatePartSize(memory.TiB.Int64(), c.parallelismChunkSize.Int64())
// 10 TiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(10*memory.TiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*128, partSize)
require.EqualValues(t, 2*memory.GiB, partSize)

// 1.3TiB file, should return 192MiB.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, c.parallelismChunkSize.Int64())
// 20001 GiB file, should return 3 GiB.
partSize, err = c.calculatePartSize(20001*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*192, partSize)
require.EqualValues(t, 3*memory.GiB, partSize)

// should return 1GiB as requested.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.GiB, partSize)

// should return 192 MiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.MiB.Int64())
// should return 1 GiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(1300*memory.GiB.Int64(), memory.MiB.Int64())
require.Error(t, err)
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 192.0 MiB or larger", err.Error())
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 1.0 GiB or larger", err.Error())
require.Zero(t, partSize)

// negative length should return 64MiB part size
partSize, err = c.calculatePartSize(-1, c.parallelismChunkSize.Int64())
// negative length should return asked for amount
partSize, err = c.calculatePartSize(-1, 1*memory.GiB.Int64())
require.NoError(t, err)
require.EqualValues(t, memory.MiB*64, partSize)
require.EqualValues(t, 1*memory.GiB, partSize)

// negative length should return specified amount
partSize, err = c.calculatePartSize(-1, 100)
require.NoError(t, err)
require.EqualValues(t, 100, partSize)
}

func TestCpUpload(t *testing.T) {
Expand Down

0 comments on commit 1cbad0f

Please sign in to comment.