diff --git a/command/sync.go b/command/sync.go index c848e7abf..6445aa809 100644 --- a/command/sync.go +++ b/command/sync.go @@ -232,7 +232,7 @@ func (s Sync) Run(c *cli.Context) error { pipeReader, pipeWriter := io.Pipe() // create a reader, writer pipe to pass commands to run // Create commands in background. - go s.planRun(c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch) + go s.planRun(ctx, c, onlySource, onlyDest, commonObjects, dsturl, strategy, pipeWriter, isBatch) err = NewRun(c, pipeReader).Run(ctx) return multierror.Append(err, merrorWaiter).ErrorOrNil() @@ -434,6 +434,7 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, cancel context // planRun prepares the commands and writes them to writer 'w'. func (s Sync) planRun( + ctx context.Context, c *cli.Context, onlySource, onlyDest chan *url.URL, common chan *ObjectPair, @@ -459,14 +460,25 @@ func (s Sync) planRun( wg.Add(1) go func() { defer wg.Done() - for srcurl := range onlySource { - curDestURL := generateDestinationURL(srcurl, dsturl, isBatch) - command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL) - if err != nil { - printDebug(s.op, err, srcurl, curDestURL) - continue + for { + select { + case srcurl := <-onlySource: + if srcurl == nil { + return + } + curDestURL := generateDestinationURL(srcurl, dsturl, isBatch) + command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL) + if err != nil { + printDebug(s.op, err, srcurl, curDestURL) + continue + } + fmt.Fprintln(w, command) + case <-ctx.Done(): + // If the context was cancelled, there was an error. + if err := ctx.Err(); err != nil { + return + } } - fmt.Fprintln(w, command) } }() @@ -474,21 +486,32 @@ func (s Sync) planRun( wg.Add(1) go func() { defer wg.Done() - for commonObject := range common { - sourceObject, destObject := commonObject.src, commonObject.dst - curSourceURL, curDestURL := sourceObject.URL, destObject.URL - err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue - } + for { + select { + case commonObject := <-common: + if commonObject == nil { + return + } + sourceObject, destObject := commonObject.src, commonObject.dst + curSourceURL, curDestURL := sourceObject.URL, destObject.URL + err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied. + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } - command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) - if err != nil { - printDebug(s.op, err, curSourceURL, curDestURL) - continue + command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL) + if err != nil { + printDebug(s.op, err, curSourceURL, curDestURL) + continue + } + fmt.Fprintln(w, command) + case <-ctx.Done(): + // If the context was cancelled, there was an error. + if err := ctx.Err(); err != nil { + return + } } - fmt.Fprintln(w, command) } }() @@ -501,8 +524,21 @@ func (s Sync) planRun( // or rewrite generateCommand function? dstURLs := make([]*url.URL, 0, extsortChunkSize) - for d := range onlyDest { - dstURLs = append(dstURLs, d) + finished := false + for !finished { + select { + case d := <-onlyDest: + if d == nil { + finished = true + break + } + dstURLs = append(dstURLs, d) + case <-ctx.Done(): + // If the context was cancelled, there was an error. + if err := ctx.Err(); err != nil { + return + } + } } if len(dstURLs) == 0 { @@ -576,7 +612,7 @@ func (s Sync) shouldStopSync(err error) bool { } if awsErr, ok := err.(awserr.Error); ok { switch awsErr.Code() { - case "AccessDenied", "NoSuchBucket": + case "AccessDenied", "NoSuchBucket", "RequestError", "SerializationError": return true } }