From 4f2fae4f28737a293e087c357224c4a8d5b4e7c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Niewrza=C5=82?= Date: Fri, 27 May 2022 15:16:58 +0200 Subject: [PATCH] cmd/uplink: better error handling for parallel transfer Few improvements were made to how we are handling errors while doing parallel upload/download for single object: * unhide error under 'context canceled' which was shown in most of cases * add part number to error message * don't try to commit if any error occurs while operation * combine errors into more readable form, example: --- failed to download part 3: uplink: eestream: failed to download stripe 0: error retrieving piece 00: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 97.119.158.36:28967: i/o timeout ... error retrieving piece 89: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 161.129.152.194:28967: i/o timeout failed to download part 1: uplink: eestream: failed to download stripe 0: error retrieving piece 01: io: read/write on closed pipe ... error retrieving piece 97: io: read/write on closed pipe failed to download part 2: uplink: eestream: failed to download stripe 0: error retrieving piece 00: io: read/write on closed pipe ... error retrieving piece 01: ecclient: piecestore: rpc: tcp connector failed: rpc: dial tcp 180.183.132.234:28967: operation was canceled error retrieving piece 96: io: read/write on closed pipe main.(*cmdCp).parallelCopy:418 main.(*cmdCp).copyFile:262 main.(*cmdCp).Execute:156 main.(*external).Wrap:123 github.com/zeebo/clingy.(*Environment).dispatchDesc:126 github.com/zeebo/clingy.(*Environment).dispatch:53 github.com/zeebo/clingy.Environment.Run:34 main.main:26 runtime.main:250 --- Change-Id: I9bb70b3f754567761fa8d17bef8ef59b0709e33b --- cmd/uplink/cmd_cp.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/cmd/uplink/cmd_cp.go b/cmd/uplink/cmd_cp.go index 5ea1cc1d0992..97632ca71418 100644 --- a/cmd/uplink/cmd_cp.go +++ b/cmd/uplink/cmd_cp.go @@ -259,7 +259,7 @@ func (c *cmdCp) copyFile(ctx clingy.Context, fs ulfs.Filesystem, source, dest ul return err } - return errs.Wrap(parallelCopy( + return errs.Wrap(c.parallelCopy( ctx, mwh, mrh, c.parallelism, partSize, @@ -308,7 +308,7 @@ func joinDestWith(dest ulloc.Location, suffix string) ulloc.Location { return dest } -func parallelCopy( +func (c *cmdCp) parallelCopy( clctx clingy.Context, dst ulfs.MultiWriteHandle, src ulfs.MultiReadHandle, @@ -396,26 +396,25 @@ func parallelCopy( // to have concurrent safe API with that regards. // // Also, we may want to check that it actually helps, before implementing it. - } - - mu.Lock() - defer mu.Unlock() - es.Add(err) + mu.Lock() + es.Add(errs.New("failed to %s part %d: %v", copyVerb(c.source, c.dest), i, err)) + mu.Unlock() + } }) if !ok { - mu.Lock() - fmt.Fprintln(clctx.Stderr(), "Failed to start part copy", i) - mu.Unlock() - return ctx.Err() + break } } limiter.Wait() - es.Add(dst.Commit(ctx)) + // don't try to commit if any error occur + if len(es) == 0 { + es.Add(dst.Commit(ctx)) + } - return es.Err() + return errs.Wrap(combineErrs(es)) } func parseRange(r string) (offset, length int64, err error) { @@ -466,3 +465,17 @@ func parseRange(r string) (offset, length int64, err error) { return starti, endi - starti + 1, nil } } + +// combineErrs makes a more readable error message from the errors group. +func combineErrs(group errs.Group) error { + if len(group) == 0 { + return nil + } + + errstrings := make([]string, len(group)) + for i, err := range group { + errstrings[i] = err.Error() + } + + return fmt.Errorf("%s", strings.Join(errstrings, "\n")) +}