Skip to content

Commit

Permalink
cmd/uplink: fix how we are collecting errors while copy in parallel
Browse files Browse the repository at this point in the history
Main issue was that when one part copy failed while being inside
goroutine (limiter) and another part was still collecting src/dst parts
it was possible to drop errors from failed part copy. It was possible
bacause on fail context was canceled and if we were still getting
part src/dst then it was returning error immediately and error
group with errors from goroutine was ignored.

Change-Id: I75c6799eba358741629795f2971c7a964cb2c9ce
  • Loading branch information
mniewrzal authored and Storj Robot committed May 31, 2022
1 parent 4f668f5 commit ffbb43d
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions cmd/uplink/cmd_cp.go
Expand Up @@ -330,7 +330,6 @@ func (c *cmdCp) parallelCopy(

ctx, cancel := context.WithCancel(clctx)

defer limiter.Wait()
defer func() { _ = src.Close() }()
defer func() {
nocancel := context2.WithoutCancellation(ctx)
Expand All @@ -340,6 +339,16 @@ func (c *cmdCp) parallelCopy(
}()
defer cancel()

addError := func(err error) {
mu.Lock()
defer mu.Unlock()

es.Add(err)

// abort all other concurrenty copies
cancel()
}

for i := 0; length != 0; i++ {
i := i

Expand All @@ -350,25 +359,19 @@ func (c *cmdCp) parallelCopy(
length -= chunk

rh, err := src.NextPart(ctx, chunk)
if errors.Is(err, io.EOF) {
if err != nil {
if !errors.Is(err, io.EOF) {
addError(errs.New("error getting reader for part %d: %v", i, err))
}
break
} else if err != nil {
mu.Lock()
fmt.Fprintln(clctx.Stderr(), "Error getting reader for part", i)
mu.Unlock()

return err
}

wh, err := dst.NextPart(ctx, chunk)
if err != nil {
_ = rh.Close()

mu.Lock()
fmt.Fprintln(clctx.Stderr(), "Error getting writer for part", i)
mu.Unlock()

return err
addError(errs.New("error getting writer for part %d: %v", i, err))
break
}

ok := limiter.Go(ctx, func() {
Expand All @@ -387,8 +390,6 @@ func (c *cmdCp) parallelCopy(
}

if err != nil {
// abort all other concurrenty copies
cancel()
// TODO: it would be also nice to use wh.Abort and rh.Close directly
// to avoid some of the waiting that's caused by sync2.Copy.
//
Expand All @@ -397,9 +398,7 @@ func (c *cmdCp) parallelCopy(
//
// Also, we may want to check that it actually helps, before implementing it.

mu.Lock()
es.Add(errs.New("failed to %s part %d: %v", copyVerb(c.source, c.dest), i, err))
mu.Unlock()
addError(errs.New("failed to %s part %d: %v", copyVerb(c.source, c.dest), i, err))
}
})
if !ok {
Expand Down

1 comment on commit ffbb43d

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/release-preparation-v1-57/18810/1

Please sign in to comment.