Skip to content

Commit

Permalink
cmd/uplink: better error handling for parallel transfer
Browse files Browse the repository at this point in the history
Few improvements were made to how we are handling errors
while doing parallel upload/download for single object:
* unhide error under 'context canceled' which was shown in most of
cases
* add part number to error message
* don't try to commit if any error occurs while operation
* combine errors into more readable form, example:

---
failed to download part 3: uplink: eestream: failed to download stripe 0:
error retrieving piece 00: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 97.119.158.36:28967: i/o timeout
...
error retrieving piece 89: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 161.129.152.194:28967: i/o timeout
failed to download part 1: uplink: eestream: failed to download stripe 0:
error retrieving piece 01: io: read/write on closed pipe
...
error retrieving piece 97: io: read/write on closed pipe
failed to download part 2: uplink: eestream: failed to download stripe 0:
error retrieving piece 00: io: read/write on closed pipe
...
error retrieving piece 01: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 180.183.132.234:28967: operation was canceled
error retrieving piece 96: io: read/write on closed pipe
	main.(*cmdCp).parallelCopy:418
	main.(*cmdCp).copyFile:262
	main.(*cmdCp).Execute:156
	main.(*external).Wrap:123
	github.com/zeebo/clingy.(*Environment).dispatchDesc:126
	github.com/zeebo/clingy.(*Environment).dispatch:53
	github.com/zeebo/clingy.Environment.Run:34
	main.main:26
	runtime.main:250
---

Change-Id: I9bb70b3f754567761fa8d17bef8ef59b0709e33b
  • Loading branch information
mniewrzal committed May 27, 2022
1 parent 3f13cd7 commit 4f2fae4
Showing 1 changed file with 26 additions and 13 deletions.
39 changes: 26 additions & 13 deletions cmd/uplink/cmd_cp.go
Expand Up @@ -259,7 +259,7 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul
return err
}

return errs.Wrap(parallelCopy(
return errs.Wrap(c.parallelCopy(
ctx,
mwh, mrh,
c.parallelism, partSize,
Expand Down Expand Up @@ -308,7 +308,7 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location {
return dest
}

func parallelCopy(
func (c *cmdCp) parallelCopy(
clctx clingy.Context,
dst ulfs.MultiWriteHandle,
src ulfs.MultiReadHandle,
Expand Down Expand Up @@ -396,26 +396,25 @@ func parallelCopy(
// to have concurrent safe API with that regards.
//
// Also, we may want to check that it actually helps, before implementing it.
}

mu.Lock()
defer mu.Unlock()

es.Add(err)
mu.Lock()
es.Add(errs.New("failed to %s part %d: %v", copyVerb(c.source, c.dest), i, err))
mu.Unlock()
}
})
if !ok {
mu.Lock()
fmt.Fprintln(clctx.Stderr(), "Failed to start part copy", i)
mu.Unlock()
return ctx.Err()
break
}
}

limiter.Wait()

es.Add(dst.Commit(ctx))
// don't try to commit if any error occur
if len(es) == 0 {
es.Add(dst.Commit(ctx))
}

return es.Err()
return errs.Wrap(combineErrs(es))
}

func parseRange(r string) (offset, length int64, err error) {
Expand Down Expand Up @@ -466,3 +465,17 @@ func parseRange(r string) (offset, length int64, err error) {
return starti, endi - starti + 1, nil
}
}

// combineErrs makes a more readable error message from the errors group.
func combineErrs(group errs.Group) error {
if len(group) == 0 {
return nil
}

errstrings := make([]string, len(group))
for i, err := range group {
errstrings[i] = err.Error()
}

return fmt.Errorf("%s", strings.Join(errstrings, "\n"))
}

1 comment on commit 4f2fae4

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/release-preparation-v1-57/18810/1

Please sign in to comment.