Skip to content

Commit

Permalink
cmd/uplink/ulfs: single part upload
Browse files Browse the repository at this point in the history
By using UploadObject for uploading a single object instead of multipart
we can avoid few roundtrips to the satellite.

Updates #6706

Change-Id: I0149b8a7a29283b87df54b3a597cbce6546769f0
  • Loading branch information
egonelbre authored and Storj Robot committed Jan 22, 2024
1 parent 506f284 commit c7ea51f
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 23 deletions.
30 changes: 18 additions & 12 deletions cmd/uplink/cmd_cp.go
Expand Up @@ -389,44 +389,50 @@ func (c *cmdCp) copyFile(ctx context.Context, fs ulfs.Filesystem, source, dest u
}
defer func() { _ = mrh.Close() }()

mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{
Expires: c.expires,
Metadata: c.metadata,
})
parallelism := c.parallelism
partSize, singlePart, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64())
if err != nil {
return err
}
defer func() { _ = mwh.Abort(ctx) }()
if singlePart {
parallelism = 1
}
singlePart = singlePart || parallelism == 1

partSize, err := c.calculatePartSize(mrh.Length(), c.parallelismChunkSize.Int64())
mwh, err := fs.Create(ctx, dest, &ulfs.CreateOptions{
Expires: c.expires,
Metadata: c.metadata,
SinglePart: singlePart,
})
if err != nil {
return err
}
defer func() { _ = mwh.Abort(ctx) }()

return errs.Wrap(c.parallelCopy(
ctx,
source, dest,
mrh, mwh,
c.parallelism, partSize,
parallelism, partSize,
offset, length,
bar,
))
}

// calculatePartSize returns the needed part size in order to upload the file with size of 'length'.
// It hereby respects if the client requests/prefers a certain size and only increases if needed.
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, err error) {
func (c *cmdCp) calculatePartSize(length, preferredSize int64) (requiredSize int64, singlePart bool, err error) {
segC := (length / maxPartCount / memory.GiB.Int64()) + 1
requiredSize = segC * memory.GiB.Int64()
switch {
case preferredSize == 0:
return requiredSize, nil
return requiredSize, requiredSize <= length, nil
case requiredSize <= preferredSize:
return preferredSize, nil
return preferredSize, preferredSize <= length, nil
case length < 0: // let the user pick their size if we don't have a length to know better
return preferredSize, nil
return preferredSize, false, nil
default:
return 0, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
return 0, false, errs.New(fmt.Sprintf("the specified chunk size %s is too small, requires %s or larger",
memory.FormatBytes(preferredSize), memory.FormatBytes(requiredSize)))
}
}
Expand Down
27 changes: 18 additions & 9 deletions cmd/uplink/cmd_cp_test.go
Expand Up @@ -100,49 +100,58 @@ func TestCpPartSize(t *testing.T) {
c := newCmdCp(nil)

// 10 GiB file, should return 1 GiB
partSize, err := c.calculatePartSize(10*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
partSize, singlePart, err := c.calculatePartSize(10*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, 1*memory.GiB, partSize)

// 10000 GB file, should return 1 GiB.
partSize, err = c.calculatePartSize(10000*memory.GB.Int64(), c.parallelismChunkSize.Int64())
partSize, singlePart, err = c.calculatePartSize(10000*memory.GB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, 1*memory.GiB, partSize)

// 10000 GiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(10000*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
partSize, singlePart, err = c.calculatePartSize(10000*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, 2*memory.GiB, partSize)

// 10 TiB file, should return 2 GiB.
partSize, err = c.calculatePartSize(10*memory.TiB.Int64(), c.parallelismChunkSize.Int64())
partSize, singlePart, err = c.calculatePartSize(10*memory.TiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, 2*memory.GiB, partSize)

// 20001 GiB file, should return 3 GiB.
partSize, err = c.calculatePartSize(20001*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
partSize, singlePart, err = c.calculatePartSize(20001*memory.GiB.Int64(), c.parallelismChunkSize.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, 3*memory.GiB, partSize)

// should return 1GiB as requested.
partSize, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
partSize, singlePart, err = c.calculatePartSize(memory.GiB.Int64()*1300, memory.GiB.Int64())
require.NoError(t, err)
require.True(t, singlePart)
require.EqualValues(t, memory.GiB, partSize)

// should return 1 GiB and error, since preferred is too low.
partSize, err = c.calculatePartSize(1300*memory.GiB.Int64(), memory.MiB.Int64())
partSize, singlePart, err = c.calculatePartSize(1300*memory.GiB.Int64(), memory.MiB.Int64())
require.Error(t, err)
require.False(t, singlePart)
require.Equal(t, "the specified chunk size 1.0 MiB is too small, requires 1.0 GiB or larger", err.Error())
require.Zero(t, partSize)

// negative length should return asked for amount
partSize, err = c.calculatePartSize(-1, 1*memory.GiB.Int64())
partSize, singlePart, err = c.calculatePartSize(-1, 1*memory.GiB.Int64())
require.NoError(t, err)
require.False(t, singlePart)
require.EqualValues(t, 1*memory.GiB, partSize)

// negative length should return specified amount
partSize, err = c.calculatePartSize(-1, 100)
partSize, singlePart, err = c.calculatePartSize(-1, 100)
require.NoError(t, err)
require.False(t, singlePart)
require.EqualValues(t, 100, partSize)
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/uplink/ulfs/filesystem.go
Expand Up @@ -14,8 +14,9 @@ import (

// CreateOptions contains extra options to create an object.
type CreateOptions struct {
Expires time.Time
Metadata map[string]string
Expires time.Time
Metadata map[string]string
SinglePart bool
}

// ListOptions describes options to the List command.
Expand Down
80 changes: 80 additions & 0 deletions cmd/uplink/ulfs/handle_uplink.go
Expand Up @@ -290,3 +290,83 @@ func (u *uplinkPartWriteHandle) Commit() error {
func (u *uplinkPartWriteHandle) Abort() error {
return u.ul.Abort()
}

type uplinkSingleWriteHandle struct {
project *uplink.Project
bucket string
upload *uplink.Upload
metadata uplink.CustomMetadata

mu sync.Mutex
commitErr *error
abortErr *error

partCount int // just for safety
}

func newUplinkSingleWriteHandle(project *uplink.Project, bucket string, upload *uplink.Upload, metadata uplink.CustomMetadata) *uplinkSingleWriteHandle {
return &uplinkSingleWriteHandle{
project: project,
bucket: bucket,
upload: upload,
metadata: metadata,
}
}

func (u *uplinkSingleWriteHandle) NextPart(ctx context.Context, length int64) (WriteHandle, error) {
u.partCount++
if u.partCount > 1 {
panic("invalid use of uplinkSingleWriteHandle")
}

return &uplinkSingleWriteHandleRef{ul: u}, nil
}

func (u *uplinkSingleWriteHandle) Commit(ctx context.Context) error {
u.mu.Lock()
defer u.mu.Unlock()

switch {
case u.abortErr != nil:
return errs.New("cannot commit an aborted multipart write")
case u.commitErr != nil:
return *u.commitErr
}

if err := u.upload.SetCustomMetadata(ctx, u.metadata); err != nil {
u.commitErr = &err
_ = u.upload.Abort()
return err
}

err := u.upload.Commit()
u.commitErr = &err
return err
}

func (u *uplinkSingleWriteHandle) Abort(ctx context.Context) error {
u.mu.Lock()
defer u.mu.Unlock()

switch {
case u.abortErr != nil:
return *u.abortErr
case u.commitErr != nil:
return errs.New("cannot abort a committed multipart write")
}
err := u.upload.Abort()
u.abortErr = &err
return err
}

// uplinkSingleWriteHandleRef implements writeHandle for *uplink.Uploads.
type uplinkSingleWriteHandleRef struct {
ul *uplinkSingleWriteHandle
}

func (u *uplinkSingleWriteHandleRef) Write(p []byte) (int, error) {
return u.ul.upload.Write(p)
}

func (u *uplinkSingleWriteHandleRef) Commit() error { return nil }
func (u *uplinkSingleWriteHandleRef) Abort() error { return nil }
10 changes: 10 additions & 0 deletions cmd/uplink/ulfs/remote.go
Expand Up @@ -56,6 +56,16 @@ func (r *Remote) Create(ctx context.Context, bucket, key string, opts *CreateOpt
}
}

if opts.SinglePart {
upload, err := r.project.UploadObject(ctx, bucket, key, &uplink.UploadOptions{
Expires: opts.Expires,
})
if err != nil {
return nil, err
}
return newUplinkSingleWriteHandle(r.project, bucket, upload, customMetadata), nil
}

info, err := r.project.BeginUpload(ctx, bucket, key, &uplink.UploadOptions{
Expires: opts.Expires,
})
Expand Down

0 comments on commit c7ea51f

Please sign in to comment.