Skip to content

Commit

Permalink
operations: run hashing operations in parallel #3419
Browse files Browse the repository at this point in the history
Before this change for a post copy Hash check we would run the hashes sequentially.

Now we run the hashes in parallel for a useful speedup.

Note that this refactors the hash check in Copy to use the standard
hash checking routine.
  • Loading branch information
ncw committed Aug 12, 2019
1 parent f1e1e5f commit 395b668
Showing 1 changed file with 37 additions and 31 deletions.
68 changes: 37 additions & 31 deletions fs/operations/operations.go
Expand Up @@ -51,30 +51,46 @@ func CheckHashes(ctx context.Context, src fs.ObjectInfo, dst fs.Object) (equal b
if common.Count() == 0 {
return true, hash.None, nil
}
ht = common.GetOne()
srcHash, err := src.Hash(ctx, ht)
equal, ht, _, _, err = checkHashes(ctx, src, dst, common.GetOne())
return equal, ht, err
}

// checkHashes does the work of CheckHashes but takes a hash.Type and
// returns the effective hash type used.
func checkHashes(ctx context.Context, src fs.ObjectInfo, dst fs.Object, ht hash.Type) (equal bool, htOut hash.Type, srcHash, dstHash string, err error) {
// Calculate hashes in parallel
g, ctx := errgroup.WithContext(ctx)
g.Go(func() (err error) {
srcHash, err = src.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(src, "Failed to calculate src hash: %v", err)
}
return err
})
g.Go(func() (err error) {
dstHash, err = dst.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to calculate dst hash: %v", err)
}
return err
})
err = g.Wait()
if err != nil {
fs.CountError(err)
fs.Errorf(src, "Failed to calculate src hash: %v", err)
return false, ht, err
return false, ht, srcHash, dstHash, err
}
if srcHash == "" {
return true, hash.None, nil
}
dstHash, err := dst.Hash(ctx, ht)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to calculate dst hash: %v", err)
return false, ht, err
return true, hash.None, srcHash, dstHash, nil
}
if dstHash == "" {
return true, hash.None, nil
return true, hash.None, srcHash, dstHash, nil
}
if srcHash != dstHash {
fs.Debugf(src, "%v = %s (%v)", ht, srcHash, src.Fs())
fs.Debugf(dst, "%v = %s (%v)", ht, dstHash, dst.Fs())
}
return srcHash == dstHash, ht, nil
return srcHash == dstHash, ht, srcHash, dstHash, nil
}

// Equal checks to see if the src and dst objects are equal by looking at
Expand Down Expand Up @@ -377,24 +393,14 @@ func Copy(ctx context.Context, f fs.Fs, dst fs.Object, remote string, src fs.Obj

// Verify hashes are the same after transfer - ignoring blank hashes
if !fs.Config.IgnoreChecksum && hashType != hash.None {
var srcSum string
srcSum, err = src.Hash(ctx, hashType)
if err != nil {
// checkHashes has logged and counted errors
equal, _, srcSum, dstSum, _ := checkHashes(ctx, src, dst, hashType)
if !equal {
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
fs.Errorf(dst, "%v", err)
fs.CountError(err)
fs.Errorf(src, "Failed to read src hash: %v", err)
} else if srcSum != "" {
var dstSum string
dstSum, err = dst.Hash(ctx, hashType)
if err != nil {
fs.CountError(err)
fs.Errorf(dst, "Failed to read hash: %v", err)
} else if !hash.Equals(srcSum, dstSum) {
err = errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
fs.Errorf(dst, "%v", err)
fs.CountError(err)
removeFailedCopy(ctx, dst)
return newDst, err
}
removeFailedCopy(ctx, dst)
return newDst, err
}
}

Expand Down

0 comments on commit 395b668

Please sign in to comment.